Repository: kudu Updated Branches: refs/heads/master 72a77bfbf -> 42db87b0b
KUDU-2095: [java] Add scanner keepAlive API to the Java client This patch adds keepAlive methods to the AsyncKuduScanner and KuduScanner. These methods leverage a package private method added to the AsyncKuduClient using a similar implementation pattern to existing scan related RPCs. The behavior of this implementation mimics the C++ client. Change-Id: Ic802f556c8860cdd43ef5f794c8f3658259bd0be Reviewed-on: http://gerrit.cloudera.org:8080/11436 Reviewed-by: Adar Dembo <a...@cloudera.com> Tested-by: Grant Henke <granthe...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/42db87b0 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/42db87b0 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/42db87b0 Branch: refs/heads/master Commit: 42db87b0b128c573b96e39615e7fa41227fea368 Parents: 72a77bf Author: Grant Henke <granthe...@apache.org> Authored: Thu Sep 13 14:05:55 2018 -0500 Committer: Grant Henke <granthe...@apache.org> Committed: Sat Sep 15 22:12:37 2018 +0000 ---------------------------------------------------------------------- .../org/apache/kudu/client/AsyncKuduClient.java | 30 ++- .../apache/kudu/client/AsyncKuduScanner.java | 88 ++++++- .../org/apache/kudu/client/KuduScanner.java | 25 +- .../org/apache/kudu/client/TestKuduClient.java | 233 +++++++++++++++---- .../apache/kudu/client/TestRemoteTablet.java | 21 ++ src/kudu/client/client.h | 13 +- 6 files changed, 347 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/42db87b0/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java index 8c1e032..62425a4 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java @@ -1004,7 +1004,7 @@ public class AsyncKuduClient implements AutoCloseable { /** * Package-private access point for {@link AsyncKuduScanner}s to close themselves. - * @param scanner the scanner to close + * @param scanner the scanner to close. * @return a deferred object that indicates the completion of the request. * The {@link AsyncKuduScanner.Response} can contain rows that were left to scan. */ @@ -1028,6 +1028,34 @@ public class AsyncKuduClient implements AutoCloseable { } /** + * Package-private access point for {@link AsyncKuduScanner}s to keep themselves + * alive on tablet servers. + * @param scanner the scanner to keep alive. + * @return a deferred object that indicates the completion of the request. + */ + Deferred<Void> keepAlive(final AsyncKuduScanner scanner) { + checkIsClosed(); + final RemoteTablet tablet = scanner.currentTablet(); + // Getting a null tablet here without being in a closed state means we were in between tablets. + // If there is no scanner to keep alive, we still return Status.OK(). + if (tablet == null) { + return Deferred.fromResult(null); + } + + final KuduRpc<Void> keepAliveRequest = scanner.getKeepAliveRequest(); + final ServerInfo info = tablet.getReplicaSelectedServerInfo(keepAliveRequest.getReplicaSelection()); + if (info == null) { + return Deferred.fromResult(null); + } + + final Deferred<Void> d = keepAliveRequest.getDeferred(); + keepAliveRequest.attempt++; + RpcProxy.sendRpc(this, connectionCache.getConnection( + info, Connection.CredentialsPolicy.ANY_CREDENTIALS), keepAliveRequest); + return d; + } + + /** * Sends the provided {@link KuduRpc} to the tablet server hosting the leader * of the tablet identified by the RPC's table and partition key. * http://git-wip-us.apache.org/repos/asf/kudu/blob/42db87b0/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java index dd61bf4..804978e 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java @@ -43,6 +43,8 @@ import com.google.protobuf.Message; import com.google.protobuf.UnsafeByteOperations; import com.stumbleupon.async.Callback; import com.stumbleupon.async.Deferred; +import org.apache.kudu.tserver.Tserver.ScannerKeepAliveRequestPB; +import org.apache.kudu.tserver.Tserver.ScannerKeepAliveResponsePB; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; @@ -662,14 +664,6 @@ public final class AsyncKuduScanner { } /** - * Sets the name of the tabletSlice that's hosting {@code this.start_key}. - * @param tablet The tabletSlice we're currently supposed to be scanning. - */ - void setTablet(final RemoteTablet tablet) { - this.tablet = tablet; - } - - /** * Invalidates this scanner and makes it assume it's no longer opened. * When a TabletServer goes away while we're scanning it, or some other type * of access problem happens, this method should be called so that the @@ -704,6 +698,31 @@ public final class AsyncKuduScanner { } /** + * Keep the current remote scanner alive. + * <p> + * Keep the current remote scanner alive on the Tablet server for an + * additional time-to-live. This is useful if the interval in between + * nextRows() calls is big enough that the remote scanner might be garbage + * collected. The scanner time-to-live can be configured on the tablet + * server via the --scanner_ttl_ms configuration flag and has a default + * of 60 seconds. + * <p> + * This does not invalidate any previously fetched results. + * <p> + * Note that an error returned by this method should not be taken as indication + * that the scan has failed. Subsequent calls to nextRows() might still be successful, + * particularly if the scanner is configured to be fault tolerant. + * @return A deferred object that indicates the completion of the request. + * @throws IllegalStateException if the scanner is already closed. + */ + public Deferred<Void> keepAlive() { + if (closed) { + throw new IllegalStateException("Scanner has already been closed"); + } + return client.keepAlive(this); + } + + /** * Returns an RPC to fetch the next rows. */ KuduRpc<Response> getNextRowsRequest() { @@ -718,6 +737,14 @@ public final class AsyncKuduScanner { } /** + * Returns an RPC to keep this scanner alive on the tablet server. + * @return a new {@link KeepAliveRequest} + */ + KuduRpc<Void> getKeepAliveRequest() { + return new KeepAliveRequest(table, tablet); + } + + /** * Throws an exception if scanning already started. * @throws IllegalStateException if scanning already started. */ @@ -796,6 +823,51 @@ public final class AsyncKuduScanner { } /** + * RPC sent out to keep a scanner alive on a TabletServer. + */ + final class KeepAliveRequest extends KuduRpc<Void> { + + KeepAliveRequest(KuduTable table, RemoteTablet tablet) { + super(table); + setTablet(tablet); + this.setTimeoutMillis(scanRequestTimeout); + } + + @Override + String serviceName() { + return TABLET_SERVER_SERVICE_NAME; + } + + @Override + String method() { + return "ScannerKeepAlive"; + } + + @Override + ReplicaSelection getReplicaSelection() { + return replicaSelection; + } + + /** Serializes this request. */ + @Override + Message createRequestPB() { + final ScannerKeepAliveRequestPB.Builder builder = ScannerKeepAliveRequestPB.newBuilder(); + builder.setScannerId(UnsafeByteOperations.unsafeWrap(scannerId)); + return builder.build(); + } + + @Override + Pair<Void, Object> deserialize(final CallResponse callResponse, + String tsUUID) throws KuduException { + ScannerKeepAliveResponsePB.Builder builder = ScannerKeepAliveResponsePB.newBuilder(); + readProtobuf(callResponse.getPBMessage(), builder); + ScannerKeepAliveResponsePB resp = builder.build(); + TabletServerErrorPB error = resp.hasError() ? resp.getError() : null; + return new Pair<Void, Object>(null, error); + } + } + + /** * RPC sent out to fetch the next rows from the TabletServer. */ final class ScanRequest extends KuduRpc<Response> { http://git-wip-us.apache.org/repos/asf/kudu/blob/42db87b0/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java index 13602a5..209fada 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java @@ -51,18 +51,39 @@ public class KuduScanner { * {@code Scanner} is done scanning), calling it again leads to an undefined * behavior. * @return a list of rows. - * @throws KuduException if anything went wrong + * @throws KuduException if anything went wrong. */ public RowResultIterator nextRows() throws KuduException { return KuduClient.joinAndHandleException(asyncScanner.nextRows()); } /** + * Keep the current remote scanner alive. + * <p> + * Keep the current remote scanner alive on the Tablet server for an + * additional time-to-live. This is useful if the interval in between + * nextRows() calls is big enough that the remote scanner might be garbage + * collected. The scanner time-to-live can be configured on the tablet + * server via the --scanner_ttl_ms configuration flag and has a default + * of 60 seconds. + * <p> + * This does not invalidate any previously fetched results. + * <p> + * Note that an exception thrown by this method should not be taken as indication + * that the scan has failed. Subsequent calls to nextRows() might still be successful, + * particularly if the scanner is configured to be fault tolerant. + * @throws KuduException if anything went wrong. + */ + public final void keepAlive() throws KuduException { + KuduClient.joinAndHandleException(asyncScanner.keepAlive()); + } + + /** * Closes this scanner (don't forget to call this when you're done with it!). * <p> * Closing a scanner already closed has no effect. * @return a deferred object that indicates the completion of the request - * @throws KuduException if anything went wrong + * @throws KuduException if anything went wrong. */ public RowResultIterator close() throws KuduException { return KuduClient.joinAndHandleException(asyncScanner.close()); http://git-wip-us.apache.org/repos/asf/kudu/blob/42db87b0/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java index 282ec03..80c0843 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java @@ -53,20 +53,44 @@ import java.util.concurrent.Future; import com.google.common.collect.ImmutableList; import com.stumbleupon.async.Deferred; +import org.apache.kudu.client.MiniKuduCluster.MiniKuduClusterBuilder; import org.apache.kudu.util.TimestampUtil; -import org.junit.Before; +import org.junit.Rule; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.junit.rules.TestName; import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.Type; import org.apache.kudu.util.CapturingLogAppender; import org.apache.kudu.util.DecimalUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestKuduClient extends BaseKuduTest { - private static final String tableName = "TestKuduClient"; + private static final Logger LOG = LoggerFactory.getLogger(TestKuduClient.class); + + private static final String TABLE_NAME = "TestKuduClient"; + + private static final int SHORT_SCANNER_TTL_MS = 5000; + private static final int SHORT_SCANNER_GC_US = SHORT_SCANNER_TTL_MS * 100; // 10% of the TTL. + + @Rule + public TestName testName = new TestName(); + + @Override + protected MiniKuduClusterBuilder getMiniClusterBuilder() { + MiniKuduClusterBuilder builder = super.getMiniClusterBuilder(); + // Set a short scanner ttl for some tests. + if ("testKeepAlive".equals(testName.getMethodName()) || + "testScannerExpiration".equals(testName.getMethodName()) + ) { + LOG.info("Overriding scanner TTL and GC for testKeepAlive"); + builder.addTserverFlag(String.format("--scanner_ttl_ms=%d", SHORT_SCANNER_TTL_MS)); + builder.addTserverFlag(String.format("--scanner_gc_check_interval_us=%d", SHORT_SCANNER_GC_US)); + } + return builder; + } /** * Test setting and reading the most recent propagated timestamp. @@ -74,7 +98,7 @@ public class TestKuduClient extends BaseKuduTest { @Test(timeout = 100000) public void testLastPropagatedTimestamps() throws Exception { // Scan a table to ensure a timestamp is propagated. - KuduTable table = syncClient.createTable(tableName, basicSchema, getBasicCreateTableOptions()); + KuduTable table = syncClient.createTable(TABLE_NAME, basicSchema, getBasicCreateTableOptions()); syncClient.newScannerBuilder(table).build().nextRows().getNumRows(); assertTrue(syncClient.hasLastPropagatedTimestamp()); assertTrue(client.hasLastPropagatedTimestamp()); @@ -104,22 +128,22 @@ public class TestKuduClient extends BaseKuduTest { @Test(timeout = 100000) public void testCreateDeleteTable() throws Exception { // Check that we can create a table. - syncClient.createTable(tableName, basicSchema, getBasicCreateTableOptions()); + syncClient.createTable(TABLE_NAME, basicSchema, getBasicCreateTableOptions()); assertFalse(syncClient.getTablesList().getTablesList().isEmpty()); - assertTrue(syncClient.getTablesList().getTablesList().contains(tableName)); + assertTrue(syncClient.getTablesList().getTablesList().contains(TABLE_NAME)); // Check that we can delete it. - syncClient.deleteTable(tableName); - assertFalse(syncClient.getTablesList().getTablesList().contains(tableName)); + syncClient.deleteTable(TABLE_NAME); + assertFalse(syncClient.getTablesList().getTablesList().contains(TABLE_NAME)); // Check that we can re-recreate it, with a different schema. List<ColumnSchema> columns = new ArrayList<>(basicSchema.getColumns()); columns.add(new ColumnSchema.ColumnSchemaBuilder("one more", Type.STRING).build()); Schema newSchema = new Schema(columns); - syncClient.createTable(tableName, newSchema, getBasicCreateTableOptions()); + syncClient.createTable(TABLE_NAME, newSchema, getBasicCreateTableOptions()); // Check that we can open a table and see that it has the new schema. - KuduTable table = syncClient.openTable(tableName); + KuduTable table = syncClient.openTable(TABLE_NAME); assertEquals(newSchema.getColumnCount(), table.getSchema().getColumnCount()); assertTrue(table.getPartitionSchema().isSimpleRangePartitioning()); @@ -131,7 +155,6 @@ public class TestKuduClient extends BaseKuduTest { newSchema.getColumn("column3_s").getCompressionAlgorithm()); } - /** * Test creating a table with various invalid schema cases. */ @@ -148,7 +171,7 @@ public class TestKuduClient extends BaseKuduTest { } Schema schema = new Schema(cols); try { - syncClient.createTable(tableName, schema, getBasicCreateTableOptions()); + syncClient.createTable(TABLE_NAME, schema, getBasicCreateTableOptions()); fail(); } catch (NonRecoverableException nre) { assertThat(nre.toString(), containsString( @@ -156,6 +179,122 @@ public class TestKuduClient extends BaseKuduTest { } } + /* + * Test the scanner behavior when a scanner is used beyond + * the scanner ttl without calling keepAlive. + * Note: The getMiniClusterBuilder override above depends on this method name. + */ + @Test(timeout = 100000) + public void testScannerExpiration() throws Exception { + // Create a basic table and load it with data. + int numRows = 1000; + syncClient.createTable( + TABLE_NAME, + basicSchema, + new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 2)); + KuduSession session = syncClient.newSession(); + KuduTable table = syncClient.openTable(TABLE_NAME); + + for (int i = 0; i < numRows; i++) { + Insert insert = createBasicSchemaInsert(table, i); + session.apply(insert); + } + + KuduScanner scanner = new KuduScanner.KuduScannerBuilder(client, table) + .replicaSelection(ReplicaSelection.CLOSEST_REPLICA) + .batchSizeBytes(100) // Use a small batch size so we can call nextRows many times. + .build(); + + // Initialize the scanner and verify we can read rows. + int rows = scanner.nextRows().getNumRows(); + assertTrue("Scanner did not read any rows", rows > 0); + + // Wait for the scanner to time out. + Thread.sleep(SHORT_SCANNER_TTL_MS * 2); + + try { + scanner.nextRows(); + fail("Exception was not thrown when accessing an expired scanner"); + } catch (NonRecoverableException ex) { + assertThat(ex.getMessage(), containsString("Scanner not found")); + } + + // Closing an expired scanner shouldn't throw an exception. + scanner.close(); + } + + /* + * Test keeping a scanner alive beyond scanner ttl. + * Note: The getMiniClusterBuilder override above depends on this method name. + */ + @Test(timeout = 100000) + public void testKeepAlive() throws Exception { + // Create a basic table and load it with data. + int numRows = 1000; + syncClient.createTable( + TABLE_NAME, + basicSchema, + new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 2)); + KuduSession session = syncClient.newSession(); + KuduTable table = syncClient.openTable(TABLE_NAME); + + for (int i = 0; i < numRows; i++) { + Insert insert = createBasicSchemaInsert(table, i); + session.apply(insert); + } + + KuduScanner scanner = new KuduScanner.KuduScannerBuilder(client, table) + .replicaSelection(ReplicaSelection.CLOSEST_REPLICA) + .batchSizeBytes(100) // Use a small batch size so we can call nextRows many times. + .build(); + + // KeepAlive on uninitialized scanner should be ok. + scanner.keepAlive(); + // Get the first batch and initialize the scanner + int accum = scanner.nextRows().getNumRows(); + + while (scanner.hasMoreRows()) { + int rows = scanner.nextRows().getNumRows(); + accum += rows; + // Break when we are between tablets. + if (scanner.currentTablet() == null) { + LOG.info(String.format("Between tablets after scanning %d rows", accum)); + break; + } + // Ensure we actually end up between tablets. + if (accum == numRows) { + fail("All rows were in a single tablet."); + } + } + + // In between scanners now and should be ok. + scanner.keepAlive(); + + // Initialize the next scanner or keepAlive will have no effect. + accum += scanner.nextRows().getNumRows(); + + // Wait for longer than the scanner ttl calling keepAlive throughout. + // Each loop sleeps 25% of the scanner ttl and we loop 10 times to ensure + // we extend over 2x the scanner ttl. + for (int i = 0; i < 10; i++) { + Thread.sleep(SHORT_SCANNER_TTL_MS / 4); + scanner.keepAlive(); + } + + // Finish out the rows. + while (scanner.hasMoreRows()) { + accum += scanner.nextRows().getNumRows(); + } + assertEquals("All rows were not scanned", numRows, accum); + + // At this point the scanner is closed and there is nothing to keep alive. + try { + scanner.keepAlive(); + fail("Exception was not thrown when calling keepAlive on a closed scanner"); + } catch (IllegalStateException ex) { + assertThat(ex.getMessage(), containsString("Scanner has already been closed")); + } + } /** * Test creating a table with columns with different combinations of NOT NULL and @@ -187,9 +326,9 @@ public class TestKuduClient extends BaseKuduTest { .defaultValue("def") .build()); Schema schema = new Schema(cols); - syncClient.createTable(tableName, schema, getBasicCreateTableOptions()); + syncClient.createTable(TABLE_NAME, schema, getBasicCreateTableOptions()); KuduSession session = syncClient.newSession(); - KuduTable table = syncClient.openTable(tableName); + KuduTable table = syncClient.openTable(TABLE_NAME); // Insert various rows. '-' indicates leaving the row unset in the insert. List<String> rows = ImmutableList.of( @@ -244,10 +383,10 @@ public class TestKuduClient extends BaseKuduTest { @Test(timeout = 100000) public void testStrings() throws Exception { Schema schema = createManyStringsSchema(); - syncClient.createTable(tableName, schema, getBasicCreateTableOptions()); + syncClient.createTable(TABLE_NAME, schema, getBasicCreateTableOptions()); KuduSession session = syncClient.newSession(); - KuduTable table = syncClient.openTable(tableName); + KuduTable table = syncClient.openTable(TABLE_NAME); for (int i = 0; i < 100; i++) { Insert insert = table.newInsert(); PartialRow row = insert.getRow(); @@ -298,10 +437,10 @@ public class TestKuduClient extends BaseKuduTest { @Test(timeout = 100000) public void testUTF8() throws Exception { Schema schema = createManyStringsSchema(); - syncClient.createTable(tableName, schema, getBasicCreateTableOptions()); + syncClient.createTable(TABLE_NAME, schema, getBasicCreateTableOptions()); KuduSession session = syncClient.newSession(); - KuduTable table = syncClient.openTable(tableName); + KuduTable table = syncClient.openTable(TABLE_NAME); Insert insert = table.newInsert(); PartialRow row = insert.getRow(); row.addString("key", "à¸à¸à¸à¸à¸ à¸à¸"); // some thai @@ -325,12 +464,12 @@ public class TestKuduClient extends BaseKuduTest { @Test(timeout = 100000) public void testBinaryColumns() throws Exception { Schema schema = createSchemaWithBinaryColumns(); - syncClient.createTable(tableName, schema, getBasicCreateTableOptions()); + syncClient.createTable(TABLE_NAME, schema, getBasicCreateTableOptions()); byte[] testArray = new byte[] {1, 2, 3, 4, 5, 6 ,7, 8, 9}; KuduSession session = syncClient.newSession(); - KuduTable table = syncClient.openTable(tableName); + KuduTable table = syncClient.openTable(TABLE_NAME); for (int i = 0; i < 100; i++) { Insert insert = table.newInsert(); PartialRow row = insert.getRow(); @@ -368,12 +507,12 @@ public class TestKuduClient extends BaseKuduTest { @Test(timeout = 100000) public void testTimestampColumns() throws Exception { Schema schema = createSchemaWithTimestampColumns(); - syncClient.createTable(tableName, schema, getBasicCreateTableOptions()); + syncClient.createTable(TABLE_NAME, schema, getBasicCreateTableOptions()); List<Long> timestamps = new ArrayList<>(); KuduSession session = syncClient.newSession(); - KuduTable table = syncClient.openTable(tableName); + KuduTable table = syncClient.openTable(TABLE_NAME); long lastTimestamp = 0; for (int i = 0; i < 100; i++) { Insert insert = table.newInsert(); @@ -416,10 +555,10 @@ public class TestKuduClient extends BaseKuduTest { @Test(timeout = 100000) public void testDecimalColumns() throws Exception { Schema schema = createSchemaWithDecimalColumns(); - syncClient.createTable(tableName, schema, getBasicCreateTableOptions()); + syncClient.createTable(TABLE_NAME, schema, getBasicCreateTableOptions()); KuduSession session = syncClient.newSession(); - KuduTable table = syncClient.openTable(tableName); + KuduTable table = syncClient.openTable(TABLE_NAME); // Verify ColumnTypeAttributes assertEquals(DecimalUtil.MAX_DECIMAL128_PRECISION, @@ -455,8 +594,8 @@ public class TestKuduClient extends BaseKuduTest { */ @Test public void testScanWithLimit() throws Exception { - syncClient.createTable(tableName, basicSchema, getBasicTableOptionsWithNonCoveredRange()); - KuduTable table = syncClient.openTable(tableName); + syncClient.createTable(TABLE_NAME, basicSchema, getBasicTableOptionsWithNonCoveredRange()); + KuduTable table = syncClient.openTable(TABLE_NAME); KuduSession session = syncClient.newSession(); int num_rows = 100; for (int key = 0; key < num_rows; key++) { @@ -505,11 +644,11 @@ public class TestKuduClient extends BaseKuduTest { @Test public void testScanWithPredicates() throws Exception { Schema schema = createManyStringsSchema(); - syncClient.createTable(tableName, schema, getBasicCreateTableOptions()); + syncClient.createTable(TABLE_NAME, schema, getBasicCreateTableOptions()); KuduSession session = syncClient.newSession(); session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND); - KuduTable table = syncClient.openTable(tableName); + KuduTable table = syncClient.openTable(TABLE_NAME); for (int i = 0; i < 100; i++) { Insert insert = table.newInsert(); PartialRow row = insert.getRow(); @@ -628,11 +767,11 @@ public class TestKuduClient extends BaseKuduTest { */ @Test(timeout = 100000) public void testScanNonCoveredTable() throws Exception { - syncClient.createTable(tableName, basicSchema, getBasicTableOptionsWithNonCoveredRange()); + syncClient.createTable(TABLE_NAME, basicSchema, getBasicTableOptionsWithNonCoveredRange()); KuduSession session = syncClient.newSession(); session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND); - KuduTable table = syncClient.openTable(tableName); + KuduTable table = syncClient.openTable(TABLE_NAME); for (int key = 0; key < 100; key++) { session.apply(createBasicSchemaInsert(table, key)); @@ -657,8 +796,8 @@ public class TestKuduClient extends BaseKuduTest { @Test(timeout = 100000) public void testAutoClose() throws Exception { try (KuduClient localClient = new KuduClient.KuduClientBuilder(masterAddresses).build()) { - localClient.createTable(tableName, basicSchema, getBasicCreateTableOptions()); - KuduTable table = localClient.openTable(tableName); + localClient.createTable(TABLE_NAME, basicSchema, getBasicCreateTableOptions()); + KuduTable table = localClient.openTable(TABLE_NAME); KuduSession session = localClient.newSession(); session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH); @@ -666,7 +805,7 @@ public class TestKuduClient extends BaseKuduTest { session.apply(insert); } - KuduTable table = syncClient.openTable(tableName); + KuduTable table = syncClient.openTable(TABLE_NAME); AsyncKuduScanner scanner = new AsyncKuduScanner.AsyncKuduScannerBuilder(client, table).build(); assertEquals(1, countRowsInScan(scanner)); } @@ -730,7 +869,7 @@ public class TestKuduClient extends BaseKuduTest { .build(); long buildTime = (System.nanoTime() - startTime) / 1000000000L; assertTrue("Building KuduClient is slow, maybe netty get stuck", buildTime < 3); - localClient.createTable(tableName, basicSchema, getBasicCreateTableOptions()); + localClient.createTable(TABLE_NAME, basicSchema, getBasicCreateTableOptions()); Thread[] threads = new Thread[4]; for (int t = 0; t < 4; t++) { final int id = t; @@ -738,7 +877,7 @@ public class TestKuduClient extends BaseKuduTest { @Override public void run() { try { - KuduTable table = localClient.openTable(tableName); + KuduTable table = localClient.openTable(TABLE_NAME); KuduSession session = localClient.newSession(); session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC); for (int i = 0; i < 100; i++) { @@ -761,7 +900,7 @@ public class TestKuduClient extends BaseKuduTest { @Test(expected=IllegalArgumentException.class) public void testNoDefaultPartitioning() throws Exception { - syncClient.createTable(tableName, basicSchema, new CreateTableOptions()); + syncClient.createTable(TABLE_NAME, basicSchema, new CreateTableOptions()); } @Test(timeout = 100000) @@ -773,8 +912,8 @@ public class TestKuduClient extends BaseKuduTest { upper.addInt("key", 1); options.addRangePartition(lower, upper); - syncClient.createTable(tableName, basicSchema, options); - KuduTable table = syncClient.openTable(tableName); + syncClient.createTable(TABLE_NAME, basicSchema, options); + KuduTable table = syncClient.openTable(TABLE_NAME); // Count the number of tablets. KuduScanToken.KuduScanTokenBuilder tokenBuilder = syncClient.newScanTokenBuilder(table); @@ -791,7 +930,7 @@ public class TestKuduClient extends BaseKuduTest { upper = basicSchema.newPartialRow(); lower.addInt("key", 1); alter.addRangePartition(lower, upper); - alterClient.alterTable(tableName, alter); + alterClient.alterTable(TABLE_NAME, alter); } // Count the number of tablets. The result should still be the same, since @@ -801,7 +940,7 @@ public class TestKuduClient extends BaseKuduTest { assertEquals(1, tokens.size()); // Reopen the table and count the tablets again. The new tablet should now show up. - table = syncClient.openTable(tableName); + table = syncClient.openTable(TABLE_NAME); tokenBuilder = syncClient.newScanTokenBuilder(table); tokens = tokenBuilder.build(); assertEquals(2, tokens.size()); @@ -810,7 +949,7 @@ public class TestKuduClient extends BaseKuduTest { @Test(timeout = 100000) public void testCreateTableWithConcurrentInsert() throws Exception { KuduTable table = syncClient.createTable( - tableName, createManyStringsSchema(), getBasicCreateTableOptions().setWait(false)); + TABLE_NAME, createManyStringsSchema(), getBasicCreateTableOptions().setWait(false)); // Insert a row. // @@ -827,13 +966,13 @@ public class TestKuduClient extends BaseKuduTest { // This won't do anything useful (i.e. if the insert succeeds, we know the // table has been created), but it's here for additional code coverage. - assertTrue(syncClient.isCreateTableDone(tableName)); + assertTrue(syncClient.isCreateTableDone(TABLE_NAME)); } @Test(timeout = 100000) public void testCreateTableWithConcurrentAlter() throws Exception { // Kick off an asynchronous table creation. - Deferred<KuduTable> d = client.createTable(tableName, + Deferred<KuduTable> d = client.createTable(TABLE_NAME, createManyStringsSchema(), getBasicCreateTableOptions()); // Rename the table that's being created to make sure it doesn't interfere @@ -843,7 +982,7 @@ public class TestKuduClient extends BaseKuduTest { // actually exists. while (true) { try { - syncClient.alterTable(tableName, + syncClient.alterTable(TABLE_NAME, new AlterTableOptions().renameTable("foo")); break; } catch (KuduException e) { @@ -894,7 +1033,7 @@ public class TestKuduClient extends BaseKuduTest { final ReplicaSelection replicaSelection) throws Exception { Schema schema = createManyStringsSchema(); - syncClient.createTable(tableName, schema, getBasicCreateTableOptions()); + syncClient.createTable(TABLE_NAME, schema, getBasicCreateTableOptions()); final int tasksNum = 4; List<Callable<Void>> callables = new ArrayList<>(); @@ -906,7 +1045,7 @@ public class TestKuduClient extends BaseKuduTest { // in the given flush mode. KuduSession session = syncClient.newSession(); session.setFlushMode(flushMode); - KuduTable table = syncClient.openTable(tableName); + KuduTable table = syncClient.openTable(TABLE_NAME); for (int i = 0; i < 3; i++) { for (int j = 100 * i; j < 100 * (i + 1); j++) { Insert insert = table.newInsert(); http://git-wip-us.apache.org/repos/asf/kudu/blob/42db87b0/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java index 9ba6d00..c835f27 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java @@ -120,6 +120,27 @@ public class TestRemoteTablet { tablet.getReplicaSelectedServerInfo(ReplicaSelection.CLOSEST_REPLICA).getUuid()); } + // AsyncKuduClient has methods like scanNextRows, keepAlive, and closeScanner that rely on + // RemoteTablet.getReplicaSelectedServerInfo to be deterministic given the same state. + // This ensures follow up calls are routed to the same server with the scanner open. + // This test ensures that remains true. + @Test + public void testGetReplicaSelectedServerInfoDeterminism() { + RemoteTablet tabletWithLocal = getTablet(0, 0); + verifyGetReplicaSelectedServerInfoDeterminism(tabletWithLocal); + + RemoteTablet tabletWithRemote = getTablet(0, -1); + verifyGetReplicaSelectedServerInfoDeterminism(tabletWithRemote); + } + + private void verifyGetReplicaSelectedServerInfoDeterminism(RemoteTablet tablet) { + String init = tablet.getReplicaSelectedServerInfo(ReplicaSelection.CLOSEST_REPLICA).getUuid(); + for (int i = 0; i < 10; i++) { + String next = tablet.getReplicaSelectedServerInfo(ReplicaSelection.CLOSEST_REPLICA).getUuid(); + assertEquals("getReplicaSelectedServerInfo was not deterministic", init, next); + } + } + @Test public void testToString() { RemoteTablet tablet = getTablet(0, 1); http://git-wip-us.apache.org/repos/asf/kudu/blob/42db87b0/src/kudu/client/client.h ---------------------------------------------------------------------- diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h index 795aae5..e479aa9 100644 --- a/src/kudu/client/client.h +++ b/src/kudu/client/client.h @@ -1953,11 +1953,13 @@ class KUDU_EXPORT KuduScanner { /// Keep the current remote scanner alive. /// - /// Keep the current remote scanner alive on the Tablet server - /// for an additional time-to-live (set by a configuration flag on - /// the tablet server). This is useful if the interval in between + /// Keep the current remote scanner alive on the Tablet server for an + /// additional time-to-live. This is useful if the interval in between /// NextBatch() calls is big enough that the remote scanner might be garbage - /// collected (default TTL is set to 60 secs.). + /// collected. The scanner time-to-live can be configured on the tablet + /// server via the --scanner_ttl_ms configuration flag and has a default + /// of 60 seconds. + /// /// This does not invalidate any previously fetched results. /// /// @return Operation result status. In particular, this method returns @@ -1965,7 +1967,8 @@ class KUDU_EXPORT KuduScanner { /// TabletServer was unreachable, for any reason. Note that a non-OK /// status returned by this method should not be taken as indication /// that the scan has failed. Subsequent calls to NextBatch() might - /// still be successful, particularly if SetFaultTolerant() has been called. + /// still be successful, particularly if the scanner is configured to be + /// fault tolerant. Status KeepAlive(); /// Close the scanner.