vjagadish1989 commented on a change in pull request #593: URL: https://github.com/apache/samza/pull/593#discussion_r208808431
########## File path: samza-core/src/main/java/org/apache/samza/table/remote/TableAsyncHelper.java ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.remote; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.function.BiFunction; +import java.util.function.Function; + +import org.apache.samza.metrics.Timer; +import org.apache.samza.storage.kv.Entry; + +import com.google.common.annotations.VisibleForTesting; + + +/** + * Helper class to handle async table IO requests with rate limiting on behalf of the remote tables. + * + * It provides a set of non-blocking execute() methods to ease the implementation of the different + * CRUD operations in table. These async requests are dispatched by a single-threaded executor + * after invoking the rateLimiter. The executor can be configured to have a bounded or unbounded wait + * queue depending on the value of {@code maxRequests}. If it is INT_MAX, unbounded queue is used; + * otherwise it is bounded. Bounded queue is useful to exert back-pressure when outbound requests + * far outpaces inbound responses. Note that with bounded wait queue the execute() APIs can block + * when quota is exceeded. + * + * Optionally, an executor can be specified for invoking the future callbacks which otherwise are + * executed on the threads of the underlying native data store client. This could be useful when + * application might execute long-running operations upon future completions. + * + * Each table instance should create a separate TableAsyncHelper instance unless they can share the + * same rateLimiter instance, ie. tagged the same way. + * + * @param <K> type of the table key + * @param <V> type of the table record + */ +public class TableAsyncHelper<K, V> { + private final TableRateLimiter<K, V> rateLimiter; + private final ExecutorService tableExecutor; + + @VisibleForTesting + final ExecutorService callbackExecutor; + + public TableAsyncHelper(String tableId, TableRateLimiter<K, V> rateLimiter, + ExecutorService tableExecutor, ExecutorService callbackExecutor) { + this.rateLimiter = rateLimiter; + this.callbackExecutor = callbackExecutor; + this.tableExecutor = tableExecutor; + } + + /** + * Decorate a CompletableFuture with result handling. + * @param ioFuture future of the table operation + * @param startNs nano time of the start time + * @param timer latency metric to be updated + * @param <T> return type + * @return CompletableFuture to return to the caller + */ + private <T> CompletableFuture<T> decorateFuture( Review comment: I wonder if it instead scatters complexity across multiple places. In general, readability should not be measured by number of lines of code but by whether the abstractions make sense. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
