KUDU-1715. Add a way to set ReplicaSelection to the java client This patch adds a ReplicaSelection option for all the RPCs, which always defaults to LEADER_ONLY, as well as a way to change it for scanners.
Change-Id: I3bb08c9c78271a0065c8aa8fb9b0f3301f84e828 Reviewed-on: http://gerrit.cloudera.org:8080/4837 Tested-by: Kudu Jenkins Reviewed-by: Jean-Daniel Cryans <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/a7fd4fc8 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/a7fd4fc8 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/a7fd4fc8 Branch: refs/heads/master Commit: a7fd4fc85c0878570a5ab51a8e28275fd4459ff5 Parents: a326fc9 Author: Jean-Daniel Cryans <[email protected]> Authored: Tue Oct 25 09:51:42 2016 -0700 Committer: Jean-Daniel Cryans <[email protected]> Committed: Thu Oct 27 18:40:35 2016 +0000 ---------------------------------------------------------------------- .../kudu/client/AbstractKuduScannerBuilder.java | 12 +++++++ .../org/apache/kudu/client/AsyncKuduClient.java | 17 +++++---- .../apache/kudu/client/AsyncKuduScanner.java | 21 +++++++----- .../java/org/apache/kudu/client/KuduRpc.java | 4 +++ .../org/apache/kudu/client/KuduScanner.java | 2 +- .../org/apache/kudu/client/RemoteTablet.java | 17 +++++++++ .../apache/kudu/client/ReplicaSelection.java | 36 ++++++++++++++++++++ .../apache/kudu/client/TestRemoteTablet.java | 8 +++++ .../kudu/client/TestScannerMultiTablet.java | 15 ++++++++ 9 files changed, 115 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/a7fd4fc8/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java index c23e07e..f58ae41 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java @@ -54,6 +54,7 @@ public abstract class AbstractKuduScannerBuilder List<String> projectedColumnNames = null; List<Integer> projectedColumnIndexes = null; long scanRequestTimeout; + ReplicaSelection replicaSelection = ReplicaSelection.LEADER_ONLY; AbstractKuduScannerBuilder(AsyncKuduClient client, KuduTable table) { this.client = client; @@ -313,6 +314,17 @@ public abstract class AbstractKuduScannerBuilder } /** + * Sets the replica selection mechanism for this scanner. The default is to read from the + * currently known leader. + * @param replicaSelection replication selection mechanism to use + * @return this instance + */ + public S replicaSelection(ReplicaSelection replicaSelection) { + this.replicaSelection = replicaSelection; + return (S) this; + } + + /** * Set an encoded (inclusive) start partition key for the scan. * * @param partitionKey the encoded partition key http://git-wip-us.apache.org/repos/asf/kudu/blob/a7fd4fc8/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java index b36737b..35cd9d6 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java @@ -586,8 +586,9 @@ public class AsyncKuduClient implements AutoCloseable { Deferred<AsyncKuduScanner.Response> scanNextRows(final AsyncKuduScanner scanner) { RemoteTablet tablet = scanner.currentTablet(); assert (tablet != null); - TabletClient client = connectionCache.getClient(tablet.getLeaderUUID()); KuduRpc<AsyncKuduScanner.Response> nextRequest = scanner.getNextRowsRequest(); + TabletClient client = + connectionCache.getClient(tablet.getReplicaSelectedUUID(nextRequest.getReplicaSelection())); Deferred<AsyncKuduScanner.Response> d = nextRequest.getDeferred(); // Important to increment the attempts before the next if statement since // getSleepTimeForRpc() relies on it if the client is null or dead. @@ -615,7 +616,9 @@ public class AsyncKuduClient implements AutoCloseable { return Deferred.fromResult(null); } - final TabletClient client = connectionCache.getClient(tablet.getLeaderUUID()); + final KuduRpc<AsyncKuduScanner.Response> closeRequest = scanner.getCloseRequest(); + final TabletClient client = connectionCache.getClient( + tablet.getReplicaSelectedUUID(closeRequest.getReplicaSelection())); if (client == null || !client.isAlive()) { // Oops, we couldn't find a tablet server that hosts this tablet. Our // cache was probably invalidated while the client was scanning. So @@ -623,10 +626,10 @@ public class AsyncKuduClient implements AutoCloseable { LOG.warn("Cannot close {} properly, no connection open for {}", scanner, tablet); return Deferred.fromResult(null); } - final KuduRpc<AsyncKuduScanner.Response> close_request = scanner.getCloseRequest(); - final Deferred<AsyncKuduScanner.Response> d = close_request.getDeferred(); - close_request.attempt++; - client.sendRpc(close_request); + + final Deferred<AsyncKuduScanner.Response> d = closeRequest.getDeferred(); + closeRequest.attempt++; + client.sendRpc(closeRequest); return d; } @@ -670,7 +673,7 @@ public class AsyncKuduClient implements AutoCloseable { // If we found a tablet, we'll try to find the TS to talk to. if (entry != null) { RemoteTablet tablet = entry.getTablet(); - String uuid = tablet.getLeaderUUID(); + String uuid = tablet.getReplicaSelectedUUID(request.getReplicaSelection()); if (uuid != null) { Deferred<R> d = request.getDeferred(); request.setTablet(tablet); http://git-wip-us.apache.org/repos/asf/kudu/blob/a7fd4fc8/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java index c61fbdb..b51394a 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java @@ -178,6 +178,8 @@ public final class AsyncKuduScanner { private final long htTimestamp; + private final ReplicaSelection replicaSelection; + ///////////////////// // Runtime variables. ///////////////////// @@ -212,8 +214,6 @@ public final class AsyncKuduScanner { private Deferred<RowResultIterator> prefetcherDeferred; - private boolean inFirstTablet = true; - final long scanRequestTimeout; AsyncKuduScanner(AsyncKuduClient client, KuduTable table, List<String> projectedNames, @@ -222,7 +222,8 @@ public final class AsyncKuduScanner { Map<String, KuduPredicate> predicates, long limit, boolean cacheBlocks, boolean prefetching, byte[] startPrimaryKey, byte[] endPrimaryKey, - long htTimestamp, int batchSizeBytes, PartitionPruner pruner) { + long htTimestamp, int batchSizeBytes, PartitionPruner pruner, + ReplicaSelection replicaSelection) { checkArgument(batchSizeBytes > 0, "Need a strictly positive number of bytes, " + "got %s", batchSizeBytes); checkArgument(limit > 0, "Need a strictly positive number for the limit, " + @@ -280,6 +281,8 @@ public final class AsyncKuduScanner { this.hasMore = false; this.closed = true; } + + this.replicaSelection = replicaSelection; } /** @@ -586,11 +589,6 @@ public final class AsyncKuduScanner { */ KuduRpc<Response> getOpenRequest() { checkScanningNotStarted(); - // This is the only point where we know we haven't started scanning and where the scanner - // should be fully configured - if (this.inFirstTablet) { - this.inFirstTablet = false; - } return new ScanRequest(table, State.OPENING); } @@ -684,6 +682,11 @@ public final class AsyncKuduScanner { } } + @Override + ReplicaSelection getReplicaSelection() { + return replicaSelection; + } + /** Serializes this request. */ ChannelBuffer serialize(Message header) { final ScanRequestPB.Builder builder = ScanRequestPB.newBuilder(); @@ -818,7 +821,7 @@ public final class AsyncKuduScanner { client, table, projectedColumnNames, projectedColumnIndexes, readMode, orderMode, scanRequestTimeout, predicates, limit, cacheBlocks, prefetching, lowerBoundPrimaryKey, upperBoundPrimaryKey, - htTimestamp, batchSizeBytes, PartitionPruner.create(this)); + htTimestamp, batchSizeBytes, PartitionPruner.create(this), replicaSelection); } } } http://git-wip-us.apache.org/repos/asf/kudu/blob/a7fd4fc8/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java index 02b547e..8b5c0df 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java @@ -262,6 +262,10 @@ public abstract class KuduRpc<R> { return sequenceId; } + ReplicaSelection getReplicaSelection() { + return ReplicaSelection.LEADER_ONLY; + } + void setSequenceId(long sequenceId) { assert (this.sequenceId == RequestTracker.NO_SEQ_NO); this.sequenceId = sequenceId; http://git-wip-us.apache.org/repos/asf/kudu/blob/a7fd4fc8/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java index 4a4bcc1..1cccd10 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java @@ -142,7 +142,7 @@ public class KuduScanner { client, table, projectedColumnNames, projectedColumnIndexes, readMode, orderMode, scanRequestTimeout, predicates, limit, cacheBlocks, prefetching, lowerBoundPrimaryKey, upperBoundPrimaryKey, - htTimestamp, batchSizeBytes, PartitionPruner.create(this))); + htTimestamp, batchSizeBytes, PartitionPruner.create(this), replicaSelection)); } } } http://git-wip-us.apache.org/repos/asf/kudu/blob/a7fd4fc8/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java index 28be0fe..d98f21d 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java @@ -165,6 +165,23 @@ class RemoteTablet implements Comparable<RemoteTablet> { } /** + * Helper function to centralize the calling of methods based on the passed replica selection + * mechanism. + * @param replicaSelection replica selection mechanism to use + * @return a UUID for the server that matches the selection, can be null + */ + String getReplicaSelectedUUID(ReplicaSelection replicaSelection) { + switch (replicaSelection) { + case LEADER_ONLY: + return getLeaderUUID(); + case CLOSEST_REPLICA: + return getClosestUUID(); + default: + throw new RuntimeException("Unknown replica selection mechanism " + replicaSelection); + } + } + + /** * Gets the replicas of this tablet. The returned list may not be mutated. * @return the replicas of the tablet */ http://git-wip-us.apache.org/repos/asf/kudu/blob/a7fd4fc8/java/kudu-client/src/main/java/org/apache/kudu/client/ReplicaSelection.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ReplicaSelection.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ReplicaSelection.java new file mode 100644 index 0000000..e33ba34 --- /dev/null +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ReplicaSelection.java @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.kudu.client; + +import org.apache.kudu.annotations.InterfaceAudience; +import org.apache.kudu.annotations.InterfaceStability; + +/** + * Policy with which to choose amongst multiple replicas. + */ [email protected] [email protected] +public enum ReplicaSelection { + /** + * Select the LEADER replica. + */ + LEADER_ONLY, + /** + * Select the closest replica to the client, or a random one if all replicas are equidistant. + */ + CLOSEST_REPLICA +} http://git-wip-us.apache.org/repos/asf/kudu/blob/a7fd4fc8/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java index aa50ad6..ea77c27 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java @@ -101,6 +101,14 @@ public class TestRemoteTablet { assertNotNull(tablet.getClosestUUID()); } + @Test + public void testReplicaSelection() { + RemoteTablet tablet = getTablet(0, 1); + + assertEquals("0", tablet.getReplicaSelectedUUID(ReplicaSelection.LEADER_ONLY)); + assertEquals("1", tablet.getReplicaSelectedUUID(ReplicaSelection.CLOSEST_REPLICA)); + } + private RemoteTablet getTablet(int leaderIndex) { return getTablet(leaderIndex, -1); } http://git-wip-us.apache.org/repos/asf/kudu/blob/a7fd4fc8/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java index 71739d4..14454c0 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java @@ -176,6 +176,21 @@ public class TestScannerMultiTablet extends BaseKuduTest { buildScannerAndCheckColumnsCount(builder, 2); } + @Test(timeout = 100000) + public void testReplicaSelections() throws Exception { + AsyncKuduScanner scanner = client.newScannerBuilder(table) + .replicaSelection(ReplicaSelection.LEADER_ONLY) + .build(); + + assertEquals(9, countRowsInScan(scanner)); + + scanner = client.newScannerBuilder(table) + .replicaSelection(ReplicaSelection.CLOSEST_REPLICA) + .build(); + + assertEquals(9, countRowsInScan(scanner)); + } + private AsyncKuduScanner getScanner(String lowerBoundKeyOne, String lowerBoundKeyTwo, String exclusiveUpperBoundKeyOne,
