This is an automated email from the ASF dual-hosted git repository. adar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 7767abcbcb092b8449a02d985d25c0e3b12023fa Author: Will Berkeley <[email protected]> AuthorDate: Mon Jun 10 15:44:31 2019 -0700 Modernize AsyncKuduClient Lambdas, etc. There are no functional changes in this patch. Change-Id: I046971bf84c48d380c950c988a64c2effcab6f5e Reviewed-on: http://gerrit.cloudera.org:8080/13589 Tested-by: Kudu Jenkins Reviewed-by: Adar Dembo <[email protected]> --- .../org/apache/kudu/client/AsyncKuduClient.java | 509 +++++++++------------ 1 file changed, 216 insertions(+), 293 deletions(-) diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java index b8dac87..e00888f 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java @@ -584,12 +584,7 @@ public class AsyncKuduClient implements AutoCloseable { // Add a callback that converts the response into a KuduTable. Deferred<KuduTable> kuduTableD = createTableD.addCallbackDeferring( - new Callback<Deferred<KuduTable>, CreateTableResponse>() { - @Override - public Deferred<KuduTable> call(CreateTableResponse resp) throws Exception { - return getTableSchema(name, resp.getTableId(), create); - } - }); + resp -> getTableSchema(name, resp.getTableId(), create)); if (!builder.shouldWait()) { return kuduTableD; @@ -597,13 +592,10 @@ public class AsyncKuduClient implements AutoCloseable { // If requested, add a callback that waits until all of the table's tablets // have been created. - return kuduTableD.addCallbackDeferring(new Callback<Deferred<KuduTable>, KuduTable>() { - @Override - public Deferred<KuduTable> call(KuduTable tableResp) throws Exception { - TableIdentifierPB.Builder table = TableIdentifierPB.newBuilder() - .setTableId(ByteString.copyFromUtf8(tableResp.getTableId())); - return getDelayedIsCreateTableDoneDeferred(table, create, tableResp); - } + return kuduTableD.addCallbackDeferring(tableResp -> { + TableIdentifierPB.Builder table = TableIdentifierPB.newBuilder() + .setTableId(ByteString.copyFromUtf8(tableResp.getTableId())); + return getDelayedIsCreateTableDoneDeferred(table, create, tableResp); }); } @@ -700,15 +692,11 @@ public class AsyncKuduClient implements AutoCloseable { // If requested, add a callback that waits until all of the table's tablets // have been altered. - return responseD.addCallbackDeferring( - new Callback<Deferred<AlterTableResponse>, AlterTableResponse>() { - @Override - public Deferred<AlterTableResponse> call(AlterTableResponse resp) throws Exception { - TableIdentifierPB.Builder table = TableIdentifierPB.newBuilder() - .setTableId(ByteString.copyFromUtf8(resp.getTableId())); - return getDelayedIsAlterTableDoneDeferred(table, alter, resp); - } - }); + return responseD.addCallbackDeferring(resp -> { + TableIdentifierPB.Builder table = TableIdentifierPB.newBuilder() + .setTableId(ByteString.copyFromUtf8(resp.getTableId())); + return getDelayedIsAlterTableDoneDeferred(table, alter, resp); + }); } /** @@ -773,32 +761,28 @@ public class AsyncKuduClient implements AutoCloseable { timer, defaultAdminOperationTimeoutMs, /*requiresAuthzTokenSupport=*/false); - rpc.setParentRpc(parent); - return sendRpcToTablet(rpc).addCallback(new Callback<KuduTable, GetTableSchemaResponse>() { - @Override - public KuduTable call(GetTableSchemaResponse resp) throws Exception { - // When opening a table, clear the existing cached non-covered range entries. - // This avoids surprises where a new table instance won't be able to see the - // current range partitions of a table for up to the ttl. - TableLocationsCache cache = tableLocations.get(resp.getTableId()); - if (cache != null) { - cache.clearNonCoveredRangeEntries(); - } - SignedTokenPB authzToken = resp.getAuthzToken(); - if (authzToken != null) { - authzTokenCache.put(resp.getTableId(), authzToken); - } - - LOG.debug("Opened table {}", resp.getTableId()); - return new KuduTable(AsyncKuduClient.this, - resp.getTableName(), - resp.getTableId(), - resp.getSchema(), - resp.getPartitionSchema(), - resp.getNumReplicas(), - resp.getExtraConfig()); + return sendRpcToTablet(rpc).addCallback(resp -> { + // When opening a table, clear the existing cached non-covered range entries. + // This avoids surprises where a new table instance won't be able to see the + // current range partitions of a table for up to the TTL. + TableLocationsCache cache = tableLocations.get(resp.getTableId()); + if (cache != null) { + cache.clearNonCoveredRangeEntries(); } + SignedTokenPB authzToken = resp.getAuthzToken(); + if (authzToken != null) { + authzTokenCache.put(resp.getTableId(), authzToken); + } + + LOG.debug("Opened table {}", resp.getTableId()); + return new KuduTable(AsyncKuduClient.this, + resp.getTableName(), + resp.getTableId(), + resp.getSchema(), + resp.getPartitionSchema(), + resp.getNumReplicas(), + resp.getExtraConfig()); }); } @@ -834,26 +818,18 @@ public class AsyncKuduClient implements AutoCloseable { throw new IllegalArgumentException("The table name cannot be null"); } - Callback<Deferred<Boolean>, KuduTable> cb = new Callback<Deferred<Boolean>, KuduTable>() { - @Override - public Deferred<Boolean> call(KuduTable table) throws Exception { - return Deferred.fromResult(true); - } - }; - Callback<Deferred<Boolean>, Exception> eb = new Callback<Deferred<Boolean>, Exception>() { - @Override - public Deferred<Boolean> call(Exception e) throws Exception { - if (e instanceof NonRecoverableException) { - Status status = ((NonRecoverableException) e).getStatus(); - if (status.isNotFound()) { - return Deferred.fromResult(false); + return AsyncUtil.addCallbacksDeferring( + getTableSchema(name, null, null), + _table -> Deferred.fromResult(true), + (Callback<Deferred<Boolean>, Exception>) e -> { + if (e instanceof NonRecoverableException) { + Status status = ((NonRecoverableException) e).getStatus(); + if (status.isNotFound()) { + return Deferred.fromResult(false); + } } - } - return Deferred.fromError(e); - } - }; - - return AsyncUtil.addCallbacksDeferring(getTableSchema(name, null, null), cb, eb); + return Deferred.fromError(e); + }); } /** @@ -904,8 +880,7 @@ public class AsyncKuduClient implements AutoCloseable { // If we've already connected to the master, use the authentication // credentials that we received when we connected. if (hasConnectedToMaster) { - fakeRpc.callback( - securityContext.exportAuthenticationCredentials()); + fakeRpc.callback(securityContext.exportAuthenticationCredentials()); return; } @@ -916,23 +891,15 @@ public class AsyncKuduClient implements AutoCloseable { .addCallback(new MasterLookupCB(masterTable, /* partitionKey */ null, /* requestedBatchSize */ 1)) - .addCallback(new Callback<Void, Object>() { - @Override - public Void call(Object ignored) { - // Just call ourselves again; we're guaranteed to have the - // authentication credentials. - assert hasConnectedToMaster; - doExportAuthenticationCredentials(fakeRpc); - return null; - } + .addCallback((Callback<Void, Object>) ignored -> { + // Just call ourselves again; we're guaranteed to have the + // authentication credentials. + assert hasConnectedToMaster; + doExportAuthenticationCredentials(fakeRpc); + return null; }) - .addErrback(new RetryTaskErrback<byte[]>( - fakeRpc, new TimerTask() { - @Override - public void run(final Timeout ignored) { - doExportAuthenticationCredentials(fakeRpc); - } - })); + .addErrback(new RetryTaskErrback<>( + fakeRpc, _ignored -> doExportAuthenticationCredentials(fakeRpc))); } @InterfaceAudience.LimitedPrivate("Test") @@ -977,22 +944,14 @@ public class AsyncKuduClient implements AutoCloseable { .addCallback(new MasterLookupCB(masterTable, /* partitionKey */ null, /* requestedBatchSize */ 1)) - .addCallback(new Callback<Void, Object>() { - @Override - public Void call(Object ignored) { - // Just call ourselves again; we're guaranteed to have the HMS config. - assert hasConnectedToMaster; - doGetHiveMetastoreConfig(fakeRpc); - return null; - } + .addCallback((Callback<Void, Object>) ignored -> { + // Just call ourselves again; we're guaranteed to have the HMS config. + assert hasConnectedToMaster; + doGetHiveMetastoreConfig(fakeRpc); + return null; }) - .addErrback(new RetryTaskErrback<HiveMetastoreConfig>( - fakeRpc, new TimerTask() { - @Override - public void run(final Timeout ignored) { - doGetHiveMetastoreConfig(fakeRpc); - } - })); + .addErrback(new RetryTaskErrback<>( + fakeRpc, ignored -> doGetHiveMetastoreConfig(fakeRpc))); } /** @@ -1021,7 +980,7 @@ public class AsyncKuduClient implements AutoCloseable { long sleepTime = getSleepTimeForRpcMillis(fakeRpc); if (cannotRetryRequest(fakeRpc) || fakeRpc.timeoutTracker.wouldSleepingTimeoutMillis(sleepTime)) { - tooManyAttemptsOrTimeout(fakeRpc, ex); // invokes fakeRpc.Deferred + tooManyAttemptsOrTimeout(fakeRpc, ex); // Invokes fakeRpc.Deferred. return null; } fakeRpc.addTrace( @@ -1033,7 +992,7 @@ public class AsyncKuduClient implements AutoCloseable { newTimeout(timer, retryTask, sleepTime); return null; - // fakeRpc.Deferred was not invoked; the user continues to wait until + // fakeRpc.Deferred was not invoked: the user continues to wait until // retryTask succeeds or fails with a fatal error. } @@ -1391,15 +1350,11 @@ public class AsyncKuduClient implements AutoCloseable { * @param <R> RPC's return type * @return newly created errback */ - private <R> Callback<Exception, Exception> getDelayedIsTableDoneEB( - final KuduRpc<R> rpc) { - return new Callback<Exception, Exception>() { - @Override - public Exception call(Exception e) throws Exception { - // TODO maybe we can retry it? - rpc.errback(e); - return e; - } + private <R> Callback<Exception, Exception> getDelayedIsTableDoneEB(final KuduRpc<R> rpc) { + return e -> { + // TODO maybe we can retry it? + rpc.errback(e); + return e; }; } @@ -1433,8 +1388,7 @@ public class AsyncKuduClient implements AutoCloseable { } @Override - Pair<R, Object> deserialize( - CallResponse callResponse, String tsUUID) throws KuduException { + Pair<R, Object> deserialize(CallResponse callResponse, String tsUUID) throws KuduException { return null; } }; @@ -1541,24 +1495,21 @@ public class AsyncKuduClient implements AutoCloseable { @Nonnull final KuduRpc<AlterTableResponse> rpc, @Nonnull final TableIdentifierPB.Builder table, @Nullable final AlterTableResponse alterResp) { - return new Callback<Deferred<AlterTableResponse>, IsAlterTableDoneResponse>() { - @Override - public Deferred<AlterTableResponse> call(IsAlterTableDoneResponse resp) throws Exception { - // Store the Deferred locally; callback() below will reset it and we'd - // return a different, non-triggered Deferred. - Deferred<AlterTableResponse> d = rpc.getDeferred(); - if (resp.isDone()) { - rpc.callback(alterResp); - } else { - rpc.attempt++; - delayedIsAlterTableDone( - table, - rpc, - getDelayedIsAlterTableDoneCB(rpc, table, alterResp), - getDelayedIsTableDoneEB(rpc)); - } - return d; + return resp -> { + // Store the Deferred locally; callback() below will reset it and we'd + // return a different, non-triggered Deferred. + Deferred<AlterTableResponse> d = rpc.getDeferred(); + if (resp.isDone()) { + rpc.callback(alterResp); + } else { + rpc.attempt++; + delayedIsAlterTableDone( + table, + rpc, + getDelayedIsAlterTableDoneCB(rpc, table, alterResp), + getDelayedIsTableDoneEB(rpc)); } + return d; }; } @@ -1577,24 +1528,21 @@ public class AsyncKuduClient implements AutoCloseable { final KuduRpc<KuduTable> rpc, final TableIdentifierPB.Builder table, final KuduTable tableResp) { - return new Callback<Deferred<KuduTable>, IsCreateTableDoneResponse>() { - @Override - public Deferred<KuduTable> call(IsCreateTableDoneResponse resp) throws Exception { - // Store the Deferred locally; callback() below will reset it and we'd - // return a different, non-triggered Deferred. - Deferred<KuduTable> d = rpc.getDeferred(); - if (resp.isDone()) { - rpc.callback(tableResp); - } else { - rpc.attempt++; - delayedIsCreateTableDone( - table, - rpc, - getDelayedIsCreateTableDoneCB(rpc, table, tableResp), - getDelayedIsTableDoneEB(rpc)); - } - return d; + return resp -> { + // Store the Deferred locally; callback() below will reset it and we'd + // return a different, non-triggered Deferred. + Deferred<KuduTable> d = rpc.getDeferred(); + if (resp.isDone()) { + rpc.callback(tableResp); + } else { + rpc.attempt++; + delayedIsCreateTableDone( + table, + rpc, + getDelayedIsCreateTableDoneCB(rpc, table, tableResp), + getDelayedIsTableDoneEB(rpc)); } + return d; }; } @@ -1779,7 +1727,7 @@ public class AsyncKuduClient implements AutoCloseable { } d.addCallback(new MasterLookupCB(table, partitionKey, fetchBatchSize)); if (hasPermit) { - d.addBoth(new ReleaseMasterLookupPermit<Master.GetTableLocationsResponsePB>()); + d.addBoth(new ReleaseMasterLookupPermit<>()); } return d; } @@ -1793,44 +1741,41 @@ public class AsyncKuduClient implements AutoCloseable { // TODO(todd): stop using this 'masterTable' hack. return ConnectToCluster.run(masterTable, masterAddresses, parentRpc, defaultAdminOperationTimeoutMs, Connection.CredentialsPolicy.ANY_CREDENTIALS).addCallback( - new Callback<Master.GetTableLocationsResponsePB, ConnectToClusterResponse>() { - @Override - public Master.GetTableLocationsResponsePB call(ConnectToClusterResponse resp) { - if (resp.getConnectResponse().hasAuthnToken()) { - // If the response has security info, adopt it. - securityContext.setAuthenticationToken(resp.getConnectResponse().getAuthnToken()); - } - List<ByteString> caCerts = resp.getConnectResponse().getCaCertDerList(); - if (!caCerts.isEmpty()) { - try { - securityContext.trustCertificates(caCerts); - } catch (CertificateException e) { - LOG.warn("Ignoring invalid CA cert from leader {}: {}", - resp.getLeaderHostAndPort(), - e.getMessage()); - } - } - - HiveMetastoreConfig hiveMetastoreConfig = null; - Master.ConnectToMasterResponsePB respPb = resp.getConnectResponse(); - if (respPb.hasHmsConfig()) { - Master.HiveMetastoreConfig metastoreConf = respPb.getHmsConfig(); - hiveMetastoreConfig = new HiveMetastoreConfig(metastoreConf.getHmsUris(), - metastoreConf.getHmsSaslEnabled(), - metastoreConf.getHmsUuid()); - } - synchronized (AsyncKuduClient.this) { - AsyncKuduClient.this.hiveMetastoreConfig = hiveMetastoreConfig; - location = respPb.getClientLocation(); - } - - hasConnectedToMaster = true; - - // Translate the located master into a TableLocations - // since the rest of our locations caching code expects this type. - return resp.getAsTableLocations(); - } - }); + resp -> { + if (resp.getConnectResponse().hasAuthnToken()) { + // If the response has security info, adopt it. + securityContext.setAuthenticationToken(resp.getConnectResponse().getAuthnToken()); + } + List<ByteString> caCerts = resp.getConnectResponse().getCaCertDerList(); + if (!caCerts.isEmpty()) { + try { + securityContext.trustCertificates(caCerts); + } catch (CertificateException e) { + LOG.warn("Ignoring invalid CA cert from leader {}: {}", + resp.getLeaderHostAndPort(), + e.getMessage()); + } + } + + HiveMetastoreConfig hiveMetastoreConfig = null; + Master.ConnectToMasterResponsePB respPb = resp.getConnectResponse(); + if (respPb.hasHmsConfig()) { + Master.HiveMetastoreConfig metastoreConf = respPb.getHmsConfig(); + hiveMetastoreConfig = new HiveMetastoreConfig(metastoreConf.getHmsUris(), + metastoreConf.getHmsSaslEnabled(), + metastoreConf.getHmsUuid()); + } + synchronized (AsyncKuduClient.this) { + AsyncKuduClient.this.hiveMetastoreConfig = hiveMetastoreConfig; + location = respPb.getClientLocation(); + } + + hasConnectedToMaster = true; + + // Translate the located master into a TableLocations + // since the rest of our locations caching code expects this type. + return resp.getAsTableLocations(); + }); } /** @@ -2023,60 +1968,48 @@ public class AsyncKuduClient implements AutoCloseable { final TimeoutTracker timeoutTracker = new TimeoutTracker(); timeoutTracker.setTimeout(deadline); - Callback<Deferred<List<KeyRange>>, List<LocatedTablet>> locateTabletCB = - new Callback<Deferred<List<KeyRange>>, List<LocatedTablet>>() { - @Override - public Deferred<List<KeyRange>> call(List<LocatedTablet> tablets) { - if (splitSizeBytes <= 0) { - final List<KeyRange> keyRanges = Lists.newArrayList(); - for (LocatedTablet tablet : tablets) { - keyRanges.add(new KeyRange(tablet, startPrimaryKey, endPrimaryKey, -1)); - } - return Deferred.fromResult(keyRanges); - } else { - List<Deferred<List<KeyRange>>> deferreds = new ArrayList<>(); - for (LocatedTablet tablet : tablets) { - // Build a fake RPC to encapsulate and propagate the timeout. - // There's no actual "RPC" to send. - KuduRpc fakeRpc = buildFakeRpc("getTableKeyRanges", - null, - timeoutTracker.getMillisBeforeTimeout()); - deferreds.add(getTabletKeyRanges(table, - startPrimaryKey, - endPrimaryKey, - tablet.getPartition().getPartitionKeyStart(), - splitSizeBytes, - fakeRpc).addCallbackDeferring( - new Callback<Deferred<List<KeyRange>>, SplitKeyRangeResponse>() { - @Override - public Deferred<List<KeyRange>> call(SplitKeyRangeResponse resp) { - final List<KeyRange> ranges = Lists.newArrayList(); - for (Common.KeyRangePB pb : resp.getKeyRanges()) { - KeyRange newRange = new KeyRange(tablet, - pb.getStartPrimaryKey().toByteArray(), - pb.getStopPrimaryKey().toByteArray(), - pb.getSizeBytesEstimates()); - ranges.add(newRange); - LOG.debug("Add key range {}", newRange); - } - return Deferred.fromResult(ranges); - } - })); - } - // Must preserve the order. - return Deferred.groupInOrder(deferreds).addCallbackDeferring( - new Callback<Deferred<List<KeyRange>>, ArrayList<List<KeyRange>>>() { - @Override - public Deferred<List<KeyRange>> call(ArrayList<List<KeyRange>> rangeLists) { - final List<KeyRange> ret = Lists.newArrayList(); - for (List<KeyRange> ranges : rangeLists) { - ret.addAll(ranges); - } - return Deferred.fromResult(ret); - } - }); + Callback<Deferred<List<KeyRange>>, List<LocatedTablet>> locateTabletCB = tablets -> { + if (splitSizeBytes <= 0) { + final List<KeyRange> keyRanges = Lists.newArrayList(); + for (LocatedTablet tablet : tablets) { + keyRanges.add(new KeyRange(tablet, startPrimaryKey, endPrimaryKey, -1)); } + return Deferred.fromResult(keyRanges); } + List<Deferred<List<KeyRange>>> deferreds = new java.util.ArrayList<>(); + for (LocatedTablet tablet : tablets) { + // Build a fake RPC to encapsulate and propagate the timeout. + // There's no actual "RPC" to send. + KuduRpc fakeRpc = buildFakeRpc("getTableKeyRanges", + null, + timeoutTracker.getMillisBeforeTimeout()); + deferreds.add(getTabletKeyRanges(table, + startPrimaryKey, + endPrimaryKey, + tablet.getPartition().getPartitionKeyStart(), + splitSizeBytes, + fakeRpc) + .addCallbackDeferring(resp -> { + final List<KeyRange> ranges = Lists.newArrayList(); + for (Common.KeyRangePB pb : resp.getKeyRanges()) { + KeyRange newRange = new KeyRange(tablet, + pb.getStartPrimaryKey().toByteArray(), + pb.getStopPrimaryKey().toByteArray(), + pb.getSizeBytesEstimates()); + ranges.add(newRange); + LOG.debug("Add key range {}", newRange); + } + return Deferred.fromResult(ranges); + })); + } + // Must preserve the order. + return Deferred.groupInOrder(deferreds).addCallbackDeferring(rangeLists -> { + final List<KeyRange> ret = Lists.newArrayList(); + for (List<KeyRange> ranges : rangeLists) { + ret.addAll(ranges); + } + return Deferred.fromResult(ret); + }); }; final List<LocatedTablet> tablets = Lists.newArrayList(); @@ -2085,7 +2018,8 @@ public class AsyncKuduClient implements AutoCloseable { endPartitionKey, fetchBatchSize, tablets, - timeoutTracker).addCallbackDeferring(locateTabletCB); + timeoutTracker) + .addCallbackDeferring(locateTabletCB); } /** @@ -2173,17 +2107,6 @@ public class AsyncKuduClient implements AutoCloseable { * Deferred back to the user. */ private <R> Deferred<R> delayedSendRpcToTablet(final KuduRpc<R> rpc, KuduException ex) { - // Here we simply retry the RPC later. We might be doing this along with a lot of other RPCs - // in parallel. Asynchbase does some hacking with a "probe" RPC while putting the other ones - // on hold but we won't be doing this for the moment. Regions in HBase can move a lot, - // we're not expecting this in Kudu. - final class RetryTimer implements TimerTask { - @Override - public void run(final Timeout timeout) { - sendRpcToTablet(rpc); - } - } - assert (ex != null); Status reasonForRetry = ex.getStatus(); rpc.addTrace( @@ -2199,7 +2122,11 @@ public class AsyncKuduClient implements AutoCloseable { // Don't let it retry. return tooManyAttemptsOrTimeout(rpc, ex); } - newTimeout(timer, new RetryTimer(), sleepTime); + // Here we simply retry the RPC later. We might be doing this along with a lot of other RPCs + // in parallel. Asynchbase does some hacking with a "probe" RPC while putting the other ones + // on hold but we won't be doing this for the moment. Regions in HBase can move a lot, + // we're not expecting this in Kudu. + newTimeout(timer, _timeout -> sendRpcToTablet(rpc), sleepTime); return rpc.getDeferred(); } @@ -2442,13 +2369,13 @@ public class AsyncKuduClient implements AutoCloseable { * @param table the table * @param partitionKey the partition key of the tablet to look up in the table * @param lookupType the type of lookup to use - * @param deadline deadline in milliseconds for this lookup to finish + * @param timeoutMs timeout in milliseconds for this lookup to finish * @return a deferred containing the located tablet */ Deferred<LocatedTablet> getTabletLocation(final KuduTable table, final byte[] partitionKey, final LookupType lookupType, - long timeout) { + long timeoutMs) { // Locate the tablet at the partition key by locating tablets between // the partition key (inclusive), and the incremented partition key (exclusive). @@ -2457,54 +2384,52 @@ public class AsyncKuduClient implements AutoCloseable { byte[] endPartitionKey; if (partitionKey.length == 0) { startPartitionKey = null; - endPartitionKey = new byte[] { 0x00 }; + endPartitionKey = new byte[]{0x00}; } else { startPartitionKey = partitionKey; endPartitionKey = Arrays.copyOf(partitionKey, partitionKey.length + 1); } final TimeoutTracker timeoutTracker = new TimeoutTracker(); - timeoutTracker.setTimeout(timeout); + timeoutTracker.setTimeout(timeoutMs); Deferred<List<LocatedTablet>> locatedTablets = locateTable( - table, startPartitionKey, endPartitionKey, FETCH_TABLETS_PER_POINT_LOOKUP, timeout); + table, startPartitionKey, endPartitionKey, FETCH_TABLETS_PER_POINT_LOOKUP, timeoutMs); // Then pick out the single tablet result from the list. - return locatedTablets.addCallbackDeferring( - new Callback<Deferred<LocatedTablet>, List<LocatedTablet>>() { - @Override - public Deferred<LocatedTablet> call(List<LocatedTablet> tablets) { - Preconditions.checkArgument(tablets.size() <= 1, - "found more than one tablet for a single partition key"); - if (tablets.isEmpty()) { - // Most likely this indicates a non-covered range, but since this - // could race with an alter table partitioning operation (which - // clears the local table locations cache), we check again. - TableLocationsCache.Entry entry = getTableLocationEntry(table.getTableId(), - partitionKey); - - if (entry == null) { - // This should be extremely rare, but a potential source of tight loops. - LOG.debug("Table location expired before it could be processed; retrying."); - return Deferred.fromError(new RecoverableException(Status.NotFound( - "Table location expired before it could be processed"))); - } - if (entry.isNonCoveredRange()) { - if (lookupType == LookupType.POINT - || entry.getUpperBoundPartitionKey().length == 0) { - return Deferred.fromError( - new NonCoveredRangeException(entry.getLowerBoundPartitionKey(), - entry.getUpperBoundPartitionKey())); - } - // This is a LOWER_BOUND lookup, get the tablet location from the upper bound key - // of the non-covered range to return the next valid tablet location. - return getTabletLocation(table, entry.getUpperBoundPartitionKey(), - LookupType.POINT, timeoutTracker.getMillisBeforeTimeout()); - } - return Deferred.fromResult(new LocatedTablet(entry.getTablet())); - } - return Deferred.fromResult(tablets.get(0)); + return locatedTablets.addCallbackDeferring(tablets -> { + Preconditions.checkArgument(tablets.size() <= 1, + "found more than one tablet for a single partition key"); + if (tablets.isEmpty()) { + // Most likely this indicates a non-covered range, but since this + // could race with an alter table partitioning operation (which + // clears the local table locations cache), we check again. + TableLocationsCache.Entry entry = getTableLocationEntry(table.getTableId(), + partitionKey); + + if (entry == null) { + // This should be extremely rare, but a potential source of tight loops. + LOG.debug("Table location expired before it could be processed; retrying."); + return Deferred.fromError(new RecoverableException(Status.NotFound( + "Table location expired before it could be processed"))); + } + if (entry.isNonCoveredRange()) { + if (lookupType == LookupType.POINT + || entry.getUpperBoundPartitionKey().length == 0) { + return Deferred.fromError( + new NonCoveredRangeException(entry.getLowerBoundPartitionKey(), + entry.getUpperBoundPartitionKey())); } - }); + // This is a LOWER_BOUND lookup, get the tablet location from the upper bound key + // of the non-covered range to return the next valid tablet location. + return getTabletLocation(table, + entry.getUpperBoundPartitionKey(), + LookupType.POINT, + timeoutTracker.getMillisBeforeTimeout()); + } + return Deferred.fromResult(new LocatedTablet(entry.getTablet())); + } + return Deferred.fromResult(tablets.get(0)); + }); } /** @@ -2685,8 +2610,7 @@ public class AsyncKuduClient implements AutoCloseable { * @param masterAddresses comma-separated list of "host:port" pairs of the masters */ public AsyncKuduClientBuilder(String masterAddresses) { - this.masterAddresses = - NetUtil.parseStrings(masterAddresses, DEFAULT_MASTER_PORT); + this.masterAddresses = NetUtil.parseStrings(masterAddresses, DEFAULT_MASTER_PORT); } /** @@ -2706,8 +2630,7 @@ public class AsyncKuduClient implements AutoCloseable { * @param masterAddresses list of master addresses */ public AsyncKuduClientBuilder(List<String> masterAddresses) { - this.masterAddresses = - Lists.newArrayListWithCapacity(masterAddresses.size()); + this.masterAddresses = Lists.newArrayListWithCapacity(masterAddresses.size()); for (String address : masterAddresses) { this.masterAddresses.add( NetUtil.parseString(address, DEFAULT_MASTER_PORT));
