This is an automated email from the ASF dual-hosted git repository.

ndimiduk pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new c473a975210 HBASE-28792: AsyncTableImpl should call coprocessor 
callbacks in a defined order (#6168)
c473a975210 is described below

commit c473a9752101847350fb0f453001fce6c8a860c2
Author: Charles Connell <[email protected]>
AuthorDate: Wed Sep 4 03:49:38 2024 -0400

    HBASE-28792: AsyncTableImpl should call coprocessor callbacks in a defined 
order (#6168)
    
    Signed-off-by: Nick Dimiduk <[email protected]>
---
 .../apache/hadoop/hbase/client/AsyncTableImpl.java |  27 +++-
 ...yncAggregationClientWithCallbackThreadPool.java | 156 +++++++++++++++++++++
 2 files changed, 180 insertions(+), 3 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index 590ee9bc47a..3b411cea7fb 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Phaser;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
@@ -299,19 +300,39 @@ class AsyncTableImpl implements 
AsyncTable<ScanResultConsumer> {
     final Context context = Context.current();
     CoprocessorCallback<R> wrappedCallback = new CoprocessorCallback<R>() {
 
+      private final Phaser regionCompletesInProgress = new Phaser(1);
+
       @Override
       public void onRegionComplete(RegionInfo region, R resp) {
-        pool.execute(context.wrap(() -> callback.onRegionComplete(region, 
resp)));
+        regionCompletesInProgress.register();
+        pool.execute(context.wrap(() -> {
+          try {
+            callback.onRegionComplete(region, resp);
+          } finally {
+            regionCompletesInProgress.arriveAndDeregister();
+          }
+        }));
       }
 
       @Override
       public void onRegionError(RegionInfo region, Throwable error) {
-        pool.execute(context.wrap(() -> callback.onRegionError(region, 
error)));
+        regionCompletesInProgress.register();
+        pool.execute(context.wrap(() -> {
+          try {
+            callback.onRegionError(region, error);
+          } finally {
+            regionCompletesInProgress.arriveAndDeregister();
+          }
+        }));
       }
 
       @Override
       public void onComplete() {
-        pool.execute(context.wrap(callback::onComplete));
+        pool.execute(context.wrap(() -> {
+          // Guarantee that onComplete() is called after all 
onRegionComplete()'s are called
+          regionCompletesInProgress.arriveAndAwaitAdvance();
+          callback.onComplete();
+        }));
       }
 
       @Override
diff --git 
a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClientWithCallbackThreadPool.java
 
b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClientWithCallbackThreadPool.java
new file mode 100644
index 00000000000..a5c74882997
--- /dev/null
+++ 
b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClientWithCallbackThreadPool.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.coprocessor.AsyncAggregationClient;
+import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
+import org.apache.hadoop.hbase.coprocessor.AggregateImplementation;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Same as TestAsyncAggregationClient, except that {@link AsyncTableImpl} is 
involved in addition to
+ * {@link RawAsyncTableImpl}. Exercises the code paths in {@link 
AsyncTableImpl#coprocessorService}.
+ */
+@Category({ MediumTests.class, CoprocessorTests.class })
+public class TestAsyncAggregationClientWithCallbackThreadPool {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    
HBaseClassTestRule.forClass(TestAsyncAggregationClientWithCallbackThreadPool.class);
+
+  private static HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+  private static TableName TABLE_NAME = 
TableName.valueOf("TestAsyncAggregationClient");
+
+  private static byte[] CF = Bytes.toBytes("CF");
+
+  private static byte[] CQ = Bytes.toBytes("CQ");
+
+  private static byte[] CQ2 = Bytes.toBytes("CQ2");
+
+  private static long COUNT = 1000;
+
+  private static AsyncConnection CONN;
+
+  private static AsyncTable<ScanResultConsumer> TABLE;
+
+  private static ExecutorService EXECUTOR_SERVICE;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Configuration conf = UTIL.getConfiguration();
+    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+      AggregateImplementation.class.getName());
+    UTIL.startMiniCluster(3);
+    byte[][] splitKeys = new byte[8][];
+    for (int i = 111; i < 999; i += 111) {
+      splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
+    }
+    UTIL.createTable(TABLE_NAME, CF, splitKeys);
+    CONN = 
ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
+    EXECUTOR_SERVICE = Executors.newFixedThreadPool(1);
+    TABLE = CONN.getTable(TABLE_NAME, EXECUTOR_SERVICE);
+    TABLE.putAll(LongStream.range(0, COUNT)
+      .mapToObj(l -> new Put(Bytes.toBytes(String.format("%03d", l)))
+        .addColumn(CF, CQ, Bytes.toBytes(l)).addColumn(CF, CQ2, 
Bytes.toBytes(l * l)))
+      .collect(Collectors.toList())).get();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    CONN.close();
+    UTIL.shutdownMiniCluster();
+    EXECUTOR_SERVICE.shutdownNow();
+  }
+
+  @Test
+  public void testMax() throws InterruptedException, ExecutionException {
+    assertEquals(COUNT - 1, AsyncAggregationClient
+      .max(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, 
CQ)).get().longValue());
+  }
+
+  @Test
+  public void testMin() throws InterruptedException, ExecutionException {
+    assertEquals(0, AsyncAggregationClient
+      .min(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, 
CQ)).get().longValue());
+  }
+
+  @Test
+  public void testRowCount() throws InterruptedException, ExecutionException {
+    assertEquals(COUNT,
+      AsyncAggregationClient
+        .rowCount(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, 
CQ)).get()
+        .longValue());
+
+    // Run the count twice in case some state doesn't get cleaned up inside 
AsyncTableImpl
+    // on the first time.
+    assertEquals(COUNT,
+      AsyncAggregationClient
+        .rowCount(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, 
CQ)).get()
+        .longValue());
+  }
+
+  @Test
+  public void testSum() throws InterruptedException, ExecutionException {
+    assertEquals(COUNT * (COUNT - 1) / 2, AsyncAggregationClient
+      .sum(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, 
CQ)).get().longValue());
+  }
+
+  private static final double DELTA = 1E-3;
+
+  @Test
+  public void testAvg() throws InterruptedException, ExecutionException {
+    assertEquals(
+      (COUNT - 1) / 2.0, AsyncAggregationClient
+        .avg(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, 
CQ)).get().doubleValue(),
+      DELTA);
+  }
+
+  @Test
+  public void testStd() throws InterruptedException, ExecutionException {
+    double avgSq =
+      LongStream.range(0, COUNT).map(l -> l * l).reduce((l1, l2) -> l1 + 
l2).getAsLong()
+        / (double) COUNT;
+    double avg = (COUNT - 1) / 2.0;
+    double std = Math.sqrt(avgSq - avg * avg);
+    assertEquals(
+      std, AsyncAggregationClient
+        .std(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, 
CQ)).get().doubleValue(),
+      DELTA);
+  }
+
+}

Reply via email to