Repository: hbase Updated Branches: refs/heads/master 3a4655019 -> 54827cf61
http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index 2917605..e2d23e5 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -18,6 +18,11 @@ */ package org.apache.hadoop.hbase; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.UniformReservoir; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + import java.io.IOException; import java.io.PrintStream; import java.lang.reflect.Constructor; @@ -61,7 +66,6 @@ import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RawAsyncTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.RowMutations; @@ -80,8 +84,6 @@ import org.apache.hadoop.hbase.io.hfile.RandomDistribution; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.CompactingMemStore; -import org.apache.hadoop.hbase.shaded.com.google.common.base.MoreObjects; -import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration; import org.apache.hadoop.hbase.trace.SpanReceiverHost; import org.apache.hadoop.hbase.trace.TraceUtil; @@ -105,10 +107,8 @@ import org.apache.htrace.core.Sampler; import org.apache.htrace.core.TraceScope; import org.apache.yetus.audience.InterfaceAudience; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.UniformReservoir; -import com.fasterxml.jackson.databind.MapperFeature; -import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.hbase.shaded.com.google.common.base.MoreObjects; +import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; /** * Script used evaluating HBase performance and scalability. Runs a HBase @@ -1302,7 +1302,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } static abstract class AsyncTableTest extends AsyncTest { - protected RawAsyncTable table; + protected AsyncTable<?> table; AsyncTableTest(AsyncConnection con, TestOptions options, Status status) { super(con, options, status); @@ -1310,7 +1310,7 @@ public class PerformanceEvaluation extends Configured implements Tool { @Override void onStartup() throws IOException { - this.table = connection.getRawTable(TableName.valueOf(opts.tableName)); + this.table = connection.getTable(TableName.valueOf(opts.tableName)); } @Override @@ -1435,7 +1435,7 @@ public class PerformanceEvaluation extends Configured implements Tool { static class AsyncScanTest extends AsyncTableTest { private ResultScanner testScanner; - private AsyncTable asyncTable; + private AsyncTable<?> asyncTable; AsyncScanTest(AsyncConnection con, TestOptions options, Status status) { super(con, options, status); http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java index 13e0e7c..5831bfc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ForkJoinPool; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -63,7 +64,7 @@ public abstract class AbstractTestAsyncTableScan { TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys); TEST_UTIL.waitTableAvailable(TABLE_NAME); ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); - ASYNC_CONN.getRawTable(TABLE_NAME).putAll(IntStream.range(0, COUNT) + ASYNC_CONN.getTable(TABLE_NAME).putAll(IntStream.range(0, COUNT) .mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i))) .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i))) .collect(Collectors.toList())).get(); @@ -92,7 +93,15 @@ public abstract class AbstractTestAsyncTableScan { return new Scan().setBatch(1).setMaxResultSize(1); } - protected static List<Pair<String, Supplier<Scan>>> getScanCreater() { + protected static AsyncTable<?> getRawTable() { + return ASYNC_CONN.getTable(TABLE_NAME); + } + + protected static AsyncTable<?> getTable() { + return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); + } + + private static List<Pair<String, Supplier<Scan>>> getScanCreator() { return Arrays.asList(Pair.newPair("normal", AbstractTestAsyncTableScan::createNormalScan), Pair.newPair("batch", AbstractTestAsyncTableScan::createBatchScan), Pair.newPair("smallResultSize", AbstractTestAsyncTableScan::createSmallResultSizeScan), @@ -100,6 +109,25 @@ public abstract class AbstractTestAsyncTableScan { AbstractTestAsyncTableScan::createBatchSmallResultSizeScan)); } + protected static List<Object[]> getScanCreatorParams() { + return getScanCreator().stream().map(p -> new Object[] { p.getFirst(), p.getSecond() }) + .collect(Collectors.toList()); + } + + private static List<Pair<String, Supplier<AsyncTable<?>>>> getTableCreator() { + return Arrays.asList(Pair.newPair("raw", AbstractTestAsyncTableScan::getRawTable), + Pair.newPair("normal", AbstractTestAsyncTableScan::getTable)); + } + + protected static List<Object[]> getTableAndScanCreatorParams() { + List<Pair<String, Supplier<AsyncTable<?>>>> tableCreator = getTableCreator(); + List<Pair<String, Supplier<Scan>>> scanCreator = getScanCreator(); + return tableCreator.stream() + .flatMap(tp -> scanCreator.stream().map( + sp -> new Object[] { tp.getFirst(), tp.getSecond(), sp.getFirst(), sp.getSecond() })) + .collect(Collectors.toList()); + } + protected abstract Scan createScan(); protected abstract List<Result> doScan(Scan scan) throws Exception; @@ -121,10 +149,11 @@ public abstract class AbstractTestAsyncTableScan { List<Result> results = doScan(createScan()); // make sure all scanners are closed at RS side TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer()) - .forEach(rs -> assertEquals( - "The scanner count of " + rs.getServerName() + " is " + + .forEach( + rs -> assertEquals( + "The scanner count of " + rs.getServerName() + " is " + rs.getRSRpcServices().getScannersCount(), - 0, rs.getRSRpcServices().getScannersCount())); + 0, rs.getRSRpcServices().getScannersCount())); assertEquals(COUNT, results.size()); IntStream.range(0, COUNT).forEach(i -> { Result result = results.get(i); @@ -150,7 +179,7 @@ public abstract class AbstractTestAsyncTableScan { public void testScanNoStopKey() throws Exception { int start = 345; List<Result> results = - doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)))); + doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)))); assertEquals(COUNT - start, results.size()); IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i)); } @@ -169,16 +198,16 @@ public abstract class AbstractTestAsyncTableScan { try { doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily"))); } catch (Exception e) { - assertTrue(e instanceof NoSuchColumnFamilyException - || e.getCause() instanceof NoSuchColumnFamilyException); + assertTrue(e instanceof NoSuchColumnFamilyException || + e.getCause() instanceof NoSuchColumnFamilyException); } } private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive, int limit) throws Exception { Scan scan = - createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive) - .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive); + createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive) + .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive); if (limit > 0) { scan.setLimit(limit); } @@ -195,9 +224,9 @@ public abstract class AbstractTestAsyncTableScan { private void testReversedScan(int start, boolean startInclusive, int stop, boolean stopInclusive, int limit) throws Exception { - Scan scan = createScan() - .withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive) - .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive).setReversed(true); + Scan scan = + createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive) + .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive).setReversed(true); if (limit > 0) { scan.setLimit(limit); } http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/BufferingScanResultConsumer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/BufferingScanResultConsumer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/BufferingScanResultConsumer.java new file mode 100644 index 0000000..036e8b3 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/BufferingScanResultConsumer.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Queue; + +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; + +/** + * A scan result consumer which buffers all the data in memory and you can call the {@link #take()} + * method below to get the result one by one. Should only be used by tests, do not write production + * code like this as the buffer is unlimited and may cause OOM. + */ +class BufferingScanResultConsumer implements AdvancedScanResultConsumer { + + private ScanMetrics scanMetrics; + + private final Queue<Result> queue = new ArrayDeque<>(); + + private boolean finished; + + private Throwable error; + + @Override + public void onScanMetricsCreated(ScanMetrics scanMetrics) { + this.scanMetrics = scanMetrics; + } + + @Override + public synchronized void onNext(Result[] results, ScanController controller) { + for (Result result : results) { + queue.offer(result); + } + notifyAll(); + } + + @Override + public synchronized void onError(Throwable error) { + finished = true; + this.error = error; + notifyAll(); + } + + @Override + public synchronized void onComplete() { + finished = true; + notifyAll(); + } + + public synchronized Result take() throws IOException, InterruptedException { + for (;;) { + if (!queue.isEmpty()) { + return queue.poll(); + } + if (finished) { + if (error != null) { + Throwables.propagateIfPossible(error, IOException.class); + throw new IOException(error); + } else { + return null; + } + } + wait(); + } + } + + public ScanMetrics getScanMetrics() { + return scanMetrics; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleRawScanResultConsumer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleRawScanResultConsumer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleRawScanResultConsumer.java deleted file mode 100644 index 526ae5e..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleRawScanResultConsumer.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables; - -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.Queue; - -import org.apache.hadoop.hbase.client.metrics.ScanMetrics; - -class SimpleRawScanResultConsumer implements RawScanResultConsumer { - - private ScanMetrics scanMetrics; - - private final Queue<Result> queue = new ArrayDeque<>(); - - private boolean finished; - - private Throwable error; - - @Override - public void onScanMetricsCreated(ScanMetrics scanMetrics) { - this.scanMetrics = scanMetrics; - } - - @Override - public synchronized void onNext(Result[] results, ScanController controller) { - for (Result result : results) { - queue.offer(result); - } - notifyAll(); - } - - @Override - public synchronized void onError(Throwable error) { - finished = true; - this.error = error; - notifyAll(); - } - - @Override - public synchronized void onComplete() { - finished = true; - notifyAll(); - } - - public synchronized Result take() throws IOException, InterruptedException { - for (;;) { - if (!queue.isEmpty()) { - return queue.poll(); - } - if (finished) { - if (error != null) { - Throwables.propagateIfPossible(error, IOException.class); - throw new IOException(error); - } else { - return null; - } - } - wait(); - } - } - - public ScanMetrics getScanMetrics() { - return scanMetrics; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java index dca66d5..5493772 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java @@ -96,7 +96,7 @@ public class TestAsyncBufferMutator { } // mutator.close will call mutator.flush automatically so all tasks should have been done. futures.forEach(f -> f.join()); - RawAsyncTable table = CONN.getRawTable(TABLE_NAME); + AsyncTable<?> table = CONN.getTable(TABLE_NAME); IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(i))).map(g -> table.get(g).join()) .forEach(r -> { assertArrayEquals(VALUE, r.getValue(CF, CQ)); http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java index e7c439b..47268c0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java @@ -212,7 +212,7 @@ public class TestAsyncClusterAdminApi extends TestAsyncAdminBase { private HRegionServer startAndWriteData(TableName tableName, byte[] value) throws Exception { createTableWithDefaultConf(tableName); - RawAsyncTable table = ASYNC_CONN.getRawTable(tableName); + AsyncTable<?> table = ASYNC_CONN.getTable(tableName); HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName); for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i))); @@ -305,7 +305,7 @@ public class TestAsyncClusterAdminApi extends TestAsyncAdminBase { TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(table); builder.addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)); admin.createTable(builder.build(), Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), 16).join(); - RawAsyncTable asyncTable = ASYNC_CONN.getRawTable(table); + AsyncTable<?> asyncTable = ASYNC_CONN.getTable(table); List<Put> puts = new ArrayList<>(); for (byte[] row : HBaseTestingUtility.ROWS) { puts.add(new Put(row).addColumn(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("v"))); http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java index 1ee1b94..9b552b4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java @@ -69,12 +69,12 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { TEST_UTIL.createMultiRegionTable(tableName, HConstants.CATALOG_FAMILY); AsyncTableRegionLocator locator = ASYNC_CONN.getRegionLocator(tableName); HRegionLocation regionLocation = locator.getRegionLocation(Bytes.toBytes("mmm")).get(); - RegionInfo region = regionLocation.getRegionInfo(); - byte[] regionName = regionLocation.getRegionInfo().getRegionName(); + RegionInfo region = regionLocation.getRegion(); + byte[] regionName = regionLocation.getRegion().getRegionName(); HRegionLocation location = rawAdmin.getRegionLocation(regionName).get(); - assertTrue(Bytes.equals(regionName, location.getRegionInfo().getRegionName())); + assertTrue(Bytes.equals(regionName, location.getRegion().getRegionName())); location = rawAdmin.getRegionLocation(region.getEncodedNameAsBytes()).get(); - assertTrue(Bytes.equals(regionName, location.getRegionInfo().getRegionName())); + assertTrue(Bytes.equals(regionName, location.getRegion().getRegionName())); } @Test @@ -252,7 +252,7 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { .filter(rs -> rs.getServerName().equals(serverName)).findFirst().get(); // write a put into the specific region - ASYNC_CONN.getRawTable(tableName) + ASYNC_CONN.getTable(tableName) .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-1"))) .join(); Assert.assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreSize() > 0); @@ -268,7 +268,7 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { Assert.assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreSize(), 0); // write another put into the specific region - ASYNC_CONN.getRawTable(tableName) + ASYNC_CONN.getTable(tableName) .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-2"))) .join(); Assert.assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreSize() > 0); @@ -288,7 +288,7 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { final int rows = 10000; loadData(tableName, families, rows); - RawAsyncTable metaTable = ASYNC_CONN.getRawTable(META_TABLE_NAME); + AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME); List<HRegionLocation> regionLocations = AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get(); int originalCount = regionLocations.size(); @@ -319,7 +319,7 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { byte[][] families = { FAMILY }; loadData(tableName, families, 1000); - RawAsyncTable metaTable = ASYNC_CONN.getRawTable(META_TABLE_NAME); + AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME); List<HRegionLocation> regionLocations = AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get(); int originalCount = regionLocations.size(); @@ -364,7 +364,7 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { byte[][] splitRows = new byte[][] { Bytes.toBytes("3"), Bytes.toBytes("6") }; createTableWithDefaultConf(tableName, splitRows); - RawAsyncTable metaTable = ASYNC_CONN.getRawTable(META_TABLE_NAME); + AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME); List<HRegionLocation> regionLocations = AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get(); RegionInfo regionA; @@ -372,16 +372,16 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { // merge with full name assertEquals(3, regionLocations.size()); - regionA = regionLocations.get(0).getRegionInfo(); - regionB = regionLocations.get(1).getRegionInfo(); + regionA = regionLocations.get(0).getRegion(); + regionB = regionLocations.get(1).getRegion(); admin.mergeRegions(regionA.getRegionName(), regionB.getRegionName(), false).get(); regionLocations = AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get(); assertEquals(2, regionLocations.size()); // merge with encoded name - regionA = regionLocations.get(0).getRegionInfo(); - regionB = regionLocations.get(1).getRegionInfo(); + regionA = regionLocations.get(0).getRegion(); + regionB = regionLocations.get(1).getRegion(); admin.mergeRegions(regionA.getRegionName(), regionB.getRegionName(), false).get(); regionLocations = @@ -404,12 +404,12 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { // create table createTableWithDefaultConf(tableName); - RawAsyncTable metaTable = ASYNC_CONN.getRawTable(META_TABLE_NAME); + AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME); List<HRegionLocation> regionLocations = AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get(); assertEquals(1, regionLocations.size()); - RawAsyncTable table = ASYNC_CONN.getRawTable(tableName); + AsyncTable<?> table = ASYNC_CONN.getTable(tableName); List<Put> puts = new ArrayList<>(); for (int i = 0; i < rowCount; i++) { Put put = new Put(Bytes.toBytes(i)); @@ -420,9 +420,9 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { if (isSplitRegion) { if (splitPoint == null) { - admin.splitRegion(regionLocations.get(0).getRegionInfo().getRegionName()).get(); + admin.splitRegion(regionLocations.get(0).getRegion().getRegionName()).get(); } else { - admin.splitRegion(regionLocations.get(0).getRegionInfo().getRegionName(), splitPoint).get(); + admin.splitRegion(regionLocations.get(0).getRegion().getRegionName(), splitPoint).get(); } } else { if (splitPoint == null) { @@ -585,7 +585,7 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { private static void loadData(final TableName tableName, final byte[][] families, final int rows, final int flushes) throws IOException { - RawAsyncTable table = ASYNC_CONN.getRawTable(tableName); + AsyncTable<?> table = ASYNC_CONN.getTable(tableName); List<Put> puts = new ArrayList<>(rows); byte[] qualifier = Bytes.toBytes("val"); for (int i = 0; i < flushes; i++) { http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java index fa7ff41..7ea69e5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java @@ -63,7 +63,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller { @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniCluster(2); - TEST_UTIL.getAdmin().setBalancerRunning(false, true); + TEST_UTIL.getAdmin().balancerSwitch(false, true); TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLE_NAME); AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); @@ -81,15 +81,15 @@ public class TestAsyncSingleRequestRpcRetryingCaller { public void testRegionMove() throws InterruptedException, ExecutionException, IOException { // This will leave a cached entry in location cache HRegionLocation loc = CONN.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get(); - int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegionInfo().getRegionName()); - TEST_UTIL.getAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(), Bytes.toBytes( + int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegion().getRegionName()); + TEST_UTIL.getAdmin().move(loc.getRegion().getEncodedNameAsBytes(), Bytes.toBytes( TEST_UTIL.getHBaseCluster().getRegionServer(1 - index).getServerName().getServerName())); - RawAsyncTable table = CONN.getRawTableBuilder(TABLE_NAME) + AsyncTable<?> table = CONN.getTableBuilder(TABLE_NAME) .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(30).build(); table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); // move back - TEST_UTIL.getAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(), + TEST_UTIL.getAdmin().move(loc.getRegion().getEncodedNameAsBytes(), Bytes.toBytes(loc.getServerName().getServerName())); Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get(); assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); @@ -166,7 +166,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller { return mockedLocator; } }) { - RawAsyncTable table = mockedConn.getRawTableBuilder(TABLE_NAME) + AsyncTable<?> table = mockedConn.getTableBuilder(TABLE_NAME) .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(5).build(); table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); assertTrue(errorTriggered.get()); http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java index 5d8b116..cecae47 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java @@ -78,13 +78,13 @@ public class TestAsyncTable { private byte[] row; @Parameter - public Supplier<AsyncTableBase> getTable; + public Supplier<AsyncTable<?>> getTable; - private static RawAsyncTable getRawTable() { - return ASYNC_CONN.getRawTable(TABLE_NAME); + private static AsyncTable<?> getRawTable() { + return ASYNC_CONN.getTable(TABLE_NAME); } - private static AsyncTable getTable() { + private static AsyncTable<?> getTable() { return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); } @@ -115,7 +115,7 @@ public class TestAsyncTable { @Test public void testSimple() throws Exception { - AsyncTableBase table = getTable.get(); + AsyncTable<?> table = getTable.get(); table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).get(); assertTrue(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get()); Result result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get(); @@ -132,7 +132,7 @@ public class TestAsyncTable { @Test public void testSimpleMultiple() throws Exception { - AsyncTableBase table = getTable.get(); + AsyncTable<?> table = getTable.get(); int count = 100; CountDownLatch putLatch = new CountDownLatch(count); IntStream.range(0, count).forEach( @@ -176,7 +176,7 @@ public class TestAsyncTable { @Test public void testIncrement() throws InterruptedException, ExecutionException { - AsyncTableBase table = getTable.get(); + AsyncTable<?> table = getTable.get(); int count = 100; CountDownLatch latch = new CountDownLatch(count); AtomicLong sum = new AtomicLong(0L); @@ -193,7 +193,7 @@ public class TestAsyncTable { @Test public void testAppend() throws InterruptedException, ExecutionException { - AsyncTableBase table = getTable.get(); + AsyncTable<?> table = getTable.get(); int count = 10; CountDownLatch latch = new CountDownLatch(count); char suffix = ':'; @@ -216,7 +216,7 @@ public class TestAsyncTable { @Test public void testCheckAndPut() throws InterruptedException, ExecutionException { - AsyncTableBase table = getTable.get(); + AsyncTable<?> table = getTable.get(); AtomicInteger successCount = new AtomicInteger(0); AtomicInteger successIndex = new AtomicInteger(-1); int count = 10; @@ -238,7 +238,7 @@ public class TestAsyncTable { @Test public void testCheckAndDelete() throws InterruptedException, ExecutionException { - AsyncTableBase table = getTable.get(); + AsyncTable<?> table = getTable.get(); int count = 10; CountDownLatch putLatch = new CountDownLatch(count + 1); table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); @@ -275,7 +275,7 @@ public class TestAsyncTable { @Test public void testMutateRow() throws InterruptedException, ExecutionException, IOException { - AsyncTableBase table = getTable.get(); + AsyncTable<?> table = getTable.get(); RowMutations mutation = new RowMutations(row); mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE)); table.mutateRow(mutation).get(); @@ -293,7 +293,7 @@ public class TestAsyncTable { @Test public void testCheckAndMutate() throws InterruptedException, ExecutionException { - AsyncTableBase table = getTable.get(); + AsyncTable<?> table = getTable.get(); int count = 10; CountDownLatch putLatch = new CountDownLatch(count + 1); table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java index 7bbbd71..2ba126f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java @@ -25,7 +25,6 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -39,7 +38,6 @@ import java.util.regex.Pattern; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.AsyncMetaTableAccessor; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -61,7 +59,7 @@ import org.junit.runners.Parameterized; * Class to test asynchronous table admin operations. */ @RunWith(Parameterized.class) -@Category({LargeTests.class, ClientTests.class}) +@Category({ LargeTests.class, ClientTests.class }) public class TestAsyncTableAdminApi extends TestAsyncAdminBase { @Test @@ -153,25 +151,25 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase { } private TableState.State getStateFromMeta(TableName table) throws Exception { - Optional<TableState> state = AsyncMetaTableAccessor.getTableState( - ASYNC_CONN.getRawTable(TableName.META_TABLE_NAME), table).get(); + Optional<TableState> state = AsyncMetaTableAccessor + .getTableState(ASYNC_CONN.getTable(TableName.META_TABLE_NAME), table).get(); assertTrue(state.isPresent()); return state.get().getState(); } @Test public void testCreateTableNumberOfRegions() throws Exception { - RawAsyncTable metaTable = ASYNC_CONN.getRawTable(META_TABLE_NAME); + AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME); createTableWithDefaultConf(tableName); List<HRegionLocation> regionLocations = - AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get(); + AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get(); assertEquals("Table should have only 1 region", 1, regionLocations.size()); final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "_2"); createTableWithDefaultConf(tableName2, new byte[][] { new byte[] { 42 } }); regionLocations = - AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName2)).get(); + AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName2)).get(); assertEquals("Table should have only 2 region", 2, regionLocations.size()); final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "_3"); @@ -179,7 +177,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase { builder.addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)); admin.createTable(builder.build(), "a".getBytes(), "z".getBytes(), 3).join(); regionLocations = - AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName3)).get(); + AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName3)).get(); assertEquals("Table should have only 3 region", 3, regionLocations.size()); final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "_4"); @@ -197,15 +195,15 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase { builder.addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)); admin.createTable(builder.build(), new byte[] { 1 }, new byte[] { 127 }, 16).join(); regionLocations = - AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName5)).get(); + AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName5)).get(); assertEquals("Table should have 16 region", 16, regionLocations.size()); } @Test public void testCreateTableWithRegions() throws Exception { byte[][] splitKeys = { new byte[] { 1, 1, 1 }, new byte[] { 2, 2, 2 }, new byte[] { 3, 3, 3 }, - new byte[] { 4, 4, 4 }, new byte[] { 5, 5, 5 }, new byte[] { 6, 6, 6 }, - new byte[] { 7, 7, 7 }, new byte[] { 8, 8, 8 }, new byte[] { 9, 9, 9 }, }; + new byte[] { 4, 4, 4 }, new byte[] { 5, 5, 5 }, new byte[] { 6, 6, 6 }, + new byte[] { 7, 7, 7 }, new byte[] { 8, 8, 8 }, new byte[] { 9, 9, 9 }, }; int expectedRegions = splitKeys.length + 1; boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(TEST_UTIL.getConfiguration()); createTableWithDefaultConf(tableName, splitKeys); @@ -213,9 +211,9 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase { boolean tableAvailable = admin.isTableAvailable(tableName, splitKeys).get(); assertTrue("Table should be created with splitKyes + 1 rows in META", tableAvailable); - RawAsyncTable metaTable = ASYNC_CONN.getRawTable(META_TABLE_NAME); + AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME); List<HRegionLocation> regions = - AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get(); + AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get(); Iterator<HRegionLocation> hris = regions.iterator(); assertEquals( @@ -223,36 +221,36 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase { expectedRegions, regions.size()); System.err.println("Found " + regions.size() + " regions"); - HRegionInfo hri; + RegionInfo hri; hris = regions.iterator(); - hri = hris.next().getRegionInfo(); + hri = hris.next().getRegion(); assertTrue(hri.getStartKey() == null || hri.getStartKey().length == 0); assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[0])); - hri = hris.next().getRegionInfo(); + hri = hris.next().getRegion(); assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[0])); assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[1])); - hri = hris.next().getRegionInfo(); + hri = hris.next().getRegion(); assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[1])); assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[2])); - hri = hris.next().getRegionInfo(); + hri = hris.next().getRegion(); assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[2])); assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[3])); - hri = hris.next().getRegionInfo(); + hri = hris.next().getRegion(); assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[3])); assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[4])); - hri = hris.next().getRegionInfo(); + hri = hris.next().getRegion(); assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[4])); assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[5])); - hri = hris.next().getRegionInfo(); + hri = hris.next().getRegion(); assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[5])); assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[6])); - hri = hris.next().getRegionInfo(); + hri = hris.next().getRegion(); assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[6])); assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[7])); - hri = hris.next().getRegionInfo(); + hri = hris.next().getRegion(); assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[7])); assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[8])); - hri = hris.next().getRegionInfo(); + hri = hris.next().getRegion(); assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[8])); assertTrue(hri.getEndKey() == null || hri.getEndKey().length == 0); if (tablesOnMaster) { @@ -274,41 +272,41 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase { admin.createTable(builder.build(), startKey, endKey, expectedRegions).join(); regions = - AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName2)).get(); + AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName2)).get(); assertEquals( "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(), expectedRegions, regions.size()); System.err.println("Found " + regions.size() + " regions"); hris = regions.iterator(); - hri = hris.next().getRegionInfo(); + hri = hris.next().getRegion(); assertTrue(hri.getStartKey() == null || hri.getStartKey().length == 0); assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 })); - hri = hris.next().getRegionInfo(); + hri = hris.next().getRegion(); assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 })); assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 2, 2, 2, 2, 2, 2, 2, 2, 2, 2 })); - hri = hris.next().getRegionInfo(); + hri = hris.next().getRegion(); assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 2, 2, 2, 2, 2, 2, 2, 2, 2, 2 })); assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 3, 3, 3, 3, 3, 3, 3, 3, 3, 3 })); - hri = hris.next().getRegionInfo(); + hri = hris.next().getRegion(); assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 3, 3, 3, 3, 3, 3, 3, 3, 3, 3 })); assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 4, 4, 4, 4, 4, 4, 4, 4, 4, 4 })); - hri = hris.next().getRegionInfo(); + hri = hris.next().getRegion(); assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 4, 4, 4, 4, 4, 4, 4, 4, 4, 4 })); assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 5, 5, 5, 5, 5, 5, 5, 5, 5, 5 })); - hri = hris.next().getRegionInfo(); + hri = hris.next().getRegion(); assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 5, 5, 5, 5, 5, 5, 5, 5, 5, 5 })); assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 6, 6, 6, 6, 6, 6, 6, 6, 6, 6 })); - hri = hris.next().getRegionInfo(); + hri = hris.next().getRegion(); assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 6, 6, 6, 6, 6, 6, 6, 6, 6, 6 })); assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 7, 7, 7, 7, 7, 7, 7, 7, 7, 7 })); - hri = hris.next().getRegionInfo(); + hri = hris.next().getRegion(); assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 7, 7, 7, 7, 7, 7, 7, 7, 7, 7 })); assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 8, 8, 8, 8, 8, 8, 8, 8, 8, 8 })); - hri = hris.next().getRegionInfo(); + hri = hris.next().getRegion(); assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 8, 8, 8, 8, 8, 8, 8, 8, 8, 8 })); assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 9, 9, 9, 9, 9, 9, 9, 9, 9, 9 })); - hri = hris.next().getRegionInfo(); + hri = hris.next().getRegion(); assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 9, 9, 9, 9, 9, 9, 9, 9, 9, 9 })); assertTrue(hri.getEndKey() == null || hri.getEndKey().length == 0); if (tablesOnMaster) { @@ -327,7 +325,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase { admin.createTable(builder.build(), startKey, endKey, expectedRegions).join(); regions = - AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName3)).get(); + AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName3)).get(); assertEquals( "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(), expectedRegions, regions.size()); @@ -339,8 +337,8 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase { // Try an invalid case where there are duplicate split keys splitKeys = new byte[][] { new byte[] { 1, 1, 1 }, new byte[] { 2, 2, 2 }, - new byte[] { 3, 3, 3 }, new byte[] { 2, 2, 2 } }; - final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "_4");; + new byte[] { 3, 3, 3 }, new byte[] { 2, 2, 2 } }; + final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "_4"); try { createTableWithDefaultConf(tableName4, splitKeys); fail("Should not be able to create this table because of " + "duplicate split keys"); @@ -353,10 +351,10 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase { throws IOException { int numRS = ((ClusterConnection) TEST_UTIL.getConnection()).getCurrentNrHRS(); - Map<ServerName, List<HRegionInfo>> server2Regions = new HashMap<>(); + Map<ServerName, List<RegionInfo>> server2Regions = new HashMap<>(); regions.stream().forEach((loc) -> { ServerName server = loc.getServerName(); - server2Regions.computeIfAbsent(server, (s) -> new ArrayList<>()).add(loc.getRegionInfo()); + server2Regions.computeIfAbsent(server, (s) -> new ArrayList<>()).add(loc.getRegion()); }); if (numRS >= 2) { // Ignore the master region server, @@ -424,7 +422,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase { // Create & Fill the table createTableWithDefaultConf(tableName, splitKeys); - RawAsyncTable table = ASYNC_CONN.getRawTable(tableName); + AsyncTable<?> table = ASYNC_CONN.getTable(tableName); int expectedRows = 10; for (int i = 0; i < expectedRows; i++) { byte[] data = Bytes.toBytes(String.valueOf(i)); @@ -449,7 +447,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase { @Test public void testDisableAndEnableTable() throws Exception { createTableWithDefaultConf(tableName); - RawAsyncTable table = ASYNC_CONN.getRawTable(tableName); + AsyncTable<?> table = ASYNC_CONN.getTable(tableName); final byte[] row = Bytes.toBytes("row"); final byte[] qualifier = Bytes.toBytes("qualifier"); final byte[] value = Bytes.toBytes("value"); @@ -502,8 +500,8 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase { final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "2"); createTableWithDefaultConf(tableName1); createTableWithDefaultConf(tableName2); - RawAsyncTable table1 = ASYNC_CONN.getRawTable(tableName1); - RawAsyncTable table2 = ASYNC_CONN.getRawTable(tableName1); + AsyncTable<?> table1 = ASYNC_CONN.getTable(tableName1); + AsyncTable<?> table2 = ASYNC_CONN.getTable(tableName1); final byte[] row = Bytes.toBytes("row"); final byte[] qualifier = Bytes.toBytes("qualifier"); @@ -517,8 +515,8 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase { table1.get(get).get(); table2.get(get).get(); - admin.listTableNames(Pattern.compile(tableName.getNameAsString() + ".*"), false) - .get().forEach(t -> admin.disableTable(t).join()); + admin.listTableNames(Pattern.compile(tableName.getNameAsString() + ".*"), false).get() + .forEach(t -> admin.disableTable(t).join()); // Test that tables are disabled get = new Get(row); @@ -541,8 +539,8 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase { assertEquals(TableState.State.DISABLED, getStateFromMeta(tableName1)); assertEquals(TableState.State.DISABLED, getStateFromMeta(tableName2)); - admin.listTableNames(Pattern.compile(tableName.getNameAsString() + ".*"), false) - .get().forEach(t -> admin.enableTable(t).join()); + admin.listTableNames(Pattern.compile(tableName.getNameAsString() + ".*"), false).get() + .forEach(t -> admin.enableTable(t).join()); // Test that tables are enabled try { @@ -562,16 +560,15 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase { @Test public void testEnableTableRetainAssignment() throws Exception { - byte[][] splitKeys = - { new byte[] { 1, 1, 1 }, new byte[] { 2, 2, 2 }, new byte[] { 3, 3, 3 }, - new byte[] { 4, 4, 4 }, new byte[] { 5, 5, 5 }, new byte[] { 6, 6, 6 }, - new byte[] { 7, 7, 7 }, new byte[] { 8, 8, 8 }, new byte[] { 9, 9, 9 } }; + byte[][] splitKeys = { new byte[] { 1, 1, 1 }, new byte[] { 2, 2, 2 }, new byte[] { 3, 3, 3 }, + new byte[] { 4, 4, 4 }, new byte[] { 5, 5, 5 }, new byte[] { 6, 6, 6 }, + new byte[] { 7, 7, 7 }, new byte[] { 8, 8, 8 }, new byte[] { 9, 9, 9 } }; int expectedRegions = splitKeys.length + 1; createTableWithDefaultConf(tableName, splitKeys); - RawAsyncTable metaTable = ASYNC_CONN.getRawTable(META_TABLE_NAME); + AsyncTable<AdvancedScanResultConsumer> metaTable = ASYNC_CONN.getTable(META_TABLE_NAME); List<HRegionLocation> regions = - AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get(); + AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get(); assertEquals( "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(), expectedRegions, regions.size()); @@ -582,7 +579,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase { admin.enableTable(tableName).join(); List<HRegionLocation> regions2 = - AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get(); + AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get(); // Check the assignment. assertEquals(regions.size(), regions2.size()); assertTrue(regions2.containsAll(regions)); @@ -611,8 +608,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase { verifyTableDescriptor(tableName, FAMILY_0); // Modify the table removing one family and verify the descriptor - admin.addColumnFamily(tableName, ColumnFamilyDescriptorBuilder.of(FAMILY_1)) - .join(); + admin.addColumnFamily(tableName, ColumnFamilyDescriptorBuilder.of(FAMILY_1)).join(); verifyTableDescriptor(tableName, FAMILY_0, FAMILY_1); } @@ -632,8 +628,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase { try { // Add same column family again - expect failure - this.admin.addColumnFamily(tableName, - ColumnFamilyDescriptorBuilder.of(FAMILY_1)).join(); + this.admin.addColumnFamily(tableName, ColumnFamilyDescriptorBuilder.of(FAMILY_1)).join(); Assert.fail("Delete a non-exist column family should fail"); } catch (Exception e) { // Expected. @@ -731,8 +726,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase { // Verify descriptor from HDFS MasterFileSystem mfs = TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterFileSystem(); Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), tableName); - TableDescriptor td = - FSTableDescriptors.getTableDescriptorFromFs(mfs.getFileSystem(), tableDir); + TableDescriptor td = FSTableDescriptors.getTableDescriptorFromFs(mfs.getFileSystem(), tableDir); verifyTableDescriptor(td, tableName, families); } @@ -768,7 +762,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase { @Test public void testCompactionTimestamps() throws Exception { createTableWithDefaultConf(tableName); - RawAsyncTable table = ASYNC_CONN.getRawTable(tableName); + AsyncTable<?> table = ASYNC_CONN.getTable(tableName); Optional<Long> ts = admin.getLastMajorCompactionTimestamp(tableName).get(); assertFalse(ts.isPresent()); Put p = new Put(Bytes.toBytes("row1")); @@ -783,9 +777,8 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase { // still 0, we flushed a file, but no major compaction happened assertFalse(ts.isPresent()); - byte[] regionName = - ASYNC_CONN.getRegionLocator(tableName).getRegionLocation(Bytes.toBytes("row1")).get() - .getRegionInfo().getRegionName(); + byte[] regionName = ASYNC_CONN.getRegionLocator(tableName) + .getRegionLocation(Bytes.toBytes("row1")).get().getRegion().getRegionName(); Optional<Long> ts1 = admin.getLastMajorCompactionTimestampForRegion(regionName).get(); assertFalse(ts1.isPresent()); p = new Put(Bytes.toBytes("row2")); @@ -823,7 +816,8 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase { } } // Sleep to wait region server report - Thread.sleep(TEST_UTIL.getConfiguration().getInt("hbase.regionserver.msginterval", 3 * 1000) * 2); + Thread + .sleep(TEST_UTIL.getConfiguration().getInt("hbase.regionserver.msginterval", 3 * 1000) * 2); ts = admin.getLastMajorCompactionTimestamp(tableName).get(); // after a compaction our earliest timestamp will have progressed forward http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java index fce9041..c80b27b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java @@ -32,13 +32,15 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; @@ -80,20 +82,20 @@ public class TestAsyncTableBatch { public String tableType; @Parameter(1) - public Function<TableName, AsyncTableBase> tableGetter; + public Function<TableName, AsyncTable<?>> tableGetter; - private static RawAsyncTable getRawTable(TableName tableName) { - return CONN.getRawTable(tableName); + private static AsyncTable<?> getRawTable(TableName tableName) { + return CONN.getTable(tableName); } - private static AsyncTable getTable(TableName tableName) { + private static AsyncTable<?> getTable(TableName tableName) { return CONN.getTable(tableName, ForkJoinPool.commonPool()); } @Parameters(name = "{index}: type={0}") public static List<Object[]> params() { - Function<TableName, AsyncTableBase> rawTableGetter = TestAsyncTableBatch::getRawTable; - Function<TableName, AsyncTableBase> tableGetter = TestAsyncTableBatch::getTable; + Function<TableName, AsyncTable<?>> rawTableGetter = TestAsyncTableBatch::getRawTable; + Function<TableName, AsyncTable<?>> tableGetter = TestAsyncTableBatch::getTable; return Arrays.asList(new Object[] { "raw", rawTableGetter }, new Object[] { "normal", tableGetter }); } @@ -134,18 +136,15 @@ public class TestAsyncTableBatch { } @Test - public void test() throws InterruptedException, ExecutionException, IOException { - AsyncTableBase table = tableGetter.apply(TABLE_NAME); + public void test() + throws InterruptedException, ExecutionException, IOException, TimeoutException { + AsyncTable<?> table = tableGetter.apply(TABLE_NAME); table.putAll(IntStream.range(0, COUNT) .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i))) .collect(Collectors.toList())).get(); - List<Result> results = - table - .getAll(IntStream.range(0, COUNT) - .mapToObj( - i -> Arrays.asList(new Get(getRow(i)), new Get(Arrays.copyOf(getRow(i), 4)))) - .flatMap(l -> l.stream()).collect(Collectors.toList())) - .get(); + List<Result> results = table.getAll(IntStream.range(0, COUNT) + .mapToObj(i -> Arrays.asList(new Get(getRow(i)), new Get(Arrays.copyOf(getRow(i), 4)))) + .flatMap(l -> l.stream()).collect(Collectors.toList())).get(); assertEquals(2 * COUNT, results.size()); for (int i = 0; i < COUNT; i++) { assertEquals(i, Bytes.toInt(results.get(2 * i).getValue(FAMILY, CQ))); @@ -153,19 +152,20 @@ public class TestAsyncTableBatch { } Admin admin = TEST_UTIL.getAdmin(); admin.flush(TABLE_NAME); - TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).forEach(r -> { - byte[] startKey = r.getRegionInfo().getStartKey(); - int number = startKey.length == 0 ? 55 : Integer.parseInt(Bytes.toString(startKey)); - byte[] splitPoint = Bytes.toBytes(String.format("%03d", number + 55)); - try { - admin.splitRegion(r.getRegionInfo().getRegionName(), splitPoint); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }); - // we are not going to test the function of split so no assertion here. Just wait for a while - // and then start our work. - Thread.sleep(5000); + List<Future<?>> splitFutures = + TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream().map(r -> { + byte[] startKey = r.getRegionInfo().getStartKey(); + int number = startKey.length == 0 ? 55 : Integer.parseInt(Bytes.toString(startKey)); + byte[] splitPoint = Bytes.toBytes(String.format("%03d", number + 55)); + try { + return admin.splitRegionAsync(r.getRegionInfo().getRegionName(), splitPoint); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }).collect(Collectors.toList()); + for (Future<?> future : splitFutures) { + future.get(30, TimeUnit.SECONDS); + } table.deleteAll( IntStream.range(0, COUNT).mapToObj(i -> new Delete(getRow(i))).collect(Collectors.toList())) .get(); @@ -179,7 +179,7 @@ public class TestAsyncTableBatch { @Test public void testMixed() throws InterruptedException, ExecutionException { - AsyncTableBase table = tableGetter.apply(TABLE_NAME); + AsyncTable<?> table = tableGetter.apply(TABLE_NAME); table.putAll(IntStream.range(0, 5) .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(FAMILY, CQ, Bytes.toBytes((long) i))) .collect(Collectors.toList())).get(); @@ -223,10 +223,10 @@ public class TestAsyncTableBatch { @Test public void testPartialSuccess() throws IOException, InterruptedException, ExecutionException { Admin admin = TEST_UTIL.getAdmin(); - HTableDescriptor htd = new HTableDescriptor(admin.getTableDescriptor(TABLE_NAME)); - htd.addCoprocessor(ErrorInjectObserver.class.getName()); - admin.modifyTable(TABLE_NAME, htd); - AsyncTableBase table = tableGetter.apply(TABLE_NAME); + TableDescriptor htd = TableDescriptorBuilder.newBuilder(admin.getDescriptor(TABLE_NAME)) + .addCoprocessor(ErrorInjectObserver.class.getName()).build(); + admin.modifyTable(htd); + AsyncTable<?> table = tableGetter.apply(TABLE_NAME); table.putAll(Arrays.asList(SPLIT_KEYS).stream().map(k -> new Put(k).addColumn(FAMILY, CQ, k)) .collect(Collectors.toList())).get(); List<CompletableFuture<Result>> futures = table http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java index 225060b..12a976e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT; -import static org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.TABLES_ON_MASTER; +import static org.apache.hadoop.hbase.master.LoadBalancer.TABLES_ON_MASTER; import static org.junit.Assert.assertEquals; import java.io.IOException; @@ -38,7 +38,6 @@ import java.util.stream.IntStream; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.MemoryCompactionPolicy; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -71,7 +70,7 @@ public class TestAsyncTableGetMultiThreaded { private static AsyncConnection CONN; - private static RawAsyncTable TABLE; + private static AsyncTable<?> TABLE; private static byte[][] SPLIT_KEYS; @@ -80,8 +79,7 @@ public class TestAsyncTableGetMultiThreaded { setUp(MemoryCompactionPolicy.NONE); } - protected static void setUp(MemoryCompactionPolicy memoryCompaction) - throws Exception { + protected static void setUp(MemoryCompactionPolicy memoryCompaction) throws Exception { TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none"); TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L); TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100); @@ -96,7 +94,7 @@ public class TestAsyncTableGetMultiThreaded { TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLE_NAME); CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); - TABLE = CONN.getRawTableBuilder(TABLE_NAME).setReadRpcTimeout(1, TimeUnit.SECONDS) + TABLE = CONN.getTableBuilder(TABLE_NAME).setReadRpcTimeout(1, TimeUnit.SECONDS) .setMaxRetries(1000).build(); TABLE.putAll( IntStream.range(0, COUNT).mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i))) @@ -123,8 +121,8 @@ public class TestAsyncTableGetMultiThreaded { public void test() throws IOException, InterruptedException, ExecutionException { int numThreads = 20; AtomicBoolean stop = new AtomicBoolean(false); - ExecutorService executor = Executors.newFixedThreadPool(numThreads, - Threads.newDaemonThreadFactory("TestAsyncGet-")); + ExecutorService executor = + Executors.newFixedThreadPool(numThreads, Threads.newDaemonThreadFactory("TestAsyncGet-")); List<Future<?>> futures = new ArrayList<>(); IntStream.range(0, numThreads).forEach(i -> futures.add(executor.submit(() -> { run(stop); @@ -138,13 +136,13 @@ public class TestAsyncTableGetMultiThreaded { region.compact(true); } Thread.sleep(5000); - admin.balancer(true); + admin.balance(true); Thread.sleep(5000); ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); ServerName newMetaServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() .map(t -> t.getRegionServer().getServerName()).filter(s -> !s.equals(metaServer)) .findAny().get(); - admin.move(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), + admin.move(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), Bytes.toBytes(newMetaServer.getServerName())); Thread.sleep(5000); } http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java index 0df8ce7..80e3e4c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java @@ -102,7 +102,7 @@ public class TestAsyncTableNoncedRetry { @Test public void testAppend() throws InterruptedException, ExecutionException { - RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME); + AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME); Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)).get(); assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)).get(); @@ -114,7 +114,7 @@ public class TestAsyncTableNoncedRetry { @Test public void testIncrement() throws InterruptedException, ExecutionException { - RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME); + AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME); assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get().longValue()); // the second call should have no effect as we always generate the same nonce. assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get().longValue()); http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java index 2e64593..ebe3a9d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.client; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.function.Supplier; -import java.util.stream.Collectors; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -42,8 +41,7 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan { @Parameters(name = "{index}: scan={0}") public static List<Object[]> params() { - return getScanCreater().stream().map(p -> new Object[] { p.getFirst(), p.getSecond() }) - .collect(Collectors.toList()); + return getScanCreatorParams(); } @Override @@ -53,7 +51,8 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan { @Override protected List<Result> doScan(Scan scan) throws Exception { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); + AsyncTable<ScanResultConsumer> table = + ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); SimpleScanResultConsumer consumer = new SimpleScanResultConsumer(); table.scan(scan, consumer); List<Result> results = consumer.getAll(); http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java index 1b414b2..393ded7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java @@ -17,11 +17,8 @@ */ package org.apache.hadoop.hbase.client; -import java.util.Arrays; import java.util.List; -import java.util.concurrent.ForkJoinPool; import java.util.function.Supplier; -import java.util.stream.Collectors; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -39,7 +36,7 @@ public class TestAsyncTableScanAll extends AbstractTestAsyncTableScan { public String tableType; @Parameter(1) - public Supplier<AsyncTableBase> getTable; + public Supplier<AsyncTable<?>> getTable; @Parameter(2) public String scanType; @@ -47,22 +44,9 @@ public class TestAsyncTableScanAll extends AbstractTestAsyncTableScan { @Parameter(3) public Supplier<Scan> scanCreator; - private static RawAsyncTable getRawTable() { - return ASYNC_CONN.getRawTable(TABLE_NAME); - } - - private static AsyncTable getTable() { - return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); - } - @Parameters(name = "{index}: table={0}, scan={2}") public static List<Object[]> params() { - Supplier<AsyncTableBase> rawTable = TestAsyncTableScanAll::getRawTable; - Supplier<AsyncTableBase> normalTable = TestAsyncTableScanAll::getTable; - return getScanCreater().stream() - .flatMap(p -> Arrays.asList(new Object[] { "raw", rawTable, p.getFirst(), p.getSecond() }, - new Object[] { "normal", normalTable, p.getFirst(), p.getSecond() }).stream()) - .collect(Collectors.toList()); + return getTableAndScanCreatorParams(); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java index b30ba0b..84537a7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java @@ -106,8 +106,8 @@ public class TestAsyncTableScanMetrics { private static Pair<List<Result>, ScanMetrics> doScanWithRawAsyncTable(Scan scan) throws IOException, InterruptedException { - SimpleRawScanResultConsumer consumer = new SimpleRawScanResultConsumer(); - CONN.getRawTable(TABLE_NAME).scan(scan, consumer); + BufferingScanResultConsumer consumer = new BufferingScanResultConsumer(); + CONN.getTable(TABLE_NAME).scan(scan, consumer); List<Result> results = new ArrayList<>(); for (Result result; (result = consumer.take()) != null;) { results.add(result); http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanRenewLease.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanRenewLease.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanRenewLease.java index c711f30..1996547 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanRenewLease.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanRenewLease.java @@ -49,7 +49,7 @@ public class TestAsyncTableScanRenewLease { private static AsyncConnection CONN; - private static RawAsyncTable TABLE; + private static AsyncTable<AdvancedScanResultConsumer> TABLE; private static int SCANNER_LEASE_TIMEOUT_PERIOD_MS = 5000; @@ -60,7 +60,7 @@ public class TestAsyncTableScanRenewLease { TEST_UTIL.startMiniCluster(1); TEST_UTIL.createTable(TABLE_NAME, FAMILY); CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); - TABLE = CONN.getRawTable(TABLE_NAME); + TABLE = CONN.getTable(TABLE_NAME); TABLE.putAll(IntStream.range(0, 10).mapToObj( i -> new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes(i))) .collect(Collectors.toList())).get(); @@ -72,7 +72,7 @@ public class TestAsyncTableScanRenewLease { TEST_UTIL.shutdownMiniCluster(); } - private static final class RenewLeaseConsumer implements RawScanResultConsumer { + private static final class RenewLeaseConsumer implements AdvancedScanResultConsumer { private final List<Result> results = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java index cefc882..bae2dbf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.function.Supplier; -import java.util.stream.Collectors; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -36,25 +35,30 @@ import org.junit.runners.Parameterized.Parameters; public class TestAsyncTableScanner extends AbstractTestAsyncTableScan { @Parameter(0) - public String scanType; + public String tableType; @Parameter(1) - public Supplier<Scan> scanCreater; + public Supplier<AsyncTable<?>> getTable; + + @Parameter(2) + public String scanType; + + @Parameter(3) + public Supplier<Scan> scanCreator; - @Parameters(name = "{index}: scan={0}") + @Parameters(name = "{index}: table={0}, scan={2}") public static List<Object[]> params() { - return getScanCreater().stream().map(p -> new Object[] { p.getFirst(), p.getSecond() }) - .collect(Collectors.toList()); + return getTableAndScanCreatorParams(); } @Override protected Scan createScan() { - return scanCreater.get(); + return scanCreator.get(); } @Override protected List<Result> doScan(Scan scan) throws Exception { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); + AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); List<Result> results = new ArrayList<>(); try (ResultScanner scanner = table.getScanner(scan)) { for (Result result; (result = scanner.next()) != null;) { http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java index a0ae150..0e18955 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java @@ -47,7 +47,7 @@ public class TestAsyncTableScannerCloseWhileSuspending { private static AsyncConnection CONN; - private static AsyncTable TABLE; + private static AsyncTable<?> TABLE; @BeforeClass public static void setUp() throws Exception { http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java index 4bca451..0624a30 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java @@ -52,9 +52,9 @@ public class TestRawAsyncScanCursor extends AbstractTestScanCursor { private void doTest(boolean reversed) throws InterruptedException, ExecutionException, IOException { CompletableFuture<Void> future = new CompletableFuture<>(); - RawAsyncTable table = CONN.getRawTable(TABLE_NAME); + AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME); table.scan(reversed ? createReversedScanWithSparseFilter() : createScanWithSparseFilter(), - new RawScanResultConsumer() { + new AdvancedScanResultConsumer() { private int count; @@ -121,8 +121,8 @@ public class TestRawAsyncScanCursor extends AbstractTestScanCursor { @Test public void testSizeLimit() throws InterruptedException, ExecutionException { CompletableFuture<Void> future = new CompletableFuture<>(); - RawAsyncTable table = CONN.getRawTable(TABLE_NAME); - table.scan(createScanWithSizeLimit(), new RawScanResultConsumer() { + AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME); + table.scan(createScanWithSizeLimit(), new AdvancedScanResultConsumer() { private int count; http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableLimitedScanWithFilter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableLimitedScanWithFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableLimitedScanWithFilter.java index f71561f..c40b34e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableLimitedScanWithFilter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableLimitedScanWithFilter.java @@ -58,14 +58,14 @@ public class TestRawAsyncTableLimitedScanWithFilter { private static AsyncConnection CONN; - private static RawAsyncTable TABLE; + private static AsyncTable<?> TABLE; @BeforeClass public static void setUp() throws Exception { UTIL.startMiniCluster(1); UTIL.createTable(TABLE_NAME, FAMILY); CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); - TABLE = CONN.getRawTable(TABLE_NAME); + TABLE = CONN.getTable(TABLE_NAME); TABLE.putAll(IntStream.range(0, ROW_COUNT).mapToObj(i -> { Put put = new Put(Bytes.toBytes(i)); IntStream.range(0, CQS.length) http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTablePartialScan.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTablePartialScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTablePartialScan.java index 2a32206..3cffdad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTablePartialScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTablePartialScan.java @@ -44,13 +44,13 @@ public class TestRawAsyncTablePartialScan { private static byte[] FAMILY = Bytes.toBytes("cf"); private static byte[][] CQS = - new byte[][] { Bytes.toBytes("cq1"), Bytes.toBytes("cq2"), Bytes.toBytes("cq3") }; + new byte[][] { Bytes.toBytes("cq1"), Bytes.toBytes("cq2"), Bytes.toBytes("cq3") }; private static int COUNT = 100; private static AsyncConnection CONN; - private static RawAsyncTable TABLE; + private static AsyncTable<?> TABLE; @BeforeClass public static void setUp() throws Exception { @@ -58,7 +58,7 @@ public class TestRawAsyncTablePartialScan { TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLE_NAME); CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); - TABLE = CONN.getRawTable(TABLE_NAME); + TABLE = CONN.getTable(TABLE_NAME); TABLE .putAll(IntStream.range(0, COUNT) .mapToObj(i -> new Put(Bytes.toBytes(String.format("%02d", i))) @@ -100,7 +100,7 @@ public class TestRawAsyncTablePartialScan { // we set batch to 2 and max result size to 1, then server will only returns one result per call // but we should get 2 + 1 for every row. List<Result> results = - TABLE.scanAll(new Scan().setBatch(2).setMaxResultSize(1).setReversed(true)).get(); + TABLE.scanAll(new Scan().setBatch(2).setMaxResultSize(1).setReversed(true)).get(); assertEquals(2 * COUNT, results.size()); for (int i = 0; i < COUNT; i++) { int row = COUNT - i - 1; http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java index 5311ca2..e25da28 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.client; import java.util.ArrayList; import java.util.List; import java.util.function.Supplier; -import java.util.stream.Collectors; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -42,8 +41,7 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan { @Parameters(name = "{index}: type={0}") public static List<Object[]> params() { - return getScanCreater().stream().map(p -> new Object[] { p.getFirst(), p.getSecond() }) - .collect(Collectors.toList()); + return getScanCreatorParams(); } @Override @@ -53,8 +51,8 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan { @Override protected List<Result> doScan(Scan scan) throws Exception { - SimpleRawScanResultConsumer scanConsumer = new SimpleRawScanResultConsumer(); - ASYNC_CONN.getRawTable(TABLE_NAME).scan(scan, scanConsumer); + BufferingScanResultConsumer scanConsumer = new BufferingScanResultConsumer(); + ASYNC_CONN.getTable(TABLE_NAME).scan(scan, scanConsumer); List<Result> results = new ArrayList<>(); for (Result result; (result = scanConsumer.take()) != null;) { results.add(result);