lidavidm commented on a change in pull request #8780:
URL: https://github.com/apache/arrow/pull/8780#discussion_r532590005



##########
File path: 
java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
##########
@@ -87,6 +89,7 @@
   private final MethodDescriptor<ArrowMessage, Flight.PutResult> 
doPutDescriptor;
   private final MethodDescriptor<ArrowMessage, ArrowMessage> 
doExchangeDescriptor;
   private final List<FlightClientMiddleware.Factory> middleware;
+  private boolean retryFlightCall = false;

Review comment:
       Let's name this something more specifc (`retryOnUnauthorized` perhaps)

##########
File path: 
java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
##########
@@ -388,6 +389,57 @@ public ExchangeReaderWriter doExchange(FlightDescriptor 
descriptor, CallOption..
     }
   }
 
+  /**
+   * Wrapper class to wrap the iterator and handle auth failures and retry.
+   * @param <T> The type of iterator.
+   */
+  public static class WrappedFlightIterator<T> implements Iterator<T> {
+    final Supplier<Iterator<T>> supplier;
+    final Iterator<T> itr;
+    final boolean enableRetry;
+
+    /**
+     * WrappedFlightIterator constructor.
+     * @param supplier The iterator supplier.
+     * @param enableRetry Retry if first call fails.
+     */
+    public WrappedFlightIterator(Supplier<Iterator<T>> supplier, boolean 
enableRetry) {
+      this.supplier = supplier;
+      this.itr = supplier.get();
+      this.enableRetry = enableRetry;
+    }
+
+    @Override
+    public boolean hasNext() {
+      try {
+        return itr.hasNext();
+      } catch (StatusRuntimeException e) {
+        if (enableRetry && e.getStatus().getCode() == 
Status.Code.UNAUTHENTICATED) {
+          // TODO: Retry Iterator from the last known position.

Review comment:
       I _believe_ you don't have to worry about this since you will either get 
the first result or UNAUTHENTICATED. (Or else, you'd have a very odd server 
that explicitly sends an UNAUTHENTICATED after giving you results.)

##########
File path: 
java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
##########
@@ -388,6 +389,57 @@ public ExchangeReaderWriter doExchange(FlightDescriptor 
descriptor, CallOption..
     }
   }
 
+  /**
+   * Wrapper class to wrap the iterator and handle auth failures and retry.
+   * @param <T> The type of iterator.
+   */
+  public static class WrappedFlightIterator<T> implements Iterator<T> {
+    final Supplier<Iterator<T>> supplier;
+    final Iterator<T> itr;
+    final boolean enableRetry;
+
+    /**
+     * WrappedFlightIterator constructor.
+     * @param supplier The iterator supplier.
+     * @param enableRetry Retry if first call fails.
+     */
+    public WrappedFlightIterator(Supplier<Iterator<T>> supplier, boolean 
enableRetry) {
+      this.supplier = supplier;
+      this.itr = supplier.get();
+      this.enableRetry = enableRetry;
+    }
+
+    @Override
+    public boolean hasNext() {
+      try {
+        return itr.hasNext();
+      } catch (StatusRuntimeException e) {
+        if (enableRetry && e.getStatus().getCode() == 
Status.Code.UNAUTHENTICATED) {
+          // TODO: Retry Iterator from the last known position.
+          // TODO: Log the retry explicitly stating the token authentication 
failed and retrying
+          // the failed operation.
+          return supplier.get().hasNext();

Review comment:
       This should update `this.itr` so that subsequent calls use the new call.

##########
File path: 
java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/CredentialCallOption.java
##########
@@ -28,7 +28,11 @@
  * Method option for supplying credentials to method calls.
  */
 public class CredentialCallOption implements CallOptions.GrpcCallOption {
-  private final Consumer<CallHeaders> credentialWriter;
+  public Consumer<CallHeaders> getCredentialWriter() {

Review comment:
       nit: can we keep getters after fields/constructor?

##########
File path: 
java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
##########
@@ -388,6 +389,57 @@ public ExchangeReaderWriter doExchange(FlightDescriptor 
descriptor, CallOption..
     }
   }
 
+  /**
+   * Wrapper class to wrap the iterator and handle auth failures and retry.
+   * @param <T> The type of iterator.
+   */
+  public static class WrappedFlightIterator<T> implements Iterator<T> {

Review comment:
       A general comment in the interests of code reuse; you may be better off 
implementing a generic StreamObserver<T> which can wrap any gRPC call. You'll 
have to change FlightClient to always use the async gRPC stub instead of the 
blocking stub, but then you can handle retries for any gRPC call without having 
to write special-purpose wrappers for each call.

##########
File path: 
java/flight/flight-core/src/main/java/org/apache/arrow/flight/auth2/ClientIncomingAuthHeaderMiddleware.java
##########
@@ -34,27 +34,29 @@
    */
   public static class Factory implements FlightClientMiddleware.Factory {

Review comment:
       This is what I intended in the doc - thank you!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to