http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java new file mode 100644 index 0000000..c0d3b74 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.throttle; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; + +/** + * A throughput controller which uses the follow schema to limit throughput + * <ul> + * <li>If compaction pressure is greater than 1.0, no limitation.</li> + * <li>In off peak hours, use a fixed throughput limitation + * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK}</li> + * <li>In normal hours, the max throughput is tuned between + * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND} and + * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND}, using the formula "lower + + * (higer - lower) * compactionPressure", where compactionPressure is in range [0.0, 1.0]</li> + * </ul> + * @see org.apache.hadoop.hbase.regionserver.Store#getCompactionPressure() + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class PressureAwareCompactionThroughputController extends PressureAwareThroughputController { + + private final static Log LOG = LogFactory + .getLog(PressureAwareCompactionThroughputController.class); + + public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND = + "hbase.hstore.compaction.throughput.higher.bound"; + + private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND = + 20L * 1024 * 1024; + + public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND = + "hbase.hstore.compaction.throughput.lower.bound"; + + private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND = + 10L * 1024 * 1024; + + public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK = + "hbase.hstore.compaction.throughput.offpeak"; + + private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK = Long.MAX_VALUE; + + public static final String HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD = + "hbase.hstore.compaction.throughput.tune.period"; + + private static final int DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD = 60 * 1000; + + // check compaction throughput every this size + private static final String HBASE_HSTORE_COMPACTION_THROUGHPUT_CONTROL_CHECK_INTERVAL = + "hbase.hstore.compaction.throughput.control.check.interval"; + + private long maxThroughputOffpeak; + + @Override + public void setup(final RegionServerServices server) { + server.getChoreService().scheduleChore( + new ScheduledChore("CompactionThroughputTuner", this, tuningPeriod) { + + @Override + protected void chore() { + tune(server.getCompactionPressure()); + } + }); + } + + private void tune(double compactionPressure) { + double maxThroughputToSet; + if (compactionPressure > 1.0) { + // set to unlimited if some stores already reach the blocking store file count + maxThroughputToSet = Double.MAX_VALUE; + } else if (offPeakHours.isOffPeakHour()) { + maxThroughputToSet = maxThroughputOffpeak; + } else { + // compactionPressure is between 0.0 and 1.0, we use a simple linear formula to + // calculate the throughput limitation. + maxThroughputToSet = + maxThroughputLowerBound + (maxThroughputUpperBound - maxThroughputLowerBound) + * compactionPressure; + } + if (LOG.isDebugEnabled()) { + LOG.debug("compactionPressure is " + compactionPressure + ", tune compaction throughput to " + + throughputDesc(maxThroughputToSet)); + } + this.setMaxThroughput(maxThroughputToSet); + } + + @Override + public void setConf(Configuration conf) { + super.setConf(conf); + if (conf == null) { + return; + } + this.maxThroughputUpperBound = + conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, + DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND); + this.maxThroughputLowerBound = + conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, + DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND); + this.maxThroughputOffpeak = + conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK, + DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK); + this.offPeakHours = OffPeakHours.getInstance(conf); + this.controlPerSize = + conf.getLong(HBASE_HSTORE_COMPACTION_THROUGHPUT_CONTROL_CHECK_INTERVAL, + this.maxThroughputLowerBound); + this.setMaxThroughput(this.maxThroughputLowerBound); + this.tuningPeriod = + getConf().getInt(HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD, + DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD); + LOG.info("Compaction throughput configurations, higher bound: " + + throughputDesc(maxThroughputUpperBound) + ", lower bound " + + throughputDesc(maxThroughputLowerBound) + ", off peak: " + + throughputDesc(maxThroughputOffpeak) + ", tuning period: " + tuningPeriod + " ms"); + } + + @Override + public String toString() { + return "DefaultCompactionThroughputController [maxThroughput=" + + throughputDesc(getMaxThroughput()) + ", activeCompactions=" + activeOperations.size() + + "]"; + } + + @Override + protected boolean skipControl(long deltaSize, long controlSize) { + if (deltaSize < controlSize) { + return true; + } else { + return false; + } + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareFlushThroughputController.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareFlushThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareFlushThroughputController.java new file mode 100644 index 0000000..f301a27 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareFlushThroughputController.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.throttle; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; + +/** + * A throughput controller which uses the follow schema to limit throughput + * <ul> + * <li>If flush pressure is greater than or equal to 1.0, no limitation.</li> + * <li>In normal case, the max throughput is tuned between + * {@value #HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND} and + * {@value #HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND}, using the formula "lower + + * (upper - lower) * flushPressure", where flushPressure is in range [0.0, 1.0)</li> + * </ul> + * @see org.apache.hadoop.hbase.regionserver.HRegionServer#getFlushPressure() + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class PressureAwareFlushThroughputController extends PressureAwareThroughputController { + + private static final Log LOG = LogFactory.getLog(PressureAwareFlushThroughputController.class); + + public static final String HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND = + "hbase.hstore.flush.throughput.upper.bound"; + + private static final long DEFAULT_HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND = + 200L * 1024 * 1024; + + public static final String HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND = + "hbase.hstore.flush.throughput.lower.bound"; + + private static final long DEFAULT_HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND = + 100L * 1024 * 1024; + + public static final String HBASE_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD = + "hbase.hstore.flush.throughput.tune.period"; + + private static final int DEFAULT_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD = 20 * 1000; + + // check flush throughput every this size + public static final String HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL = + "hbase.hstore.flush.throughput.control.check.interval"; + + private static final long DEFAULT_HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL = + 10L * 1024 * 1024;// 10MB + + @Override + public void setup(final RegionServerServices server) { + server.getChoreService().scheduleChore( + new ScheduledChore("FlushThroughputTuner", this, tuningPeriod, this.tuningPeriod) { + + @Override + protected void chore() { + tune(server.getFlushPressure()); + } + }); + } + + private void tune(double flushPressure) { + double maxThroughputToSet; + if (flushPressure >= 1.0) { + // set to unlimited if global memstore size already exceeds lower limit + maxThroughputToSet = Double.MAX_VALUE; + } else { + // flushPressure is between 0.0 and 1.0, we use a simple linear formula to + // calculate the throughput limitation. + maxThroughputToSet = + maxThroughputLowerBound + (maxThroughputUpperBound - maxThroughputLowerBound) + * flushPressure; + } + if (LOG.isDebugEnabled()) { + LOG.debug("flushPressure is " + flushPressure + ", tune flush throughput to " + + throughputDesc(maxThroughputToSet)); + } + this.setMaxThroughput(maxThroughputToSet); + } + + @Override + public void setConf(Configuration conf) { + super.setConf(conf); + if (conf == null) { + return; + } + this.maxThroughputUpperBound = + conf.getLong(HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND, + DEFAULT_HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND); + this.maxThroughputLowerBound = + conf.getLong(HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND, + DEFAULT_HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND); + this.offPeakHours = OffPeakHours.getInstance(conf); + this.controlPerSize = + conf.getLong(HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL, + DEFAULT_HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL); + this.setMaxThroughput(this.maxThroughputLowerBound); + this.tuningPeriod = + getConf().getInt(HBASE_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD, + DEFAULT_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD); + LOG.info("Flush throughput configurations, upper bound: " + + throughputDesc(maxThroughputUpperBound) + ", lower bound " + + throughputDesc(maxThroughputLowerBound) + ", tuning period: " + tuningPeriod + " ms"); + } + + @Override + public String toString() { + return "DefaultFlushController [maxThroughput=" + throughputDesc(getMaxThroughput()) + + ", activeFlushNumber=" + activeOperations.size() + "]"; + } + + @Override + protected boolean skipControl(long deltaSize, long controlSize) { + // for flush, we control the flow no matter whether the flush size is small + return false; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java new file mode 100644 index 0000000..c360985 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.throttle; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public abstract class PressureAwareThroughputController extends Configured implements + ThroughputController, Stoppable { + private static final Log LOG = LogFactory.getLog(PressureAwareThroughputController.class); + + /** + * Stores the information of one controlled compaction. + */ + private static final class ActiveOperation { + + private final long startTime; + + private long lastControlTime; + + private long lastControlSize; + + private long totalSize; + + private long numberOfSleeps; + + private long totalSleepTime; + + // prevent too many debug log + private long lastLogTime; + + ActiveOperation() { + long currentTime = EnvironmentEdgeManager.currentTime(); + this.startTime = currentTime; + this.lastControlTime = currentTime; + this.lastLogTime = currentTime; + } + } + + protected long maxThroughputUpperBound; + + protected long maxThroughputLowerBound; + + protected OffPeakHours offPeakHours; + + protected long controlPerSize; + + protected int tuningPeriod; + + private volatile double maxThroughput; + + protected final ConcurrentMap<String, ActiveOperation> activeOperations = + new ConcurrentHashMap<String, ActiveOperation>(); + + @Override + public abstract void setup(final RegionServerServices server); + + protected String throughputDesc(long deltaSize, long elapsedTime) { + return throughputDesc((double) deltaSize / elapsedTime * 1000); + } + + protected String throughputDesc(double speed) { + if (speed >= 1E15) { // large enough to say it is unlimited + return "unlimited"; + } else { + return String.format("%.2f MB/sec", speed / 1024 / 1024); + } + } + + @Override + public void start(String opName) { + activeOperations.put(opName, new ActiveOperation()); + } + + @Override + public long control(String opName, long size) throws InterruptedException { + ActiveOperation operation = activeOperations.get(opName); + operation.totalSize += size; + long deltaSize = operation.totalSize - operation.lastControlSize; + if (deltaSize < controlPerSize) { + return 0; + } + long now = EnvironmentEdgeManager.currentTime(); + double maxThroughputPerCompaction = this.getMaxThroughput() / activeOperations.size(); + long minTimeAllowed = (long) (deltaSize / maxThroughputPerCompaction * 1000); // ms + long elapsedTime = now - operation.lastControlTime; + operation.lastControlSize = operation.totalSize; + if (elapsedTime >= minTimeAllowed) { + operation.lastControlTime = EnvironmentEdgeManager.currentTime(); + return 0; + } + // too fast + long sleepTime = minTimeAllowed - elapsedTime; + if (LOG.isDebugEnabled()) { + // do not log too much + if (now - operation.lastLogTime > 5L * 1000) { + LOG.debug("deltaSize: " + deltaSize + " bytes; elapseTime: " + elapsedTime + " ns"); + LOG.debug(opName + " sleep " + sleepTime + " ms because current throughput is " + + throughputDesc(deltaSize, elapsedTime) + ", max allowed is " + + throughputDesc(maxThroughputPerCompaction) + ", already slept " + + operation.numberOfSleeps + " time(s) and total slept time is " + + operation.totalSleepTime + " ms till now."); + operation.lastLogTime = now; + } + } + Thread.sleep(sleepTime); + operation.numberOfSleeps++; + operation.totalSleepTime += sleepTime; + operation.lastControlTime = EnvironmentEdgeManager.currentTime(); + return sleepTime; + } + + /** + * Check whether to skip control given delta size and control size + * @param deltaSize Delta size since last control + * @param controlSize Size limit to perform control + * @return a boolean indicates whether to skip this control + */ + protected abstract boolean skipControl(long deltaSize, long controlSize); + + @Override + public void finish(String opName) { + ActiveOperation operation = activeOperations.remove(opName); + long elapsedTime = EnvironmentEdgeManager.currentTime() - operation.startTime; + LOG.info(opName + " average throughput is " + + throughputDesc(operation.totalSize, elapsedTime) + ", slept " + + operation.numberOfSleeps + " time(s) and total slept time is " + + operation.totalSleepTime + " ms. " + activeOperations.size() + + " active operations remaining, total limit is " + throughputDesc(getMaxThroughput())); + } + + private volatile boolean stopped = false; + + @Override + public void stop(String why) { + stopped = true; + } + + @Override + public boolean isStopped() { + return stopped; + } + + public double getMaxThroughput() { + return maxThroughput; + } + + public void setMaxThroughput(double maxThroughput) { + this.maxThroughput = maxThroughput; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputControlUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputControlUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputControlUtil.java new file mode 100644 index 0000000..b3c4147 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputControlUtil.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.throttle; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.Store; + +/** + * Helper methods for throttling + */ +@InterfaceAudience.Private +public final class ThroughputControlUtil { + private ThroughputControlUtil() { + } + + private static final AtomicInteger NAME_COUNTER = new AtomicInteger(0); + private static final String NAME_DELIMITER = "#"; + + /** + * Generate a name for throttling, to prevent name conflict when multiple IO operation running + * parallel on the same store. + * @param store the Store instance on which IO operation is happening + * @param opName Name of the IO operation, e.g. "flush", "compaction", etc. + * @return The name for throttling + */ + public static String getNameForThrottling(final Store store, final String opName) { + int counter; + for (;;) { + counter = NAME_COUNTER.get(); + int next = counter == Integer.MAX_VALUE ? 0 : counter + 1; + if (NAME_COUNTER.compareAndSet(counter, next)) { + break; + } + } + return store.getRegionInfo().getRegionNameAsString() + NAME_DELIMITER + + store.getFamily().getNameAsString() + NAME_DELIMITER + opName + NAME_DELIMITER + counter; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java new file mode 100644 index 0000000..f299f98 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/ThroughputController.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.throttle; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; + +/** + * A utility that constrains the total throughput of one or more simultaneous flows by + * sleeping when necessary. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public interface ThroughputController extends Stoppable { + + /** + * Setup controller for the given region server. + */ + void setup(RegionServerServices server); + + /** + * Start the throughput controller. + */ + void start(String name); + + /** + * Control the throughput. Will sleep if too fast. + * @return the actual sleep time. + */ + long control(String name, long size) throws InterruptedException; + + /** + * Finish the controller. Should call this method in a finally block. + */ + void finish(String name); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index 0986ad7..e634327 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.ServerNonceManager; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; @@ -317,4 +318,13 @@ public class MockRegionServerServices implements RegionServerServices { // TODO Auto-generated method stub return null; } + + public ThroughputController getFlushThroughputController() { + return null; + } + + @Override + public double getFlushPressure() { + return 0; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java index 94a63d8..35a7403 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java @@ -31,7 +31,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; @@ -44,7 +43,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -122,7 +121,7 @@ public class TestIOFencing { @Override public boolean compact(CompactionContext compaction, Store store, - CompactionThroughputController throughputController) throws IOException { + ThroughputController throughputController) throws IOException { try { return super.compact(compaction, store, throughputController); } finally { @@ -132,7 +131,7 @@ public class TestIOFencing { @Override public boolean compact(CompactionContext compaction, Store store, - CompactionThroughputController throughputController, User user) throws IOException { + ThroughputController throughputController, User user) throws IOException { try { return super.compact(compaction, store, throughputController, user); } finally { http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java index e20c4ad..5241dbe 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java @@ -60,12 +60,12 @@ import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.testclassification.CoprocessorTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -235,7 +235,7 @@ public class TestRegionObserverScannerOpenHook { @Override public boolean compact(CompactionContext compaction, Store store, - CompactionThroughputController throughputController) throws IOException { + ThroughputController throughputController) throws IOException { boolean ret = super.compact(compaction, store, throughputController); if (ret) compactionStateChangeLatch.countDown(); return ret; @@ -243,7 +243,7 @@ public class TestRegionObserverScannerOpenHook { @Override public boolean compact(CompactionContext compaction, Store store, - CompactionThroughputController throughputController, User user) throws IOException { + ThroughputController throughputController, User user) throws IOException { boolean ret = super.compact(compaction, store, throughputController, user); if (ret) compactionStateChangeLatch.countDown(); return ret; http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 32f644b..4de4a5f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -103,6 +103,7 @@ import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.ServerNonceManager; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; @@ -662,4 +663,13 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { // TODO Auto-generated method stub return null; } + + public ThroughputController getFlushThroughputController() { + return null; + } + + @Override + public double getFlushPressure() { + return 0; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java index 022279a..f57ade9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java @@ -22,8 +22,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory; -import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; +import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Assert; http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index b374bdc..06b4c46 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -59,9 +59,9 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; -import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory; +import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; +import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -104,7 +104,7 @@ public class TestCompaction { conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024); conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100); conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, - NoLimitCompactionThroughputController.class.getName()); + NoLimitThroughputController.class.getName()); compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3); secondRowBytes = START_KEY_BYTES.clone(); @@ -363,13 +363,13 @@ public class TestCompaction { } @Override - public List<Path> compact(CompactionThroughputController throughputController) + public List<Path> compact(ThroughputController throughputController) throws IOException { return compact(throughputController, null); } @Override - public List<Path> compact(CompactionThroughputController throughputController, User user) + public List<Path> compact(ThroughputController throughputController, User user) throws IOException { finishCompaction(this.selectedFiles); return new ArrayList<Path>(); @@ -421,13 +421,13 @@ public class TestCompaction { } @Override - public List<Path> compact(CompactionThroughputController throughputController) + public List<Path> compact(ThroughputController throughputController) throws IOException { return compact(throughputController, null); } @Override - public List<Path> compact(CompactionThroughputController throughputController, User user) + public List<Path> compact(ThroughputController throughputController, User user) throws IOException { try { isInCompact = true; @@ -510,10 +510,10 @@ public class TestCompaction { HRegion r = mock(HRegion.class); when( r.compact(any(CompactionContext.class), any(Store.class), - any(CompactionThroughputController.class), any(User.class))).then(new Answer<Boolean>() { + any(ThroughputController.class), any(User.class))).then(new Answer<Boolean>() { public Boolean answer(InvocationOnMock invocation) throws Throwable { invocation.getArgumentAt(0, CompactionContext.class).compact( - invocation.getArgumentAt(2, CompactionThroughputController.class)); + invocation.getArgumentAt(2, ThroughputController.class)); return true; } }); http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java index ef02431..385048c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java @@ -59,7 +59,7 @@ import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; -import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.security.EncryptionUtil; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -537,7 +537,7 @@ public class TestHMobStore { // Trigger major compaction this.store.triggerMajorCompaction(); CompactionContext requestCompaction = this.store.requestCompaction(1, null); - this.store.compact(requestCompaction, NoLimitCompactionThroughputController.INSTANCE); + this.store.compact(requestCompaction, NoLimitThroughputController.INSTANCE); Assert.assertEquals(1, this.store.getStorefiles().size()); //Check encryption after compaction http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index 382193b..85b2a9d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -70,7 +70,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescripto import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl; import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult; -import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -314,7 +314,7 @@ public class TestHRegionReplayEvents { // compaction from primary LOG.info("-- Compacting primary, only 1 store"); primaryRegion.compactStore(Bytes.toBytes("cf1"), - NoLimitCompactionThroughputController.INSTANCE); + NoLimitThroughputController.INSTANCE); // now replay the edits and the flush marker reader = createWALReaderForPrimary(); http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index 41fbae6..fe620e7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -85,7 +85,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; -import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; @@ -259,7 +259,7 @@ public class TestSplitTransactionOnCluster { region.initialize(); // 2, Run Compaction cc - assertFalse(region.compact(cc, store, NoLimitCompactionThroughputController.INSTANCE)); + assertFalse(region.compact(cc, store, NoLimitThroughputController.INSTANCE)); assertTrue(fileNum > store.getStorefiles().size()); // 3, Split http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 9e846c6..354ea2d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -72,7 +72,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; -import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.security.User; @@ -382,7 +382,7 @@ public class TestStore { Assert.assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS); // after compact; check the lowest time stamp - store.compact(store.requestCompaction(), NoLimitCompactionThroughputController.INSTANCE); + store.compact(store.requestCompaction(), NoLimitThroughputController.INSTANCE); lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); Assert.assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java index eb8513b..cb586f3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java @@ -52,8 +52,8 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor; +import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -133,7 +133,7 @@ public class TestStripeCompactor { StripeCompactor sc = createCompactor(writers, input); List<Path> paths = sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo, - NoLimitCompactionThroughputController.INSTANCE); + NoLimitThroughputController.INSTANCE); writers.verifyKvs(output, allFiles, true); if (allFiles) { assertEquals(output.length, paths.size()); @@ -170,7 +170,7 @@ public class TestStripeCompactor { StripeCompactor sc = createCompactor(writers, input); List<Path> paths = sc.compact(createDummyRequest(), targetCount, targetSize, left, right, null, null, - NoLimitCompactionThroughputController.INSTANCE); + NoLimitThroughputController.INSTANCE); assertEquals(output.length, paths.size()); writers.verifyKvs(output, true, true); List<byte[]> boundaries = new ArrayList<byte[]>(); http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java index 1454aa8..6b641c1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java @@ -37,10 +37,10 @@ import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -77,7 +77,7 @@ public class TestStripeStoreEngine { when( mockCompactor.compact(any(CompactionRequest.class), anyInt(), anyLong(), any(byte[].class), any(byte[].class), any(byte[].class), any(byte[].class), - any(CompactionThroughputController.class), any(User.class))) + any(ThroughputController.class), any(User.class))) .thenReturn(new ArrayList<Path>()); // Produce 3 L0 files. @@ -96,10 +96,10 @@ public class TestStripeStoreEngine { assertEquals(2, compaction.getRequest().getFiles().size()); assertFalse(compaction.getRequest().getFiles().contains(sf)); // Make sure the correct method it called on compactor. - compaction.compact(NoLimitCompactionThroughputController.INSTANCE); + compaction.compact(NoLimitThroughputController.INSTANCE); verify(mockCompactor, times(1)).compact(compaction.getRequest(), targetCount, 0L, StripeStoreFileManager.OPEN_KEY, StripeStoreFileManager.OPEN_KEY, null, null, - NoLimitCompactionThroughputController.INSTANCE, null); + NoLimitThroughputController.INSTANCE, null); } private static StoreFile createFile() throws Exception { http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java deleted file mode 100644 index 4456ef2..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java +++ /dev/null @@ -1,302 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver.compactions; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.List; -import java.util.Random; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.HStore; -import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.StoreEngine; -import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; -import org.apache.hadoop.hbase.regionserver.StripeStoreEngine; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.JVMClusterUtil; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({ RegionServerTests.class, MediumTests.class }) -public class TestCompactionWithThroughputController { - - private static final Log LOG = LogFactory.getLog(TestCompactionWithThroughputController.class); - - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - private static final double EPSILON = 1E-6; - - private final TableName tableName = TableName.valueOf(getClass().getSimpleName()); - - private final byte[] family = Bytes.toBytes("f"); - - private final byte[] qualifier = Bytes.toBytes("q"); - - private Store getStoreWithName(TableName tableName) { - MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); - List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads(); - for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { - HRegionServer hrs = rsts.get(i).getRegionServer(); - for (Region region : hrs.getOnlineRegions(tableName)) { - return region.getStores().iterator().next(); - } - } - return null; - } - - private Store prepareData() throws IOException { - Admin admin = TEST_UTIL.getHBaseAdmin(); - if (admin.tableExists(tableName)) { - admin.disableTable(tableName); - admin.deleteTable(tableName); - } - Table table = TEST_UTIL.createTable(tableName, family); - Random rand = new Random(); - for (int i = 0; i < 10; i++) { - for (int j = 0; j < 10; j++) { - byte[] value = new byte[128 * 1024]; - rand.nextBytes(value); - table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); - } - admin.flush(tableName); - } - return getStoreWithName(tableName); - } - - private long testCompactionWithThroughputLimit() throws Exception { - long throughputLimit = 1024L * 1024; - Configuration conf = TEST_UTIL.getConfiguration(); - conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); - conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); - conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); - conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); - conf.setLong( - PressureAwareCompactionThroughputController - .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, - throughputLimit); - conf.setLong( - PressureAwareCompactionThroughputController - .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, - throughputLimit); - conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, - PressureAwareCompactionThroughputController.class.getName()); - TEST_UTIL.startMiniCluster(1); - try { - Store store = prepareData(); - assertEquals(10, store.getStorefilesCount()); - long startTime = System.currentTimeMillis(); - TEST_UTIL.getHBaseAdmin().majorCompact(tableName); - while (store.getStorefilesCount() != 1) { - Thread.sleep(20); - } - long duration = System.currentTimeMillis() - startTime; - double throughput = (double) store.getStorefilesSize() / duration * 1000; - // confirm that the speed limit work properly(not too fast, and also not too slow) - // 20% is the max acceptable error rate. - assertTrue(throughput < throughputLimit * 1.2); - assertTrue(throughput > throughputLimit * 0.8); - return System.currentTimeMillis() - startTime; - } finally { - TEST_UTIL.shutdownMiniCluster(); - } - } - - private long testCompactionWithoutThroughputLimit() throws Exception { - Configuration conf = TEST_UTIL.getConfiguration(); - conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); - conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); - conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); - conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); - conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, - NoLimitCompactionThroughputController.class.getName()); - TEST_UTIL.startMiniCluster(1); - try { - Store store = prepareData(); - assertEquals(10, store.getStorefilesCount()); - long startTime = System.currentTimeMillis(); - TEST_UTIL.getHBaseAdmin().majorCompact(tableName); - while (store.getStorefilesCount() != 1) { - Thread.sleep(20); - } - return System.currentTimeMillis() - startTime; - } finally { - TEST_UTIL.shutdownMiniCluster(); - } - } - - @Test - public void testCompaction() throws Exception { - long limitTime = testCompactionWithThroughputLimit(); - long noLimitTime = testCompactionWithoutThroughputLimit(); - LOG.info("With 1M/s limit, compaction use " + limitTime + "ms; without limit, compaction use " - + noLimitTime + "ms"); - // usually the throughput of a compaction without limitation is about 40MB/sec at least, so this - // is a very weak assumption. - assertTrue(limitTime > noLimitTime * 2); - } - - /** - * Test the tuning task of {@link PressureAwareCompactionThroughputController} - */ - @Test - public void testThroughputTuning() throws Exception { - Configuration conf = TEST_UTIL.getConfiguration(); - conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); - conf.setLong( - PressureAwareCompactionThroughputController - .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, - 20L * 1024 * 1024); - conf.setLong( - PressureAwareCompactionThroughputController - .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, - 10L * 1024 * 1024); - conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 4); - conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 6); - conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, - PressureAwareCompactionThroughputController.class.getName()); - conf.setInt( - PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD, - 1000); - TEST_UTIL.startMiniCluster(1); - Connection conn = ConnectionFactory.createConnection(conf); - try { - HTableDescriptor htd = new HTableDescriptor(tableName); - htd.addFamily(new HColumnDescriptor(family)); - htd.setCompactionEnabled(false); - TEST_UTIL.getHBaseAdmin().createTable(htd); - TEST_UTIL.waitTableAvailable(tableName); - HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName); - PressureAwareCompactionThroughputController throughputController = - (PressureAwareCompactionThroughputController) regionServer.compactSplitThread - .getCompactionThroughputController(); - assertEquals(10L * 1024 * 1024, throughputController.maxThroughput, EPSILON); - Table table = conn.getTable(tableName); - for (int i = 0; i < 5; i++) { - byte[] value = new byte[0]; - table.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, value)); - TEST_UTIL.flush(tableName); - } - Thread.sleep(2000); - assertEquals(15L * 1024 * 1024, throughputController.maxThroughput, EPSILON); - - byte[] value1 = new byte[0]; - table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value1)); - TEST_UTIL.flush(tableName); - Thread.sleep(2000); - assertEquals(20L * 1024 * 1024, throughputController.maxThroughput, EPSILON); - - byte[] value = new byte[0]; - table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value)); - TEST_UTIL.flush(tableName); - Thread.sleep(2000); - assertEquals(Double.MAX_VALUE, throughputController.maxThroughput, EPSILON); - - conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, - NoLimitCompactionThroughputController.class.getName()); - regionServer.compactSplitThread.onConfigurationChange(conf); - assertTrue(throughputController.isStopped()); - assertTrue(regionServer.compactSplitThread.getCompactionThroughputController() - instanceof NoLimitCompactionThroughputController); - } finally { - conn.close(); - TEST_UTIL.shutdownMiniCluster(); - } - } - - /** - * Test the logic that we calculate compaction pressure for a striped store. - */ - @Test - public void testGetCompactionPressureForStripedStore() throws Exception { - Configuration conf = TEST_UTIL.getConfiguration(); - conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName()); - conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, false); - conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, 2); - conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4); - conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 12); - TEST_UTIL.startMiniCluster(1); - Connection conn = ConnectionFactory.createConnection(conf); - try { - HTableDescriptor htd = new HTableDescriptor(tableName); - htd.addFamily(new HColumnDescriptor(family)); - htd.setCompactionEnabled(false); - TEST_UTIL.getHBaseAdmin().createTable(htd); - TEST_UTIL.waitTableAvailable(tableName); - HStore store = (HStore) getStoreWithName(tableName); - assertEquals(0, store.getStorefilesCount()); - assertEquals(0.0, store.getCompactionPressure(), EPSILON); - Table table = conn.getTable(tableName); - for (int i = 0; i < 4; i++) { - byte[] value1 = new byte[0]; - table.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, value1)); - byte[] value = new byte[0]; - table.put(new Put(Bytes.toBytes(100 + i)).addColumn(family, qualifier, value)); - TEST_UTIL.flush(tableName); - } - assertEquals(8, store.getStorefilesCount()); - assertEquals(0.0, store.getCompactionPressure(), EPSILON); - - byte[] value5 = new byte[0]; - table.put(new Put(Bytes.toBytes(4)).addColumn(family, qualifier, value5)); - byte[] value4 = new byte[0]; - table.put(new Put(Bytes.toBytes(104)).addColumn(family, qualifier, value4)); - TEST_UTIL.flush(tableName); - assertEquals(10, store.getStorefilesCount()); - assertEquals(0.5, store.getCompactionPressure(), EPSILON); - - byte[] value3 = new byte[0]; - table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value3)); - byte[] value2 = new byte[0]; - table.put(new Put(Bytes.toBytes(105)).addColumn(family, qualifier, value2)); - TEST_UTIL.flush(tableName); - assertEquals(12, store.getStorefilesCount()); - assertEquals(1.0, store.getCompactionPressure(), EPSILON); - - byte[] value1 = new byte[0]; - table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value1)); - byte[] value = new byte[0]; - table.put(new Put(Bytes.toBytes(106)).addColumn(family, qualifier, value)); - TEST_UTIL.flush(tableName); - assertEquals(14, store.getStorefilesCount()); - assertEquals(2.0, store.getCompactionPressure(), EPSILON); - } finally { - conn.close(); - TEST_UTIL.shutdownMiniCluster(); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index 56e71e8..c440a57 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager; import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher; import org.apache.hadoop.hbase.regionserver.TestStripeCompactor.StoreFileWritersCapture; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider; +import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -216,10 +217,10 @@ public class TestStripeCompactionPolicy { assertTrue(policy.needsCompactions(si, al())); StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); assertEquals(si.getStorefiles(), scr.getRequest().getFiles()); - scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null); + scr.execute(sc, NoLimitThroughputController.INSTANCE, null); verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), - any(NoLimitCompactionThroughputController.class), any(User.class)); + any(NoLimitThroughputController.class), any(User.class)); } @Test @@ -469,7 +470,7 @@ public class TestStripeCompactionPolicy { // All the Stripes are expired, so the Compactor will not create any Writers. We need to create // an empty file to preserve metadata StripeCompactor sc = createCompactor(); - List<Path> paths = scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null); + List<Path> paths = scr.execute(sc, NoLimitThroughputController.INSTANCE, null); assertEquals(1, paths.size()); } @@ -528,7 +529,7 @@ public class TestStripeCompactionPolicy { assertTrue(policy.needsCompactions(si, al())); StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); - scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null); + scr.execute(sc, NoLimitThroughputController.INSTANCE, null); verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher<List<byte[]>>() { @Override public boolean matches(Object argument) { @@ -542,7 +543,7 @@ public class TestStripeCompactionPolicy { } }), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom), dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo), - any(NoLimitCompactionThroughputController.class), any(User.class)); + any(NoLimitThroughputController.class), any(User.class)); } /** @@ -563,12 +564,12 @@ public class TestStripeCompactionPolicy { assertTrue(!needsCompaction || policy.needsCompactions(si, al())); StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); - scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null); + scr.execute(sc, NoLimitThroughputController.INSTANCE, null); verify(sc, times(1)).compact(eq(scr.getRequest()), count == null ? anyInt() : eq(count.intValue()), size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end), dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end), - any(NoLimitCompactionThroughputController.class), any(User.class)); + any(NoLimitThroughputController.class), any(User.class)); } /** Verify arbitrary flush. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java new file mode 100644 index 0000000..41975eb --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.throttle; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreEngine; +import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; +import org.apache.hadoop.hbase.regionserver.StripeStoreEngine; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; +import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; +import org.apache.hadoop.hbase.regionserver.throttle.PressureAwareCompactionThroughputController; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestCompactionWithThroughputController { + + private static final Log LOG = LogFactory.getLog(TestCompactionWithThroughputController.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static final double EPSILON = 1E-6; + + private final TableName tableName = TableName.valueOf(getClass().getSimpleName()); + + private final byte[] family = Bytes.toBytes("f"); + + private final byte[] qualifier = Bytes.toBytes("q"); + + private Store getStoreWithName(TableName tableName) { + MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); + List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads(); + for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { + HRegionServer hrs = rsts.get(i).getRegionServer(); + for (Region region : hrs.getOnlineRegions(tableName)) { + return region.getStores().iterator().next(); + } + } + return null; + } + + private Store prepareData() throws IOException { + Admin admin = TEST_UTIL.getHBaseAdmin(); + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + Table table = TEST_UTIL.createTable(tableName, family); + Random rand = new Random(); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + byte[] value = new byte[128 * 1024]; + rand.nextBytes(value); + table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); + } + admin.flush(tableName); + } + return getStoreWithName(tableName); + } + + private long testCompactionWithThroughputLimit() throws Exception { + long throughputLimit = 1024L * 1024; + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); + conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); + conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); + conf.setLong( + PressureAwareCompactionThroughputController + .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, + throughputLimit); + conf.setLong( + PressureAwareCompactionThroughputController + .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, + throughputLimit); + conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, + PressureAwareCompactionThroughputController.class.getName()); + TEST_UTIL.startMiniCluster(1); + try { + Store store = prepareData(); + assertEquals(10, store.getStorefilesCount()); + long startTime = System.currentTimeMillis(); + TEST_UTIL.getHBaseAdmin().majorCompact(tableName); + while (store.getStorefilesCount() != 1) { + Thread.sleep(20); + } + long duration = System.currentTimeMillis() - startTime; + double throughput = (double) store.getStorefilesSize() / duration * 1000; + // confirm that the speed limit work properly(not too fast, and also not too slow) + // 20% is the max acceptable error rate. + assertTrue(throughput < throughputLimit * 1.2); + assertTrue(throughput > throughputLimit * 0.8); + return System.currentTimeMillis() - startTime; + } finally { + TEST_UTIL.shutdownMiniCluster(); + } + } + + private long testCompactionWithoutThroughputLimit() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); + conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); + conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); + conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, + NoLimitThroughputController.class.getName()); + TEST_UTIL.startMiniCluster(1); + try { + Store store = prepareData(); + assertEquals(10, store.getStorefilesCount()); + long startTime = System.currentTimeMillis(); + TEST_UTIL.getHBaseAdmin().majorCompact(tableName); + while (store.getStorefilesCount() != 1) { + Thread.sleep(20); + } + return System.currentTimeMillis() - startTime; + } finally { + TEST_UTIL.shutdownMiniCluster(); + } + } + + @Test + public void testCompaction() throws Exception { + long limitTime = testCompactionWithThroughputLimit(); + long noLimitTime = testCompactionWithoutThroughputLimit(); + LOG.info("With 1M/s limit, compaction use " + limitTime + "ms; without limit, compaction use " + + noLimitTime + "ms"); + // usually the throughput of a compaction without limitation is about 40MB/sec at least, so this + // is a very weak assumption. + assertTrue(limitTime > noLimitTime * 2); + } + + /** + * Test the tuning task of {@link PressureAwareCompactionThroughputController} + */ + @Test + public void testThroughputTuning() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); + conf.setLong( + PressureAwareCompactionThroughputController + .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, + 20L * 1024 * 1024); + conf.setLong( + PressureAwareCompactionThroughputController + .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, + 10L * 1024 * 1024); + conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 4); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 6); + conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, + PressureAwareCompactionThroughputController.class.getName()); + conf.setInt( + PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD, + 1000); + TEST_UTIL.startMiniCluster(1); + Connection conn = ConnectionFactory.createConnection(conf); + try { + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(family)); + htd.setCompactionEnabled(false); + TEST_UTIL.getHBaseAdmin().createTable(htd); + TEST_UTIL.waitTableAvailable(tableName); + HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName); + PressureAwareCompactionThroughputController throughputController = + (PressureAwareCompactionThroughputController) regionServer.compactSplitThread + .getCompactionThroughputController(); + assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON); + Table table = conn.getTable(tableName); + for (int i = 0; i < 5; i++) { + byte[] value = new byte[0]; + table.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, value)); + TEST_UTIL.flush(tableName); + } + Thread.sleep(2000); + assertEquals(15L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON); + + byte[] value1 = new byte[0]; + table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value1)); + TEST_UTIL.flush(tableName); + Thread.sleep(2000); + assertEquals(20L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON); + + byte[] value = new byte[0]; + table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value)); + TEST_UTIL.flush(tableName); + Thread.sleep(2000); + assertEquals(Double.MAX_VALUE, throughputController.getMaxThroughput(), EPSILON); + + conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, + NoLimitThroughputController.class.getName()); + regionServer.compactSplitThread.onConfigurationChange(conf); + assertTrue(throughputController.isStopped()); + assertTrue(regionServer.compactSplitThread.getCompactionThroughputController() + instanceof NoLimitThroughputController); + } finally { + conn.close(); + TEST_UTIL.shutdownMiniCluster(); + } + } + + /** + * Test the logic that we calculate compaction pressure for a striped store. + */ + @Test + public void testGetCompactionPressureForStripedStore() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName()); + conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, false); + conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, 2); + conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 12); + TEST_UTIL.startMiniCluster(1); + Connection conn = ConnectionFactory.createConnection(conf); + try { + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(family)); + htd.setCompactionEnabled(false); + TEST_UTIL.getHBaseAdmin().createTable(htd); + TEST_UTIL.waitTableAvailable(tableName); + HStore store = (HStore) getStoreWithName(tableName); + assertEquals(0, store.getStorefilesCount()); + assertEquals(0.0, store.getCompactionPressure(), EPSILON); + Table table = conn.getTable(tableName); + for (int i = 0; i < 4; i++) { + byte[] value1 = new byte[0]; + table.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, value1)); + byte[] value = new byte[0]; + table.put(new Put(Bytes.toBytes(100 + i)).addColumn(family, qualifier, value)); + TEST_UTIL.flush(tableName); + } + assertEquals(8, store.getStorefilesCount()); + assertEquals(0.0, store.getCompactionPressure(), EPSILON); + + byte[] value5 = new byte[0]; + table.put(new Put(Bytes.toBytes(4)).addColumn(family, qualifier, value5)); + byte[] value4 = new byte[0]; + table.put(new Put(Bytes.toBytes(104)).addColumn(family, qualifier, value4)); + TEST_UTIL.flush(tableName); + assertEquals(10, store.getStorefilesCount()); + assertEquals(0.5, store.getCompactionPressure(), EPSILON); + + byte[] value3 = new byte[0]; + table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value3)); + byte[] value2 = new byte[0]; + table.put(new Put(Bytes.toBytes(105)).addColumn(family, qualifier, value2)); + TEST_UTIL.flush(tableName); + assertEquals(12, store.getStorefilesCount()); + assertEquals(1.0, store.getCompactionPressure(), EPSILON); + + byte[] value1 = new byte[0]; + table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value1)); + byte[] value = new byte[0]; + table.put(new Put(Bytes.toBytes(106)).addColumn(family, qualifier, value)); + TEST_UTIL.flush(tableName); + assertEquals(14, store.getStorefilesCount()); + assertEquals(2.0, store.getCompactionPressure(), EPSILON); + } finally { + conn.close(); + TEST_UTIL.shutdownMiniCluster(); + } + } +}