vjagadish1989 commented on a change in pull request #593:
URL: https://github.com/apache/samza/pull/593#discussion_r208402510



##########
File path: 
samza-core/src/main/java/org/apache/samza/table/remote/TableAsyncHelper.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.storage.kv.Entry;
+
+import com.google.common.annotations.VisibleForTesting;
+
+
+/**
+ * Helper class to handle async table IO requests with rate limiting on behalf 
of the remote tables.
+ *
+ * It provides a set of non-blocking execute() methods to ease the 
implementation of the different
+ * CRUD operations in table. These async requests are dispatched by a 
single-threaded executor
+ * after invoking the rateLimiter. The executor can be configured to have a 
bounded or unbounded wait
+ * queue depending on the value of {@code maxRequests}. If it is INT_MAX, 
unbounded queue is used;
+ * otherwise it is bounded. Bounded queue is useful to exert back-pressure 
when outbound requests
+ * far outpaces inbound responses. Note that with bounded wait queue the 
execute() APIs can block
+ * when quota is exceeded.
+ *
+ * Optionally, an executor can be specified for invoking the future callbacks 
which otherwise are
+ * executed on the threads of the underlying native data store client. This 
could be useful when
+ * application might execute long-running operations upon future completions.
+ *
+ * Each table instance should create a separate TableAsyncHelper instance 
unless they can share the
+ * same rateLimiter instance, ie. tagged the same way.
+ *
+ * @param <K> type of the table key
+ * @param <V> type of the table record
+ */
+public class TableAsyncHelper<K, V> {
+  private final TableRateLimiter<K, V> rateLimiter;
+  private final ExecutorService tableExecutor;
+
+  @VisibleForTesting
+  final ExecutorService callbackExecutor;
+
+  public TableAsyncHelper(String tableId, TableRateLimiter<K, V> rateLimiter,
+      ExecutorService tableExecutor, ExecutorService callbackExecutor) {
+    this.rateLimiter = rateLimiter;
+    this.callbackExecutor = callbackExecutor;
+    this.tableExecutor = tableExecutor;
+  }
+
+  /**
+   * Decorate a CompletableFuture with result handling.
+   * @param ioFuture future of the table operation
+   * @param startNs nano time of the start time
+   * @param timer latency metric to be updated
+   * @param <T> return type
+   * @return CompletableFuture to return to the caller
+   */
+  private <T> CompletableFuture<T> decorateFuture(
+      CompletableFuture<T> ioFuture, long startNs, Timer timer) {
+    return callbackExecutor != null ?
+        ioFuture.thenApplyAsync(r -> {
+            timer.update(System.nanoTime() - startNs);
+            return r;
+          }, callbackExecutor) :
+        ioFuture.thenApply(r -> {
+            timer.update(System.nanoTime() - startNs);
+            return r;
+          });
+  }
+
+  /**
+   * Execute an async request given a table key
+   * @param key key of the table record
+   * @param method method to be executed
+   * @param timer latency metric to be updated
+   * @param <T> return type
+   * @return CompletableFuture of the operation
+   */
+  public <T> CompletableFuture<T> execute(
+      K key, Function<K, CompletableFuture<T>> method, Timer timer) {
+    final long startNs = System.nanoTime();
+    CompletableFuture<T> ioFuture = rateLimiter.isRateLimited() ?
+        CompletableFuture
+            .runAsync(() -> rateLimiter.throttle(key), tableExecutor)

Review comment:
       Explore combine the throttling + method invocation + timer-update in a 
single-stage for better encapsulation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to