Hexiaoqiao commented on code in PR #6868:
URL: https://github.com/apache/hadoop/pull/6868#discussion_r1651044329


##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncForEachRun.java:
##########
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/**
+ * The AsyncForEachRun class is part of the asynchronous operation utilities
+ * within the Hadoop Distributed File System (HDFS) Federation router.
+ * It provides the functionality to perform asynchronous operations on each
+ * element of an Iterator, applying a given async function and then applying
+ * a final transformation function to the results.
+ *
+ * <p>This class is designed to work with other asynchronous interfaces and
+ * utility classes to enable complex asynchronous workflows. It allows for
+ * non-blocking execution of tasks, which can improve the performance and
+ * responsiveness of HDFS operations.</p>
+ *
+ * <p>The class implements the AsyncRun interface, which means it can be used
+ * in asynchronous task chains. It maintains an Iterator of elements to
+ * process, an asyncFunction to apply to each element, and a final
+ * transformation function (thenApply) to produce the final result.</p>
+ *
+ * <p>The run method initiates the asynchronous operation, and the doOnce
+ * method recursively applies the asyncFunction to each element and handles
+ * the results. If the satisfy flag is set, the operation is completed
+ * with the current result.</p>
+ *
+ * <p>AsyncForEachRun is used to implement the following semantics:</p>
+ * <pre>
+ * {@code
+ * for (I element : elements) {
+ *     T res = asyncFunction(element);
+ *     R result = thenApply(element, res);
+ *     if (satisfyCondition(res, result)) {
+ *         break;
+ *     }
+ * }
+ * return result;
+ * }
+ * </pre>
+ *
+ * @param <I> the type of the elements being iterated over
+ * @param <T> the type of the intermediate result from the asyncFunction
+ * @param <R> the type of the final result after applying the thenApply 
function
+ * @see AsyncRun
+ * @see AsyncApplyFunction
+ * @see BiFunction
+ */
+public class AsyncForEachRun<I, T, R> implements AsyncRun<R> {
+
+  private boolean satisfy = false;
+  private Iterator<I> iterator;
+  private I now;
+  private final CompletableFuture<R> result = new CompletableFuture<>();
+  private AsyncApplyFunction<I, T> asyncFunction;
+  private BiFunction<AsyncForEachRun<I, T, R>, T, R> thenApply;
+
+  @Override
+  public void run() {
+    try {
+      doOnce(null);
+    } catch (IOException ioe) {
+      result.completeExceptionally(ioe);
+    }
+    setCurCompletableFuture(result);
+  }
+
+  /**
+   * Performs a single iteration of the asynchronous for-each operation.
+   *
+   * <p>This method is called to process each element of the iterator provided 
to
+   * the {@link AsyncForEachRun} constructor. It applies the asynchronous 
function to
+   * the current element, then applies the 'then' function to the result. If 
the
+   * 'satisfy' condition is met, the iteration is halted, and the current 
result is
+   * used to complete the future. This method is recursive, so it will 
continue to
+   * call itself for the next elements until the iterator is exhausted or the 
satisfy
+   * condition is true.</p>
+   *
+   * @param ret the initial or current result to be passed into the 'then' 
function
+   * @throws IOException if an I/O error occurs while applying the 
asynchronous function
+   * @see #forEach(Iterator)
+   * @see #asyncDo(AsyncApplyFunction)
+   * @see #then(BiFunction)
+   */
+  private void doOnce(R ret) throws IOException {
+    if (!iterator.hasNext()) {
+      result.complete(ret);
+      return;
+    }
+    now = iterator.next();
+    CompletableFuture<T> completableFuture = asyncFunction.async(now);
+    completableFuture.thenApply(t -> {
+      R r = null;
+      try {
+        r = thenApply.apply(AsyncForEachRun.this, t);
+      } catch (IOException e) {
+        result.completeExceptionally(new CompletionException(e));
+        return null;
+      }
+      if (satisfy) {
+        result.complete(r);
+        return null;
+      }
+      try {
+        doOnce(r);
+      } catch (IOException e) {
+        throw new CompletionException(e);
+      }
+      return null;
+    }).exceptionally(e ->
+        result.completeExceptionally(e));
+  }
+
+  /**
+   * Retrieves the current element being processed in the asynchronous 
for-each loop.
+   *
+   * <p>This method provides access to the element that is currently being
+   * operated on within the asynchronous iteration. It can be useful for
+   * inspection, logging, or other purposes that require knowledge of the
+   * current state of the iteration.</p>
+   *
+   * @return the current element of type {@code I} being processed in the 
iterator.
+   * @see #forEach(Iterator)
+   * @see #run()
+   */
+  public I getNow() {

Review Comment:
   The name of this function is not matched with annotation OR some ambiguity 
here? my first feeling it will return timestamp, but implement one abstract 
type returned actually here.



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/ApplyFunction.java:
##########
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+/**
+ * Represents a function that accepts a value of type T and produces a result 
of type R.
+ * This interface extends {@link Async} and provides methods to apply the 
function
+ * asynchronously using {@link CompletableFuture}.
+ *
+ * <p>ApplyFunction is used to implement the following semantics:</p>
+ * <pre>
+ * {@code
+ *    T res = doAsync(input);
+ *    // Can use ApplyFunction
+ *    R result = thenApply(res);
+ * }
+ * </pre>
+ *
+ * @param <T> the type of the input to the function
+ * @param <R> the type of the result of the function
+ */
+@FunctionalInterface
+public interface ApplyFunction<T, R> extends Async<R>{
+
+  /**
+   * Applies this function to the given argument.
+   *
+   * @param t the function argument
+   * @return the function result
+   * @throws IOException if an I/O error occurs
+   */
+  R apply(T t) throws IOException;
+
+  /**
+   * Applies this function asynchronously to the result of the given {@link 
CompletableFuture}.
+   * The function is executed on the same thread as the completion of the 
given future.
+   *
+   * @param in the input future
+   * @return a new future that holds the result of the function application
+   */
+  default CompletableFuture<R> apply(CompletableFuture<T> in) {
+    return in.thenApply(t -> {
+      try {
+        return ApplyFunction.this.apply(t);
+      } catch (IOException e) {
+        throw new CompletionException(e);
+      }
+    });
+  }
+
+  /**
+   * Applies this function asynchronously to the result of the given {@link 
CompletableFuture},
+   * using the specified executor for the asynchronous computation.
+   *
+   * @param in the input future
+   * @param executor the executor to use for the asynchronous computation
+   * @return a new future that holds the result of the function application
+   */
+  default CompletableFuture<R> apply(CompletableFuture<T> in, Executor 
executor) {
+    return in.thenApplyAsync(t -> {
+      try {
+        return ApplyFunction.this.apply(t);
+      } catch (IOException e) {
+        throw new CompletionException(e);
+      }
+    }, executor);
+  }
+

Review Comment:
   Please remove the extra blank line.



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java:
##########
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.function.Function;
+
+import static 
org.apache.hadoop.hdfs.server.federation.router.async.Async.CUR_COMPLETABLE_FUTURE;
+
+/**
+ * The AsyncUtil class provides a collection of utility methods to simplify
+ * the implementation of asynchronous operations using Java's 
CompletableFuture.
+ * It encapsulates common patterns such as applying functions, handling 
exceptions,
+ * and executing tasks in a non-blocking manner. This class is designed to work
+ * with Hadoop's asynchronous router operations in HDFS Federation.
+ *
+ * <p>The utility methods support a fluent-style API, allowing for the 
chaining of
+ * asynchronous operations. For example, after an asynchronous operation 
completes,
+ * a function can be applied to its result, and the process can continue with
+ * the new result. This is particularly useful for complex workflows that 
require
+ * multiple steps, where each step depends on the completion of the previous 
one.</p>
+ *
+ * <p>The class also provides methods to handle exceptions that may occur 
during a
+ * synchronous operation. This ensures that error handling is integrated 
smoothly
+ * into the workflow, allowing for robust and fault-tolerant applications.</p>
+ *
+ * @see CompletableFuture
+ * @see ApplyFunction
+ * @see AsyncApplyFunction
+ * @see AsyncRun
+ * @see AsyncForEachRun
+ * @see CatchFunction
+ * @see AsyncCatchFunction
+ * @see FinallyFunction
+ */
+public final class AsyncUtil {
+  private static final Boolean BOOLEAN_RESULT = false;
+  private static final Long LONG_RESULT = -1L;
+  private static final Object NULL_RESULT = null;
+
+  private AsyncUtil(){}
+
+
+  /**
+   * Provides a default value based on the type specified.
+   *
+   * @param clazz The {@link Class} object representing the type of the value
+   *              to be returned.
+   * @param <R>   The type of the value to be returned.
+   * @return An object with a value determined by the type:
+   *         <ul>
+   *           <li>{@code false} if {@code clazz} is {@link Boolean},
+   *           <li>-1 if {@code clazz} is {@link Long},
+   *           <li>{@code null} for any other type.
+   *         </ul>
+   */
+  public static <R> R asyncReturn(Class<R> clazz) {
+    if (clazz == null) {
+      return null;
+    }
+    if (clazz.equals(Boolean.class)) {
+      return (R) BOOLEAN_RESULT;
+    } else if (clazz.equals(Long.class)) {
+      return (R) LONG_RESULT;
+    }
+    return (R) NULL_RESULT;
+  }
+
+  public static <R> R syncReturn(Class<R> clazz)

Review Comment:
   Suggest to add some javadoc here.



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncApplyFunction.java:
##########
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+/**
+ * The AsyncApplyFunction interface represents a function that
+ * asynchronously accepts a value of type T and produces a result
+ * of type R. This interface extends {@link ApplyFunction} and is
+ * designed to be used with asynchronous computation frameworks,
+ * such as Java's {@link java.util.concurrent.CompletableFuture}.
+ *
+ * <p>An implementation of this interface is expected to perform an
+ * asynchronous operation and return a result, which is typically
+ * represented as a {@code CompletableFuture<R>}. This allows for
+ * non-blocking execution of tasks and is particularly useful for
+ * I/O operations or any operation that may take a significant amount
+ * of time to complete.</p>
+ *
+ * <p>AsyncApplyFunction is used to implement the following semantics:</p>
+ * <pre>
+ * {@code
+ *    T res = doAsync1(input);
+ *    // Can use AsyncApplyFunction
+ *    R result = doAsync2(res);
+ * }
+ * </pre>
+ *
+ * @param <T> the type of the input to the function
+ * @param <R> the type of the result of the function
+ * @see ApplyFunction
+ * @see java.util.concurrent.CompletableFuture
+ */
+@FunctionalInterface
+public interface AsyncApplyFunction<T, R> extends ApplyFunction<T, R> {
+
+  /**
+   * Asynchronously applies this function to the given argument.
+   *
+   * <p>This method is intended to initiate the function application
+   * without waiting for the result. It is typically used when the
+   * result of the operation is not required immediately or when the
+   * operation is part of a larger asynchronous workflow.</p>
+   *
+   * @param t the function argument
+   * @throws IOException if an I/O error occurs during the application
+   *                     of the function
+   */
+  void applyAsync(T t) throws IOException;
+
+  /**
+   * Synchronously applies this function to the given argument and
+   * returns the result.
+   *
+   * <p>This method waits for the asynchronous operation to complete
+   * and returns its result. It is useful when the result is needed
+   * immediately and the calling code cannot proceed without it.</p>
+   *
+   * @param t the function argument
+   * @return the result of applying the function to the argument
+   * @throws IOException if an I/O error occurs during the application
+   *                     of the function
+   */
+  @Override
+  default R apply(T t) throws IOException {
+    applyAsync(t);
+    return result();
+  }
+
+
+  default CompletableFuture<R> async(T t) throws IOException {

Review Comment:
   Suggest to add some javadoc here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to