SAMZA-2043: Consolidate ReadableTable and ReadWriteTable So far we've not seen a lot of use in maintaining separate implementation for ReadableTable and ReadWriteTable, which adds quite a bit complexity. Hence consolidating them.
Author: Wei Song <[email protected]> Reviewers: Xinyu Liu <[email protected]> Closes #861 from weisong44/SAMZA-2043 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6a75503d Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6a75503d Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6a75503d Branch: refs/heads/master Commit: 6a75503d74ae65b30e2dcf760bba6e1d8050cdba Parents: c5348bf Author: Wei Song <[email protected]> Authored: Mon Dec 17 15:11:27 2018 -0800 Committer: Wei Song <[email protected]> Committed: Mon Dec 17 15:11:27 2018 -0800 ---------------------------------------------------------------------- .../org/apache/samza/context/TaskContext.java | 16 +- .../org/apache/samza/table/ReadWriteTable.java | 53 ++- .../org/apache/samza/table/ReadableTable.java | 86 ---- .../main/java/org/apache/samza/table/Table.java | 2 - .../org/apache/samza/table/TableProvider.java | 4 +- .../table/descriptors/BaseTableDescriptor.java | 1 + .../table/descriptors/LocalTableDescriptor.java | 1 + .../descriptors/RemoteTableDescriptor.java | 1 + .../apache/samza/table/utils/SerdeUtils.java | 1 + .../table/remote/TestTableRateLimiter.java | 2 +- .../apache/samza/context/TaskContextImpl.java | 4 +- .../operators/impl/SendToTableOperatorImpl.java | 2 +- .../impl/StreamTableJoinOperatorImpl.java | 6 +- .../apache/samza/table/BaseReadWriteTable.java | 69 +++ .../apache/samza/table/BaseReadableTable.java | 75 --- .../org/apache/samza/table/TableManager.java | 4 +- .../samza/table/caching/CachingTable.java | 108 ++--- .../table/caching/CachingTableProvider.java | 8 +- .../table/caching/guava/GuavaCacheTable.java | 4 +- .../caching/guava/GuavaCacheTableProvider.java | 4 +- .../table/remote/RemoteReadWriteTable.java | 244 ---------- .../samza/table/remote/RemoteReadableTable.java | 246 ---------- .../apache/samza/table/remote/RemoteTable.java | 436 ++++++++++++++++++ .../samza/table/remote/RemoteTableProvider.java | 68 ++- .../apache/samza/table/utils/TableMetrics.java | 77 ++++ .../samza/table/utils/TableReadMetrics.java | 54 --- .../samza/table/utils/TableWriteMetrics.java | 60 --- .../impl/TestStreamTableJoinOperatorImpl.java | 4 +- .../apache/samza/table/TestTableManager.java | 4 +- .../samza/table/caching/TestCachingTable.java | 15 +- .../descriptors/TestLocalTableDescriptor.java | 4 +- .../table/remote/TestRemoteReadWriteTable.java | 458 ------------------- .../samza/table/remote/TestRemoteTable.java | 456 ++++++++++++++++++ .../descriptors/TestRemoteTableDescriptor.java | 8 +- .../retry/TestRetriableTableFunctions.java | 4 +- .../inmemory/TestInMemoryTableDescriptor.java | 4 +- .../descriptors/TestRocksDbTableDescriptor.java | 4 +- .../samza/storage/kv/LocalReadWriteTable.java | 154 ------- .../samza/storage/kv/LocalReadableTable.java | 108 ----- .../org/apache/samza/storage/kv/LocalTable.java | 213 +++++++++ .../samza/storage/kv/LocalTableProvider.java | 7 +- .../storage/kv/TestLocalReadWriteTable.java | 247 ---------- .../storage/kv/TestLocalReadableTable.java | 155 ------- .../storage/kv/TestLocalTableProvider.java | 2 +- .../samza/storage/kv/TestLocalTableRead.java | 155 +++++++ .../samza/storage/kv/TestLocalTableWrite.java | 247 ++++++++++ .../framework/StreamTaskIntegrationTest.java | 2 +- .../apache/samza/test/table/TestLocalTable.java | 362 --------------- .../test/table/TestLocalTableEndToEnd.java | 361 +++++++++++++++ .../samza/test/table/TestRemoteTable.java | 288 ------------ .../test/table/TestRemoteTableEndToEnd.java | 310 +++++++++++++ 51 files changed, 2509 insertions(+), 2699 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/context/TaskContext.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/context/TaskContext.java b/samza-api/src/main/java/org/apache/samza/context/TaskContext.java index cdf7404..8adfcea 100644 --- a/samza-api/src/main/java/org/apache/samza/context/TaskContext.java +++ b/samza-api/src/main/java/org/apache/samza/context/TaskContext.java @@ -25,8 +25,6 @@ import org.apache.samza.scheduler.CallbackScheduler; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.table.ReadWriteTable; -import org.apache.samza.table.ReadableTable; -import org.apache.samza.table.Table; /** @@ -63,17 +61,15 @@ public interface TaskContext { KeyValueStore<?, ?> getStore(String storeName); /** - * Gets the {@link Table} corresponding to the {@code tableId} for this task. + * Gets the {@link ReadWriteTable} corresponding to the {@code tableId} for this task. * - * The returned table should be cast with the concrete type parameters based on the configured table serdes, and - * whether it is {@link ReadWriteTable} or {@link ReadableTable}. E.g., if using string key and integer value - * serde for a writable table, it should be cast to a {@code ReadWriteTable<String, Integer>}. - * - * @param tableId id of the {@link Table} to get - * @return the {@link Table} associated with {@code tableId} for this task + * @param tableId id of the {@link ReadWriteTable} to get + * @param <K> the type of the key in this table + * @param <V> the type of the value in this table + * @return the {@link ReadWriteTable} associated with {@code tableId} for this task * @throws IllegalArgumentException if there is no table associated with {@code tableId} */ - Table<?> getTable(String tableId); + <K, V> ReadWriteTable<K, V> getTable(String tableId); /** * Gets the {@link CallbackScheduler} for this task, which can be used to schedule a callback to be executed http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java b/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java index 083a1b5..ffb87a4 100644 --- a/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java +++ b/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java @@ -19,9 +19,11 @@ package org.apache.samza.table; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.context.Context; import org.apache.samza.storage.kv.Entry; /** @@ -32,7 +34,51 @@ import org.apache.samza.storage.kv.Entry; * @param <V> the type of the value in this table */ @InterfaceStability.Unstable -public interface ReadWriteTable<K, V> extends ReadableTable<K, V> { +public interface ReadWriteTable<K, V> extends Table { + + /** + * Initializes the table during container initialization. + * Guaranteed to be invoked as the first operation on the table. + * @param context {@link Context} corresponding to this table + */ + default void init(Context context) { + } + + /** + * Gets the value associated with the specified {@code key}. + * + * @param key the key with which the associated value is to be fetched. + * @return if found, the value associated with the specified {@code key}; otherwise, {@code null}. + * @throws NullPointerException if the specified {@code key} is {@code null}. + */ + V get(K key); + + /** + * Asynchronously gets the value associated with the specified {@code key}. + * + * @param key the key with which the associated value is to be fetched. + * @return completableFuture for the requested value + * @throws NullPointerException if the specified {@code key} is {@code null}. + */ + CompletableFuture<V> getAsync(K key); + + /** + * Gets the values with which the specified {@code keys} are associated. + * + * @param keys the keys with which the associated values are to be fetched. + * @return a map of the keys that were found and their respective values. + * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}. + */ + Map<K, V> getAll(List<K> keys); + + /** + * Asynchronously gets the values with which the specified {@code keys} are associated. + * + * @param keys the keys with which the associated values are to be fetched. + * @return completableFuture for the requested entries + * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}. + */ + CompletableFuture<Map<K, V>> getAllAsync(List<K> keys); /** * Updates the mapping of the specified key-value pair; @@ -114,4 +160,9 @@ public interface ReadWriteTable<K, V> extends ReadableTable<K, V> { * Flushes the underlying store of this table, if applicable. */ void flush(); + + /** + * Close the table and release any resources acquired + */ + void close(); } http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java b/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java deleted file mode 100644 index 6c88fd3..0000000 --- a/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.table; - -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.context.Context; -import org.apache.samza.operators.KV; - - -/** - * - * A table that supports get by one or more keys - * - * @param <K> the type of the record key in this table - * @param <V> the type of the record value in this table - */ [email protected] -public interface ReadableTable<K, V> extends Table<KV<K, V>> { - /** - * Initializes the table during container initialization. - * Guaranteed to be invoked as the first operation on the table. - * @param context {@link Context} corresponding to this table - */ - default void init(Context context) { - } - - /** - * Gets the value associated with the specified {@code key}. - * - * @param key the key with which the associated value is to be fetched. - * @return if found, the value associated with the specified {@code key}; otherwise, {@code null}. - * @throws NullPointerException if the specified {@code key} is {@code null}. - */ - V get(K key); - - /** - * Asynchronously gets the value associated with the specified {@code key}. - * - * @param key the key with which the associated value is to be fetched. - * @return completableFuture for the requested value - * @throws NullPointerException if the specified {@code key} is {@code null}. - */ - CompletableFuture<V> getAsync(K key); - - /** - * Gets the values with which the specified {@code keys} are associated. - * - * @param keys the keys with which the associated values are to be fetched. - * @return a map of the keys that were found and their respective values. - * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}. - */ - Map<K, V> getAll(List<K> keys); - - /** - * Asynchronously gets the values with which the specified {@code keys} are associated. - * - * @param keys the keys with which the associated values are to be fetched. - * @return completableFuture for the requested entries - * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}. - */ - CompletableFuture<Map<K, V>> getAllAsync(List<K> keys); - - /** - * Close the table and release any resources acquired - */ - void close(); -} http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/Table.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/Table.java b/samza-api/src/main/java/org/apache/samza/table/Table.java index 234d15b..c454012 100644 --- a/samza-api/src/main/java/org/apache/samza/table/Table.java +++ b/samza-api/src/main/java/org/apache/samza/table/Table.java @@ -36,8 +36,6 @@ import org.apache.samza.task.InitableTask; * hybrid tables. For remote data sources, a {@code RemoteTable} provides optimized access with caching, rate-limiting, * and retry support. * <p> - * Depending on the implementation, a {@link Table} can be a {@link ReadableTable} or a {@link ReadWriteTable}. - * <p> * Use a {@link TableDescriptor} to specify the properties of a {@link Table}. For High Level API * {@link StreamApplication}s, use {@link StreamApplicationDescriptor#getTable} to obtain the {@link Table} instance for * the descriptor that can be used with the {@link MessageStream} operators like {@link MessageStream#sendTo(Table)}. http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/TableProvider.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java index 2dec989..36cad2e 100644 --- a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java +++ b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java @@ -34,10 +34,10 @@ public interface TableProvider { void init(Context context); /** - * Get an instance of the table for read/write operations + * Get an instance of the {@link ReadWriteTable} * @return the underlying table */ - Table getTable(); + ReadWriteTable getTable(); /** * Shutdown the underlying table http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java index 26c2ae3..52eca5f 100644 --- a/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java +++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java @@ -56,6 +56,7 @@ abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K, * @param value the value * @return this table descriptor instance */ + @SuppressWarnings("unchecked") public D withConfig(String key, String value) { config.put(key, value); return (D) this; http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java index 1ebb580..1623710 100644 --- a/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java +++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java @@ -39,6 +39,7 @@ import org.apache.samza.table.utils.SerdeUtils; * @param <V> the type of the value in this table * @param <D> the type of the concrete table descriptor */ +@SuppressWarnings("unchecked") abstract public class LocalTableDescriptor<K, V, D extends LocalTableDescriptor<K, V, D>> extends BaseTableDescriptor<K, V, D> { http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java index 7286004..4b15c47 100644 --- a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java +++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java @@ -235,6 +235,7 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot if (!tagCreditsMap.isEmpty()) { RateLimiter defaultRateLimiter; try { + @SuppressWarnings("unchecked") Class<? extends RateLimiter> clazz = (Class<? extends RateLimiter>) Class.forName(DEFAULT_RATE_LIMITER_CLASS_NAME); Constructor<? extends RateLimiter> ctor = clazz.getConstructor(Map.class); defaultRateLimiter = ctor.newInstance(tagCreditsMap); http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java b/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java index a7b66e5..338baf4 100644 --- a/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java +++ b/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java @@ -54,6 +54,7 @@ public final class SerdeUtils { * @return deserialized object instance * @param <T> type of the object */ + @SuppressWarnings("unchecked") public static <T> T deserialize(String name, String strObject) { try { byte [] bytes = Base64.getDecoder().decode(strObject); http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java b/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java index ea9acbd..3235d5a 100644 --- a/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java +++ b/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java @@ -25,9 +25,9 @@ import java.util.Collections; import org.apache.samza.metrics.Timer; import org.apache.samza.storage.kv.Entry; import org.apache.samza.util.RateLimiter; +import org.junit.Assert; import org.junit.Test; -import junit.framework.Assert; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyMap; http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java b/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java index ec52f8a..a29c2b3 100644 --- a/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java +++ b/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java @@ -26,7 +26,7 @@ import org.apache.samza.scheduler.CallbackScheduler; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.system.StreamMetadataCache; import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.table.Table; +import org.apache.samza.table.ReadWriteTable; import org.apache.samza.table.TableManager; import java.util.HashMap; @@ -83,7 +83,7 @@ public class TaskContextImpl implements TaskContext { } @Override - public Table getTable(String tableId) { + public <K, V> ReadWriteTable<K, V> getTable(String tableId) { return this.tableManager.getTable(tableId); } http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java index 0d39c1b..6d84b17 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java @@ -44,7 +44,7 @@ public class SendToTableOperatorImpl<K, V> extends OperatorImpl<KV<K, V>, Void> SendToTableOperatorImpl(SendToTableOperatorSpec<K, V> sendToTableOpSpec, Context context) { this.sendToTableOpSpec = sendToTableOpSpec; - this.table = (ReadWriteTable) context.getTaskContext().getTable(sendToTableOpSpec.getTableId()); + this.table = context.getTaskContext().getTable(sendToTableOpSpec.getTableId()); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java index 96f07d1..e3fc266 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java @@ -22,7 +22,7 @@ import org.apache.samza.context.Context; import org.apache.samza.operators.KV; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec; -import org.apache.samza.table.ReadableTable; +import org.apache.samza.table.ReadWriteTable; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskCoordinator; @@ -42,11 +42,11 @@ import java.util.Collections; class StreamTableJoinOperatorImpl<K, M, R extends KV, JM> extends OperatorImpl<M, JM> { private final StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec; - private final ReadableTable<K, ?> table; + private final ReadWriteTable<K, ?> table; StreamTableJoinOperatorImpl(StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec, Context context) { this.joinOpSpec = joinOpSpec; - this.table = (ReadableTable) context.getTaskContext().getTable(joinOpSpec.getTableId()); + this.table = context.getTaskContext().getTable(joinOpSpec.getTableId()); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/BaseReadWriteTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/BaseReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/BaseReadWriteTable.java new file mode 100644 index 0000000..cef224d --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/BaseReadWriteTable.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.table; + +import com.google.common.base.Preconditions; +import org.apache.samza.config.MetricsConfig; +import org.apache.samza.context.Context; +import org.apache.samza.table.utils.TableMetrics; +import org.apache.samza.util.HighResolutionClock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Base class for a concrete table implementation + * + * @param <K> the type of the key in this table + * @param <V> the type of the value in this table + */ +abstract public class BaseReadWriteTable<K, V> implements ReadWriteTable<K, V> { + + protected final Logger logger; + + protected final String tableId; + + protected TableMetrics metrics; + + protected HighResolutionClock clock; + + /** + * Construct an instance + * @param tableId Id of the table + */ + public BaseReadWriteTable(String tableId) { + Preconditions.checkArgument(tableId != null & !tableId.isEmpty(), + String.format("Invalid table Id: %s", tableId)); + this.tableId = tableId; + this.logger = LoggerFactory.getLogger(getClass().getName() + "." + tableId); + } + + @Override + public void init(Context context) { + MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig()); + clock = metricsConfig.getMetricsTimerEnabled() + ? () -> System.nanoTime() + : () -> 0L; + metrics = new TableMetrics(context, this, tableId); + } + + public String getTableId() { + return tableId; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java deleted file mode 100644 index 1dfd54c..0000000 --- a/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.table; - -import com.google.common.base.Preconditions; -import org.apache.samza.config.MetricsConfig; -import org.apache.samza.context.Context; -import org.apache.samza.table.utils.TableReadMetrics; -import org.apache.samza.table.utils.TableWriteMetrics; -import org.apache.samza.util.HighResolutionClock; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Base class for all readable tables - * - * @param <K> the type of the key in this table - * @param <V> the type of the value in this table - */ -abstract public class BaseReadableTable<K, V> implements ReadableTable<K, V> { - - protected final Logger logger; - - protected final String tableId; - - protected TableReadMetrics readMetrics; - protected TableWriteMetrics writeMetrics; - - protected HighResolutionClock clock; - - /** - * Construct an instance - * @param tableId Id of the table - */ - public BaseReadableTable(String tableId) { - Preconditions.checkArgument(tableId != null & !tableId.isEmpty(), - String.format("Invalid table Id: %s", tableId)); - this.tableId = tableId; - this.logger = LoggerFactory.getLogger(getClass().getName() + "." + tableId); - } - - @Override - public void init(Context context) { - MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig()); - clock = metricsConfig.getMetricsTimerEnabled() - ? () -> System.nanoTime() - : () -> 0L; - - readMetrics = new TableReadMetrics(context, this, tableId); - if (this instanceof ReadWriteTable) { - writeMetrics = new TableWriteMetrics(context, this, tableId); - } - } - - public String getTableId() { - return tableId; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/TableManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/TableManager.java b/samza-core/src/main/java/org/apache/samza/table/TableManager.java index d3ba771..5a3777e 100644 --- a/samza-core/src/main/java/org/apache/samza/table/TableManager.java +++ b/samza-core/src/main/java/org/apache/samza/table/TableManager.java @@ -54,7 +54,7 @@ public class TableManager { static class TableCtx { private TableProvider tableProvider; - private Table table; + private ReadWriteTable table; } private final Logger logger = LoggerFactory.getLogger(TableManager.class.getName()); @@ -110,7 +110,7 @@ public class TableManager { * @param tableId Id of the table * @return table instance */ - public Table getTable(String tableId) { + public ReadWriteTable getTable(String tableId) { Preconditions.checkState(initialized, "TableManager has not been initialized."); TableCtx ctx = tableContexts.get(tableId); http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java index e63bf61..2fde79a 100644 --- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java @@ -23,9 +23,8 @@ import com.google.common.base.Preconditions; import org.apache.samza.SamzaException; import org.apache.samza.context.Context; import org.apache.samza.storage.kv.Entry; -import org.apache.samza.table.BaseReadableTable; +import org.apache.samza.table.BaseReadWriteTable; import org.apache.samza.table.ReadWriteTable; -import org.apache.samza.table.ReadableTable; import org.apache.samza.table.utils.TableMetricsUtil; import java.util.ArrayList; @@ -41,34 +40,32 @@ import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer; /** - * A composite table incorporating a cache with a Samza table. The cache is + * A hybrid table incorporating a cache with a Samza table. The cache is * represented as a {@link ReadWriteTable}. * - * The intented use case is to optimize the latency of accessing the actual table, eg. + * The intented use case is to optimize the latency of accessing the actual table, e.g. * remote tables, when eventual consistency between cache and table is acceptable. * The cache is expected to support TTL such that the values can be refreshed at some * point. * - * If the actual table is read-write table, CachingTable supports both write-through - * and write-around (writes bypassing cache) policies. For write-through policy, it - * supports read-after-write semantics because the value is cached after written to - * the table. + * {@link CachingTable} supports write-through and write-around (writes bypassing cache) policies. + * For write-through policy, it supports read-after-write semantics because the value is + * cached after written to the table. * - * Note that there is no synchronization in CachingTable because it is impossible to + * Note that there is no synchronization in {@link CachingTable} because it is impossible to * implement a critical section between table read/write and cache update in the async * code paths without serializing all async operations for the same keys. Given stale - * data is a presumed trade off for using a cache for table, it should be acceptable - * for the data in table and cache are out-of-sync. Moreover, unsynchronized operations - * in CachingTable also deliver higher performance when there is contention. + * data is a presumed trade-off for using a cache with table, it should be acceptable + * for the data in table and cache to be temporarily out-of-sync. Moreover, unsynchronized + * operations in {@link CachingTable} also deliver higher performance when there is contention. * * @param <K> type of the table key * @param <V> type of the table value */ -public class CachingTable<K, V> extends BaseReadableTable<K, V> +public class CachingTable<K, V> extends BaseReadWriteTable<K, V> implements ReadWriteTable<K, V> { - private final ReadableTable<K, V> rdTable; - private final ReadWriteTable<K, V> rwTable; + private final ReadWriteTable<K, V> table; private final ReadWriteTable<K, V> cache; private final boolean isWriteAround; @@ -76,10 +73,9 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V> private AtomicLong hitCount = new AtomicLong(); private AtomicLong missCount = new AtomicLong(); - public CachingTable(String tableId, ReadableTable<K, V> table, ReadWriteTable<K, V> cache, boolean isWriteAround) { + public CachingTable(String tableId, ReadWriteTable<K, V> table, ReadWriteTable<K, V> cache, boolean isWriteAround) { super(tableId); - this.rdTable = table; - this.rwTable = table instanceof ReadWriteTable ? (ReadWriteTable) table : null; + this.table = table; this.cache = cache; this.isWriteAround = isWriteAround; } @@ -114,16 +110,14 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V> public V get(K key) { try { return getAsync(key).get(); - } catch (InterruptedException e) { - throw new SamzaException(e); } catch (Exception e) { - throw (SamzaException) e.getCause(); + throw new SamzaException(e); } } @Override public CompletableFuture<V> getAsync(K key) { - incCounter(readMetrics.numGets); + incCounter(metrics.numGets); V value = cache.get(key); if (value != null) { hitCount.incrementAndGet(); @@ -133,14 +127,14 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V> long startNs = clock.nanoTime(); missCount.incrementAndGet(); - return rdTable.getAsync(key).handle((result, e) -> { + return table.getAsync(key).handle((result, e) -> { if (e != null) { throw new SamzaException("Failed to get the record for " + key, e); } else { if (result != null) { cache.put(key, result); } - updateTimer(readMetrics.getNs, clock.nanoTime() - startNs); + updateTimer(metrics.getNs, clock.nanoTime() - startNs); return result; } }); @@ -150,16 +144,14 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V> public Map<K, V> getAll(List<K> keys) { try { return getAllAsync(keys).get(); - } catch (InterruptedException e) { - throw new SamzaException(e); } catch (Exception e) { - throw (SamzaException) e.getCause(); + throw new SamzaException(e); } } @Override public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) { - incCounter(readMetrics.numGetAlls); + incCounter(metrics.numGetAlls); // Make a copy of entries which might be immutable Map<K, V> getAllResult = new HashMap<>(); List<K> missingKeys = lookupCache(keys, getAllResult); @@ -169,7 +161,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V> } long startNs = clock.nanoTime(); - return rdTable.getAllAsync(missingKeys).handle((records, e) -> { + return table.getAllAsync(missingKeys).handle((records, e) -> { if (e != null) { throw new SamzaException("Failed to get records for " + keys, e); } else { @@ -179,7 +171,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V> .collect(Collectors.toList())); getAllResult.putAll(records); } - updateTimer(readMetrics.getAllNs, clock.nanoTime() - startNs); + updateTimer(metrics.getAllNs, clock.nanoTime() - startNs); return getAllResult; } }); @@ -189,20 +181,18 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V> public void put(K key, V value) { try { putAsync(key, value).get(); - } catch (InterruptedException e) { - throw new SamzaException(e); } catch (Exception e) { - throw (SamzaException) e.getCause(); + throw new SamzaException(e); } } @Override public CompletableFuture<Void> putAsync(K key, V value) { - incCounter(writeMetrics.numPuts); - Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " + rdTable); + incCounter(metrics.numPuts); + Preconditions.checkNotNull(table, "Cannot write to a read-only table: " + table); long startNs = clock.nanoTime(); - return rwTable.putAsync(key, value).handle((result, e) -> { + return table.putAsync(key, value).handle((result, e) -> { if (e != null) { throw new SamzaException(String.format("Failed to put a record, key=%s, value=%s", key, value), e); } else if (!isWriteAround) { @@ -212,7 +202,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V> cache.put(key, value); } } - updateTimer(writeMetrics.putNs, clock.nanoTime() - startNs); + updateTimer(metrics.putNs, clock.nanoTime() - startNs); return result; }); } @@ -221,26 +211,24 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V> public void putAll(List<Entry<K, V>> records) { try { putAllAsync(records).get(); - } catch (InterruptedException e) { - throw new SamzaException(e); } catch (Exception e) { - throw (SamzaException) e.getCause(); + throw new SamzaException(e); } } @Override public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> records) { - incCounter(writeMetrics.numPutAlls); + incCounter(metrics.numPutAlls); long startNs = clock.nanoTime(); - Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " + rdTable); - return rwTable.putAllAsync(records).handle((result, e) -> { + Preconditions.checkNotNull(table, "Cannot write to a read-only table: " + table); + return table.putAllAsync(records).handle((result, e) -> { if (e != null) { throw new SamzaException("Failed to put records " + records, e); } else if (!isWriteAround) { cache.putAll(records); } - updateTimer(writeMetrics.putAllNs, clock.nanoTime() - startNs); + updateTimer(metrics.putAllNs, clock.nanoTime() - startNs); return result; }); } @@ -249,25 +237,23 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V> public void delete(K key) { try { deleteAsync(key).get(); - } catch (InterruptedException e) { - throw new SamzaException(e); } catch (Exception e) { - throw (SamzaException) e.getCause(); + throw new SamzaException(e); } } @Override public CompletableFuture<Void> deleteAsync(K key) { - incCounter(writeMetrics.numDeletes); + incCounter(metrics.numDeletes); long startNs = clock.nanoTime(); - Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: " + rdTable); - return rwTable.deleteAsync(key).handle((result, e) -> { + Preconditions.checkNotNull(table, "Cannot delete from a read-only table: " + table); + return table.deleteAsync(key).handle((result, e) -> { if (e != null) { throw new SamzaException("Failed to delete the record for " + key, e); } else if (!isWriteAround) { cache.delete(key); } - updateTimer(writeMetrics.deleteNs, clock.nanoTime() - startNs); + updateTimer(metrics.deleteNs, clock.nanoTime() - startNs); return result; }); } @@ -283,33 +269,33 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V> @Override public CompletableFuture<Void> deleteAllAsync(List<K> keys) { - incCounter(writeMetrics.numDeleteAlls); + incCounter(metrics.numDeleteAlls); long startNs = clock.nanoTime(); - Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: " + rdTable); - return rwTable.deleteAllAsync(keys).handle((result, e) -> { + Preconditions.checkNotNull(table, "Cannot delete from a read-only table: " + table); + return table.deleteAllAsync(keys).handle((result, e) -> { if (e != null) { throw new SamzaException("Failed to delete the record for " + keys, e); } else if (!isWriteAround) { cache.deleteAll(keys); } - updateTimer(writeMetrics.deleteAllNs, clock.nanoTime() - startNs); + updateTimer(metrics.deleteAllNs, clock.nanoTime() - startNs); return result; }); } @Override public synchronized void flush() { - incCounter(writeMetrics.numFlushes); + incCounter(metrics.numFlushes); long startNs = clock.nanoTime(); - Preconditions.checkNotNull(rwTable, "Cannot flush a read-only table: " + rdTable); - rwTable.flush(); - updateTimer(writeMetrics.flushNs, clock.nanoTime() - startNs); + Preconditions.checkNotNull(table, "Cannot flush a read-only table: " + table); + table.flush(); + updateTimer(metrics.flushNs, clock.nanoTime() - startNs); } @Override public void close() { - this.cache.close(); - this.rdTable.close(); + cache.close(); + table.close(); } double hitRate() { http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java index d835809..e533cf4 100644 --- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java +++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java @@ -25,8 +25,6 @@ import java.util.concurrent.TimeUnit; import org.apache.samza.config.JavaTableConfig; import org.apache.samza.table.ReadWriteTable; -import org.apache.samza.table.ReadableTable; -import org.apache.samza.table.Table; import org.apache.samza.table.descriptors.CachingTableDescriptor; import org.apache.samza.table.caching.guava.GuavaCacheTable; import org.apache.samza.table.BaseTableProvider; @@ -47,18 +45,18 @@ public class CachingTableProvider extends BaseTableProvider { } @Override - public Table getTable() { + public ReadWriteTable getTable() { Preconditions.checkNotNull(context, String.format("Table %s not initialized", tableId)); JavaTableConfig tableConfig = new JavaTableConfig(context.getJobContext().getConfig()); String realTableId = tableConfig.getForTable(tableId, CachingTableDescriptor.REAL_TABLE_ID); - ReadableTable table = (ReadableTable) this.context.getTaskContext().getTable(realTableId); + ReadWriteTable table = this.context.getTaskContext().getTable(realTableId); String cacheTableId = tableConfig.getForTable(tableId, CachingTableDescriptor.CACHE_TABLE_ID); ReadWriteTable cache; if (cacheTableId != null) { - cache = (ReadWriteTable) this.context.getTaskContext().getTable(cacheTableId); + cache = this.context.getTaskContext().getTable(cacheTableId); } else { cache = createDefaultCacheTable(realTableId, tableConfig); defaultCaches.add(cache); http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java index b75a0bc..d8a5d9c 100644 --- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java @@ -23,7 +23,7 @@ import com.google.common.cache.Cache; import org.apache.samza.SamzaException; import org.apache.samza.context.Context; import org.apache.samza.storage.kv.Entry; -import org.apache.samza.table.BaseReadableTable; +import org.apache.samza.table.BaseReadWriteTable; import org.apache.samza.table.ReadWriteTable; import org.apache.samza.table.utils.TableMetricsUtil; @@ -40,7 +40,7 @@ import java.util.concurrent.CompletableFuture; * @param <K> type of the key in the cache * @param <V> type of the value in the cache */ -public class GuavaCacheTable<K, V> extends BaseReadableTable<K, V> +public class GuavaCacheTable<K, V> extends BaseReadWriteTable<K, V> implements ReadWriteTable<K, V> { private final Cache<K, V> cache; http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java index e45719e..042d3c7 100644 --- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java +++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java @@ -24,7 +24,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.samza.config.JavaTableConfig; -import org.apache.samza.table.Table; +import org.apache.samza.table.ReadWriteTable; import org.apache.samza.table.BaseTableProvider; import org.apache.samza.table.descriptors.GuavaCacheTableDescriptor; import org.apache.samza.table.utils.SerdeUtils; @@ -44,7 +44,7 @@ public class GuavaCacheTableProvider extends BaseTableProvider { } @Override - public Table getTable() { + public ReadWriteTable getTable() { Preconditions.checkNotNull(context, String.format("Table %s not initialized", tableId)); JavaTableConfig tableConfig = new JavaTableConfig(context.getJobContext().getConfig()); Cache guavaCache = SerdeUtils.deserialize(GuavaCacheTableDescriptor.GUAVA_CACHE, http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java deleted file mode 100644 index 80c2cac..0000000 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java +++ /dev/null @@ -1,244 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.table.remote; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import java.util.Collection; -import java.util.function.BiFunction; -import java.util.function.Function; -import org.apache.samza.SamzaException; -import org.apache.samza.config.MetricsConfig; -import org.apache.samza.context.Context; -import org.apache.samza.metrics.Counter; -import org.apache.samza.metrics.Timer; -import org.apache.samza.storage.kv.Entry; -import org.apache.samza.table.ReadWriteTable; -import org.apache.samza.table.utils.TableMetricsUtil; - -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.stream.Collectors; - -import static org.apache.samza.table.utils.TableMetricsUtil.incCounter; -import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer; - - -/** - * Remote store backed read writable table - * - * @param <K> the type of the key in this table - * @param <V> the type of the value in this table - */ -public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> - implements ReadWriteTable<K, V> { - - protected final TableWriteFunction<K, V> writeFn; - protected final TableRateLimiter writeRateLimiter; - - public RemoteReadWriteTable(String tableId, TableReadFunction readFn, TableWriteFunction writeFn, - TableRateLimiter<K, V> readRateLimiter, TableRateLimiter<K, V> writeRateLimiter, - ExecutorService tableExecutor, ExecutorService callbackExecutor) { - super(tableId, readFn, readRateLimiter, tableExecutor, callbackExecutor); - Preconditions.checkNotNull(writeFn, "null write function"); - this.writeFn = writeFn; - this.writeRateLimiter = writeRateLimiter; - } - - @Override - public void init(Context context) { - super.init(context); - MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig()); - if (metricsConfig.getMetricsTimerEnabled()) { - TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId); - writeRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("put-throttle-ns")); - } - } - - @Override - public void put(K key, V value) { - try { - putAsync(key, value).get(); - } catch (Exception e) { - throw new SamzaException(e); - } - } - - @Override - public CompletableFuture<Void> putAsync(K key, V value) { - Preconditions.checkNotNull(key); - if (value == null) { - return deleteAsync(key); - } - - return execute(writeRateLimiter, key, value, writeFn::putAsync, writeMetrics.numPuts, writeMetrics.putNs) - .exceptionally(e -> { - throw new SamzaException("Failed to put a record with key=" + key, (Throwable) e); - }); - } - - @Override - public void putAll(List<Entry<K, V>> entries) { - try { - putAllAsync(entries).get(); - } catch (Exception e) { - throw new SamzaException(e); - } - } - - @Override - public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> records) { - Preconditions.checkNotNull(records); - if (records.isEmpty()) { - return CompletableFuture.completedFuture(null); - } - - List<K> deleteKeys = records.stream() - .filter(e -> e.getValue() == null).map(Entry::getKey).collect(Collectors.toList()); - - CompletableFuture<Void> deleteFuture = deleteKeys.isEmpty() - ? CompletableFuture.completedFuture(null) : deleteAllAsync(deleteKeys); - - List<Entry<K, V>> putRecords = records.stream() - .filter(e -> e.getValue() != null).collect(Collectors.toList()); - - // Return the combined future - return CompletableFuture.allOf( - deleteFuture, - executeRecords(writeRateLimiter, putRecords, writeFn::putAllAsync, writeMetrics.numPutAlls, writeMetrics.putAllNs)) - .exceptionally(e -> { - String strKeys = records.stream().map(r -> r.getKey().toString()).collect(Collectors.joining(",")); - throw new SamzaException(String.format("Failed to put records with keys=" + strKeys), e); - }); - } - - @Override - public void delete(K key) { - try { - deleteAsync(key).get(); - } catch (Exception e) { - throw new SamzaException(e); - } - } - - @Override - public CompletableFuture<Void> deleteAsync(K key) { - Preconditions.checkNotNull(key); - return execute(writeRateLimiter, key, writeFn::deleteAsync, writeMetrics.numDeletes, writeMetrics.deleteNs) - .exceptionally(e -> { - throw new SamzaException(String.format("Failed to delete the record for " + key), (Throwable) e); - }); - } - - @Override - public void deleteAll(List<K> keys) { - try { - deleteAllAsync(keys).get(); - } catch (Exception e) { - throw new SamzaException(e); - } - } - - @Override - public CompletableFuture<Void> deleteAllAsync(List<K> keys) { - Preconditions.checkNotNull(keys); - if (keys.isEmpty()) { - return CompletableFuture.completedFuture(null); - } - - return execute(writeRateLimiter, keys, writeFn::deleteAllAsync, writeMetrics.numDeleteAlls, writeMetrics.deleteAllNs) - .exceptionally(e -> { - throw new SamzaException(String.format("Failed to delete records for " + keys), (Throwable) e); - }); - } - - @Override - public void flush() { - try { - incCounter(writeMetrics.numFlushes); - long startNs = clock.nanoTime(); - writeFn.flush(); - updateTimer(writeMetrics.flushNs, clock.nanoTime() - startNs); - } catch (Exception e) { - String errMsg = "Failed to flush remote store"; - logger.error(errMsg, e); - throw new SamzaException(errMsg, e); - } - } - - @Override - public void close() { - writeFn.close(); - super.close(); - } - - /** - * Execute an async request given a table record (key+value) - * @param rateLimiter helper for rate limiting - * @param key key of the table record - * @param value value of the table record - * @param method method to be executed - * @param timer latency metric to be updated - * @return CompletableFuture of the operation - */ - protected CompletableFuture<Void> execute(TableRateLimiter<K, V> rateLimiter, - K key, V value, BiFunction<K, V, CompletableFuture<Void>> method, Counter counter, Timer timer) { - incCounter(counter); - final long startNs = clock.nanoTime(); - CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited() - ? CompletableFuture - .runAsync(() -> rateLimiter.throttle(key, value), tableExecutor) - .thenCompose((r) -> method.apply(key, value)) - : method.apply(key, value); - return completeExecution(ioFuture, startNs, timer); - } - - /** - * Execute an async request given a collection of table records - * @param rateLimiter helper for rate limiting - * @param records list of records - * @param method method to be executed - * @param timer latency metric to be updated - * @return CompletableFuture of the operation - */ - protected CompletableFuture<Void> executeRecords(TableRateLimiter<K, V> rateLimiter, - Collection<Entry<K, V>> records, Function<Collection<Entry<K, V>>, CompletableFuture<Void>> method, - Counter counter, Timer timer) { - incCounter(counter); - final long startNs = clock.nanoTime(); - CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited() - ? CompletableFuture - .runAsync(() -> rateLimiter.throttleRecords(records), tableExecutor) - .thenCompose((r) -> method.apply(records)) - : method.apply(records); - return completeExecution(ioFuture, startNs, timer); - } - - @VisibleForTesting - public TableWriteFunction<K, V> getWriteFn() { - return writeFn; - } - - @VisibleForTesting - public TableRateLimiter getWriteRateLimiter() { - return writeRateLimiter; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java deleted file mode 100644 index 84a05b8..0000000 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.table.remote; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import java.util.Objects; -import org.apache.samza.SamzaException; -import org.apache.samza.config.MetricsConfig; -import org.apache.samza.context.Context; -import org.apache.samza.metrics.Counter; -import org.apache.samza.metrics.Timer; -import org.apache.samza.table.BaseReadableTable; -import org.apache.samza.table.utils.TableMetricsUtil; - -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.function.Function; - -import static org.apache.samza.table.utils.TableMetricsUtil.incCounter; -import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer; - -/** - * A Samza {@link org.apache.samza.table.Table} backed by a remote data-store or service. - * <p> - * Many stream-processing applications require to look-up data from remote data sources eg: databases, - * web-services, RPC systems to process messages in the stream. Such access to adjunct datasets can be - * naturally modeled as a join between the incoming stream and a {@link RemoteReadableTable}. - * <p> - * Example use-cases include: - * <ul> - * <li> Augmenting a stream of "page-views" with information from a database of user-profiles; </li> - * <li> Scoring page views with impressions services. </li> - * <li> A notifications-system that sends out emails may require a query to an external database to process its message. </li> - * </ul> - * <p> - * A {@link RemoteReadableTable} is meant to be used with a {@link TableReadFunction} and a {@link TableWriteFunction} - * which encapsulate the functionality of reading and writing data to the remote service. These provide a - * pluggable means to specify I/O operations on the table. While the base implementation merely delegates to - * these reader and writer functions, sub-classes of {@link RemoteReadableTable} may provide rich functionality like - * caching or throttling on top of them. - * - * For async IO methods, requests are dispatched by a single-threaded executor after invoking the rateLimiter. - * Optionally, an executor can be specified for invoking the future callbacks which otherwise are - * executed on the threads of the underlying native data store client. This could be useful when - * application might execute long-running operations upon future completions; another use case is to increase - * throughput with more parallelism in the callback executions. - * - * @param <K> the type of the key in this table - * @param <V> the type of the value in this table - */ -public class RemoteReadableTable<K, V> extends BaseReadableTable<K, V> { - - protected final ExecutorService callbackExecutor; - protected final ExecutorService tableExecutor; - protected final TableReadFunction<K, V> readFn; - protected final TableRateLimiter<K, V> readRateLimiter; - - /** - * Construct a RemoteReadableTable instance - * @param tableId table id - * @param readFn {@link TableReadFunction} for read operations - * @param rateLimiter helper for rate limiting - * @param tableExecutor executor for issuing async requests - * @param callbackExecutor executor for invoking async callbacks - */ - public RemoteReadableTable(String tableId, TableReadFunction<K, V> readFn, - TableRateLimiter<K, V> rateLimiter, ExecutorService tableExecutor, ExecutorService callbackExecutor) { - super(tableId); - Preconditions.checkNotNull(readFn, "null read function"); - this.readFn = readFn; - this.readRateLimiter = rateLimiter; - this.callbackExecutor = callbackExecutor; - this.tableExecutor = tableExecutor; - } - - @Override - public void init(Context context) { - super.init(context); - MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig()); - if (metricsConfig.getMetricsTimerEnabled()) { - TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId); - readRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("get-throttle-ns")); - } - } - - @Override - public V get(K key) { - try { - return getAsync(key).get(); - } catch (Exception e) { - throw new SamzaException(e); - } - } - - @Override - public CompletableFuture<V> getAsync(K key) { - Preconditions.checkNotNull(key); - return execute(readRateLimiter, key, readFn::getAsync, readMetrics.numGets, readMetrics.getNs) - .handle((result, e) -> { - if (e != null) { - throw new SamzaException("Failed to get the records for " + key, e); - } - if (result == null) { - incCounter(readMetrics.numMissedLookups); - } - return result; - }); - } - - @Override - public Map<K, V> getAll(List<K> keys) { - try { - return getAllAsync(keys).get(); - } catch (Exception e) { - throw new SamzaException(e); - } - } - - @Override - public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) { - Preconditions.checkNotNull(keys); - if (keys.isEmpty()) { - return CompletableFuture.completedFuture(Collections.EMPTY_MAP); - } - return execute(readRateLimiter, keys, readFn::getAllAsync, readMetrics.numGetAlls, readMetrics.getAllNs) - .handle((result, e) -> { - if (e != null) { - throw new SamzaException("Failed to get the records for " + keys, e); - } - result.values().stream().filter(Objects::isNull).forEach(v -> incCounter(readMetrics.numMissedLookups)); - return result; - }); - } - - /** - * Execute an async request given a table key - * @param rateLimiter helper for rate limiting - * @param key key of the table record - * @param method method to be executed - * @param timer latency metric to be updated - * @param <T> return type - * @return CompletableFuture of the operation - */ - protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> rateLimiter, - K key, Function<K, CompletableFuture<T>> method, Counter counter, Timer timer) { - incCounter(counter); - final long startNs = clock.nanoTime(); - CompletableFuture<T> ioFuture = rateLimiter.isRateLimited() - ? CompletableFuture - .runAsync(() -> rateLimiter.throttle(key), tableExecutor) - .thenCompose((r) -> method.apply(key)) - : method.apply(key); - return completeExecution(ioFuture, startNs, timer); - } - - /** - * Execute an async request given a collection of table keys - * @param rateLimiter helper for rate limiting - * @param keys collection of keys - * @param method method to be executed - * @param timer latency metric to be updated - * @param <T> return type - * @return CompletableFuture of the operation - */ - protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> rateLimiter, - Collection<K> keys, Function<Collection<K>, CompletableFuture<T>> method, Counter counter, Timer timer) { - incCounter(counter); - final long startNs = clock.nanoTime(); - CompletableFuture<T> ioFuture = rateLimiter.isRateLimited() - ? CompletableFuture - .runAsync(() -> rateLimiter.throttle(keys), tableExecutor) - .thenCompose((r) -> method.apply(keys)) - : method.apply(keys); - return completeExecution(ioFuture, startNs, timer); - } - - /** - * Complete the pending execution and update timer - * @param ioFuture the future to be executed - * @param startNs start time in nanosecond - * @param timer latency metric to be updated - * @param <T> return type - * @return CompletableFuture of the operation - */ - protected <T> CompletableFuture<T> completeExecution(CompletableFuture<T> ioFuture, long startNs, Timer timer) { - if (callbackExecutor != null) { - ioFuture.thenApplyAsync(r -> { - updateTimer(timer, clock.nanoTime() - startNs); - return r; - }, callbackExecutor); - } else { - ioFuture.thenApply(r -> { - updateTimer(timer, clock.nanoTime() - startNs); - return r; - }); - } - return ioFuture; - } - - @Override - public void close() { - readFn.close(); - } - - @VisibleForTesting - public ExecutorService getCallbackExecutor() { - return callbackExecutor; - } - - @VisibleForTesting - public ExecutorService getTableExecutor() { - return tableExecutor; - } - - @VisibleForTesting - public TableReadFunction<K, V> getReadFn() { - return readFn; - } - - @VisibleForTesting - public TableRateLimiter<K, V> getReadRateLimiter() { - return readRateLimiter; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java new file mode 100644 index 0000000..5b9b289 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.remote; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.function.BiFunction; +import java.util.function.Function; +import org.apache.samza.SamzaException; +import org.apache.samza.config.MetricsConfig; +import org.apache.samza.context.Context; +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.Timer; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.table.BaseReadWriteTable; +import org.apache.samza.table.ReadWriteTable; +import org.apache.samza.table.utils.TableMetricsUtil; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; + +import static org.apache.samza.table.utils.TableMetricsUtil.incCounter; +import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer; + + +/** + * A Samza {@link ReadWriteTable} backed by a remote data-store or service. + * <p> + * Many stream-processing applications require to look-up data from remote data sources eg: databases, + * web-services, RPC systems to process messages in the stream. Such access to adjunct datasets can be + * naturally modeled as a join between the incoming stream and a table. + * <p> + * Example use-cases include: + * <ul> + * <li> Augmenting a stream of "page-views" with information from a database of user-profiles; </li> + * <li> Scoring page views with impressions services. </li> + * <li> A notifications-system that sends out emails may require a query to an external database to process its message. </li> + * </ul> + * <p> + * A {@link RemoteTable} is meant to be used with a {@link TableReadFunction} and a {@link TableWriteFunction} + * which encapsulate the functionality of reading and writing data to the remote service. These provide a + * pluggable means to specify I/O operations on the table. + * + * For async IO methods, requests are dispatched by a single-threaded executor after invoking the rateLimiter. + * Optionally, an executor can be specified for invoking the future callbacks which otherwise are + * executed on the threads of the underlying native data store client. This could be useful when + * application might execute long-running operations upon future completions; another use case is to increase + * throughput with more parallelism in the callback executions. + * + * @param <K> the type of the key in this table + * @param <V> the type of the value in this table + */ +public class RemoteTable<K, V> extends BaseReadWriteTable<K, V> + implements ReadWriteTable<K, V> { + + protected final ExecutorService callbackExecutor; + protected final ExecutorService tableExecutor; + protected final TableReadFunction<K, V> readFn; + protected final TableWriteFunction<K, V> writeFn; + protected final TableRateLimiter<K, V> readRateLimiter; + protected final TableRateLimiter writeRateLimiter; + + /** + * Construct a RemoteTable instance + * @param tableId table id + * @param readFn {@link TableReadFunction} for read operations + * @param writeFn {@link TableWriteFunction} for read operations + * @param readRateLimiter helper for read rate limiting + * @param writeRateLimiter helper for write rate limiting + * @param tableExecutor executor for issuing async requests + * @param callbackExecutor executor for invoking async callbacks + */ + public RemoteTable(String tableId, TableReadFunction readFn, TableWriteFunction writeFn, + TableRateLimiter<K, V> readRateLimiter, TableRateLimiter<K, V> writeRateLimiter, + ExecutorService tableExecutor, ExecutorService callbackExecutor) { + super(tableId); + Preconditions.checkNotNull(readFn, "null read function"); + this.readFn = readFn; + this.writeFn = writeFn; + this.readRateLimiter = readRateLimiter; + this.writeRateLimiter = writeRateLimiter; + this.tableExecutor = tableExecutor; + this.callbackExecutor = callbackExecutor; + } + + @Override + public void init(Context context) { + super.init(context); + MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig()); + if (metricsConfig.getMetricsTimerEnabled()) { + TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId); + readRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("get-throttle-ns")); + if (writeRateLimiter != null) { + writeRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("put-throttle-ns")); + } + } + } + + @Override + public V get(K key) { + try { + return getAsync(key).get(); + } catch (Exception e) { + throw new SamzaException(e); + } + } + + @Override + public CompletableFuture<V> getAsync(K key) { + Preconditions.checkNotNull(key); + return execute(readRateLimiter, key, readFn::getAsync, metrics.numGets, metrics.getNs) + .handle((result, e) -> { + if (e != null) { + throw new SamzaException("Failed to get the records for " + key, e); + } + if (result == null) { + incCounter(metrics.numMissedLookups); + } + return result; + }); + } + + @Override + public Map<K, V> getAll(List<K> keys) { + try { + return getAllAsync(keys).get(); + } catch (Exception e) { + throw new SamzaException(e); + } + } + + @Override + public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) { + Preconditions.checkNotNull(keys); + if (keys.isEmpty()) { + return CompletableFuture.completedFuture(Collections.EMPTY_MAP); + } + return execute(readRateLimiter, keys, readFn::getAllAsync, metrics.numGetAlls, metrics.getAllNs) + .handle((result, e) -> { + if (e != null) { + throw new SamzaException("Failed to get the records for " + keys, e); + } + result.values().stream().filter(Objects::isNull).forEach(v -> incCounter(metrics.numMissedLookups)); + return result; + }); + } + + @Override + public void put(K key, V value) { + try { + putAsync(key, value).get(); + } catch (Exception e) { + throw new SamzaException(e); + } + } + + @Override + public CompletableFuture<Void> putAsync(K key, V value) { + Preconditions.checkNotNull(writeFn, "null write function"); + Preconditions.checkNotNull(key); + if (value == null) { + return deleteAsync(key); + } + + return execute(writeRateLimiter, key, value, writeFn::putAsync, metrics.numPuts, metrics.putNs) + .exceptionally(e -> { + throw new SamzaException("Failed to put a record with key=" + key, (Throwable) e); + }); + } + + @Override + public void putAll(List<Entry<K, V>> entries) { + try { + putAllAsync(entries).get(); + } catch (Exception e) { + throw new SamzaException(e); + } + } + + @Override + public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> records) { + Preconditions.checkNotNull(writeFn, "null write function"); + Preconditions.checkNotNull(records); + if (records.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + List<K> deleteKeys = records.stream() + .filter(e -> e.getValue() == null).map(Entry::getKey).collect(Collectors.toList()); + + CompletableFuture<Void> deleteFuture = deleteKeys.isEmpty() + ? CompletableFuture.completedFuture(null) : deleteAllAsync(deleteKeys); + + List<Entry<K, V>> putRecords = records.stream() + .filter(e -> e.getValue() != null).collect(Collectors.toList()); + + // Return the combined future + return CompletableFuture.allOf( + deleteFuture, + executeRecords(writeRateLimiter, putRecords, writeFn::putAllAsync, metrics.numPutAlls, metrics.putAllNs)) + .exceptionally(e -> { + String strKeys = records.stream().map(r -> r.getKey().toString()).collect(Collectors.joining(",")); + throw new SamzaException(String.format("Failed to put records with keys=" + strKeys), e); + }); + } + + @Override + public void delete(K key) { + try { + deleteAsync(key).get(); + } catch (Exception e) { + throw new SamzaException(e); + } + } + + @Override + public CompletableFuture<Void> deleteAsync(K key) { + Preconditions.checkNotNull(writeFn, "null write function"); + Preconditions.checkNotNull(key); + return execute(writeRateLimiter, key, writeFn::deleteAsync, metrics.numDeletes, metrics.deleteNs) + .exceptionally(e -> { + throw new SamzaException(String.format("Failed to delete the record for " + key), (Throwable) e); + }); + } + + @Override + public void deleteAll(List<K> keys) { + try { + deleteAllAsync(keys).get(); + } catch (Exception e) { + throw new SamzaException(e); + } + } + + @Override + public CompletableFuture<Void> deleteAllAsync(List<K> keys) { + Preconditions.checkNotNull(writeFn, "null write function"); + Preconditions.checkNotNull(keys); + if (keys.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + return execute(writeRateLimiter, keys, writeFn::deleteAllAsync, metrics.numDeleteAlls, metrics.deleteAllNs) + .exceptionally(e -> { + throw new SamzaException(String.format("Failed to delete records for " + keys), (Throwable) e); + }); + } + + @Override + public void flush() { + if (writeFn != null) { + try { + incCounter(metrics.numFlushes); + long startNs = clock.nanoTime(); + writeFn.flush(); + updateTimer(metrics.flushNs, clock.nanoTime() - startNs); + } catch (Exception e) { + String errMsg = "Failed to flush remote store"; + logger.error(errMsg, e); + throw new SamzaException(errMsg, e); + } + } + } + + @Override + public void close() { + readFn.close(); + if (writeFn != null) { + writeFn.close(); + } + } + + /** + * Execute an async request given a table key + * @param rateLimiter helper for rate limiting + * @param key key of the table record + * @param method method to be executed + * @param counter count metric to be updated + * @param timer latency metric to be updated + * @param <T> return type + * @return CompletableFuture of the operation + */ + protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> rateLimiter, + K key, Function<K, CompletableFuture<T>> method, Counter counter, Timer timer) { + incCounter(counter); + final long startNs = clock.nanoTime(); + CompletableFuture<T> ioFuture = rateLimiter.isRateLimited() + ? CompletableFuture + .runAsync(() -> rateLimiter.throttle(key), tableExecutor) + .thenCompose((r) -> method.apply(key)) + : method.apply(key); + return completeExecution(ioFuture, startNs, timer); + } + + /** + * Execute an async request given a collection of table keys + * @param rateLimiter helper for rate limiting + * @param keys collection of keys + * @param method method to be executed + * @param counter count metric to be updated + * @param timer latency metric to be updated + * @param <T> return type + * @return CompletableFuture of the operation + */ + protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> rateLimiter, + Collection<K> keys, Function<Collection<K>, CompletableFuture<T>> method, Counter counter, Timer timer) { + incCounter(counter); + final long startNs = clock.nanoTime(); + CompletableFuture<T> ioFuture = rateLimiter.isRateLimited() + ? CompletableFuture + .runAsync(() -> rateLimiter.throttle(keys), tableExecutor) + .thenCompose((r) -> method.apply(keys)) + : method.apply(keys); + return completeExecution(ioFuture, startNs, timer); + } + + /** + * Complete the pending execution and update timer + * @param ioFuture the future to be executed + * @param startNs start time in nanosecond + * @param timer latency metric to be updated + * @param <T> return type + * @return CompletableFuture of the operation + */ + protected <T> CompletableFuture<T> completeExecution(CompletableFuture<T> ioFuture, long startNs, Timer timer) { + if (callbackExecutor != null) { + ioFuture.thenApplyAsync(r -> { + updateTimer(timer, clock.nanoTime() - startNs); + return r; + }, callbackExecutor); + } else { + ioFuture.thenApply(r -> { + updateTimer(timer, clock.nanoTime() - startNs); + return r; + }); + } + return ioFuture; + } + + /** + * Execute an async request given a table record (key+value) + * @param rateLimiter helper for rate limiting + * @param key key of the table record + * @param value value of the table record + * @param method method to be executed + * @param counter count metric to be updated + * @param timer latency metric to be updated + * @return CompletableFuture of the operation + */ + protected CompletableFuture<Void> execute(TableRateLimiter<K, V> rateLimiter, + K key, V value, BiFunction<K, V, CompletableFuture<Void>> method, Counter counter, Timer timer) { + incCounter(counter); + final long startNs = clock.nanoTime(); + CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited() + ? CompletableFuture + .runAsync(() -> rateLimiter.throttle(key, value), tableExecutor) + .thenCompose((r) -> method.apply(key, value)) + : method.apply(key, value); + return completeExecution(ioFuture, startNs, timer); + } + + /** + * Execute an async request given a collection of table records + * @param rateLimiter helper for rate limiting + * @param records list of records + * @param method method to be executed + * @param counter count metric to be updated + * @param timer latency metric to be updated + * @return CompletableFuture of the operation + */ + protected CompletableFuture<Void> executeRecords(TableRateLimiter<K, V> rateLimiter, + Collection<Entry<K, V>> records, Function<Collection<Entry<K, V>>, CompletableFuture<Void>> method, + Counter counter, Timer timer) { + incCounter(counter); + final long startNs = clock.nanoTime(); + CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited() + ? CompletableFuture + .runAsync(() -> rateLimiter.throttleRecords(records), tableExecutor) + .thenCompose((r) -> method.apply(records)) + : method.apply(records); + return completeExecution(ioFuture, startNs, timer); + } + + @VisibleForTesting + public ExecutorService getCallbackExecutor() { + return callbackExecutor; + } + + @VisibleForTesting + public ExecutorService getTableExecutor() { + return tableExecutor; + } + + @VisibleForTesting + public TableReadFunction<K, V> getReadFn() { + return readFn; + } + + @VisibleForTesting + public TableRateLimiter<K, V> getReadRateLimiter() { + return readRateLimiter; + } + + @VisibleForTesting + public TableWriteFunction<K, V> getWriteFn() { + return writeFn; + } + + @VisibleForTesting + public TableRateLimiter getWriteRateLimiter() { + return writeRateLimiter; + } +}
