Repository: hbase Updated Branches: refs/heads/master 4e77b18da -> ffa0cea2a
http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java deleted file mode 100644 index 090c55a..0000000 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java +++ /dev/null @@ -1,345 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.KeyValue.Type; -import org.apache.hadoop.hbase.client.ClientSmallReversedScanner.SmallReversedScannerCallableFactory; -import org.apache.hadoop.hbase.client.metrics.ScanMetrics; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Iterator; -import java.util.Queue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * Test the ClientSmallReversedScanner. - */ -@Category(SmallTests.class) -public class TestClientSmallReversedScanner { - - Scan scan; - ExecutorService pool; - Configuration conf; - - ClusterConnection clusterConn; - RpcRetryingCallerFactory rpcFactory; - RpcControllerFactory controllerFactory; - RpcRetryingCaller<Result[]> caller; - - @Before - @SuppressWarnings({"deprecation", "unchecked"}) - public void setup() throws IOException { - clusterConn = Mockito.mock(ClusterConnection.class); - rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class); - controllerFactory = Mockito.mock(RpcControllerFactory.class); - pool = Executors.newSingleThreadExecutor(); - scan = new Scan(); - conf = new Configuration(); - Mockito.when(clusterConn.getConfiguration()).thenReturn(conf); - // Mock out the RpcCaller - caller = Mockito.mock(RpcRetryingCaller.class); - // Return the mock from the factory - Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); - } - - @After - public void teardown() { - if (null != pool) { - pool.shutdownNow(); - } - } - - /** - * Create a simple Answer which returns true the first time, and false every time after. - */ - private Answer<Boolean> createTrueThenFalseAnswer() { - return new Answer<Boolean>() { - boolean first = true; - - @Override - public Boolean answer(InvocationOnMock invocation) { - if (first) { - first = false; - return true; - } - return false; - } - }; - } - - private SmallReversedScannerCallableFactory getFactory( - final ScannerCallableWithReplicas callableWithReplicas) { - return new SmallReversedScannerCallableFactory() { - @Override - public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table, - Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum, - RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout, - int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller<Result[]> caller, - boolean isFirstRegionToLocate) { - return callableWithReplicas; - } - }; - } - - @Test - public void testContextPresent() throws Exception { - final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, - Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, - Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, - Type.Maximum); - - ScannerCallableWithReplicas callableWithReplicas = Mockito - .mock(ScannerCallableWithReplicas.class); - - // Mock out the RpcCaller - @SuppressWarnings("unchecked") - RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class); - // Return the mock from the factory - Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); - - // Intentionally leave a "default" caching size in the Scan. No matter the value, we - // should continue based on the server context - - SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas); - - try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan, - TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool, - Integer.MAX_VALUE)) { - - csrs.setScannerCallableFactory(factory); - - // Return some data the first time, less the second, and none after that - Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout())) - .thenAnswer(new Answer<Result[]>() { - int count = 0; - - @Override - public Result[] answer(InvocationOnMock invocation) { - Result[] results; - if (0 == count) { - results = new Result[] {Result.create(new Cell[] {kv3}), - Result.create(new Cell[] {kv2})}; - } else if (1 == count) { - results = new Result[] {Result.create(new Cell[] {kv1})}; - } else { - results = new Result[0]; - } - count++; - return results; - } - }); - - // Pass back the context always - Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true); - // Only have more results the first time - Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenAnswer( - createTrueThenFalseAnswer()); - - // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right - HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class); - Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo); - // Trigger the "no more data" branch for #nextScanner(...) - Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY); - - csrs.loadCache(); - - Queue<Result> results = csrs.cache; - Iterator<Result> iter = results.iterator(); - assertEquals(3, results.size()); - for (int i = 3; i >= 1 && iter.hasNext(); i--) { - Result result = iter.next(); - byte[] row = result.getRow(); - assertEquals("row" + i, new String(row, StandardCharsets.UTF_8)); - assertEquals(1, result.getMap().size()); - } - assertTrue(csrs.closed); - } - } - - @Test - public void testNoContextFewerRecords() throws Exception { - final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, - Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, - Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, - Type.Maximum); - - ScannerCallableWithReplicas callableWithReplicas = Mockito - .mock(ScannerCallableWithReplicas.class); - - // While the server returns 2 records per batch, we expect more records. - scan.setCaching(2); - - SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas); - - try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan, - TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool, - Integer.MAX_VALUE)) { - - csrs.setScannerCallableFactory(factory); - - // Return some data the first time, less the second, and none after that - Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout())) - .thenAnswer(new Answer<Result[]>() { - int count = 0; - - @Override - public Result[] answer(InvocationOnMock invocation) { - Result[] results; - if (0 == count) { - results = new Result[] {Result.create(new Cell[] {kv3}), - Result.create(new Cell[] {kv2})}; - } else if (1 == count) { - // Return fewer records than expected (2) - results = new Result[] {Result.create(new Cell[] {kv1})}; - } else { - throw new RuntimeException("Should not fetch a third batch from the server"); - } - count++; - return results; - } - }); - - // Server doesn't return the context - Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false); - // getServerHasMoreResults shouldn't be called when hasMoreResultsContext returns false - Mockito.when(callableWithReplicas.getServerHasMoreResults()) - .thenThrow(new RuntimeException("Should not be called")); - - // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right - HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class); - Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo); - // Trigger the "no more data" branch for #nextScanner(...) - Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY); - - csrs.loadCache(); - - Queue<Result> results = csrs.cache; - Iterator<Result> iter = results.iterator(); - assertEquals(2, results.size()); - for (int i = 3; i >= 2 && iter.hasNext(); i--) { - Result result = iter.next(); - byte[] row = result.getRow(); - assertEquals("row" + i, new String(row, StandardCharsets.UTF_8)); - assertEquals(1, result.getMap().size()); - } - - // "consume" the Results - results.clear(); - - csrs.loadCache(); - - assertEquals(1, results.size()); - Result result = results.peek(); - assertEquals("row1", new String(result.getRow(), StandardCharsets.UTF_8)); - assertEquals(1, result.getMap().size()); - - assertTrue(csrs.closed); - } - } - - @Test - public void testNoContextNoRecords() throws Exception { - ScannerCallableWithReplicas callableWithReplicas = Mockito - .mock(ScannerCallableWithReplicas.class); - - // While the server return 2 records per RPC, we expect there to be more records. - scan.setCaching(2); - - SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas); - - try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan, - TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool, - Integer.MAX_VALUE)) { - - csrs.setScannerCallableFactory(factory); - - // Return some data the first time, less the second, and none after that - Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout())) - .thenReturn(new Result[0]); - - // Server doesn't return the context - Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false); - // Only have more results the first time - Mockito.when(callableWithReplicas.getServerHasMoreResults()) - .thenThrow(new RuntimeException("Should not be called")); - - // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right - HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class); - Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo); - // Trigger the "no more data" branch for #nextScanner(...) - Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY); - - csrs.loadCache(); - - assertEquals(0, csrs.cache.size()); - assertTrue(csrs.closed); - } - } - - @Test - public void testContextNoRecords() throws Exception { - ScannerCallableWithReplicas callableWithReplicas = Mockito - .mock(ScannerCallableWithReplicas.class); - - SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas); - - try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan, - TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool, - Integer.MAX_VALUE)) { - - csrs.setScannerCallableFactory(factory); - - // Return some data the first time, less the second, and none after that - Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout())) - .thenReturn(new Result[0]); - - // Server doesn't return the context - Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true); - // Only have more results the first time - Mockito.when(callableWithReplicas.getServerHasMoreResults()) - .thenReturn(false); - - // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right - HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class); - Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo); - // Trigger the "no more data" branch for #nextScanner(...) - Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY); - - csrs.loadCache(); - - assertEquals(0, csrs.cache.size()); - assertTrue(csrs.closed); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java deleted file mode 100644 index 318fbe7..0000000 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallScanner.java +++ /dev/null @@ -1,335 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.KeyValue.Type; -import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory; -import org.apache.hadoop.hbase.client.metrics.ScanMetrics; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Queue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * Test the ClientSmallScanner. - */ -@Category(SmallTests.class) -public class TestClientSmallScanner { - - Scan scan; - ExecutorService pool; - Configuration conf; - - ClusterConnection clusterConn; - RpcRetryingCallerFactory rpcFactory; - RpcControllerFactory controllerFactory; - RpcRetryingCaller<Result[]> caller; - - @Before - @SuppressWarnings({"deprecation", "unchecked"}) - public void setup() throws IOException { - clusterConn = Mockito.mock(ClusterConnection.class); - rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class); - controllerFactory = Mockito.mock(RpcControllerFactory.class); - pool = Executors.newSingleThreadExecutor(); - scan = new Scan(); - conf = new Configuration(); - Mockito.when(clusterConn.getConfiguration()).thenReturn(conf); - // Mock out the RpcCaller - caller = Mockito.mock(RpcRetryingCaller.class); - // Return the mock from the factory - Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); - } - - @After - public void teardown() { - if (null != pool) { - pool.shutdownNow(); - } - } - - /** - * Create a simple Answer which returns true the first time, and false every time after. - */ - private Answer<Boolean> createTrueThenFalseAnswer() { - return new Answer<Boolean>() { - boolean first = true; - - @Override - public Boolean answer(InvocationOnMock invocation) { - if (first) { - first = false; - return true; - } - return false; - } - }; - } - - private SmallScannerCallableFactory getFactory( - final ScannerCallableWithReplicas callableWithReplicas) { - return new SmallScannerCallableFactory() { - @Override - public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table, - Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum, - RpcControllerFactory controllerFactory, ExecutorService pool, - int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf, - RpcRetryingCaller<Result[]> caller) { - return callableWithReplicas; - } - }; - } - - @Test - public void testContextPresent() throws Exception { - final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, - Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, - Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, - Type.Maximum); - - ScannerCallableWithReplicas callableWithReplicas = Mockito - .mock(ScannerCallableWithReplicas.class); - - // Mock out the RpcCaller - @SuppressWarnings("unchecked") - RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class); - // Return the mock from the factory - Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); - - SmallScannerCallableFactory factory = getFactory(callableWithReplicas); - - // Intentionally leave a "default" caching size in the Scan. No matter the value, we - // should continue based on the server context - - try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"), - clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { - - css.setScannerCallableFactory(factory); - - // Return some data the first time, less the second, and none after that - Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout())) - .thenAnswer(new Answer<Result[]>() { - int count = 0; - - @Override - public Result[] answer(InvocationOnMock invocation) { - Result[] results; - if (0 == count) { - results = new Result[] {Result.create(new Cell[] {kv1}), - Result.create(new Cell[] {kv2})}; - } else if (1 == count) { - results = new Result[] {Result.create(new Cell[] {kv3})}; - } else { - results = new Result[0]; - } - count++; - return results; - } - }); - - // Pass back the context always - Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true); - // Only have more results the first time - Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenAnswer( - createTrueThenFalseAnswer()); - - // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right - HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class); - Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo); - // Trigger the "no more data" branch for #nextScanner(...) - Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY); - - css.loadCache(); - - Queue<Result> results = css.cache; - assertEquals(3, results.size()); - for (int i = 1; i <= 3; i++) { - Result result = results.poll(); - byte[] row = result.getRow(); - assertEquals("row" + i, new String(row, StandardCharsets.UTF_8)); - assertEquals(1, result.getMap().size()); - } - - assertTrue(css.closed); - } - } - - @Test - public void testNoContextFewerRecords() throws Exception { - final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, - Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, - Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1, - Type.Maximum); - - ScannerCallableWithReplicas callableWithReplicas = Mockito - .mock(ScannerCallableWithReplicas.class); - - // While the server returns 2 records per batch, we expect more records. - scan.setCaching(2); - SmallScannerCallableFactory factory = getFactory(callableWithReplicas); - - try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"), - clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { - - css.setScannerCallableFactory(factory); - // Return some data the first time, less the second, and none after that - Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout())) - .thenAnswer(new Answer<Result[]>() { - int count = 0; - - @Override - public Result[] answer(InvocationOnMock invocation) { - Result[] results; - if (0 == count) { - results = new Result[] {Result.create(new Cell[] {kv1}), - Result.create(new Cell[] {kv2})}; - } else if (1 == count) { - // Return fewer records than expected (2) - results = new Result[] {Result.create(new Cell[] {kv3})}; - } else { - throw new RuntimeException("Should not fetch a third batch from the server"); - } - count++; - return results; - } - }); - - // Server doesn't return the context - Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false); - // Only have more results the first time - Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenThrow( - new RuntimeException("Should not be called")); - - // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right - HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class); - Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo); - // Trigger the "no more data" branch for #nextScanner(...) - Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY); - - css.loadCache(); - - Queue<Result> results = css.cache; - assertEquals(2, results.size()); - for (int i = 1; i <= 2; i++) { - Result result = results.poll(); - byte[] row = result.getRow(); - assertEquals("row" + i, new String(row, StandardCharsets.UTF_8)); - assertEquals(1, result.getMap().size()); - } - - // "consume" the results we verified - results.clear(); - - css.loadCache(); - - assertEquals(1, results.size()); - Result result = results.peek(); - assertEquals("row3", new String(result.getRow(), StandardCharsets.UTF_8)); - assertEquals(1, result.getMap().size()); - assertTrue(css.closed); - } - } - - @Test - public void testNoContextNoRecords() throws Exception { - ScannerCallableWithReplicas callableWithReplicas = Mockito - .mock(ScannerCallableWithReplicas.class); - - // While the server return 2 records per RPC, we expect there to be more records. - scan.setCaching(2); - - SmallScannerCallableFactory factory = getFactory(callableWithReplicas); - - try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"), - clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { - - css.setScannerCallableFactory(factory); - - // Return some data the first time, less the second, and none after that - Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout())) - .thenReturn(new Result[0]); - - // Server doesn't return the context - Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false); - // Only have more results the first time - Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenThrow( - new RuntimeException("Should not be called")); - - // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right - HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class); - Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo); - // Trigger the "no more data" branch for #nextScanner(...) - Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY); - - css.loadCache(); - - assertEquals(0, css.cache.size()); - assertTrue(css.closed); - } - } - - @Test - public void testContextNoRecords() throws Exception { - ScannerCallableWithReplicas callableWithReplicas = Mockito - .mock(ScannerCallableWithReplicas.class); - - SmallScannerCallableFactory factory = getFactory(callableWithReplicas); - - try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"), - clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { - - css.setScannerCallableFactory(factory); - - // Return some data the first time, less the second, and none after that - Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout())) - .thenReturn(new Result[0]); - - // Server doesn't return the context - Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true); - // Only have more results the first time - Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenReturn(false); - - // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right - HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class); - Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo); - // Trigger the "no more data" branch for #nextScanner(...) - Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY); - - css.loadCache(); - - assertEquals(0, css.cache.size()); - assertTrue(css.closed); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java index 0e2842b..574ec50 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java @@ -63,27 +63,27 @@ public class SyncTable extends Configured implements Tool { static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster"; static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster"; static final String DRY_RUN_CONF_KEY="sync.table.dry.run"; - + Path sourceHashDir; String sourceTableName; String targetTableName; - + String sourceZkCluster; String targetZkCluster; boolean dryRun; - + Counters counters; - + public SyncTable(Configuration conf) { super(conf); } - + public Job createSubmittableJob(String[] args) throws IOException { FileSystem fs = sourceHashDir.getFileSystem(getConf()); if (!fs.exists(sourceHashDir)) { throw new IOException("Source hash dir not found: " + sourceHashDir); } - + HashTable.TableHash tableHash = HashTable.TableHash.read(getConf(), sourceHashDir); LOG.info("Read source hash manifest: " + tableHash); LOG.info("Read " + tableHash.partitions.size() + " partition keys"); @@ -97,7 +97,7 @@ public class SyncTable extends Configured implements Tool { + " says numHashFiles=" + tableHash.numHashFiles + " but the number of partition keys" + " found in the partitions file is " + tableHash.partitions.size()); } - + Path dataDir = new Path(sourceHashDir, HashTable.HASH_DATA_DIR); int dataSubdirCount = 0; for (FileStatus file : fs.listStatus(dataDir)) { @@ -105,14 +105,14 @@ public class SyncTable extends Configured implements Tool { dataSubdirCount++; } } - + if (dataSubdirCount != tableHash.numHashFiles) { throw new RuntimeException("Hash data appears corrupt. The number of of hash files created" + " should be 1 more than the number of partition keys. However, the number of data dirs" + " found is " + dataSubdirCount + " but the number of partition keys" + " found in the partitions file is " + tableHash.partitions.size()); } - + Job job = Job.getInstance(getConf(),getConf().get("mapreduce.job.name", "syncTable_" + sourceTableName + "-" + targetTableName)); Configuration jobConf = job.getConfiguration(); @@ -127,12 +127,12 @@ public class SyncTable extends Configured implements Tool { jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster); } jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun); - + TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(), SyncMapper.class, null, null, job); - + job.setNumReduceTasks(0); - + if (dryRun) { job.setOutputFormatClass(NullOutputFormat.class); } else { @@ -140,37 +140,37 @@ public class SyncTable extends Configured implements Tool { // because it sets up the TableOutputFormat. TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null, targetZkCluster, null, null); - + // would be nice to add an option for bulk load instead } - + return job; } - + public static class SyncMapper extends TableMapper<ImmutableBytesWritable, Mutation> { Path sourceHashDir; - + Connection sourceConnection; Connection targetConnection; Table sourceTable; Table targetTable; boolean dryRun; - + HashTable.TableHash sourceTableHash; HashTable.TableHash.Reader sourceHashReader; ImmutableBytesWritable currentSourceHash; ImmutableBytesWritable nextSourceKey; HashTable.ResultHasher targetHasher; - + Throwable mapperException; - + public static enum Counter {BATCHES, HASHES_MATCHED, HASHES_NOT_MATCHED, SOURCEMISSINGROWS, SOURCEMISSINGCELLS, TARGETMISSINGROWS, TARGETMISSINGCELLS, ROWSWITHDIFFS, DIFFERENTCELLVALUES, MATCHINGROWS, MATCHINGCELLS, EMPTY_BATCHES, RANGESMATCHED, RANGESNOTMATCHED}; - + @Override protected void setup(Context context) throws IOException { - + Configuration conf = context.getConfiguration(); sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY)); sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, null); @@ -179,23 +179,23 @@ public class SyncTable extends Configured implements Tool { sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY); targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY); dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false); - + sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir); LOG.info("Read source hash manifest: " + sourceTableHash); LOG.info("Read " + sourceTableHash.partitions.size() + " partition keys"); - + TableSplit split = (TableSplit) context.getInputSplit(); ImmutableBytesWritable splitStartKey = new ImmutableBytesWritable(split.getStartRow()); - + sourceHashReader = sourceTableHash.newReader(conf, splitStartKey); findNextKeyHashPair(); - + // create a hasher, but don't start it right away // instead, find the first hash batch at or after the start row // and skip any rows that come before. they will be caught by the previous task targetHasher = new HashTable.ResultHasher(); } - + private static Connection openConnection(Configuration conf, String zkClusterConfKey, String configPrefix) throws IOException { @@ -204,12 +204,12 @@ public class SyncTable extends Configured implements Tool { zkCluster, configPrefix); return ConnectionFactory.createConnection(clusterConf); } - + private static Table openTable(Connection connection, Configuration conf, String tableNameConfKey) throws IOException { return connection.getTable(TableName.valueOf(conf.get(tableNameConfKey))); } - + /** * Attempt to read the next source key/hash pair. * If there are no more, set nextSourceKey to null @@ -223,7 +223,7 @@ public class SyncTable extends Configured implements Tool { nextSourceKey = null; } } - + @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { @@ -232,7 +232,7 @@ public class SyncTable extends Configured implements Tool { while (nextSourceKey != null && key.compareTo(nextSourceKey) >= 0) { moveToNextBatch(context); } - + // next, add the scanned row (as long as we've reached the first batch) if (targetHasher.isBatchStarted()) { targetHasher.hashResult(value); @@ -247,7 +247,7 @@ public class SyncTable extends Configured implements Tool { /** * If there is an open hash batch, complete it and sync if there are diffs. - * Start a new batch, and seek to read the + * Start a new batch, and seek to read the */ private void moveToNextBatch(Context context) throws IOException, InterruptedException { if (targetHasher.isBatchStarted()) { @@ -255,7 +255,7 @@ public class SyncTable extends Configured implements Tool { } targetHasher.startBatch(nextSourceKey); currentSourceHash = sourceHashReader.getCurrentHash(); - + findNextKeyHashPair(); } @@ -276,28 +276,28 @@ public class SyncTable extends Configured implements Tool { context.getCounter(Counter.HASHES_MATCHED).increment(1); } else { context.getCounter(Counter.HASHES_NOT_MATCHED).increment(1); - + ImmutableBytesWritable stopRow = nextSourceKey == null ? new ImmutableBytesWritable(sourceTableHash.stopRow) : nextSourceKey; - + if (LOG.isDebugEnabled()) { LOG.debug("Hash mismatch. Key range: " + toHex(targetHasher.getBatchStartKey()) + " to " + toHex(stopRow) + " sourceHash: " + toHex(currentSourceHash) + " targetHash: " + toHex(targetHash)); } - + syncRange(context, targetHasher.getBatchStartKey(), stopRow); } } private static String toHex(ImmutableBytesWritable bytes) { return Bytes.toHex(bytes.get(), bytes.getOffset(), bytes.getLength()); } - + private static final CellScanner EMPTY_CELL_SCANNER = new CellScanner(Iterators.<Result>emptyIterator()); - + /** * Rescan the given range directly from the source and target tables. * Count and log differences, and if this is not a dry run, output Puts and Deletes @@ -305,17 +305,16 @@ public class SyncTable extends Configured implements Tool { */ private void syncRange(Context context, ImmutableBytesWritable startRow, ImmutableBytesWritable stopRow) throws IOException, InterruptedException { - Scan scan = sourceTableHash.initScan(); scan.setStartRow(startRow.copyBytes()); scan.setStopRow(stopRow.copyBytes()); - + ResultScanner sourceScanner = sourceTable.getScanner(scan); CellScanner sourceCells = new CellScanner(sourceScanner.iterator()); - ResultScanner targetScanner = targetTable.getScanner(scan); + ResultScanner targetScanner = targetTable.getScanner(new Scan(scan)); CellScanner targetCells = new CellScanner(targetScanner.iterator()); - + boolean rangeMatched = true; byte[] nextSourceRow = sourceCells.nextRow(); byte[] nextTargetRow = targetCells.nextRow(); @@ -327,7 +326,7 @@ public class SyncTable extends Configured implements Tool { LOG.info("Target missing row: " + Bytes.toHex(nextSourceRow)); } context.getCounter(Counter.TARGETMISSINGROWS).increment(1); - + rowMatched = syncRowCells(context, nextSourceRow, sourceCells, EMPTY_CELL_SCANNER); nextSourceRow = sourceCells.nextRow(); // advance only source to next row } else if (rowComparison > 0) { @@ -335,41 +334,41 @@ public class SyncTable extends Configured implements Tool { LOG.info("Source missing row: " + Bytes.toHex(nextTargetRow)); } context.getCounter(Counter.SOURCEMISSINGROWS).increment(1); - + rowMatched = syncRowCells(context, nextTargetRow, EMPTY_CELL_SCANNER, targetCells); nextTargetRow = targetCells.nextRow(); // advance only target to next row } else { // current row is the same on both sides, compare cell by cell rowMatched = syncRowCells(context, nextSourceRow, sourceCells, targetCells); - nextSourceRow = sourceCells.nextRow(); + nextSourceRow = sourceCells.nextRow(); nextTargetRow = targetCells.nextRow(); } - + if (!rowMatched) { rangeMatched = false; } } - + sourceScanner.close(); targetScanner.close(); - + context.getCounter(rangeMatched ? Counter.RANGESMATCHED : Counter.RANGESNOTMATCHED) .increment(1); } - + private static class CellScanner { private final Iterator<Result> results; - + private byte[] currentRow; private Result currentRowResult; private int nextCellInRow; - + private Result nextRowResult; - + public CellScanner(Iterator<Result> results) { this.results = results; } - + /** * Advance to the next row and return its row key. * Returns null iff there are no more rows. @@ -390,7 +389,7 @@ public class SyncTable extends Configured implements Tool { nextRowResult = null; } } - + if (nextRowResult == null) { // end of data, no more rows currentRowResult = null; @@ -398,7 +397,7 @@ public class SyncTable extends Configured implements Tool { return null; } } - + // advance to cached result for next row currentRowResult = nextRowResult; nextCellInRow = 0; @@ -406,7 +405,7 @@ public class SyncTable extends Configured implements Tool { nextRowResult = null; return currentRow; } - + /** * Returns the next Cell in the current row or null iff none remain. */ @@ -415,7 +414,7 @@ public class SyncTable extends Configured implements Tool { // nothing left in current row return null; } - + Cell nextCell = currentRowResult.rawCells()[nextCellInRow]; nextCellInRow++; if (nextCellInRow == currentRowResult.size()) { @@ -441,7 +440,7 @@ public class SyncTable extends Configured implements Tool { return nextCell; } } - + /** * Compare the cells for the given row from the source and target tables. * Count and log any differences. @@ -465,14 +464,14 @@ public class SyncTable extends Configured implements Tool { } context.getCounter(Counter.TARGETMISSINGCELLS).increment(1); matchingRow = false; - + if (!dryRun) { if (put == null) { put = new Put(rowKey); } put.add(sourceCell); } - + sourceCell = sourceCells.nextCellInRow(); } else if (cellKeyComparison > 0) { if (LOG.isDebugEnabled()) { @@ -480,7 +479,7 @@ public class SyncTable extends Configured implements Tool { } context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1); matchingRow = false; - + if (!dryRun) { if (delete == null) { delete = new Delete(rowKey); @@ -489,7 +488,7 @@ public class SyncTable extends Configured implements Tool { delete.addColumn(CellUtil.cloneFamily(targetCell), CellUtil.cloneQualifier(targetCell), targetCell.getTimestamp()); } - + targetCell = targetCells.nextCellInRow(); } else { // the cell keys are equal, now check values @@ -507,7 +506,7 @@ public class SyncTable extends Configured implements Tool { } context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1); matchingRow = false; - + if (!dryRun) { // overwrite target cell if (put == null) { @@ -519,7 +518,7 @@ public class SyncTable extends Configured implements Tool { sourceCell = sourceCells.nextCellInRow(); targetCell = targetCells.nextCellInRow(); } - + if (!dryRun && sourceTableHash.scanBatch > 0) { if (put != null && put.size() >= sourceTableHash.scanBatch) { context.write(new ImmutableBytesWritable(rowKey), put); @@ -531,7 +530,7 @@ public class SyncTable extends Configured implements Tool { } } } - + if (!dryRun) { if (put != null) { context.write(new ImmutableBytesWritable(rowKey), put); @@ -540,7 +539,7 @@ public class SyncTable extends Configured implements Tool { context.write(new ImmutableBytesWritable(rowKey), delete); } } - + if (matchingCells > 0) { context.getCounter(Counter.MATCHINGCELLS).increment(matchingCells); } @@ -581,21 +580,21 @@ public class SyncTable extends Configured implements Tool { if (c2 == null) { return -1; // target missing cell } - + int result = CellComparator.compareFamilies(c1, c2); if (result != 0) { return result; } - + result = CellComparator.compareQualifiers(c1, c2); if (result != 0) { return result; } - + // note timestamp comparison is inverted - more recent cells first return CellComparator.compareTimestamps(c1, c2); } - + @Override protected void cleanup(Context context) throws IOException, InterruptedException { @@ -606,7 +605,7 @@ public class SyncTable extends Configured implements Tool { mapperException = t; } } - + try { sourceTable.close(); targetTable.close(); @@ -619,7 +618,7 @@ public class SyncTable extends Configured implements Tool { LOG.error("Suppressing exception from closing tables", t); } } - + // propagate first exception if (mapperException != null) { Throwables.propagateIfInstanceOf(mapperException, IOException.class); @@ -639,7 +638,7 @@ public class SyncTable extends Configured implements Tool { && (nextSourceKey.compareTo(splitEndRow) < 0 || reachedEndOfTable)) { moveToNextBatch(context); } - + if (targetHasher.isBatchStarted()) { // need to complete the final open hash batch @@ -654,7 +653,7 @@ public class SyncTable extends Configured implements Tool { } else { scan.setStopRow(nextSourceKey.copyBytes()); } - + ResultScanner targetScanner = null; try { targetScanner = targetTable.getScanner(scan); @@ -672,7 +671,7 @@ public class SyncTable extends Configured implements Tool { } } } - + private static final int NUM_ARGS = 3; private static void printUsage(final String errorMsg) { if (errorMsg != null && errorMsg.length() > 0) { @@ -682,7 +681,7 @@ public class SyncTable extends Configured implements Tool { System.err.println("Usage: SyncTable [options] <sourcehashdir> <sourcetable> <targettable>"); System.err.println(); System.err.println("Options:"); - + System.err.println(" sourcezkcluster ZK cluster key of the source table"); System.err.println(" (defaults to cluster in classpath's config)"); System.err.println(" targetzkcluster ZK cluster key of the target table"); @@ -704,7 +703,7 @@ public class SyncTable extends Configured implements Tool { + " --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase" + " hdfs://nn:9000/hashes/tableA tableA tableA"); } - + private boolean doCommandLine(final String[] args) { if (args.length < NUM_ARGS) { printUsage(null); @@ -714,37 +713,37 @@ public class SyncTable extends Configured implements Tool { sourceHashDir = new Path(args[args.length - 3]); sourceTableName = args[args.length - 2]; targetTableName = args[args.length - 1]; - + for (int i = 0; i < args.length - NUM_ARGS; i++) { String cmd = args[i]; if (cmd.equals("-h") || cmd.startsWith("--h")) { printUsage(null); return false; } - + final String sourceZkClusterKey = "--sourcezkcluster="; if (cmd.startsWith(sourceZkClusterKey)) { sourceZkCluster = cmd.substring(sourceZkClusterKey.length()); continue; } - + final String targetZkClusterKey = "--targetzkcluster="; if (cmd.startsWith(targetZkClusterKey)) { targetZkCluster = cmd.substring(targetZkClusterKey.length()); continue; } - + final String dryRunKey = "--dryrun="; if (cmd.startsWith(dryRunKey)) { dryRun = Boolean.parseBoolean(cmd.substring(dryRunKey.length())); continue; } - + printUsage("Invalid argument '" + cmd + "'"); return false; } - + } catch (Exception e) { e.printStackTrace(); printUsage("Can't start because " + e.getMessage()); @@ -752,7 +751,7 @@ public class SyncTable extends Configured implements Tool { } return true; } - + /** * Main entry point. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 925128c..ae4e49d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -207,8 +207,6 @@ import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.zookeeper.KeeperException; -import com.google.common.annotations.VisibleForTesting; - /** * Implements the regionserver RPC services. */ @@ -353,14 +351,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler, private final String scannerName; private final RegionScanner s; private final Region r; + private final boolean allowPartial; private final RpcCallback closeCallBack; private final RpcCallback shippedCallback; - public RegionScannerHolder(String scannerName, RegionScanner s, Region r, + public RegionScannerHolder(String scannerName, RegionScanner s, Region r, boolean allowPartial, RpcCallback closeCallBack, RpcCallback shippedCallback) { this.scannerName = scannerName; this.s = s; this.r = r; + this.allowPartial = allowPartial; this.closeCallBack = closeCallBack; this.shippedCallback = shippedCallback; } @@ -1211,8 +1211,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return lastBlock; } - private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r) - throws LeaseStillHeldException { + private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r, + boolean allowPartial) throws LeaseStillHeldException { Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod, new ScannerListener(scannerName)); RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, s, lease); @@ -1223,7 +1223,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, closeCallback = new RegionScannerCloseCallBack(s); } RegionScannerHolder rsh = - new RegionScannerHolder(scannerName, s, r, closeCallback, shippedCallback); + new RegionScannerHolder(scannerName, s, r, allowPartial, closeCallback, shippedCallback); RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!"; return rsh; @@ -2685,8 +2685,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return rsh; } - private Pair<RegionScannerHolder, Boolean> newRegionScanner(ScanRequest request, - ScanResponse.Builder builder) throws IOException { + private RegionScannerHolder newRegionScanner(ScanRequest request, ScanResponse.Builder builder) + throws IOException { Region region = getRegion(request.getRegion()); ClientProtos.Scan protoScan = request.getScan(); boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); @@ -2717,7 +2717,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, builder.setMvccReadPoint(scanner.getMvccReadPoint()); builder.setTtl(scannerLeaseTimeoutPeriod); String scannerName = String.valueOf(scannerId); - return Pair.newPair(addScanner(scannerName, scanner, region), scan.isSmall()); + return addScanner(scannerName, scanner, region, + !scan.isSmall() && !(request.hasLimitOfRows() && request.getLimitOfRows() > 0)); } private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh) @@ -2773,9 +2774,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // return whether we have more results in region. private boolean scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, - boolean isSmallScan, long maxQuotaResultSize, int rows, List<Result> results, - ScanResponse.Builder builder, MutableObject lastBlock, RpcCallContext context) - throws IOException { + long maxQuotaResultSize, int rows, List<Result> results, ScanResponse.Builder builder, + MutableObject lastBlock, RpcCallContext context) throws IOException { Region region = rsh.r; RegionScanner scanner = rsh.s; long maxResultSize; @@ -2806,7 +2806,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // formed. boolean serverGuaranteesOrderOfPartials = results.isEmpty(); boolean allowPartialResults = - clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan; + clientHandlesPartials && serverGuaranteesOrderOfPartials && rsh.allowPartial; boolean moreRows = false; // Heartbeat messages occur when the processing of the ScanRequest is exceeds a @@ -2963,15 +2963,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, rpcScanRequestCount.increment(); RegionScannerHolder rsh; ScanResponse.Builder builder = ScanResponse.newBuilder(); - boolean isSmallScan; try { if (request.hasScannerId()) { rsh = getRegionScanner(request); - isSmallScan = false; } else { - Pair<RegionScannerHolder, Boolean> pair = newRegionScanner(request, builder); - rsh = pair.getFirst(); - isSmallScan = pair.getSecond().booleanValue(); + rsh = newRegionScanner(request, builder); } } catch (IOException e) { if (e == SCANNER_ALREADY_CLOSED) { @@ -3058,7 +3054,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } if (!done) { - moreResultsInRegion = scan((HBaseRpcController) controller, request, rsh, isSmallScan, + moreResultsInRegion = scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, results, builder, lastBlock, context); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java index c6d3e80..73160bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java @@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.tool.Canary.RegionTask.TaskType; import org.apache.hadoop.hbase.util.Bytes; @@ -367,7 +368,7 @@ public final class Canary implements Tool { scan.setFilter(new FirstKeyOnlyFilter()); scan.addFamily(column.getName()); scan.setMaxResultSize(1L); - scan.setSmall(true); + scan.setOneRowLimit(); } if (LOG.isDebugEnabled()) { @@ -502,7 +503,7 @@ public final class Canary implements Tool { scan.setFilter(new FirstKeyOnlyFilter()); scan.setCaching(1); scan.setMaxResultSize(1L); - scan.setSmall(true); + scan.setOneRowLimit(); stopWatch.start(); ResultScanner s = table.getScanner(scan); s.next(); http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 6643720..4539f97 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -2238,13 +2238,13 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } public int countRows(final Table table, final Scan scan) throws IOException { - ResultScanner results = table.getScanner(scan); - int count = 0; - for (@SuppressWarnings("unused") Result res : results) { - count++; + try (ResultScanner results = table.getScanner(scan)) { + int count = 0; + while (results.next() != null) { + count++; + } + return count; } - results.close(); - return count; } public int countRows(final Table table, final byte[]... families) throws IOException { http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java index 368f050..414ffa7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java @@ -167,7 +167,7 @@ public class TestMetaTableAccessorNoCluster { public ScanResponse answer(InvocationOnMock invocation) throws Throwable { ((HBaseRpcController) invocation.getArguments()[0]).setCellScanner(CellUtil .createCellScanner(cellScannables)); - return builder.build(); + return builder.setScannerId(1234567890L).build(); } }).thenReturn(ScanResponse.newBuilder().setMoreResults(false).build()); // Associate a spied-upon Connection with UTIL.getConfiguration. Need http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java index 18a4d86..7c3c343 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java @@ -609,7 +609,6 @@ public class TestPartialResultsFromClientSide { scan.setAllowPartialResults(true); scan.setSmall(true); scan.setMaxResultSize(1); - ResultScanner scanner = TABLE.getScanner(scan); Result r = null; @@ -733,11 +732,13 @@ public class TestPartialResultsFromClientSide { byte[] value = Bytes.createMaxByteArray(100); Table tmpTable = createTestTable(testName, rows, families, qualifiers, value); - // Open scanner before deletes ResultScanner scanner = tmpTable.getScanner(new Scan().setMaxResultSize(1).setAllowPartialResults(true)); - + // now the openScanner will also fetch data and will be executed lazily, i.e, only openScanner + // when you call next, so here we need to make a next call to open scanner. The maxResultSize + // limit can make sure that we will not fetch all the data at once, so the test sill works. + int scannerCount = scanner.next().rawCells().length; Delete delete1 = new Delete(rows[0]); delete1.addColumn(families[0], qualifiers[0], 0); tmpTable.delete(delete1); @@ -747,7 +748,7 @@ public class TestPartialResultsFromClientSide { tmpTable.delete(delete2); // Should see all cells because scanner was opened prior to deletes - int scannerCount = countCellsFromScanner(scanner); + scannerCount += countCellsFromScanner(scanner); int expectedCount = numRows * numFamilies * numQualifiers; assertTrue("scannerCount: " + scannerCount + " expectedCount: " + expectedCount, scannerCount == expectedCount); @@ -760,6 +761,7 @@ public class TestPartialResultsFromClientSide { scannerCount == expectedCount); scanner = tmpTable.getScanner(new Scan().setMaxResultSize(1).setAllowPartialResults(true)); + scannerCount = scanner.next().rawCells().length; // Put in 2 new rows. The timestamps differ from the deleted rows Put put1 = new Put(rows[0]); put1.add(new KeyValue(rows[0], families[0], qualifiers[0], 1, value)); @@ -770,7 +772,7 @@ public class TestPartialResultsFromClientSide { tmpTable.put(put2); // Scanner opened prior to puts. Cell count shouldn't have changed - scannerCount = countCellsFromScanner(scanner); + scannerCount += countCellsFromScanner(scanner); expectedCount = numRows * numFamilies * numQualifiers - 2; assertTrue("scannerCount: " + scannerCount + " expectedCount: " + expectedCount, scannerCount == expectedCount); http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java index 432fb69..79fcde8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java @@ -86,9 +86,11 @@ public class TestClientScannerRPCTimeout { public void testScannerNextRPCTimesout() throws Exception { final TableName TABLE_NAME = TableName.valueOf("testScannerNextRPCTimesout"); Table ht = TEST_UTIL.createTable(TABLE_NAME, FAMILY); + byte[] r0 = Bytes.toBytes("row-0"); byte[] r1 = Bytes.toBytes("row-1"); byte[] r2 = Bytes.toBytes("row-2"); byte[] r3 = Bytes.toBytes("row-3"); + putToTable(ht, r0); putToTable(ht, r1); putToTable(ht, r2); putToTable(ht, r3); @@ -98,6 +100,9 @@ public class TestClientScannerRPCTimeout { scan.setCaching(1); ResultScanner scanner = ht.getScanner(scan); Result result = scanner.next(); + // fetched when openScanner + assertTrue("Expected row: row-0", Bytes.equals(r0, result.getRow())); + result = scanner.next(); assertTrue("Expected row: row-1", Bytes.equals(r1, result.getRow())); LOG.info("Got expected first row"); long t1 = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index 0e271f0..e40975e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -727,6 +727,7 @@ public class TestFromClientSide3 { scan.setTimeRange(0, Long.MAX_VALUE); scan.setCaching(1); ResultScanner scanner = table.getScanner(scan); + int rowNum = scanner.next() != null ? 1 : 0; //the started scanner shouldn't see the rows put below for(int i = 1; i < 1000; i++) { put = new Put(Bytes.toBytes(String.valueOf(i))); @@ -734,7 +735,6 @@ public class TestFromClientSide3 { put.addColumn(FAMILY, Bytes.toBytes( ""), Bytes.toBytes(i)); table.put(put); } - int rowNum = 0; for(Result result : scanner) { rowNum++; } http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java index ef8f234..e3e33a3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java @@ -106,8 +106,8 @@ public class TestLeaseRenewal { Scan s = new Scan(); s.setCaching(1); ResultScanner rs = table.getScanner(s); - // make sure that calling renewLease does not impact the scan results - assertTrue(rs.renewLease()); + // we haven't open the scanner yet so nothing happens + assertFalse(rs.renewLease()); assertTrue(Arrays.equals(rs.next().getRow(), ANOTHERROW)); // renew the lease a few times, long enough to be sure // the lease would have expired otherwise http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java index 965bb9e..b480e8b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java @@ -183,7 +183,7 @@ public class TestMobStoreScanner { Scan scan = new Scan(); scan.setCaching(1); ResultScanner rs = table.getScanner(scan); - + Result result = rs.next(); Put put3 = new Put(row1); byte[] value3 = Bytes.toBytes("value3"); put3.addColumn(family, qf1, ts, value3); @@ -192,7 +192,7 @@ public class TestMobStoreScanner { byte[] value4 = Bytes.toBytes("value4"); put4.addColumn(family, qf1, ts, value4); table.put(put4); - Result result = rs.next(); + Cell cell = result.getColumnLatestCell(family, qf1); Assert.assertArrayEquals(value1, CellUtil.cloneValue(cell)); http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java index a15cbb3..178b537 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java @@ -45,12 +45,11 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import junit.framework.Assert; - @Category({RegionServerTests.class, MediumTests.class}) public class TestScannerWithBulkload { private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -213,6 +212,7 @@ public class TestScannerWithBulkload { final Admin admin = TEST_UTIL.getAdmin(); createTable(admin, tableName); Scan scan = createScan(); + scan.setCaching(1); final Table table = init(admin, l, scan, tableName); // use bulkload final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadWithParallelScan/", @@ -221,6 +221,7 @@ public class TestScannerWithBulkload { conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true); final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); ResultScanner scanner = table.getScanner(scan); + Result result = scanner.next(); // Create a scanner and then do bulk load final CountDownLatch latch = new CountDownLatch(1); new Thread() { @@ -242,7 +243,6 @@ public class TestScannerWithBulkload { latch.await(); // By the time we do next() the bulk loaded files are also added to the kv // scanner - Result result = scanner.next(); scanAfterBulkLoad(scanner, result, "version1"); scanner.close(); table.close(); http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java index 2efc5ff..5511e79 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java @@ -937,7 +937,6 @@ public class TestAccessController extends SecureTestUtil { for (Result r = scanner.next(); r != null; r = scanner.next()) { // do nothing } - } catch (IOException e) { } finally { scanner.close(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController2.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController2.java index b939156..6e1e09c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController2.java @@ -384,6 +384,7 @@ public class TestAccessController2 extends SecureTestUtil { Scan s1 = new Scan(); s1.addFamily(TEST_FAMILY_2); try (ResultScanner scanner1 = table.getScanner(s1);) { + scanner1.next(); } } return null; @@ -414,6 +415,7 @@ public class TestAccessController2 extends SecureTestUtil { Scan s1 = new Scan(); s1.addFamily(TEST_FAMILY_2); try (ResultScanner scanner1 = table.getScanner(s1);) { + scanner1.next(); } } return null; @@ -428,6 +430,7 @@ public class TestAccessController2 extends SecureTestUtil { Scan s1 = new Scan(); s1.addColumn(TEST_FAMILY, Q2); try (ResultScanner scanner1 = table.getScanner(s1);) { + scanner1.next(); } } return null; http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa0cea2/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/MobSnapshotTestingUtils.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/MobSnapshotTestingUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/MobSnapshotTestingUtils.java index 9e332d6..d301214 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/MobSnapshotTestingUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/MobSnapshotTestingUtils.java @@ -20,13 +20,21 @@ package org.apache.hadoop.hbase.snapshot; import static org.junit.Assert.assertEquals; import java.io.IOException; -import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.BloomType; import org.junit.Assert; @@ -100,24 +108,22 @@ public class MobSnapshotTestingUtils { /** * Return the number of rows in the given table. */ - public static int countMobRows(final Table table, final byte[]... families) - throws IOException { + public static int countMobRows(final Table table, final byte[]... families) throws IOException { Scan scan = new Scan(); for (byte[] family : families) { scan.addFamily(family); } - ResultScanner results = table.getScanner(scan); - int count = 0; - for (Result res : results) { - count++; - List<Cell> cells = res.listCells(); - for (Cell cell : cells) { - // Verify the value - Assert.assertTrue(CellUtil.cloneValue(cell).length > 0); + try (ResultScanner results = table.getScanner(scan)) { + int count = 0; + for (Result res; (res = results.next()) != null;) { + count++; + for (Cell cell : res.listCells()) { + // Verify the value + Assert.assertTrue(CellUtil.cloneValue(cell).length > 0); + } } + return count; } - results.close(); - return count; } public static void verifyMobRowCount(final HBaseTestingUtility util,
