lidavidm commented on a change in pull request #7994: URL: https://github.com/apache/arrow/pull/7994#discussion_r510146622
########## File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ContextAdapter.java ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.grpc; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.arrow.flight.RequestContext; +import org.apache.arrow.flight.auth2.Auth2Constants; + +import io.grpc.Context; + +/** + * Adapter class for gRPC contexts. + */ +public class ContextAdapter implements RequestContext { + // gRPC uses reference equality when looking up keys in a Context. Cache used keys in this static map + // so that look ups can succeed. + private static final Map<String, Context.Key<String>> usedKeys = new HashMap<>(); + private static final Context.Key<String> authkey = Context.keyWithDefault(Auth2Constants.PEER_IDENTITY_KEY, ""); + private Context context = Context.current(); + + /** + * Retrieves the gRPC context. + * @return the gRPC context. + */ + public Context getContext() { + return context; + } + + @Override + public void put(String key, String value) { + context = context.withValue(authkey, value); Review comment: Is it intentional to ignore key here? ########## File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ContextAdapter.java ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.grpc; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.arrow.flight.RequestContext; +import org.apache.arrow.flight.auth2.Auth2Constants; + +import io.grpc.Context; + +/** + * Adapter class for gRPC contexts. + */ +public class ContextAdapter implements RequestContext { + // gRPC uses reference equality when looking up keys in a Context. Cache used keys in this static map + // so that look ups can succeed. + private static final Map<String, Context.Key<String>> usedKeys = new HashMap<>(); + private static final Context.Key<String> authkey = Context.keyWithDefault(Auth2Constants.PEER_IDENTITY_KEY, ""); + private Context context = Context.current(); + + /** + * Retrieves the gRPC context. + * @return the gRPC context. + */ + public Context getContext() { + return context; + } + + @Override + public void put(String key, String value) { + context = context.withValue(authkey, value); + } + + @Override + public String get(String key) { + return authkey.get(context); + } + + private static Context.Key<String> getGrpcKey(String key) { Review comment: This looks unused? ########## File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/auth2/ClientHandshakeWrapper.java ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.auth2; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.apache.arrow.flight.FlightRuntimeException; +import org.apache.arrow.flight.grpc.StatusUtils; +import org.apache.arrow.flight.impl.Flight.HandshakeRequest; +import org.apache.arrow.flight.impl.Flight.HandshakeResponse; +import org.apache.arrow.flight.impl.FlightServiceGrpc.FlightServiceStub; + +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; + +/** + * Utility class for executing a handshake with a FlightServer. + */ +public class ClientHandshakeWrapper { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClientHandshakeWrapper.class); + + /** + * Do handshake for a client. The stub will be authenticated after this method returns. + * + * @param stub The service stub. + */ + public static void doClientHandshake(FlightServiceStub stub) { + final HandshakeObserver observer = new HandshakeObserver(); + try { + observer.requestObserver = stub.handshake(observer); + observer.requestObserver.onNext(HandshakeRequest.newBuilder().build()); + observer.requestObserver.onCompleted(); + try { + if (!observer.completed.get()) { + // TODO: ARROW-5681 + throw new RuntimeException("Unauthenticated"); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw ex; + } catch (ExecutionException ex) { + final FlightRuntimeException wrappedException = StatusUtils.fromThrowable(ex.getCause()); + logger.error("Failed on completing future", wrappedException); Review comment: I don't know if we need to have all these log statements - the application should handle the exception and log if needed right? ########## File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java ########## @@ -176,6 +184,32 @@ public void authenticate(ClientAuthHandler handler, CallOption... options) { authInterceptor.setAuthHandler(handler); } + /** + * Authenticates with a username and password. + * + * @param username the username. + * @param password the password. + * @return a CredentialCallOption containing a bearer token if the server emitted one, or + * empty if no bearer token was returned. This can be used in subsequent API calls. + */ + public Optional<CredentialCallOption> basicHeaderAuthenticate(String username, String password) { Review comment: nit: maybe 'authenticateBasicToken'? ########## File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ContextAdapter.java ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.grpc; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.arrow.flight.CallContext; +import org.apache.arrow.flight.auth.AuthConstants; + +import io.grpc.Context; + +/** + * Adapter class for gRPC contexts. + */ +public class ContextAdapter implements CallContext { + // gRPC uses reference equality when looking up keys in a Context. Cache used keys in this static map + // so that look ups can succeed. + private static final Map<String, Context.Key<String>> usedKeys = new HashMap<>(); Review comment: Thinking about it, would a better implementation be to store a `Map<String, String>` implementing RequestContext inside the gRPC context, and just pull it out of the context wherever needed? That way you don't have to deal with gRPC context keys. (I do like the context key pattern because it is typed, though!) ########## File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/auth2/ClientHandshakeWrapper.java ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.auth2; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.apache.arrow.flight.FlightRuntimeException; +import org.apache.arrow.flight.grpc.StatusUtils; +import org.apache.arrow.flight.impl.Flight.HandshakeRequest; +import org.apache.arrow.flight.impl.Flight.HandshakeResponse; +import org.apache.arrow.flight.impl.FlightServiceGrpc.FlightServiceStub; + +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; + +/** + * Utility class for executing a handshake with a FlightServer. + */ +public class ClientHandshakeWrapper { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClientHandshakeWrapper.class); + + /** + * Do handshake for a client. The stub will be authenticated after this method returns. + * + * @param stub The service stub. + */ + public static void doClientHandshake(FlightServiceStub stub) { + final HandshakeObserver observer = new HandshakeObserver(); + try { + observer.requestObserver = stub.handshake(observer); + observer.requestObserver.onNext(HandshakeRequest.newBuilder().build()); + observer.requestObserver.onCompleted(); + try { + if (!observer.completed.get()) { Review comment: Ah but then again, the timeout will be set on the gRPC stub so this shouldn't block forever. ########## File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/auth2/BasicCallHeaderAuthenticator.java ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.auth2; + +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Optional; + +import org.apache.arrow.flight.CallHeaders; +import org.apache.arrow.flight.CallStatus; +import org.apache.arrow.flight.FlightRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A ServerAuthHandler for username/password authentication. + */ +public class BasicCallHeaderAuthenticator implements CallHeaderAuthenticator { + + private static final Logger logger = LoggerFactory.getLogger(BasicCallHeaderAuthenticator.class); + private final BasicAuthValidator authValidator; + + public BasicCallHeaderAuthenticator(BasicAuthValidator authValidator) { + super(); + this.authValidator = authValidator; + } + + @Override + public AuthResult authenticate(CallHeaders headers) { + final String authEncoded = AuthUtilities.getValueFromAuthHeader(headers, Auth2Constants.BASIC_PREFIX); + if (authEncoded == null) { + throw CallStatus.UNAUTHENTICATED.toRuntimeException(); + } + + try { + // The value has the format Base64(<username>:<password>) + final String authDecoded = new String(Base64.getDecoder().decode(authEncoded), StandardCharsets.UTF_8); + final int colonPos = authDecoded.indexOf(':'); + if (colonPos == -1) { + throw CallStatus.UNAUTHORIZED.toRuntimeException(); + } + + final String user = authDecoded.substring(0, colonPos); + final String password = authDecoded.substring(colonPos + 1); + final Optional<String> bearerToken = authValidator.validateCredentials(user, password); + return new AuthResult() { + @Override + public String getPeerIdentity() { + return user; + } + + @Override + public Optional<String> getBearerToken() { + return bearerToken; + } + }; + + } catch (UnsupportedEncodingException ex) { + throw CallStatus.INTERNAL.withCause(ex).toRuntimeException(); + } catch (FlightRuntimeException ex) { + throw ex; + } catch (Exception ex) { + throw CallStatus.UNAUTHORIZED.withCause(ex).toRuntimeException(); + } + } + + @Override + public boolean validateBearer(String bearerToken) { + return false; Review comment: Should this delegate to authValidator? ########## File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/RequestContext.java ########## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight; + +/** + * Tracks variables about the current request. + */ +public interface RequestContext { + /** + * Register a variable and a value. + * @param key the variable name. + * @param value the value. + */ + void put(String key, String value); + + /** + * Retrieve a registered variable. + * @param key the variable name. + * @return the value, or empty string if not found. + */ + String get(String key); +} Review comment: Might it be useful to also provide a `keySet()` method, even if we don't want the full Map interface? ########## File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ContextAdapter.java ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.grpc; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.arrow.flight.RequestContext; +import org.apache.arrow.flight.auth2.Auth2Constants; + +import io.grpc.Context; + +/** + * Adapter class for gRPC contexts. + */ +public class ContextAdapter implements RequestContext { + // gRPC uses reference equality when looking up keys in a Context. Cache used keys in this static map + // so that look ups can succeed. + private static final Map<String, Context.Key<String>> usedKeys = new HashMap<>(); + private static final Context.Key<String> authkey = Context.keyWithDefault(Auth2Constants.PEER_IDENTITY_KEY, ""); + private Context context = Context.current(); + + /** + * Retrieves the gRPC context. + * @return the gRPC context. + */ + public Context getContext() { + return context; + } + + @Override + public void put(String key, String value) { + context = context.withValue(authkey, value); Review comment: Ditto in `get` below. ########## File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/auth2/ClientHandshakeWrapper.java ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.auth2; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.apache.arrow.flight.FlightRuntimeException; +import org.apache.arrow.flight.grpc.StatusUtils; +import org.apache.arrow.flight.impl.Flight.HandshakeRequest; +import org.apache.arrow.flight.impl.Flight.HandshakeResponse; +import org.apache.arrow.flight.impl.FlightServiceGrpc.FlightServiceStub; + +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; + +/** + * Utility class for executing a handshake with a FlightServer. + */ +public class ClientHandshakeWrapper { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClientHandshakeWrapper.class); + + /** + * Do handshake for a client. The stub will be authenticated after this method returns. + * + * @param stub The service stub. + */ + public static void doClientHandshake(FlightServiceStub stub) { + final HandshakeObserver observer = new HandshakeObserver(); + try { + observer.requestObserver = stub.handshake(observer); + observer.requestObserver.onNext(HandshakeRequest.newBuilder().build()); + observer.requestObserver.onCompleted(); + try { + if (!observer.completed.get()) { + // TODO: ARROW-5681 + throw new RuntimeException("Unauthenticated"); Review comment: Maybe throw a CallStatus.UNAUTHENTICATED here? ########## File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/auth2/ClientHandshakeWrapper.java ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.auth2; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.apache.arrow.flight.FlightRuntimeException; +import org.apache.arrow.flight.grpc.StatusUtils; +import org.apache.arrow.flight.impl.Flight.HandshakeRequest; +import org.apache.arrow.flight.impl.Flight.HandshakeResponse; +import org.apache.arrow.flight.impl.FlightServiceGrpc.FlightServiceStub; + +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; + +/** + * Utility class for executing a handshake with a FlightServer. + */ +public class ClientHandshakeWrapper { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClientHandshakeWrapper.class); + + /** + * Do handshake for a client. The stub will be authenticated after this method returns. + * + * @param stub The service stub. + */ + public static void doClientHandshake(FlightServiceStub stub) { + final HandshakeObserver observer = new HandshakeObserver(); + try { + observer.requestObserver = stub.handshake(observer); + observer.requestObserver.onNext(HandshakeRequest.newBuilder().build()); + observer.requestObserver.onCompleted(); + try { + if (!observer.completed.get()) { + // TODO: ARROW-5681 + throw new RuntimeException("Unauthenticated"); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw ex; + } catch (ExecutionException ex) { + final FlightRuntimeException wrappedException = StatusUtils.fromThrowable(ex.getCause()); + logger.error("Failed on completing future", wrappedException); + throw wrappedException; + } + } catch (StatusRuntimeException sre) { + logger.error("Failed with SREe", sre); + throw StatusUtils.fromGrpcRuntimeException(sre); + } catch (Throwable ex) { + logger.error("Failed with unknown", ex); + if (ex instanceof FlightRuntimeException) { + throw (FlightRuntimeException) ex; + } + throw StatusUtils.fromThrowable(ex); + } + } + + private static class HandshakeObserver implements StreamObserver<HandshakeResponse> { + + private volatile StreamObserver<HandshakeRequest> requestObserver; + private final CompletableFuture<Boolean> completed; + + public HandshakeObserver() { + super(); + completed = new CompletableFuture<>(); + } + + @Override + public void onNext(HandshakeResponse value) { + logger.debug("Got HandshakeResponse"); Review comment: Ditto about logs - we shouldn't log things unless they're really going to be useful for an application developer. ########## File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ContextAdapter.java ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.grpc; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.arrow.flight.CallContext; +import org.apache.arrow.flight.auth.AuthConstants; + +import io.grpc.Context; + +/** + * Adapter class for gRPC contexts. + */ +public class ContextAdapter implements CallContext { + // gRPC uses reference equality when looking up keys in a Context. Cache used keys in this static map + // so that look ups can succeed. + private static final Map<String, Context.Key<String>> usedKeys = new HashMap<>(); Review comment: Also of note, this should at least be a ConcurrentMap since it's static and shared by multiple requests. ########## File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/auth2/ClientHandshakeWrapper.java ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.auth2; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.apache.arrow.flight.FlightRuntimeException; +import org.apache.arrow.flight.grpc.StatusUtils; +import org.apache.arrow.flight.impl.Flight.HandshakeRequest; +import org.apache.arrow.flight.impl.Flight.HandshakeResponse; +import org.apache.arrow.flight.impl.FlightServiceGrpc.FlightServiceStub; + +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; + +/** + * Utility class for executing a handshake with a FlightServer. + */ +public class ClientHandshakeWrapper { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClientHandshakeWrapper.class); + + /** + * Do handshake for a client. The stub will be authenticated after this method returns. + * + * @param stub The service stub. + */ + public static void doClientHandshake(FlightServiceStub stub) { + final HandshakeObserver observer = new HandshakeObserver(); + try { + observer.requestObserver = stub.handshake(observer); + observer.requestObserver.onNext(HandshakeRequest.newBuilder().build()); + observer.requestObserver.onCompleted(); + try { + if (!observer.completed.get()) { Review comment: It might be good to accept a timeout for this. (CallOptions... in FlightClient#handshake already has a timeout so it would be nice to respect that.) ########## File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/auth2/ClientBearerTokenMiddleware.java ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.auth2; + +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.arrow.flight.CallHeaders; +import org.apache.arrow.flight.CallInfo; +import org.apache.arrow.flight.CallStatus; +import org.apache.arrow.flight.FlightClientMiddleware; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Middleware for capturing bearer tokens sent back from the Flight server. + */ +public class ClientBearerTokenMiddleware implements FlightClientMiddleware { + private static final Logger logger = LoggerFactory.getLogger(ClientBearerTokenMiddleware.class); + + private final Factory factory; + + /** + * Factory used within FlightClient. + */ + public static class Factory implements FlightClientMiddleware.Factory { + private final AtomicReference<String> bearerToken = new AtomicReference<>(); + + @Override + public FlightClientMiddleware onCallStarted(CallInfo info) { + logger.debug("Call name: {}", info.method().name()); Review comment: Let's not log this. ########## File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/RequestContext.java ########## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight; + +/** + * Tracks variables about the current request. + */ +public interface RequestContext { + /** + * Register a variable and a value. + * @param key the variable name. + * @param value the value. + */ + void put(String key, String value); + + /** + * Retrieve a registered variable. + * @param key the variable name. + * @return the value, or empty string if not found. + */ + String get(String key); +} Review comment: And/or containsKey and delete. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
