http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestController.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestController.java new file mode 100644 index 0000000..7e9c968 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestController.java @@ -0,0 +1,125 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.io.InterruptedIOException; +import java.util.Collection; +import java.util.function.Consumer; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * An interface for client request scheduling algorithm. + */ [email protected] [email protected] +public interface RequestController { + + @InterfaceAudience.Public + @InterfaceStability.Evolving + public enum ReturnCode { + /** + * Accept current row. + */ + INCLUDE, + /** + * Skip current row. + */ + SKIP, + /** + * No more row can be included. + */ + END + } + + /** + * Picks up the valid data. + */ + @InterfaceAudience.Public + @InterfaceStability.Evolving + public interface Checker { + /** + * Checks the data whether it is valid to submit. + * @param loc the destination of data + * @param row the data to check + * @return describe the decision for the row + */ + ReturnCode canTakeRow(HRegionLocation loc, Row row); + + /** + * Reset the state of the scheduler when completing the iteration of rows. + * @throws InterruptedIOException some controller may wait + * for some busy region or RS to complete the undealt request. + */ + void reset() throws InterruptedIOException ; + } + + /** + * @return A new checker for evaluating a batch rows. + */ + Checker newChecker(); + + /** + * Increment the counter if we build a valid task. + * @param regions The destination of task + * @param sn The target server + */ + void incTaskCounters(Collection<byte[]> regions, ServerName sn); + + /** + * Decrement the counter if a task is accomplished. + * @param regions The destination of task + * @param sn The target server + */ + void decTaskCounters(Collection<byte[]> regions, ServerName sn); + + /** + * @return The number of running task. + */ + long getNumberOfTsksInProgress(); + + /** + * Waits for the running tasks to complete. + * If there are specified threshold and trigger, the implementation should + * wake up once in a while for checking the threshold and calling trigger. + * @param max This method will return if the number of running tasks is + * less than or equal to max. + * @param id the caller's id + * @param periodToTrigger The period to invoke the trigger. This value is a + * hint. The real period depends on the implementation. + * @param trigger The object to call periodically. + * @throws java.io.InterruptedIOException If the waiting is interrupted + */ + void waitForMaximumCurrentTasks(long max, long id, + int periodToTrigger, Consumer<Long> trigger) throws InterruptedIOException; + + /** + * Wait until there is at least one slot for a new task. + * @param id the caller's id + * @param periodToTrigger The period to invoke the trigger. This value is a + * hint. The real period depends on the implementation. + * @param trigger The object to call periodically. + * @throws java.io.InterruptedIOException If the waiting is interrupted + */ + void waitForFreeSlot(long id, int periodToTrigger, + Consumer<Long> trigger) throws InterruptedIOException; +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestControllerFactory.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestControllerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestControllerFactory.java new file mode 100644 index 0000000..7ed80f0 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestControllerFactory.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.ReflectionUtils; + +/** + * A factory class that constructs an {@link org.apache.hadoop.hbase.client.RequestController}. + */ [email protected] [email protected] +public final class RequestControllerFactory { + public static final String REQUEST_CONTROLLER_IMPL_CONF_KEY = "hbase.client.request.controller.impl"; + /** + * Constructs a {@link org.apache.hadoop.hbase.client.RequestController}. + * @param conf The {@link Configuration} to use. + * @return A RequestController which is built according to the configuration. + */ + public static RequestController create(Configuration conf) { + Class<? extends RequestController> clazz= conf.getClass(REQUEST_CONTROLLER_IMPL_CONF_KEY, + SimpleRequestController.class, RequestController.class); + return ReflectionUtils.newInstance(clazz, conf); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java index 788f1a4..85fd590 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java @@ -30,8 +30,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; */ @InterfaceAudience.Public @InterfaceStability.Evolving -@VisibleForTesting -interface RowAccess<T> extends Iterable<T> { +public interface RowAccess<T> extends Iterable<T> { /** * @return true if there are no elements. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SimpleRequestController.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SimpleRequestController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SimpleRequestController.java new file mode 100644 index 0000000..473f264 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SimpleRequestController.java @@ -0,0 +1,519 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.common.annotations.VisibleForTesting; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.Bytes; +import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; +import org.apache.hadoop.hbase.util.EnvironmentEdge; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * Holds back the request if the submitted size or number has reached the + * threshold. + */ [email protected] [email protected] +class SimpleRequestController implements RequestController { + private static final Log LOG = LogFactory.getLog(SimpleRequestController.class); + /** + * The maximum size of single RegionServer. + */ + public static final String HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = "hbase.client.max.perrequest.heapsize"; + + /** + * Default value of #HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE + */ + @VisibleForTesting + static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = 4194304; + + /** + * The maximum size of submit. + */ + public static final String HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = "hbase.client.max.submit.heapsize"; + /** + * Default value of #HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE + */ + @VisibleForTesting + static final long DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE; + @VisibleForTesting + final AtomicLong tasksInProgress = new AtomicLong(0); + @VisibleForTesting + final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion + = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); + @VisibleForTesting + final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer = new ConcurrentHashMap<>(); + /** + * The number of tasks simultaneously executed on the cluster. + */ + private final int maxTotalConcurrentTasks; + + /** + * The max heap size of all tasks simultaneously executed on a server. + */ + private final long maxHeapSizePerRequest; + private final long maxHeapSizeSubmit; + /** + * The number of tasks we run in parallel on a single region. With 1 (the + * default) , we ensure that the ordering of the queries is respected: we + * don't start a set of operations on a region before the previous one is + * done. As well, this limits the pressure we put on the region server. + */ + @VisibleForTesting + final int maxConcurrentTasksPerRegion; + + /** + * The number of task simultaneously executed on a single region server. + */ + @VisibleForTesting + final int maxConcurrentTasksPerServer; + private final int thresholdToLogUndoneTaskDetails; + public static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = + "hbase.client.threshold.log.details"; + private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10; + public static final String THRESHOLD_TO_LOG_REGION_DETAILS = + "hbase.client.threshold.log.region.details"; + private static final int DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS = 2; + private final int thresholdToLogRegionDetails; + SimpleRequestController(final Configuration conf) { + this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, + HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS); + this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS, + HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS); + this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, + HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS); + this.maxHeapSizePerRequest = conf.getLong(HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, + DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE); + this.maxHeapSizeSubmit = conf.getLong(HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE); + this.thresholdToLogUndoneTaskDetails = + conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS, + DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS); + this.thresholdToLogRegionDetails = + conf.getInt(THRESHOLD_TO_LOG_REGION_DETAILS, + DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS); + if (this.maxTotalConcurrentTasks <= 0) { + throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks); + } + if (this.maxConcurrentTasksPerServer <= 0) { + throw new IllegalArgumentException("maxConcurrentTasksPerServer=" + + maxConcurrentTasksPerServer); + } + if (this.maxConcurrentTasksPerRegion <= 0) { + throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" + + maxConcurrentTasksPerRegion); + } + if (this.maxHeapSizePerRequest <= 0) { + throw new IllegalArgumentException("maxHeapSizePerServer=" + + maxHeapSizePerRequest); + } + + if (this.maxHeapSizeSubmit <= 0) { + throw new IllegalArgumentException("maxHeapSizeSubmit=" + + maxHeapSizeSubmit); + } + } + + @VisibleForTesting + static Checker newChecker(List<RowChecker> checkers) { + return new Checker() { + private boolean isEnd = false; + + @Override + public ReturnCode canTakeRow(HRegionLocation loc, Row row) { + if (isEnd) { + return ReturnCode.END; + } + long rowSize = (row instanceof Mutation) ? ((Mutation) row).heapSize() : 0; + ReturnCode code = ReturnCode.INCLUDE; + for (RowChecker checker : checkers) { + switch (checker.canTakeOperation(loc, rowSize)) { + case END: + isEnd = true; + code = ReturnCode.END; + break; + case SKIP: + code = ReturnCode.SKIP; + break; + case INCLUDE: + default: + break; + } + if (code == ReturnCode.END) { + break; + } + } + for (RowChecker checker : checkers) { + checker.notifyFinal(code, loc, rowSize); + } + return code; + } + + @Override + public void reset() throws InterruptedIOException { + isEnd = false; + InterruptedIOException e = null; + for (RowChecker checker : checkers) { + try { + checker.reset(); + } catch (InterruptedIOException ex) { + e = ex; + } + } + if (e != null) { + throw e; + } + } + }; + } + + @Override + public Checker newChecker() { + List<RowChecker> checkers = new ArrayList<>(3); + checkers.add(new TaskCountChecker(maxTotalConcurrentTasks, + maxConcurrentTasksPerServer, + maxConcurrentTasksPerRegion, + tasksInProgress, + taskCounterPerServer, + taskCounterPerRegion)); + checkers.add(new RequestSizeChecker(maxHeapSizePerRequest)); + checkers.add(new SubmittedSizeChecker(maxHeapSizeSubmit)); + return newChecker(checkers); + } + + @Override + public void incTaskCounters(Collection<byte[]> regions, ServerName sn) { + tasksInProgress.incrementAndGet(); + + computeIfAbsent(taskCounterPerServer, sn, AtomicInteger::new).incrementAndGet(); + + regions.forEach((regBytes) + -> computeIfAbsent(taskCounterPerRegion, regBytes, AtomicInteger::new).incrementAndGet() + ); + } + + @Override + public void decTaskCounters(Collection<byte[]> regions, ServerName sn) { + regions.forEach(regBytes -> { + AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes); + regionCnt.decrementAndGet(); + }); + + taskCounterPerServer.get(sn).decrementAndGet(); + tasksInProgress.decrementAndGet(); + synchronized (tasksInProgress) { + tasksInProgress.notifyAll(); + } + } + + @Override + public long getNumberOfTsksInProgress() { + return tasksInProgress.get(); + } + + @Override + public void waitForMaximumCurrentTasks(long max, long id, + int periodToTrigger, Consumer<Long> trigger) throws InterruptedIOException { + assert max >= 0; + long lastLog = EnvironmentEdgeManager.currentTime(); + long currentInProgress, oldInProgress = Long.MAX_VALUE; + while ((currentInProgress = tasksInProgress.get()) > max) { + if (oldInProgress != currentInProgress) { // Wait for in progress to change. + long now = EnvironmentEdgeManager.currentTime(); + if (now > lastLog + periodToTrigger) { + lastLog = now; + if (trigger != null) { + trigger.accept(currentInProgress); + } + logDetailsOfUndoneTasks(currentInProgress); + } + } + oldInProgress = currentInProgress; + try { + synchronized (tasksInProgress) { + if (tasksInProgress.get() == oldInProgress) { + tasksInProgress.wait(10); + } + } + } catch (InterruptedException e) { + throw new InterruptedIOException("#" + id + ", interrupted." + + " currentNumberOfTask=" + currentInProgress); + } + } + } + + private void logDetailsOfUndoneTasks(long taskInProgress) { + if (taskInProgress <= thresholdToLogUndoneTaskDetails) { + ArrayList<ServerName> servers = new ArrayList<>(); + for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) { + if (entry.getValue().get() > 0) { + servers.add(entry.getKey()); + } + } + LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers); + } + + if (taskInProgress <= thresholdToLogRegionDetails) { + ArrayList<String> regions = new ArrayList<>(); + for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) { + if (entry.getValue().get() > 0) { + regions.add(Bytes.toString(entry.getKey())); + } + } + LOG.info("Regions against which left over task(s) are processed: " + regions); + } + } + + @Override + public void waitForFreeSlot(long id, int periodToTrigger, Consumer<Long> trigger) throws InterruptedIOException { + waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, id, periodToTrigger, trigger); + } + + /** + * limit the heapsize of total submitted data. Reduce the limit of heapsize + * for submitting quickly if there is no running task. + */ + @VisibleForTesting + static class SubmittedSizeChecker implements RowChecker { + + private final long maxHeapSizeSubmit; + private long heapSize = 0; + + SubmittedSizeChecker(final long maxHeapSizeSubmit) { + this.maxHeapSizeSubmit = maxHeapSizeSubmit; + } + + @Override + public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) { + if (heapSize >= maxHeapSizeSubmit) { + return ReturnCode.END; + } + return ReturnCode.INCLUDE; + } + + @Override + public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) { + if (code == ReturnCode.INCLUDE) { + heapSize += rowSize; + } + } + + @Override + public void reset() { + heapSize = 0; + } + } + + /** + * limit the max number of tasks in an AsyncProcess. + */ + @VisibleForTesting + static class TaskCountChecker implements RowChecker { + + private static final long MAX_WAITING_TIME = 1000; //ms + private final Set<HRegionInfo> regionsIncluded = new HashSet<>(); + private final Set<ServerName> serversIncluded = new HashSet<>(); + private final int maxConcurrentTasksPerRegion; + private final int maxTotalConcurrentTasks; + private final int maxConcurrentTasksPerServer; + private final Map<byte[], AtomicInteger> taskCounterPerRegion; + private final Map<ServerName, AtomicInteger> taskCounterPerServer; + private final Set<byte[]> busyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR); + private final AtomicLong tasksInProgress; + + TaskCountChecker(final int maxTotalConcurrentTasks, + final int maxConcurrentTasksPerServer, + final int maxConcurrentTasksPerRegion, + final AtomicLong tasksInProgress, + final Map<ServerName, AtomicInteger> taskCounterPerServer, + final Map<byte[], AtomicInteger> taskCounterPerRegion) { + this.maxTotalConcurrentTasks = maxTotalConcurrentTasks; + this.maxConcurrentTasksPerRegion = maxConcurrentTasksPerRegion; + this.maxConcurrentTasksPerServer = maxConcurrentTasksPerServer; + this.taskCounterPerRegion = taskCounterPerRegion; + this.taskCounterPerServer = taskCounterPerServer; + this.tasksInProgress = tasksInProgress; + } + + @Override + public void reset() throws InterruptedIOException { + // prevent the busy-waiting + waitForRegion(); + regionsIncluded.clear(); + serversIncluded.clear(); + busyRegions.clear(); + } + + private void waitForRegion() throws InterruptedIOException { + if (busyRegions.isEmpty()) { + return; + } + EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); + final long start = ee.currentTime(); + while ((ee.currentTime() - start) <= MAX_WAITING_TIME) { + for (byte[] region : busyRegions) { + AtomicInteger count = taskCounterPerRegion.get(region); + if (count == null || count.get() < maxConcurrentTasksPerRegion) { + return; + } + } + try { + synchronized (tasksInProgress) { + tasksInProgress.wait(10); + } + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted." + + " tasksInProgress=" + tasksInProgress); + } + } + } + + /** + * 1) check the regions is allowed. 2) check the concurrent tasks for + * regions. 3) check the total concurrent tasks. 4) check the concurrent + * tasks for server. + * + * @param loc + * @param rowSize + * @return + */ + @Override + public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) { + + HRegionInfo regionInfo = loc.getRegionInfo(); + if (regionsIncluded.contains(regionInfo)) { + // We already know what to do with this region. + return ReturnCode.INCLUDE; + } + AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName()); + if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) { + // Too many tasks on this region already. + return ReturnCode.SKIP; + } + int newServers = serversIncluded.size() + + (serversIncluded.contains(loc.getServerName()) ? 0 : 1); + if ((newServers + tasksInProgress.get()) > maxTotalConcurrentTasks) { + // Too many tasks. + return ReturnCode.SKIP; + } + AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName()); + if (serverCnt != null && serverCnt.get() >= maxConcurrentTasksPerServer) { + // Too many tasks for this individual server + return ReturnCode.SKIP; + } + return ReturnCode.INCLUDE; + } + + @Override + public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) { + if (code == ReturnCode.INCLUDE) { + regionsIncluded.add(loc.getRegionInfo()); + serversIncluded.add(loc.getServerName()); + } + busyRegions.add(loc.getRegionInfo().getRegionName()); + } + } + + /** + * limit the request size for each regionserver. + */ + @VisibleForTesting + static class RequestSizeChecker implements RowChecker { + + private final long maxHeapSizePerRequest; + private final Map<ServerName, Long> serverRequestSizes = new HashMap<>(); + + RequestSizeChecker(final long maxHeapSizePerRequest) { + this.maxHeapSizePerRequest = maxHeapSizePerRequest; + } + + @Override + public void reset() { + serverRequestSizes.clear(); + } + + @Override + public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) { + // Is it ok for limit of request size? + long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName()) + ? serverRequestSizes.get(loc.getServerName()) : 0L; + // accept at least one request + if (currentRequestSize == 0 || currentRequestSize + rowSize <= maxHeapSizePerRequest) { + return ReturnCode.INCLUDE; + } + return ReturnCode.SKIP; + } + + @Override + public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) { + if (code == ReturnCode.INCLUDE) { + long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName()) + ? serverRequestSizes.get(loc.getServerName()) : 0L; + serverRequestSizes.put(loc.getServerName(), currentRequestSize + rowSize); + } + } + } + + /** + * Provide a way to control the flow of rows iteration. + */ + @VisibleForTesting + interface RowChecker { + + ReturnCode canTakeOperation(HRegionLocation loc, long rowSize); + + /** + * Add the final ReturnCode to the checker. The ReturnCode may be reversed, + * so the checker need the final decision to update the inner state. + * + * @param code The final decision + * @param loc the destination of data + * @param rowSize the data size + */ + void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize); + + /** + * Reset the inner state. + */ + void reset() throws InterruptedIOException; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index bb6cbb5..ed7202a 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -33,12 +33,10 @@ import java.util.Random; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -59,15 +57,8 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.AsyncProcess.ListRowAccess; -import org.apache.hadoop.hbase.client.AsyncProcess.TaskCountChecker; -import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker.ReturnCode; -import org.apache.hadoop.hbase.client.AsyncProcess.RowCheckerHost; -import org.apache.hadoop.hbase.client.AsyncProcess.RequestSizeChecker; +import org.apache.hadoop.hbase.client.AsyncProcessTask.ListRowAccess; import org.apache.hadoop.hbase.client.coprocessor.Batch; -import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; @@ -78,60 +69,64 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestRule; import org.mockito.Mockito; -import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker; -import org.apache.hadoop.hbase.client.AsyncProcess.SubmittedSizeChecker; +import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows; import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; import org.apache.hadoop.hbase.client.backoff.ServerStatistics; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.testclassification.ClientTests; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; + @Category({ClientTests.class, MediumTests.class}) public class TestAsyncProcess { @Rule public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()). withLookingForStuckThread(true).build(); - private final static Log LOG = LogFactory.getLog(TestAsyncProcess.class); + private static final Log LOG = LogFactory.getLog(TestAsyncProcess.class); private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE"); private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes(); private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes(); private static final byte[] DUMMY_BYTES_3 = "DUMMY_BYTES_3".getBytes(); private static final byte[] FAILS = "FAILS".getBytes(); - private static final Configuration conf = new Configuration(); - - private static ServerName sn = ServerName.valueOf("s1:1,1"); - private static ServerName sn2 = ServerName.valueOf("s2:2,2"); - private static ServerName sn3 = ServerName.valueOf("s3:3,3"); - private static HRegionInfo hri1 = + private static final Configuration CONF = new Configuration(); + private static final ConnectionConfiguration CONNECTION_CONFIG = new ConnectionConfiguration(CONF); + private static final ServerName sn = ServerName.valueOf("s1:1,1"); + private static final ServerName sn2 = ServerName.valueOf("s2:2,2"); + private static final ServerName sn3 = ServerName.valueOf("s3:3,3"); + private static final HRegionInfo hri1 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1); - private static HRegionInfo hri2 = + private static final HRegionInfo hri2 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2); - private static HRegionInfo hri3 = + private static final HRegionInfo hri3 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3); - private static HRegionLocation loc1 = new HRegionLocation(hri1, sn); - private static HRegionLocation loc2 = new HRegionLocation(hri2, sn); - private static HRegionLocation loc3 = new HRegionLocation(hri3, sn2); + private static final HRegionLocation loc1 = new HRegionLocation(hri1, sn); + private static final HRegionLocation loc2 = new HRegionLocation(hri2, sn); + private static final HRegionLocation loc3 = new HRegionLocation(hri3, sn2); // Replica stuff - private static HRegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1), + private static final HRegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1), hri1r2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 2); - private static HRegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1); - private static RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn), + private static final HRegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1); + private static final RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn), new HRegionLocation(hri1r1, sn2), new HRegionLocation(hri1r2, sn3)); - private static RegionLocations hrls2 = new RegionLocations(new HRegionLocation(hri2, sn2), + private static final RegionLocations hrls2 = new RegionLocations(new HRegionLocation(hri2, sn2), new HRegionLocation(hri2r1, sn3)); - private static RegionLocations hrls3 = new RegionLocations(new HRegionLocation(hri3, sn3), null); + private static final RegionLocations hrls3 = new RegionLocations(new HRegionLocation(hri3, sn3), null); private static final String success = "success"; private static Exception failure = new Exception("failure"); - private static int NB_RETRIES = 3; + private static final int NB_RETRIES = 3; + private static final int RPC_TIMEOUT = CONF.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + private static final int OPERATION_TIMEOUT = CONF.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); @BeforeClass public static void beforeClass(){ - conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES); + CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES); } static class CountingThreadFactory implements ThreadFactory { @@ -153,20 +148,21 @@ public class TestAsyncProcess { final AtomicInteger nbActions = new AtomicInteger(); public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>(); public AtomicInteger callsCt = new AtomicInteger(); - private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - private static int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + private long previousTimeout = -1; + final ExecutorService service; @Override - protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName, - List<Action> actions, long nonceGroup, ExecutorService pool, - Batch.Callback<Res> callback, Object[] results, boolean needResults, - CancellableRegionServerCallable callable, int curTimeout) { + protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture( + AsyncProcessTask task, List<Action> actions, long nonceGroup) { // Test HTable has tableName of null, so pass DUMMY_TABLE + AsyncProcessTask wrap = new AsyncProcessTask(task){ + @Override + public TableName getTableName() { + return DUMMY_TABLE; + } + }; AsyncRequestFutureImpl<Res> r = new MyAsyncRequestFutureImpl<Res>( - DUMMY_TABLE, actions, nonceGroup, getPool(pool), needResults, - results, callback, callable, operationTimeout, rpcTimeout, this); + wrap, actions, nonceGroup, this); allReqs.add(r); return r; } @@ -176,49 +172,54 @@ public class TestAsyncProcess { } public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) { - super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, - new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)), - new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf), rpcTimeout, - operationTimeout); + super(hc, conf, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf)); + service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, + new SynchronousQueue<>(), new CountingThreadFactory(nbThreads)); } public MyAsyncProcess( ClusterConnection hc, Configuration conf, boolean useGlobalErrors) { - super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, - new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())), - new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), - rpcTimeout, operationTimeout); + super(hc, conf, + new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf)); + service = Executors.newFixedThreadPool(5); } - public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors, - @SuppressWarnings("unused") boolean dummy) { - super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, - new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())) { - @Override - public void execute(Runnable command) { - throw new RejectedExecutionException("test under failure"); - } - }, - new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), - rpcTimeout, operationTimeout); + public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName, + List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback, + boolean needResults) throws InterruptedIOException { + AsyncProcessTask task = AsyncProcessTask.newBuilder(callback) + .setPool(pool == null ? service : pool) + .setTableName(tableName) + .setRowAccess(rows) + .setSubmittedRows(atLeastOne ? SubmittedRows.AT_LEAST_ONE : SubmittedRows.NORMAL) + .setNeedResults(needResults) + .setRpcTimeout(RPC_TIMEOUT) + .setOperationTimeout(OPERATION_TIMEOUT) + .build(); + return submit(task); + } + + public <CResult> AsyncRequestFuture submit(TableName tableName, + final List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback, + boolean needResults) throws InterruptedIOException { + return submit(null, tableName, rows, atLeastOne, callback, needResults); } @Override - public <Res> AsyncRequestFuture submit(TableName tableName, RowAccess<? extends Row> rows, - boolean atLeastOne, Callback<Res> callback, boolean needResults) + public <Res> AsyncRequestFuture submit(AsyncProcessTask<Res> task) throws InterruptedIOException { + previousTimeout = task.getRpcTimeout(); // We use results in tests to check things, so override to always save them. - return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true); + AsyncProcessTask<Res> wrap = new AsyncProcessTask<Res>(task) { + @Override + public boolean getNeedResults() { + return true; + } + }; + return super.submit(wrap); } @Override - public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, - List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results, - CancellableRegionServerCallable callable, int curTimeout) { - previousTimeout = curTimeout; - return super.submitAll(pool, tableName, rows, callback, results, callable, curTimeout); - } - @Override protected RpcRetryingCaller<AbstractResponse> createCaller( CancellableRegionServerCallable callable, int rpcTimeout) { callsCt.incrementAndGet(); @@ -260,12 +261,9 @@ public class TestAsyncProcess { static class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> { - public MyAsyncRequestFutureImpl(TableName tableName, List<Action> actions, long nonceGroup, - ExecutorService pool, boolean needResults, Object[] results, - Batch.Callback callback, CancellableRegionServerCallable callable, int operationTimeout, - int rpcTimeout, AsyncProcess asyncProcess) { - super(tableName, actions, nonceGroup, pool, needResults, - results, callback, callable, operationTimeout, rpcTimeout, asyncProcess); + public MyAsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions, + long nonceGroup, AsyncProcess asyncProcess) { + super(task, actions, nonceGroup, asyncProcess); } @Override @@ -483,7 +481,7 @@ public class TestAsyncProcess { final boolean usedRegions[]; protected MyConnectionImpl2(List<HRegionLocation> hrl) throws IOException { - super(conf); + super(CONF); this.hrl = hrl; this.usedRegions = new boolean[hrl.size()]; } @@ -553,19 +551,7 @@ public class TestAsyncProcess { long putsHeapSize = writeBuffer; doSubmitRequest(writeBuffer, putsHeapSize); } - @Test - public void testIllegalArgument() throws IOException { - ClusterConnection conn = createHConnection(); - final long maxHeapSizePerRequest = conn.getConfiguration().getLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, - AsyncProcess.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE); - conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1); - try { - MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); - fail("The maxHeapSizePerRequest must be bigger than zero"); - } catch (IllegalArgumentException e) { - } - conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, maxHeapSizePerRequest); - } + @Test public void testSubmitLargeRequestWithUnlimitedSize() throws Exception { long maxHeapSizePerRequest = Long.MAX_VALUE; @@ -601,10 +587,13 @@ public class TestAsyncProcess { private void doSubmitRequest(long maxHeapSizePerRequest, long putsHeapSize) throws Exception { ClusterConnection conn = createHConnection(); - final long defaultHeapSizePerRequest = conn.getConfiguration().getLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, - AsyncProcess.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE); - conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, maxHeapSizePerRequest); - BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE); + final String defaultClazz = conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); + final long defaultHeapSizePerRequest = conn.getConfiguration().getLong( + SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, + SimpleRequestController.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE); + conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, + SimpleRequestController.class.getName()); + conn.getConfiguration().setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, maxHeapSizePerRequest); // sn has two regions long putSizeSN = 0; @@ -630,11 +619,12 @@ public class TestAsyncProcess { + ", maxHeapSizePerRequest:" + maxHeapSizePerRequest + ", minCountSnRequest:" + minCountSnRequest + ", minCountSn2Request:" + minCountSn2Request); - try (HTable ht = new HTable(conn, bufferParam)) { - MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); - ht.mutator.ap = ap; - Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get()); + MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true); + BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); + BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); + try (HTable ht = new HTable(conn, mutator)) { + Assert.assertEquals(0L, ht.mutator.getCurrentWriteBufferSize()); ht.put(puts); List<AsyncRequestFuture> reqs = ap.allReqs; @@ -680,12 +670,17 @@ public class TestAsyncProcess { assertEquals(putSizeSN2, (long) sizePerServers.get(sn2)); } // restore config. - conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, defaultHeapSizePerRequest); + conn.getConfiguration().setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, defaultHeapSizePerRequest); + if (defaultClazz != null) { + conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, + defaultClazz); + } } + @Test public void testSubmit() throws Exception { ClusterConnection hc = createHConnection(); - AsyncProcess ap = new MyAsyncProcess(hc, conf); + MyAsyncProcess ap = new MyAsyncProcess(hc, CONF); List<Put> puts = new ArrayList<Put>(); puts.add(createPut(1, true)); @@ -704,7 +699,7 @@ public class TestAsyncProcess { updateCalled.incrementAndGet(); } }; - AsyncProcess ap = new MyAsyncProcess(hc, conf); + MyAsyncProcess ap = new MyAsyncProcess(hc, CONF); List<Put> puts = new ArrayList<Put>(); puts.add(createPut(1, true)); @@ -717,13 +712,16 @@ public class TestAsyncProcess { @Test public void testSubmitBusyRegion() throws Exception { - ClusterConnection hc = createHConnection(); - AsyncProcess ap = new MyAsyncProcess(hc, conf); - + ClusterConnection conn = createHConnection(); + final String defaultClazz = conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); + conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, + SimpleRequestController.class.getName()); + MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); + SimpleRequestController controller = (SimpleRequestController) ap.requestController; List<Put> puts = new ArrayList<Put>(); puts.add(createPut(1, true)); - for (int i = 0; i != ap.maxConcurrentTasksPerRegion; ++i) { + for (int i = 0; i != controller.maxConcurrentTasksPerRegion; ++i) { ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn); } ap.submit(null, DUMMY_TABLE, puts, false, null, false); @@ -732,15 +730,22 @@ public class TestAsyncProcess { ap.decTaskCounters(Arrays.asList(hri1.getRegionName()), sn); ap.submit(null, DUMMY_TABLE, puts, false, null, false); Assert.assertEquals(0, puts.size()); + if (defaultClazz != null) { + conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, + defaultClazz); + } } @Test public void testSubmitBusyRegionServer() throws Exception { - ClusterConnection hc = createHConnection(); - AsyncProcess ap = new MyAsyncProcess(hc, conf); - - ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer)); + ClusterConnection conn = createHConnection(); + MyAsyncProcess ap = new MyAsyncProcess(conn, CONF); + final String defaultClazz = conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); + conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, + SimpleRequestController.class.getName()); + SimpleRequestController controller = (SimpleRequestController) ap.requestController; + controller.taskCounterPerServer.put(sn2, new AtomicInteger(controller.maxConcurrentTasksPerServer)); List<Put> puts = new ArrayList<Put>(); puts.add(createPut(1, true)); @@ -751,14 +756,18 @@ public class TestAsyncProcess { ap.submit(null, DUMMY_TABLE, puts, false, null, false); Assert.assertEquals(" puts=" + puts, 1, puts.size()); - ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer - 1)); + controller.taskCounterPerServer.put(sn2, new AtomicInteger(controller.maxConcurrentTasksPerServer - 1)); ap.submit(null, DUMMY_TABLE, puts, false, null, false); Assert.assertTrue(puts.isEmpty()); + if (defaultClazz != null) { + conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, + defaultClazz); + } } @Test public void testFail() throws Exception { - MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); + MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false); List<Put> puts = new ArrayList<Put>(); Put p = createPut(1, false); @@ -784,10 +793,15 @@ public class TestAsyncProcess { @Test public void testSubmitTrue() throws IOException { - final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); - ap.tasksInProgress.incrementAndGet(); - final AtomicInteger ai = new AtomicInteger(ap.maxConcurrentTasksPerRegion); - ap.taskCounterPerRegion.put(hri1.getRegionName(), ai); + ClusterConnection conn = createHConnection(); + final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, false); + final String defaultClazz = conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); + conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, + SimpleRequestController.class.getName()); + SimpleRequestController controller = (SimpleRequestController) ap.requestController; + controller.tasksInProgress.incrementAndGet(); + final AtomicInteger ai = new AtomicInteger(controller.maxConcurrentTasksPerRegion); + controller.taskCounterPerRegion.put(hri1.getRegionName(), ai); final AtomicBoolean checkPoint = new AtomicBoolean(false); final AtomicBoolean checkPoint2 = new AtomicBoolean(false); @@ -798,7 +812,7 @@ public class TestAsyncProcess { Threads.sleep(1000); Assert.assertFalse(checkPoint.get()); // TODO: this is timing-dependent ai.decrementAndGet(); - ap.tasksInProgress.decrementAndGet(); + controller.tasksInProgress.decrementAndGet(); checkPoint2.set(true); } }; @@ -819,11 +833,15 @@ public class TestAsyncProcess { while (!checkPoint2.get()){ Threads.sleep(1); } + if (defaultClazz != null) { + conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, + defaultClazz); + } } @Test public void testFailAndSuccess() throws Exception { - MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); + MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false); List<Put> puts = new ArrayList<Put>(); puts.add(createPut(1, false)); @@ -850,7 +868,7 @@ public class TestAsyncProcess { @Test public void testFlush() throws Exception { - MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); + MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false); List<Put> puts = new ArrayList<Put>(); puts.add(createPut(1, false)); @@ -868,24 +886,32 @@ public class TestAsyncProcess { @Test public void testTaskCountWithoutClientBackoffPolicy() throws IOException, InterruptedException { ClusterConnection hc = createHConnection(); - MyAsyncProcess ap = new MyAsyncProcess(hc, conf, false); + MyAsyncProcess ap = new MyAsyncProcess(hc, CONF, false); testTaskCount(ap); } @Test public void testTaskCountWithClientBackoffPolicy() throws IOException, InterruptedException { - Configuration copyConf = new Configuration(conf); + Configuration copyConf = new Configuration(CONF); copyConf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true); MyClientBackoffPolicy bp = new MyClientBackoffPolicy(); - ClusterConnection hc = createHConnection(); - Mockito.when(hc.getConfiguration()).thenReturn(copyConf); - Mockito.when(hc.getStatisticsTracker()).thenReturn(ServerStatisticTracker.create(copyConf)); - Mockito.when(hc.getBackoffPolicy()).thenReturn(bp); - MyAsyncProcess ap = new MyAsyncProcess(hc, copyConf, false); + ClusterConnection conn = createHConnection(); + Mockito.when(conn.getConfiguration()).thenReturn(copyConf); + Mockito.when(conn.getStatisticsTracker()).thenReturn(ServerStatisticTracker.create(copyConf)); + Mockito.when(conn.getBackoffPolicy()).thenReturn(bp); + final String defaultClazz = conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); + conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, + SimpleRequestController.class.getName()); + MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, false); testTaskCount(ap); + if (defaultClazz != null) { + conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, + defaultClazz); + } } - private void testTaskCount(AsyncProcess ap) throws InterruptedIOException, InterruptedException { + private void testTaskCount(MyAsyncProcess ap) throws InterruptedIOException, InterruptedException { + SimpleRequestController controller = (SimpleRequestController) ap.requestController; List<Put> puts = new ArrayList<>(); for (int i = 0; i != 3; ++i) { puts.add(createPut(1, true)); @@ -896,18 +922,24 @@ public class TestAsyncProcess { ap.waitForMaximumCurrentTasks(0, null); // More time to wait if there are incorrect task count. TimeUnit.SECONDS.sleep(1); - assertEquals(0, ap.tasksInProgress.get()); - for (AtomicInteger count : ap.taskCounterPerRegion.values()) { + assertEquals(0, controller.tasksInProgress.get()); + for (AtomicInteger count : controller.taskCounterPerRegion.values()) { assertEquals(0, count.get()); } - for (AtomicInteger count : ap.taskCounterPerServer.values()) { + for (AtomicInteger count : controller.taskCounterPerServer.values()) { assertEquals(0, count.get()); } } @Test public void testMaxTask() throws Exception { - final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); + ClusterConnection conn = createHConnection(); + final String defaultClazz = conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY); + conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, + SimpleRequestController.class.getName()); + final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, false); + SimpleRequestController controller = (SimpleRequestController) ap.requestController; + for (int i = 0; i < 1000; i++) { ap.incTaskCounters(Arrays.asList("dummy".getBytes()), sn); @@ -940,7 +972,7 @@ public class TestAsyncProcess { @Override public void run() { Threads.sleep(sleepTime); - while (ap.tasksInProgress.get() > 0) { + while (controller.tasksInProgress.get() > 0) { ap.decTaskCounters(Arrays.asList("dummy".getBytes()), sn); } } @@ -953,6 +985,10 @@ public class TestAsyncProcess { //Adds 100 to secure us against approximate timing. Assert.assertTrue(start + 100L + sleepTime > end); + if (defaultClazz != null) { + conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, + defaultClazz); + } } private static ClusterConnection createHConnection() throws IOException { @@ -999,38 +1035,53 @@ public class TestAsyncProcess { NonceGenerator ng = Mockito.mock(NonceGenerator.class); Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE); Mockito.when(hc.getNonceGenerator()).thenReturn(ng); - Mockito.when(hc.getConfiguration()).thenReturn(conf); + Mockito.when(hc.getConfiguration()).thenReturn(CONF); + Mockito.when(hc.getConnectionConfiguration()).thenReturn(CONNECTION_CONFIG); return hc; } @Test public void testHTablePutSuccess() throws Exception { - BufferedMutatorImpl ht = Mockito.mock(BufferedMutatorImpl.class); - ht.ap = new MyAsyncProcess(createHConnection(), conf, true); + ClusterConnection conn = createHConnection(); + MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true); + BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); + BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap); Put put = createPut(1, true); - Assert.assertEquals(0, ht.getWriteBufferSize()); + Assert.assertEquals(conn.getConnectionConfiguration().getWriteBufferSize(), ht.getWriteBufferSize()); + Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); ht.mutate(put); - Assert.assertEquals(0, ht.getWriteBufferSize()); + ht.flush(); + Assert.assertEquals(0, ht.getCurrentWriteBufferSize()); + } + + @Test + public void testBufferedMutatorImplWithSharedPool() throws Exception { + ClusterConnection conn = createHConnection(); + MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true); + BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); + BufferedMutator ht = new BufferedMutatorImpl(conn, bufferParam, ap); + + ht.close(); + assertFalse(ap.service.isShutdown()); } private void doHTableFailedPut(boolean bufferOn) throws Exception { ClusterConnection conn = createHConnection(); - BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE); + MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true); + BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); if (bufferOn) { bufferParam.writeBufferSize(1024L * 1024L); } else { bufferParam.writeBufferSize(0L); } - - HTable ht = new HTable(conn, bufferParam); - MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); - ht.mutator.ap = ap; + BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); + HTable ht = new HTable(conn, mutator); Put put = createPut(1, false); - Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get()); + Assert.assertEquals(0L, ht.mutator.getCurrentWriteBufferSize()); try { ht.put(put); if (bufferOn) { @@ -1039,7 +1090,7 @@ public class TestAsyncProcess { Assert.fail(); } catch (RetriesExhaustedException expected) { } - Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get()); + Assert.assertEquals(0L, ht.mutator.getCurrentWriteBufferSize()); // The table should have sent one request, maybe after multiple attempts AsyncRequestFuture ars = null; for (AsyncRequestFuture someReqs : ap.allReqs) { @@ -1067,10 +1118,10 @@ public class TestAsyncProcess { @Test public void testHTableFailedPutAndNewPut() throws Exception { ClusterConnection conn = createHConnection(); - BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null, - new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(0)); - MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); - mutator.ap = ap; + MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true); + BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE) + .writeBufferSize(0); + BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); Put p = createPut(1, false); mutator.mutate(p); @@ -1083,202 +1134,13 @@ public class TestAsyncProcess { // puts, we may raise an exception in the middle of the list. It's then up to the caller to // manage what was inserted, what was tried but failed, and what was not even tried. p = createPut(1, true); - Assert.assertEquals(0, mutator.writeAsyncBuffer.size()); + Assert.assertEquals(0, mutator.size()); try { mutator.mutate(p); Assert.fail(); } catch (RetriesExhaustedException expected) { } - Assert.assertEquals("the put should not been inserted.", 0, mutator.writeAsyncBuffer.size()); - } - - @Test - public void testTaskCheckerHost() throws IOException { - final int maxTotalConcurrentTasks = 100; - final int maxConcurrentTasksPerServer = 2; - final int maxConcurrentTasksPerRegion = 1; - final AtomicLong tasksInProgress = new AtomicLong(0); - final Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>(); - final Map<byte[], AtomicInteger> taskCounterPerRegion = new HashMap<>(); - TaskCountChecker countChecker = new TaskCountChecker( - maxTotalConcurrentTasks, - maxConcurrentTasksPerServer, - maxConcurrentTasksPerRegion, - tasksInProgress, taskCounterPerServer, taskCounterPerRegion); - final long maxHeapSizePerRequest = 2 * 1024 * 1024; - // unlimiited - RequestSizeChecker sizeChecker = new RequestSizeChecker(maxHeapSizePerRequest); - RowCheckerHost checkerHost = new RowCheckerHost(Arrays.asList(countChecker, sizeChecker)); - - ReturnCode loc1Code = checkerHost.canTakeOperation(loc1, maxHeapSizePerRequest); - assertEquals(RowChecker.ReturnCode.INCLUDE, loc1Code); - - ReturnCode loc1Code_2 = checkerHost.canTakeOperation(loc1, maxHeapSizePerRequest); - // rejected for size - assertNotEquals(RowChecker.ReturnCode.INCLUDE, loc1Code_2); - - ReturnCode loc2Code = checkerHost.canTakeOperation(loc2, maxHeapSizePerRequest); - // rejected for size - assertNotEquals(RowChecker.ReturnCode.INCLUDE, loc2Code); - - // fill the task slots for loc3. - taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(100)); - taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(100)); - - ReturnCode loc3Code = checkerHost.canTakeOperation(loc3, 1L); - // rejected for count - assertNotEquals(RowChecker.ReturnCode.INCLUDE, loc3Code); - - // release the task slots for loc3. - taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(0)); - taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(0)); - - ReturnCode loc3Code_2 = checkerHost.canTakeOperation(loc3, 1L); - assertEquals(RowChecker.ReturnCode.INCLUDE, loc3Code_2); - } - - @Test - public void testRequestSizeCheckerr() throws IOException { - final long maxHeapSizePerRequest = 2 * 1024 * 1024; - final ClusterConnection conn = createHConnection(); - RequestSizeChecker checker = new RequestSizeChecker(maxHeapSizePerRequest); - - // inner state is unchanged. - for (int i = 0; i != 10; ++i) { - ReturnCode code = checker.canTakeOperation(loc1, maxHeapSizePerRequest); - assertEquals(RowChecker.ReturnCode.INCLUDE, code); - code = checker.canTakeOperation(loc2, maxHeapSizePerRequest); - assertEquals(RowChecker.ReturnCode.INCLUDE, code); - } - - // accept the data located on loc1 region. - ReturnCode acceptCode = checker.canTakeOperation(loc1, maxHeapSizePerRequest); - assertEquals(RowChecker.ReturnCode.INCLUDE, acceptCode); - checker.notifyFinal(acceptCode, loc1, maxHeapSizePerRequest); - - // the sn server reachs the limit. - for (int i = 0; i != 10; ++i) { - ReturnCode code = checker.canTakeOperation(loc1, maxHeapSizePerRequest); - assertNotEquals(RowChecker.ReturnCode.INCLUDE, code); - code = checker.canTakeOperation(loc2, maxHeapSizePerRequest); - assertNotEquals(RowChecker.ReturnCode.INCLUDE, code); - } - - // the request to sn2 server should be accepted. - for (int i = 0; i != 10; ++i) { - ReturnCode code = checker.canTakeOperation(loc3, maxHeapSizePerRequest); - assertEquals(RowChecker.ReturnCode.INCLUDE, code); - } - - checker.reset(); - for (int i = 0; i != 10; ++i) { - ReturnCode code = checker.canTakeOperation(loc1, maxHeapSizePerRequest); - assertEquals(RowChecker.ReturnCode.INCLUDE, code); - code = checker.canTakeOperation(loc2, maxHeapSizePerRequest); - assertEquals(RowChecker.ReturnCode.INCLUDE, code); - } - } - - @Test - public void testSubmittedSizeChecker() { - final long maxHeapSizeSubmit = 2 * 1024 * 1024; - SubmittedSizeChecker checker = new SubmittedSizeChecker(maxHeapSizeSubmit); - - for (int i = 0; i != 10; ++i) { - ReturnCode include = checker.canTakeOperation(loc1, 100000); - assertEquals(ReturnCode.INCLUDE, include); - } - - for (int i = 0; i != 10; ++i) { - checker.notifyFinal(ReturnCode.INCLUDE, loc1, maxHeapSizeSubmit); - } - - for (int i = 0; i != 10; ++i) { - ReturnCode include = checker.canTakeOperation(loc1, 100000); - assertEquals(ReturnCode.END, include); - } - for (int i = 0; i != 10; ++i) { - ReturnCode include = checker.canTakeOperation(loc2, 100000); - assertEquals(ReturnCode.END, include); - } - checker.reset(); - for (int i = 0; i != 10; ++i) { - ReturnCode include = checker.canTakeOperation(loc1, 100000); - assertEquals(ReturnCode.INCLUDE, include); - } - } - @Test - public void testTaskCountChecker() throws InterruptedIOException { - long rowSize = 12345; - int maxTotalConcurrentTasks = 100; - int maxConcurrentTasksPerServer = 2; - int maxConcurrentTasksPerRegion = 1; - AtomicLong tasksInProgress = new AtomicLong(0); - Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>(); - Map<byte[], AtomicInteger> taskCounterPerRegion = new HashMap<>(); - TaskCountChecker checker = new TaskCountChecker( - maxTotalConcurrentTasks, - maxConcurrentTasksPerServer, - maxConcurrentTasksPerRegion, - tasksInProgress, taskCounterPerServer, taskCounterPerRegion); - - // inner state is unchanged. - for (int i = 0; i != 10; ++i) { - ReturnCode code = checker.canTakeOperation(loc1, rowSize); - assertEquals(RowChecker.ReturnCode.INCLUDE, code); - } - // add loc1 region. - ReturnCode code = checker.canTakeOperation(loc1, rowSize); - assertEquals(RowChecker.ReturnCode.INCLUDE, code); - checker.notifyFinal(code, loc1, rowSize); - - // fill the task slots for loc1. - taskCounterPerRegion.put(loc1.getRegionInfo().getRegionName(), new AtomicInteger(100)); - taskCounterPerServer.put(loc1.getServerName(), new AtomicInteger(100)); - - // the region was previously accepted, so it must be accpted now. - for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { - ReturnCode includeCode = checker.canTakeOperation(loc1, rowSize); - assertEquals(RowChecker.ReturnCode.INCLUDE, includeCode); - checker.notifyFinal(includeCode, loc1, rowSize); - } - - // fill the task slots for loc3. - taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(100)); - taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(100)); - - // no task slots. - for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { - ReturnCode excludeCode = checker.canTakeOperation(loc3, rowSize); - assertNotEquals(RowChecker.ReturnCode.INCLUDE, excludeCode); - checker.notifyFinal(excludeCode, loc3, rowSize); - } - - // release the tasks for loc3. - taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(0)); - taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(0)); - - // add loc3 region. - ReturnCode code3 = checker.canTakeOperation(loc3, rowSize); - assertEquals(RowChecker.ReturnCode.INCLUDE, code3); - checker.notifyFinal(code3, loc3, rowSize); - - // the region was previously accepted, so it must be accpted now. - for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { - ReturnCode includeCode = checker.canTakeOperation(loc3, rowSize); - assertEquals(RowChecker.ReturnCode.INCLUDE, includeCode); - checker.notifyFinal(includeCode, loc3, rowSize); - } - - checker.reset(); - // the region was previously accepted, - // but checker have reseted and task slots for loc1 is full. - // So it must be rejected now. - for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) { - ReturnCode includeCode = checker.canTakeOperation(loc1, rowSize); - assertNotEquals(RowChecker.ReturnCode.INCLUDE, includeCode); - checker.notifyFinal(includeCode, loc1, rowSize); - } + Assert.assertEquals("the put should not been inserted.", 0, mutator.size()); } @Test @@ -1302,9 +1164,12 @@ public class TestAsyncProcess { @Test public void testBatch() throws IOException, InterruptedException { - ClusterConnection conn = new MyConnectionImpl(conf); - HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE)); - ht.multiAp = new MyAsyncProcess(conn, conf, false); + ClusterConnection conn = new MyConnectionImpl(CONF); + MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true); + BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); + BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); + HTable ht = new HTable(conn, mutator); + ht.multiAp = new MyAsyncProcess(conn, CONF, false); List<Put> puts = new ArrayList<Put>(); puts.add(createPut(1, true)); @@ -1332,18 +1197,16 @@ public class TestAsyncProcess { } @Test public void testErrorsServers() throws IOException { - Configuration configuration = new Configuration(conf); + Configuration configuration = new Configuration(CONF); ClusterConnection conn = new MyConnectionImpl(configuration); - BufferedMutatorImpl mutator = - new BufferedMutatorImpl(conn, null, null, new BufferedMutatorParams(DUMMY_TABLE)); - configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true); - MyAsyncProcess ap = new MyAsyncProcess(conn, configuration, true); - mutator.ap = ap; + BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); + BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); + configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true); - Assert.assertNotNull(mutator.ap.createServerErrorTracker()); - Assert.assertTrue(mutator.ap.serverTrackerTimeout > 200); - mutator.ap.serverTrackerTimeout = 1; + Assert.assertNotNull(ap.createServerErrorTracker()); + Assert.assertTrue(ap.serverTrackerTimeout > 200); + ap.serverTrackerTimeout = 1; Put p = createPut(1, false); mutator.mutate(p); @@ -1361,14 +1224,15 @@ public class TestAsyncProcess { public void testReadAndWriteTimeout() throws IOException { final long readTimeout = 10 * 1000; final long writeTimeout = 20 * 1000; - Configuration copyConf = new Configuration(conf); + Configuration copyConf = new Configuration(CONF); copyConf.setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, readTimeout); copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout); ClusterConnection conn = createHConnection(); Mockito.when(conn.getConfiguration()).thenReturn(copyConf); - BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE); - try (HTable ht = new HTable(conn, bufferParam)) { - MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, true); + MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, true); + BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); + BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); + try (HTable ht = new HTable(conn, mutator)) { ht.multiAp = ap; List<Get> gets = new LinkedList<>(); gets.add(new Get(DUMMY_BYTES_1)); @@ -1399,12 +1263,12 @@ public class TestAsyncProcess { @Test public void testGlobalErrors() throws IOException { - ClusterConnection conn = new MyConnectionImpl(conf); - BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE); - AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new IOException("test")); - mutator.ap = ap; + ClusterConnection conn = new MyConnectionImpl(CONF); + AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, CONF, new IOException("test")); + BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); + BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); - Assert.assertNotNull(mutator.ap.createServerErrorTracker()); + Assert.assertNotNull(ap.createServerErrorTracker()); Put p = createPut(1, true); mutator.mutate(p); @@ -1421,13 +1285,11 @@ public class TestAsyncProcess { @Test public void testCallQueueTooLarge() throws IOException { - ClusterConnection conn = new MyConnectionImpl(conf); - BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE); - AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new CallQueueTooBigException()); - mutator.ap = ap; - - Assert.assertNotNull(mutator.ap.createServerErrorTracker()); - + ClusterConnection conn = new MyConnectionImpl(CONF); + AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, CONF, new CallQueueTooBigException()); + BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); + BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); + Assert.assertNotNull(ap.createServerErrorTracker()); Put p = createPut(1, true); mutator.mutate(p); @@ -1459,10 +1321,11 @@ public class TestAsyncProcess { } MyConnectionImpl2 con = new MyConnectionImpl2(hrls); - HTable ht = new HTable(con, new BufferedMutatorParams(DUMMY_TABLE)); - MyAsyncProcess ap = new MyAsyncProcess(con, conf, con.nbThreads); + MyAsyncProcess ap = new MyAsyncProcess(con, CONF, con.nbThreads); + BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); + BufferedMutatorImpl mutator = new BufferedMutatorImpl(con , bufferParam, ap); + HTable ht = new HTable(con, mutator); ht.multiAp = ap; - ht.batch(gets, null); Assert.assertEquals(ap.nbActions.get(), NB_REGS); @@ -1482,7 +1345,16 @@ public class TestAsyncProcess { // One region has no replica, so the main call succeeds for it. MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0); List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3); - AsyncRequestFuture ars = ap.submitAll(null,DUMMY_TABLE, rows, null, new Object[3]); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(ap.service) + .setRpcTimeout(RPC_TIMEOUT) + .setOperationTimeout(OPERATION_TIMEOUT) + .setTableName(DUMMY_TABLE) + .setRowAccess(rows) + .setResults(new Object[3]) + .setSubmittedRows(SubmittedRows.ALL) + .build(); + AsyncRequestFuture ars = ap.submit(task); verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE); Assert.assertEquals(2, ap.getReplicaCallCount()); } @@ -1492,7 +1364,16 @@ public class TestAsyncProcess { // Main call succeeds before replica calls are kicked off. MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0); List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3); - AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[3]); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(ap.service) + .setRpcTimeout(RPC_TIMEOUT) + .setOperationTimeout(OPERATION_TIMEOUT) + .setTableName(DUMMY_TABLE) + .setRowAccess(rows) + .setResults(new Object[3]) + .setSubmittedRows(SubmittedRows.ALL) + .build(); + AsyncRequestFuture ars = ap.submit(task); verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE); Assert.assertEquals(0, ap.getReplicaCallCount()); } @@ -1502,7 +1383,16 @@ public class TestAsyncProcess { // Either main or replica can succeed. MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0); List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); - AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(ap.service) + .setRpcTimeout(RPC_TIMEOUT) + .setOperationTimeout(OPERATION_TIMEOUT) + .setTableName(DUMMY_TABLE) + .setRowAccess(rows) + .setResults(new Object[2]) + .setSubmittedRows(SubmittedRows.ALL) + .build(); + AsyncRequestFuture ars = ap.submit(task); verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE); long replicaCalls = ap.getReplicaCallCount(); Assert.assertTrue(replicaCalls >= 0); @@ -1517,7 +1407,16 @@ public class TestAsyncProcess { MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0); ap.setPrimaryCallDelay(sn2, 2000); List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); - AsyncRequestFuture ars = ap.submitAll(null ,DUMMY_TABLE, rows, null, new Object[2]); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(ap.service) + .setRpcTimeout(RPC_TIMEOUT) + .setOperationTimeout(OPERATION_TIMEOUT) + .setTableName(DUMMY_TABLE) + .setRowAccess(rows) + .setResults(new Object[2]) + .setSubmittedRows(SubmittedRows.ALL) + .build(); + AsyncRequestFuture ars = ap.submit(task); verifyReplicaResult(ars, RR.FALSE, RR.TRUE); Assert.assertEquals(1, ap.getReplicaCallCount()); } @@ -1530,7 +1429,16 @@ public class TestAsyncProcess { MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 0); ap.addFailures(hri1, hri2); List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); - AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(ap.service) + .setRpcTimeout(RPC_TIMEOUT) + .setOperationTimeout(OPERATION_TIMEOUT) + .setTableName(DUMMY_TABLE) + .setRowAccess(rows) + .setResults(new Object[2]) + .setSubmittedRows(SubmittedRows.ALL) + .build(); + AsyncRequestFuture ars = ap.submit(task); verifyReplicaResult(ars, RR.FAILED, RR.FAILED); Assert.assertEquals(0, ap.getReplicaCallCount()); } @@ -1542,7 +1450,16 @@ public class TestAsyncProcess { MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 0); ap.addFailures(hri1, hri1r2, hri2); List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); - AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(ap.service) + .setRpcTimeout(RPC_TIMEOUT) + .setOperationTimeout(OPERATION_TIMEOUT) + .setTableName(DUMMY_TABLE) + .setRowAccess(rows) + .setResults(new Object[2]) + .setSubmittedRows(SubmittedRows.ALL) + .build(); + AsyncRequestFuture ars = ap.submit(task); verifyReplicaResult(ars, RR.TRUE, RR.TRUE); Assert.assertEquals(2, ap.getReplicaCallCount()); } @@ -1554,7 +1471,16 @@ public class TestAsyncProcess { MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 0); ap.addFailures(hri1, hri1r1, hri1r2, hri2r1); List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); - AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(ap.service) + .setRpcTimeout(RPC_TIMEOUT) + .setOperationTimeout(OPERATION_TIMEOUT) + .setTableName(DUMMY_TABLE) + .setRowAccess(rows) + .setResults(new Object[2]) + .setSubmittedRows(SubmittedRows.ALL) + .build(); + AsyncRequestFuture ars = ap.submit(task); verifyReplicaResult(ars, RR.FAILED, RR.FALSE); // We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1 Assert.assertEquals(3, ars.getErrors().getNumExceptions()); @@ -1583,6 +1509,13 @@ public class TestAsyncProcess { return ap; } + private static BufferedMutatorParams createBufferedMutatorParams(MyAsyncProcess ap, TableName name) { + return new BufferedMutatorParams(name) + .pool(ap.service) + .rpcTimeout(RPC_TIMEOUT) + .opertationTimeout(OPERATION_TIMEOUT); + } + private static List<Get> makeTimelineGets(byte[]... rows) { List<Get> result = new ArrayList<Get>(); for (byte[] row : rows) { @@ -1663,14 +1596,9 @@ public class TestAsyncProcess { } static class AsyncProcessForThrowableCheck extends AsyncProcess { - private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - private static int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); - public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf, - ExecutorService pool) { - super(hc, conf, pool, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory( - conf), rpcTimeout, operationTimeout); + public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) { + super(hc, conf, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory( + conf)); } } @@ -1681,56 +1609,22 @@ public class TestAsyncProcess { MyThreadPoolExecutor myPool = new MyThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(200)); - AsyncProcess ap = new AsyncProcessForThrowableCheck(hc, conf, myPool); + AsyncProcess ap = new AsyncProcessForThrowableCheck(hc, CONF); List<Put> puts = new ArrayList<Put>(); puts.add(createPut(1, true)); - - ap.submit(null, DUMMY_TABLE, puts, false, null, false); + AsyncProcessTask task = AsyncProcessTask.newBuilder() + .setPool(myPool) + .setRpcTimeout(RPC_TIMEOUT) + .setOperationTimeout(OPERATION_TIMEOUT) + .setTableName(DUMMY_TABLE) + .setRowAccess(puts) + .setSubmittedRows(SubmittedRows.NORMAL) + .build(); + ap.submit(task); Assert.assertTrue(puts.isEmpty()); } - @Test - public void testWaitForMaximumCurrentTasks() throws Exception { - final AtomicLong tasks = new AtomicLong(0); - final AtomicInteger max = new AtomicInteger(0); - final CyclicBarrier barrier = new CyclicBarrier(2); - final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf); - Runnable runnable = new Runnable() { - @Override - public void run() { - try { - barrier.await(); - ap.waitForMaximumCurrentTasks(max.get(), tasks, 1, null); - } catch (InterruptedIOException e) { - Assert.fail(e.getMessage()); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (BrokenBarrierException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - }; - // First test that our runnable thread only exits when tasks is zero. - Thread t = new Thread(runnable); - t.start(); - barrier.await(); - t.join(); - // Now assert we stay running if max == zero and tasks is > 0. - barrier.reset(); - tasks.set(1000000); - t = new Thread(runnable); - t.start(); - barrier.await(); - while (tasks.get() > 0) { - assertTrue(t.isAlive()); - tasks.set(tasks.get() - 1); - } - t.join(); - } - /** * Test and make sure we could use a special pause setting when retry with * CallQueueTooBigException, see HBASE-17114 @@ -1738,18 +1632,18 @@ public class TestAsyncProcess { */ @Test public void testRetryPauseWithCallQueueTooBigException() throws Exception { - Configuration myConf = new Configuration(conf); + Configuration myConf = new Configuration(CONF); final long specialPause = 500L; final int retries = 1; myConf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, specialPause); myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); ClusterConnection conn = new MyConnectionImpl(myConf); - BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE); AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, myConf, new CallQueueTooBigException()); - mutator.ap = ap; + BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); + BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); - Assert.assertNotNull(mutator.ap.createServerErrorTracker()); + Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker()); Put p = createPut(1, true); mutator.mutate(p); @@ -1775,8 +1669,9 @@ public class TestAsyncProcess { final long normalPause = myConf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); ap = new AsyncProcessWithFailure(conn, myConf, new IOException()); - mutator.ap = ap; - Assert.assertNotNull(mutator.ap.createServerErrorTracker()); + bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); + mutator = new BufferedMutatorImpl(conn, bufferParam, ap); + Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker()); mutator.mutate(p); startTime = System.currentTimeMillis(); try {
