This is an automated email from the ASF dual-hosted git repository. awong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 7645f5b2ea1f54777ce88c61df54422b068142e5 Author: Andrew Wong <[email protected]> AuthorDate: Sun Jan 6 21:02:46 2019 -0800 KUDU-2543 pt 3 java: pass around authz tokens Adds handling of authz tokens to the Java client. The Java client will now cache tokens upon opening a table, and use them for RPCs that need them (e.g. Writes and Scans), reacquiring them when receiving word that they are expired. This is tested as follows: - TestAuthnTokenReacquire's test for scans and writes is repurposed to also test for reacquisition of authz tokens when they expire - basic tests are added to test the token cache - a test is added to test authz reacquisition in the case that a multi-master deployment undergoes a leadership change - a test is added to test authz reacquisition upon invalid or expired tokens during prolonged workloads against a multi-master deployment Change-Id: Iadd5f7709b45628d7ddd9e2b100d0dfaabbf15af Reviewed-on: http://gerrit.cloudera.org:8080/12279 Tested-by: Kudu Jenkins Reviewed-by: Adar Dembo <[email protected]> Reviewed-by: Alexey Serbin <[email protected]> Reviewed-by: Hao Hao <[email protected]> --- .../org/apache/kudu/client/AsyncKuduClient.java | 53 ++++- .../org/apache/kudu/client/AsyncKuduScanner.java | 17 ++ .../org/apache/kudu/client/AuthzTokenCache.java | 243 +++++++++++++++++++++ .../main/java/org/apache/kudu/client/Batch.java | 17 ++ .../java/org/apache/kudu/client/Connection.java | 10 +- .../apache/kudu/client/GetTableSchemaRequest.java | 21 +- .../apache/kudu/client/GetTableSchemaResponse.java | 15 +- .../kudu/client/InvalidAuthzTokenException.java | 40 ++++ .../main/java/org/apache/kudu/client/KuduRpc.java | 13 ++ .../java/org/apache/kudu/client/Operation.java | 16 ++ .../main/java/org/apache/kudu/client/RpcProxy.java | 13 +- ...nReacquire.java => TestAuthTokenReacquire.java} | 110 +++++++--- .../apache/kudu/client/TestAuthzTokenCache.java | 151 +++++++++++++ .../kudu/client/TestMultiMasterAuthzTokens.java | 184 ++++++++++++++++ .../java/org/apache/kudu/test/ClientTestUtil.java | 12 + 15 files changed, 872 insertions(+), 43 deletions(-) diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java index cf22d0a..3bdb56b 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java @@ -56,6 +56,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.Message; import com.stumbleupon.async.Callback; import com.stumbleupon.async.Deferred; +import org.apache.kudu.security.Token; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; @@ -68,6 +69,7 @@ import org.jboss.netty.util.TimerTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.kudu.security.Token.SignedTokenPB; import org.apache.kudu.Common; import org.apache.kudu.Schema; import org.apache.kudu.master.Master; @@ -116,8 +118,8 @@ import org.apache.kudu.util.Pair; * Authentication and Authorization Service</em> (JAAS) API provided by the JDK. * JAAS provides a common way for applications to initialize Kerberos * credentials, store these credentials in a {@link javax.security.auth.Subject} - * instance, and associate the Subject the current thread of execution. The Kudu - * client then accesses the Kerberos credentials in the + * instance, and associate the Subject with the current thread of execution. + * The Kudu client then accesses the Kerberos credentials in the * {@link javax.security.auth.Subject} and uses them to authenticate to the * remote cluster as necessary. * <p> @@ -355,6 +357,9 @@ public class AsyncKuduClient implements AutoCloseable { /** A helper to facilitate re-acquiring of authentication token if current one expires. */ private final AuthnTokenReacquirer tokenReacquirer; + /** A helper to facilitate retrieving authz tokens */ + private final AuthzTokenCache authzTokenCache; + private volatile boolean closed; private AsyncKuduClient(AsyncKuduClientBuilder b) { @@ -373,6 +378,7 @@ public class AsyncKuduClient implements AutoCloseable { this.connectionCache = new ConnectionCache( securityContext, timer, channelFactory); this.tokenReacquirer = new AuthnTokenReacquirer(this); + this.authzTokenCache = new AuthzTokenCache(this); } /** @@ -756,11 +762,14 @@ public class AsyncKuduClient implements AutoCloseable { Preconditions.checkNotNull(tableName); // Prefer a lookup by table ID over name, since the former is immutable. + // For backwards compatibility with older tservers, we don't require authz + // token support. GetTableSchemaRequest rpc = new GetTableSchemaRequest(this.masterTable, tableId, tableId != null ? null : tableName, timer, - defaultAdminOperationTimeoutMs); + defaultAdminOperationTimeoutMs, + /*requiresAuthzTokenSupport=*/false); rpc.setParentRpc(parent); return sendRpcToTablet(rpc).addCallback(new Callback<KuduTable, GetTableSchemaResponse>() { @@ -773,6 +782,10 @@ public class AsyncKuduClient implements AutoCloseable { if (cache != null) { cache.clearNonCoveredRangeEntries(); } + SignedTokenPB authzToken = resp.getAuthzToken(); + if (authzToken != null) { + authzTokenCache.put(resp.getTableId(), authzToken); + } LOG.debug("Opened table {}", resp.getTableId()); return new KuduTable(AsyncKuduClient.this, @@ -898,6 +911,11 @@ public class AsyncKuduClient implements AutoCloseable { })); } + @InterfaceAudience.LimitedPrivate("Test") + public AuthzTokenCache getAuthzTokenCache() { + return this.authzTokenCache; + } + /** * Get the Hive Metastore configuration of the most recently connected-to leader master, or * {@code null} if the Hive Metastore integration is not enabled. @@ -1955,17 +1973,40 @@ public class AsyncKuduClient implements AutoCloseable { } /** - * Handle a RPC failed due to invalid authn token error. In short, connect to the Kudu cluster + * Handle an RPC failed due to invalid authn token error. In short, connect to the Kudu cluster * to acquire a new authentication token and retry the RPC once a new authentication token * is put into the {@link #securityContext}. * - * @param rpc the RPC which failed do to invalid authn token + * @param rpc the RPC which failed with an invalid authn token */ - <R> void handleInvalidToken(KuduRpc<R> rpc) { + <R> void handleInvalidAuthnToken(KuduRpc<R> rpc) { + // TODO(awong): plumb the offending KuduException into the reacquirer. tokenReacquirer.handleAuthnTokenExpiration(rpc); } /** + * Handle an RPC that failed due to an invalid authorization token error. The + * RPC will be retried after fetching a new authz token. + * + * @param rpc the RPC that failed with an invalid authz token + * @param ex the KuduException that led to this handling + */ + <R> void handleInvalidAuthzToken(KuduRpc<R> rpc, KuduException ex) { + authzTokenCache.retrieveAuthzToken(rpc, ex); + } + + /** + * Gets an authorization token for the given table from the cache, or nullptr + * if none exists. + * + * @param tableId the table ID for which to get an authz token + * @return a signed authz token for the table + */ + SignedTokenPB getAuthzToken(String tableId) { + return authzTokenCache.get(tableId); + } + + /** * This methods enable putting RPCs on hold for a period of time determined by * {@link #getSleepTimeForRpcMillis(KuduRpc)}. If the RPC is out of time/retries, its errback will * be immediately called. diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java index 93bafb8..68a0982 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java @@ -43,6 +43,7 @@ import com.google.protobuf.Message; import com.google.protobuf.UnsafeByteOperations; import com.stumbleupon.async.Callback; import com.stumbleupon.async.Deferred; +import org.apache.kudu.security.Token; import org.apache.kudu.tserver.Tserver.ScannerKeepAliveRequestPB; import org.apache.kudu.tserver.Tserver.ScannerKeepAliveResponsePB; import org.apache.yetus.audience.InterfaceAudience; @@ -886,6 +887,9 @@ public final class AsyncKuduScanner { State state; + /** The token with which to authorize this RPC. */ + private Token.SignedTokenPB authzToken; + ScanRequest(KuduTable table, State state, RemoteTablet tablet) { super(table, client.getTimer(), scanRequestTimeout); setTablet(tablet); @@ -916,6 +920,16 @@ public final class AsyncKuduScanner { return replicaSelection; } + @Override + boolean needsAuthzToken() { + return true; + } + + @Override + void bindAuthzToken(Token.SignedTokenPB token) { + authzToken = token; + } + /** Serializes this request. */ @Override Message createRequestPB() { @@ -968,6 +982,9 @@ public final class AsyncKuduScanner { for (KuduPredicate pred : predicates.values()) { newBuilder.addColumnPredicates(pred.toPB()); } + if (authzToken != null) { + newBuilder.setAuthzToken(authzToken); + } builder.setNewScanRequest(newBuilder.build()) .setBatchSizeBytes(batchSizeBytes); break; diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AuthzTokenCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AuthzTokenCache.java new file mode 100644 index 0000000..1dd17e1 --- /dev/null +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AuthzTokenCache.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.kudu.client; + +import com.google.common.base.Preconditions; +import com.stumbleupon.async.Callback; +import org.apache.kudu.security.Token; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Cache for authz tokens received from the master of unbounded capacity. A + * client will receive an authz token upon opening a table and put it into the + * cache. A subsequent operation that requires an authz token (e.g. writes, + * scans) will fetch it from the cache and attach it to the operation request. + */ +@ThreadSafe [email protected] +public class AuthzTokenCache { + private static class RpcAndException { + final KuduRpc<?> rpc; + final KuduException ex; + + RpcAndException(KuduRpc<?> rpc, KuduException ex) { + this.rpc = rpc; + this.ex = ex; + } + } + + private static final Logger LOG = LoggerFactory.getLogger(AuthzTokenCache.class); + private final AsyncKuduClient client; + + // Map from a table ID to an authz token for that table. + private final ConcurrentHashMap<String, Token.SignedTokenPB> authzTokens = new ConcurrentHashMap<>(); + + // Map from a table ID that has an in-flight RPC to get a new authz token, to + // the list of RPCs waiting to be retried once that token is received and the + // exception each is handling. + // Note: Unlike the token map which is synchronized to make it threadsafe, + // synchronization of this map also serves to ensure requests for the same + // table ID get grouped together. + @GuardedBy("retriesLock") + private final Map<String, List<RpcAndException>> + retriesForTable = new HashMap<>(); + private final Object retriesLock = new Object(); + + // Number of RPCs sent to retrieve authz tokens. Useful for testing. + private AtomicInteger numRetrievalsSent; + + /** + * Create a new AuthzTokenCache object. + * + * @param client the Kudu client object with which to send requests. + */ + AuthzTokenCache(@Nonnull AsyncKuduClient client) { + this.client = client; + numRetrievalsSent = new AtomicInteger(0); + } + + /** + * Returns the number of RPCs sent to retrieve authz token over the lifetime + * of this cache. + * @return number of RPCs sent + */ + @InterfaceAudience.LimitedPrivate("Test") + int numRetrievalsSent() { + return numRetrievalsSent.get(); + } + + /** + * Puts the given token into the cache. No validation is done on the validity + * or expiration of the token -- that happens on the tablet servers. + * + * @param tableId the table ID the authz token is for + * @param token an authz token to put into the cache + */ + void put(@Nonnull String tableId, @Nonnull Token.SignedTokenPB token) { + authzTokens.put(tableId, token); + } + + /** + * Returns the cached token for the given 'tableId' if one exists. + * + * @param tableId table ID to get an authz token for + * @return the token for the table ID if one exists + */ + Token.SignedTokenPB get(@Nonnull String tableId) { + return authzTokens.get(tableId); + } + + /** + * Returns the list of pending RPCs waiting on a new authz token for the given + * table, clearing the table's entry in the pending map. + * + * @param tableId the table ID whose RPCs should be cleared + * @return the RPCs to be retried for the given table ID and the + */ + private List<RpcAndException> clearPendingRetries(@Nonnull String tableId) { + List<RpcAndException> pendingRetries; + synchronized (retriesLock) { + pendingRetries = retriesForTable.remove(tableId); + } + Preconditions.checkState(!pendingRetries.isEmpty(), + "no pending retries for table " + tableId); + return pendingRetries; + } + + /** + * Sends an RPC to retrieve an authz token for retrying the specified parent + * RPC, calling 'cb' on success and 'eb' on failure. + * + * 'parentRpc' is used for logging and deadline tracking. + * + * @param parentRpc the RPC that is waiting on the authz token + * @param cb callback to be called after receiving a response from the master + * @param eb errback to be called after hitting an exception + */ + private void sendRetrievalForRpc(@Nonnull KuduRpc<?> parentRpc, + @Nonnull Callback<Void, GetTableSchemaResponse> cb, + @Nonnull Callback<Void, Exception> eb) { + String tableId = parentRpc.getTable().getTableId(); + LOG.debug("sending RPC to retrieve token for table ID " + tableId); + GetTableSchemaRequest retrieveAuthzTokenReq = new GetTableSchemaRequest( + client.getMasterTable(), tableId, /*name=*/null, client.getTimer(), + client.getDefaultAdminOperationTimeoutMs(), /*requiresAuthzTokenSupport=*/true); + retrieveAuthzTokenReq.setParentRpc(parentRpc); + retrieveAuthzTokenReq.deadlineTracker.setDeadline(parentRpc.deadlineTracker.getDeadline()); + numRetrievalsSent.incrementAndGet(); + client.sendRpcToTablet(retrieveAuthzTokenReq).addCallback(cb) + .addErrback(eb); + } + + /** + * Method to call upon receiving an RPC that indicates it had an invalid authz + * token and needs a new one. If there is already an in-flight RPC to retrieve + * a new authz token for the given table, add the 'rpc' to the collection of + * RPCs to be retried once the retrieval completes. + * + * @param rpc the RPC that needs a new authz token + * @param ex error that caused triggered this retrieval + * @param <R> the RPC type + */ + <R> void retrieveAuthzToken(@Nonnull final KuduRpc<R> rpc, @Nonnull final KuduException ex) { + /** + * Handles a response from getting an authz token. + */ + final class NewAuthzTokenCB implements Callback<Void, GetTableSchemaResponse> { + private final String tableId; + + public NewAuthzTokenCB(String tableId) { + this.tableId = tableId; + } + + @Override + public Void call(@Nonnull GetTableSchemaResponse resp) throws Exception { + if (resp.getAuthzToken() == null) { + // Note: If we were talking to an old master, we would hit an + // exception earlier in the RPC handling. + throw new NonRecoverableException( + Status.InvalidArgument("no authz token retrieved for " + tableId)); + } + LOG.debug("retrieved authz token for " + tableId); + put(tableId, resp.getAuthzToken()); + for (RpcAndException rpcAndEx : clearPendingRetries(tableId)) { + client.handleRetryableErrorNoDelay(rpcAndEx.rpc, rpcAndEx.ex); + } + return null; + } + } + + /** + * Handles the case where there was an error getting the new authz token. + */ + final class NewAuthzTokenErrB implements Callback<Void, Exception> { + private KuduRpc<?> parentRpc; + private final NewAuthzTokenCB cb; + + public NewAuthzTokenErrB(@Nonnull NewAuthzTokenCB cb, @Nonnull KuduRpc<?> parentRpc) { + this.cb = cb; + this.parentRpc = parentRpc; + } + + @Override + public Void call(@Nonnull Exception e) { + String tableId = cb.tableId; + if (e instanceof RecoverableException) { + sendRetrievalForRpc(parentRpc, cb, this); + } else { + for (RpcAndException rpcAndEx : clearPendingRetries(tableId)) { + rpcAndEx.rpc.errback(e); + } + } + return null; + } + } + + final String tableId = rpc.getTable().getTableId(); + RpcAndException rpcAndEx = new RpcAndException(rpc, ex); + synchronized (retriesLock) { + List<RpcAndException> pendingRetries = retriesForTable.putIfAbsent( + tableId, new ArrayList<>(Arrays.asList(rpcAndEx))); + if (pendingRetries == null) { + // There isn't an in-flight RPC to retrieve a new authz token. + NewAuthzTokenCB newTokenCB = new NewAuthzTokenCB(tableId); + NewAuthzTokenErrB newTokenErrB = new NewAuthzTokenErrB(newTokenCB, rpc); + sendRetrievalForRpc(rpc, newTokenCB, newTokenErrB); + } else { + Preconditions.checkState(!pendingRetries.isEmpty(), + "no pending retries for table " + tableId); + pendingRetries.add(rpcAndEx); + } + } + } +} diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java index b08e246..9685923 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java @@ -24,6 +24,7 @@ import java.util.List; import com.google.common.base.MoreObjects; import com.google.protobuf.Message; import com.google.protobuf.UnsafeByteOperations; +import org.apache.kudu.security.Token; import org.apache.yetus.audience.InterfaceAudience; import org.jboss.netty.util.Timer; @@ -49,6 +50,9 @@ class Batch extends KuduRpc<BatchResponse> { /** The tablet this batch will be routed to. */ private final LocatedTablet tablet; + /** The token with which to authorize this RPC. */ + private Token.SignedTokenPB authzToken; + /** * This size will be set when serialize is called. It stands for the size of rows in all * operations in this batch. @@ -105,6 +109,16 @@ class Batch extends KuduRpc<BatchResponse> { } @Override + boolean needsAuthzToken() { + return true; + } + + @Override + void bindAuthzToken(Token.SignedTokenPB token) { + authzToken = token; + } + + @Override Message createRequestPB() { final Tserver.WriteRequestPB.Builder builder = Operation.createAndFillWriteRequestPB(operations); @@ -112,6 +126,9 @@ class Batch extends KuduRpc<BatchResponse> { (long)builder.getRowOperations().getIndirectData().size(); builder.setTabletId(UnsafeByteOperations.unsafeWrap(getTablet().getTabletIdAsBytes())); builder.setExternalConsistencyMode(externalConsistencyMode.pbVersion()); + if (authzToken != null) { + builder.setAuthzToken(authzToken); + } return builder.build(); } diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java index ba188f0..5da0a8c 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java @@ -394,12 +394,18 @@ class Connection extends SimpleChannelUpstreamHandler { final RpcHeader.ErrorStatusPB.Builder errorBuilder = RpcHeader.ErrorStatusPB.newBuilder(); KuduRpc.readProtobuf(response.getPBMessage(), errorBuilder); final RpcHeader.ErrorStatusPB error = errorBuilder.build(); - if (error.getCode().equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_SERVER_TOO_BUSY) || - error.getCode().equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_UNAVAILABLE)) { + RpcHeader.ErrorStatusPB.RpcErrorCodePB code = error.getCode(); + if (code.equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_SERVER_TOO_BUSY) || + code.equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_UNAVAILABLE)) { responseCbk.call(new CallResponseInfo( response, new RecoverableException(Status.ServiceUnavailable(error.getMessage())))); return; } + if (code.equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_INVALID_AUTHORIZATION_TOKEN)) { + responseCbk.call(new CallResponseInfo( + response, new InvalidAuthzTokenException(Status.NotAuthorized(error.getMessage())))); + return; + } final String message = getLogPrefix() + " server sent error " + error.getMessage(); LOG.error(message); // can be useful diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java index d3e48e6..0e2ec55 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java @@ -22,8 +22,10 @@ import static org.apache.kudu.master.Master.GetTableSchemaResponsePB; import static org.apache.kudu.master.Master.TableIdentifierPB; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; import com.google.protobuf.Message; +import org.apache.kudu.master.Master; import org.apache.yetus.audience.InterfaceAudience; import org.jboss.netty.util.Timer; @@ -31,6 +33,9 @@ import org.apache.kudu.Schema; import org.apache.kudu.master.Master.TableIdentifierPB.Builder; import org.apache.kudu.util.Pair; +import java.util.Collection; +import java.util.List; + /** * RPC to fetch a table's schema */ @@ -38,18 +43,22 @@ import org.apache.kudu.util.Pair; public class GetTableSchemaRequest extends KuduRpc<GetTableSchemaResponse> { private final String id; private final String name; - + private final List<Integer> requiredFeatures; GetTableSchemaRequest(KuduTable masterTable, String id, String name, Timer timer, - long timeoutMillis) { + long timeoutMillis, + boolean requiresAuthzTokenSupport) { super(masterTable, timer, timeoutMillis); Preconditions.checkArgument(id != null ^ name != null, "Only one of table ID or the table name should be provided"); this.id = id; this.name = name; + this.requiredFeatures = requiresAuthzTokenSupport ? + ImmutableList.of(Master.MasterFeatures.GENERATE_AUTHZ_TOKEN_VALUE) : + ImmutableList.<Integer>of(); } @Override @@ -89,8 +98,14 @@ public class GetTableSchemaRequest extends KuduRpc<GetTableSchemaResponse> { schema, respBuilder.getTableId().toStringUtf8(), respBuilder.getNumReplicas(), - ProtobufHelper.pbToPartitionSchema(respBuilder.getPartitionSchema(), schema)); + ProtobufHelper.pbToPartitionSchema(respBuilder.getPartitionSchema(), schema), + respBuilder.hasAuthzToken() ? respBuilder.getAuthzToken() : null); return new Pair<GetTableSchemaResponse, Object>( response, respBuilder.hasError() ? respBuilder.getError() : null); } + + @Override + Collection<Integer> getRequiredFeatures() { + return requiredFeatures; + } } diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaResponse.java index a3d10ab..b018e0a 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaResponse.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaResponse.java @@ -19,6 +19,7 @@ package org.apache.kudu.client; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.kudu.security.Token.SignedTokenPB; import org.apache.kudu.Schema; @InterfaceAudience.Private @@ -28,6 +29,7 @@ public class GetTableSchemaResponse extends KuduRpcResponse { private final PartitionSchema partitionSchema; private final String tableId; private final int numReplicas; + private final SignedTokenPB authzToken; /** * @param elapsedMillis Time in milliseconds since RPC creation to now @@ -36,18 +38,21 @@ public class GetTableSchemaResponse extends KuduRpcResponse { * @param tableId the UUID of the table in the response * @param numReplicas the table's replication factor * @param partitionSchema the table's partition schema + * @param authzToken an authorization token for use with this table */ GetTableSchemaResponse(long elapsedMillis, String tsUUID, Schema schema, String tableId, int numReplicas, - PartitionSchema partitionSchema) { + PartitionSchema partitionSchema, + SignedTokenPB authzToken) { super(elapsedMillis, tsUUID); this.schema = schema; this.partitionSchema = partitionSchema; this.tableId = tableId; this.numReplicas = numReplicas; + this.authzToken = authzToken; } /** @@ -81,4 +86,12 @@ public class GetTableSchemaResponse extends KuduRpcResponse { public int getNumReplicas() { return numReplicas; } + + /** + * Get the authorization token for the table. + * @return the table's authz token + */ + public SignedTokenPB getAuthzToken() { + return authzToken; + } } diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/InvalidAuthzTokenException.java b/java/kudu-client/src/main/java/org/apache/kudu/client/InvalidAuthzTokenException.java new file mode 100644 index 0000000..4adb82f --- /dev/null +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/InvalidAuthzTokenException.java @@ -0,0 +1,40 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.kudu.client; + +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +/** + * Exception for notifying of an invalid authorization token. In most use cases + * in the Kudu Java client code, 'invalid authz token' means 'expired authz + * token'. Receiving this exception means the authorization token used to make a + * request is no longer valid and a new one is needed to make requests that + * access data. + */ [email protected] [email protected] +class InvalidAuthzTokenException extends RecoverableException { + /** + * @param status status object containing the reason for the exception trace + */ + InvalidAuthzTokenException(Status status) { + super(status); + } +} diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java index eb33c80..06cce0b 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java @@ -40,6 +40,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import com.google.protobuf.Message.Builder; import com.stumbleupon.async.Deferred; +import org.apache.kudu.security.Token; import org.apache.yetus.audience.InterfaceAudience; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; @@ -117,6 +118,18 @@ public abstract class KuduRpc<R> { } /** + * Binds the given authorization token to the request. + */ + void bindAuthzToken(Token.SignedTokenPB token) {} + + /** + * Whether the request needs to be authorized via authz token. + */ + boolean needsAuthzToken() { + return false; + } + + /** * The Deferred that will be invoked when this RPC completes or fails. * In case of a successful completion, this Deferred's first callback * will be invoked with an {@link Object} containing the de-serialized diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java index 38ba205..d1d9748 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java @@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; import com.google.protobuf.Message; import com.google.protobuf.UnsafeByteOperations; +import org.apache.kudu.security.Token; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.jboss.netty.util.Timer; @@ -86,6 +87,8 @@ public abstract class Operation extends KuduRpc<OperationResponse> { private PartialRow row; + private Token.SignedTokenPB authzToken; + /** See {@link SessionConfiguration#setIgnoreAllDuplicateRows(boolean)} */ boolean ignoreAllDuplicateRows = false; @@ -149,6 +152,16 @@ public abstract class Operation extends KuduRpc<OperationResponse> { } @Override + boolean needsAuthzToken() { + return true; + } + + @Override + void bindAuthzToken(Token.SignedTokenPB token) { + authzToken = token; + } + + @Override Message createRequestPB() { final Tserver.WriteRequestPB.Builder builder = createAndFillWriteRequestPB(ImmutableList.of(this)); @@ -159,6 +172,9 @@ public abstract class Operation extends KuduRpc<OperationResponse> { if (this.propagatedTimestamp != AsyncKuduClient.NO_TIMESTAMP) { builder.setPropagatedTimestamp(this.propagatedTimestamp); } + if (authzToken != null) { + builder.setAuthzToken(authzToken); + } return builder.build(); } diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java index 8419437..606cfd8 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java @@ -32,6 +32,7 @@ import javax.annotation.Nonnull; import com.google.common.base.Preconditions; import com.google.protobuf.Message; import com.stumbleupon.async.Callback; +import org.apache.kudu.security.Token; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; @@ -181,6 +182,12 @@ class RpcProxy { RpcHeader.RemoteMethodPB.newBuilder() .setServiceName(rpc.serviceName()) .setMethodName(rpc.method())); + // Before we create the request, get an authz token if needed. This is done + // regardless of whether the KuduRpc object already has a token; we may be + // a retrying due to an invalid token and the client may have a new token. + if (rpc.needsAuthzToken()) { + rpc.bindAuthzToken(client.getAuthzToken(rpc.getTable().getTableId())); + } final Message reqPB = rpc.createRequestPB(); // TODO(wdberkeley): We should enforce that every RPC has a timeout. if (rpc.timeoutTracker.hasTimeout()) { @@ -225,7 +232,11 @@ class RpcProxy { connection.getServerInfo()); if (ex != null) { if (ex instanceof InvalidAuthnTokenException) { - client.handleInvalidToken(rpc); + client.handleInvalidAuthnToken(rpc); + return; + } + if (ex instanceof InvalidAuthzTokenException) { + client.handleInvalidAuthzToken(rpc, ex); return; } if (ex instanceof RecoverableException) { diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquire.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthTokenReacquire.java similarity index 60% rename from java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquire.java rename to java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthTokenReacquire.java index 7d332b0..c888bef 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquire.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthTokenReacquire.java @@ -33,6 +33,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import org.apache.kudu.Schema; +import org.apache.kudu.security.Token; import org.apache.kudu.test.cluster.MiniKuduCluster.MiniKuduClusterBuilder; import org.apache.kudu.test.KuduTestHarness; import org.apache.kudu.test.ClientTestUtil; @@ -43,25 +44,30 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * This test contains scenarios to verify that client re-acquires authn token upon expiration - * of the current one and automatically retries the call. + * This test contains scenarios to verify that clients re-acquire tokens upon + * expiration of the current one and automatically retries affected calls. */ -public class TestAuthnTokenReacquire { - private static final Logger LOG = LoggerFactory.getLogger(TestAuthnTokenReacquire.class); +public class TestAuthTokenReacquire { + private static final Logger LOG = LoggerFactory.getLogger(TestAuthTokenReacquire.class); - private static final String TABLE_NAME = "TestAuthnTokenReacquire-table"; + private static final String TABLE_NAME = "TestAuthTokenReacquire-table"; + + // Set a low token timeout. private static final int TOKEN_TTL_SEC = 1; private static final int OP_TIMEOUT_MS = 60 * TOKEN_TTL_SEC * 1000; private static final Schema basicSchema = ClientTestUtil.getBasicSchema(); - // Inject additional INVALID_AUTHENTICATION_TOKEN responses from both the master and tablet - // servers, even for not-yet-expired tokens. + // Inject additional INVALID_AUTHENTICATION_TOKEN responses from both the + // master and tablet servers, even for not-yet-expired tokens. private static final MiniKuduClusterBuilder clusterBuilder = KuduTestHarness.getBaseClusterBuilder() .enableKerberos() .addMasterServerFlag(String.format("--authn_token_validity_seconds=%d", TOKEN_TTL_SEC)) + .addMasterServerFlag(String.format("--authz_token_validity_seconds=%d", TOKEN_TTL_SEC)) .addMasterServerFlag("--rpc_inject_invalid_authn_token_ratio=0.5") - .addTabletServerFlag("--rpc_inject_invalid_authn_token_ratio=0.5"); + .addTabletServerFlag("--rpc_inject_invalid_authn_token_ratio=0.5") + .addTabletServerFlag("--tserver_enforce_access_control=true") + .addTabletServerFlag("--tserver_inject_invalid_authz_token_ratio=0.5"); private KuduClient client; private AsyncKuduClient asyncClient; @@ -81,11 +87,19 @@ public class TestAuthnTokenReacquire { } } - private void dropConnectionsAndExpireToken() throws InterruptedException { + private void dropConnectionsAndExpireTokens() throws InterruptedException { // Drop all connections from the client to Kudu servers. dropConnections(); - // Wait for authn token expiration. - Thread.sleep(TOKEN_TTL_SEC * 1000); + // Wait for token expiration. Since we've just dropped all connections, this + // means that we'll need to get a new authn token upon sending the next RPC. + expireTokens(); + } + + private void expireTokens() throws InterruptedException { + // Sleep long enough for the authn/authz tokens to expire. Wait for just + // past the token TTL to avoid making this test flaky, e.g. in case the + // token just misses being considered expired. + Thread.sleep((TOKEN_TTL_SEC + 1) * 1000); } @Test @@ -100,23 +114,23 @@ public class TestAuthnTokenReacquire { @Override @SuppressWarnings("AssertionFailureIgnored") public void run() { - final String tableName = "TestAuthnTokenReacquire-table-" + threadIdx; + final String tableName = "TestAuthTokenReacquire-table-" + threadIdx; try { ListTabletServersResponse response = client.listTabletServers(); assertNotNull(response); - dropConnectionsAndExpireToken(); + dropConnectionsAndExpireTokens(); ListTablesResponse tableList = client.getTablesList(tableName); assertNotNull(tableList); assertTrue(tableList.getTablesList().isEmpty()); - dropConnectionsAndExpireToken(); + dropConnectionsAndExpireTokens(); client.createTable(tableName, basicSchema, getBasicCreateTableOptions()); - dropConnectionsAndExpireToken(); + dropConnectionsAndExpireTokens(); KuduTable table = client.openTable(tableName); assertEquals(basicSchema.getColumnCount(), table.getSchema().getColumnCount()); - dropConnectionsAndExpireToken(); + dropConnectionsAndExpireTokens(); client.deleteTable(tableName); assertFalse(client.tableExists(tableName)); @@ -126,7 +140,7 @@ public class TestAuthnTokenReacquire { } } }); - thread.run(); + thread.start(); threads.add(thread); } for (Thread thread : threads) { @@ -140,29 +154,65 @@ public class TestAuthnTokenReacquire { } } + private int countRowsInTable(KuduTable table) throws Exception { + AsyncKuduScanner scanner = new AsyncKuduScanner.AsyncKuduScannerBuilder(asyncClient, table) + .scanRequestTimeout(OP_TIMEOUT_MS) + .build(); + return countRowsInScan(scanner); + } + + private void insertRowWithKey(KuduSession session, KuduTable table, int key) throws Exception { + session.apply(createBasicSchemaInsert(table, key)); + session.flush(); + RowErrorsAndOverflowStatus errors = session.getPendingErrors(); + assertFalse(errors.isOverflowed()); + assertEquals(0, session.countPendingErrors()); + } + + @Test public void testBasicWorkflow() throws Exception { KuduTable table = client.createTable(TABLE_NAME, basicSchema, getBasicCreateTableOptions()); - dropConnectionsAndExpireToken(); + String tableId = table.getTableId(); + int key = 0; + // Drop all connections so the first write needs to reconnect with a new authn token. + Token.SignedTokenPB originalToken = asyncClient.securityContext.getAuthenticationToken(); + dropConnectionsAndExpireTokens(); KuduSession session = client.newSession(); session.setTimeoutMillis(OP_TIMEOUT_MS); - session.apply(createBasicSchemaInsert(table, 1)); - session.flush(); - RowErrorsAndOverflowStatus errors = session.getPendingErrors(); - assertFalse(errors.isOverflowed()); - assertEquals(0, session.countPendingErrors()); - dropConnectionsAndExpireToken(); + insertRowWithKey(session, table, ++key); - KuduTable scanTable = client.openTable(TABLE_NAME); - AsyncKuduScanner scanner = new AsyncKuduScanner.AsyncKuduScannerBuilder(asyncClient, scanTable) - .scanRequestTimeout(OP_TIMEOUT_MS) - .build(); - assertEquals(1, countRowsInScan(scanner)); - dropConnectionsAndExpireToken(); + // Verify that we got a different authn token. + assertFalse(asyncClient.securityContext.getAuthenticationToken().equals(originalToken)); + // Now wait for the authz token to expire and do a write. + originalToken = asyncClient.getAuthzToken(tableId); + expireTokens(); + insertRowWithKey(session, table, ++key); + + // Verify that we got a different authz token. + assertFalse(asyncClient.getAuthzToken(tableId).equals(originalToken)); + + // Drop all connections so the first scan needs to reconnect with a new authn token. + originalToken = asyncClient.securityContext.getAuthenticationToken(); + dropConnectionsAndExpireTokens(); + KuduTable scanTable = client.openTable(TABLE_NAME); + assertEquals(key, countRowsInTable(scanTable)); + assertFalse(asyncClient.securityContext.getAuthenticationToken().equals(originalToken)); + + // Now wait for the authz token to expire and do a scan. + originalToken = asyncClient.getAuthzToken(tableId); + expireTokens(); + assertEquals(key, countRowsInTable(scanTable)); + assertFalse(asyncClient.getAuthzToken(tableId).equals(originalToken)); + + // Force the client to get a new authn token and delete the table. + originalToken = asyncClient.securityContext.getAuthenticationToken(); + dropConnectionsAndExpireTokens(); client.deleteTable(TABLE_NAME); assertFalse(client.tableExists(TABLE_NAME)); + assertFalse(asyncClient.securityContext.getAuthenticationToken().equals(originalToken)); } } diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthzTokenCache.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthzTokenCache.java new file mode 100644 index 0000000..0b229a9 --- /dev/null +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthzTokenCache.java @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.client; + +import com.stumbleupon.async.Deferred; +import org.apache.kudu.security.Token; +import org.apache.kudu.test.KuduTestHarness; +import org.apache.kudu.test.cluster.MiniKuduCluster; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static org.apache.kudu.test.ClientTestUtil.getBasicCreateTableOptions; +import static org.apache.kudu.test.ClientTestUtil.getBasicSchema; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class TestAuthzTokenCache { + private static final Logger LOG = LoggerFactory.getLogger(TestAuthzTokenCache.class); + + // This tests basic functionality of the authz token cache (e.g. putting + // things in, getting stuff out). + private static final MiniKuduCluster.MiniKuduClusterBuilder clusterBuilder = + KuduTestHarness.getBaseClusterBuilder() + .enableKerberos(); + + private static final String tableName = "TestAuthzTokenCache-table"; + + private KuduClient client; + private AsyncKuduClient asyncClient; + + @Before + public void setUp() { + client = harness.getClient(); + asyncClient = harness.getAsyncClient(); + } + + @Rule + public KuduTestHarness harness = new KuduTestHarness(clusterBuilder); + + // Retrieves a new authz token from the master (regardless of whether there is + // already one in the authz token cache). + public void fetchAuthzToken(KuduTable table) throws Exception { + // Send a dummy RPC via the token cache. This will run a scan RPC + // after retrieving a new authz token. + AsyncKuduScanner scanner = new AsyncKuduScanner.AsyncKuduScannerBuilder(asyncClient, table) + .build(); + KuduRpc<AsyncKuduScanner.Response> req = scanner.getOpenRequest(); + Deferred<AsyncKuduScanner.Response> d = req.getDeferred(); + asyncClient.getAuthzTokenCache().retrieveAuthzToken(req, + new InvalidAuthzTokenException(Status.IOError("test failure"))); + assertNotNull(d.join()); + } + + @Test + public void testBasicAuthzTokenCache() throws Exception { + // First, do a sanity check that we get an authz token in the first place + // upon accessing a table. + final KuduTable table = client.createTable(tableName, getBasicSchema(), + getBasicCreateTableOptions()); + AuthzTokenCache tokenCache = asyncClient.getAuthzTokenCache(); + String tableId = table.getTableId(); + Token.SignedTokenPB originalToken = asyncClient.getAuthzToken(tableId); + assertNotNull(originalToken); + + // Wait a bit so the next token we get will be different. A different token + // will be generated every second by virtue of having a different + // expiration, which is in seconds. + Thread.sleep(1100); + + // Send a dummy RPC via the token cache, sending it only after getting a new + // authz token. + fetchAuthzToken(table); + + // Verify we actually got a new authz token. + assertFalse(asyncClient.getAuthzToken(tableId).equals(originalToken)); + + // Now put the original token directly in the cache. + tokenCache.put(tableId, originalToken); + assertTrue(asyncClient.getAuthzToken(tableId).equals(originalToken)); + } + + @Test + public void testRetrieveAuthzTokensInParallel() throws Exception { + final KuduTable table = client.createTable(tableName, getBasicSchema(), + getBasicCreateTableOptions()); + final String tableId = table.getTableId(); + class AuthzTokenFetcher implements Callable<Exception> { + @Override + public Exception call() { + try { + fetchAuthzToken(table); + } catch (Exception e) { + return e; + } + return null; + } + } + // Send a bunch of authz token requests in parallel. + final int NUM_THREADS = 30; + ArrayList<AuthzTokenFetcher> fetchers = new ArrayList<>(); + for (int i = 0; i < NUM_THREADS; i++) { + fetchers.add(new AuthzTokenFetcher()); + } + int fails = 0; + final ExecutorService pool = Executors.newFixedThreadPool(NUM_THREADS); + List<Future<Exception>> exceptions = pool.invokeAll(fetchers); + pool.shutdown(); + for (int i = 0; i < NUM_THREADS; i++) { + Exception e = exceptions.get(i).get(); + if (e != null) { + fails++; + e.printStackTrace(); + } + } + assertEquals(0, fails); + // We should have gotten a token with all those retrievals, and sent a + // number of RPCs that was lower than the number of threads. + assertNotNull(asyncClient.getAuthzToken(tableId)); + int numRetrievals = asyncClient.getAuthzTokenCache().numRetrievalsSent(); + LOG.debug(String.format("Sent %d RPCs for %d threads", numRetrievals, NUM_THREADS)); + assertTrue(0 < numRetrievals); + assertTrue(numRetrievals < NUM_THREADS); + } +} diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestMultiMasterAuthzTokens.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestMultiMasterAuthzTokens.java new file mode 100644 index 0000000..931692a --- /dev/null +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestMultiMasterAuthzTokens.java @@ -0,0 +1,184 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.client; + +import org.apache.kudu.test.KuduTestHarness; +import org.apache.kudu.test.cluster.MiniKuduCluster; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static org.apache.kudu.client.SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND; +import static org.apache.kudu.client.SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC; +import static org.apache.kudu.test.ClientTestUtil.*; +import static org.junit.Assert.assertEquals; + +public class TestMultiMasterAuthzTokens { + private static final MiniKuduCluster.MiniKuduClusterBuilder clusterBuilder = + KuduTestHarness.getBaseClusterBuilder() + .addMasterServerFlag("--authz_token_validity_seconds=1") + .addTabletServerFlag("--tserver_enforce_access_control=true") + // Inject invalid tokens such that operations will be forced to go + // back to the master for an authz token. + .addTabletServerFlag("--tserver_inject_invalid_authz_token_ratio=0.5"); + + private static final String tableName = "TestMultiMasterAuthzToken-table"; + + private KuduClient client; + + @Before + public void setUp() { + client = harness.getClient(); + } + + @Rule + public KuduTestHarness harness = new KuduTestHarness(clusterBuilder); + + /** + * Utility to send RPCs to add rows given the specified flush mode. + * Inserts rows with keys [startRow, endRow). + */ + private void insertRows(KuduTable table, SessionConfiguration.FlushMode mode, + int startRow, int endRow) throws Exception { + KuduSession session = client.newSession(); + session.setFlushMode(mode); + for (int i = startRow; i < endRow; i++) { + Insert insert = createBasicSchemaInsert(table, i); + session.apply(insert); + } + session.flush(); + } + + /** + * Utility to send RPCs to add rows given the specified flush mode. + * Upserts rows with keys [startRow, endRow). + */ + private void upsertRows(KuduTable table, SessionConfiguration.FlushMode mode, + int startRow, int endRow) throws Exception { + KuduSession session = client.newSession(); + session.setFlushMode(mode); + for (int i = startRow; i < endRow; i++) { + Upsert upsert = createBasicSchemaUpsert(table, i); + session.apply(upsert); + } + session.flush(); + } + + @Test + public void testAuthzTokensDuringElection() throws Exception { + // Test sending various requests that require authorization. + final KuduTable table = client.createTable(tableName, getBasicSchema(), + getBasicCreateTableOptions().setNumReplicas(1)); + + // Restart the masters to trigger an election. + harness.killAllMasterServers(); + harness.startAllMasterServers(); + + final int NUM_REQS = 10; + insertRows(table, AUTO_FLUSH_SYNC, 0, NUM_REQS); + + // Do the same for batches of inserts. + harness.killAllMasterServers(); + harness.startAllMasterServers(); + insertRows(table, AUTO_FLUSH_BACKGROUND, NUM_REQS, 2 * NUM_REQS); + + // And for scans. + harness.killAllMasterServers(); + harness.startAllMasterServers(); + for (int i = 0; i < NUM_REQS; i++) { + assertEquals(2 * NUM_REQS, countRowsInTable(table)); + } + } + + @Test + public void testAuthzTokenExpiration() throws Exception { + // Test a long-running concurrent workload with different types of requests + // being sent, all the while injecting invalid tokens, with a short authz + // token expiration time. The threads should reacquire tokens as needed + // without surfacing token errors to the client. + final int TEST_RUNTIME_MS = 30000; + final KuduTable table = client.createTable(tableName, getBasicSchema(), + getBasicCreateTableOptions().setNumReplicas(1)); + final CountDownLatch latch = new CountDownLatch(1); + final ExecutorService pool = Executors.newFixedThreadPool(3); + List<Future<Exception>> exceptions = new ArrayList<>(); + exceptions.add(pool.submit(new Callable<Exception>() { + @Override + public Exception call() throws Exception { + try { + int batch = 0; + while (latch.getCount() > 0) { + // Send writes without batching. + upsertRows(table, AUTO_FLUSH_SYNC, batch * 10, (++batch) * 10); + } + } catch (Exception e) { + return e; + } + return null; + } + })); + exceptions.add(pool.submit(new Callable<Exception>() { + @Override + public Exception call() throws Exception { + try { + int batch = 0; + while (latch.getCount() > 0) { + // Also send writes with batching. + upsertRows(table, AUTO_FLUSH_BACKGROUND, batch * 10, (++batch) * 10); + } + } catch (Exception e) { + return e; + } + return null; + } + })); + exceptions.add(pool.submit(new Callable<Exception>() { + @Override + public Exception call() throws Exception { + try { + while (latch.getCount() > 0) { + // We can't guarantee a row count, but catch any exceptions. + countRowsInTable(table); + } + } catch (Exception e) { + return e; + } + return null; + } + })); + Thread.sleep(TEST_RUNTIME_MS); + latch.countDown(); + int fails = 0; + for (Future<Exception> future : exceptions) { + Exception e = future.get(); + if (e != null) { + e.printStackTrace(); + fails++; + } + } + assertEquals(0, fails); + } +} diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java index 9b67a9b..a93ee83 100644 --- a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java +++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java @@ -41,6 +41,7 @@ import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.PartialRow; import org.apache.kudu.client.RowResult; import org.apache.kudu.client.RowResultIterator; +import org.apache.kudu.client.Upsert; import org.apache.kudu.util.DecimalUtil; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -290,6 +291,17 @@ public abstract class ClientTestUtil { session.close(); } + public static Upsert createBasicSchemaUpsert(KuduTable table, int key) { + Upsert upsert = table.newUpsert(); + PartialRow row = upsert.getRow(); + row.addInt(0, key); + row.addInt(1, 3); + row.addInt(2, 4); + row.addString(3, "another string"); + row.addBoolean(4, false); + return upsert; + } + public static Insert createBasicSchemaInsert(KuduTable table, int key) { Insert insert = table.newInsert(); PartialRow row = insert.getRow();
