SAMZA-2043: Consolidate ReadableTable and ReadWriteTable

So far we've not seen a lot of use in maintaining separate implementation for 
ReadableTable and ReadWriteTable, which adds quite a bit complexity. Hence 
consolidating them.

Author: Wei Song <[email protected]>

Reviewers: Xinyu Liu <[email protected]>

Closes #861 from weisong44/SAMZA-2043


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6a75503d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6a75503d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6a75503d

Branch: refs/heads/master
Commit: 6a75503d74ae65b30e2dcf760bba6e1d8050cdba
Parents: c5348bf
Author: Wei Song <[email protected]>
Authored: Mon Dec 17 15:11:27 2018 -0800
Committer: Wei Song <[email protected]>
Committed: Mon Dec 17 15:11:27 2018 -0800

----------------------------------------------------------------------
 .../org/apache/samza/context/TaskContext.java   |  16 +-
 .../org/apache/samza/table/ReadWriteTable.java  |  53 ++-
 .../org/apache/samza/table/ReadableTable.java   |  86 ----
 .../main/java/org/apache/samza/table/Table.java |   2 -
 .../org/apache/samza/table/TableProvider.java   |   4 +-
 .../table/descriptors/BaseTableDescriptor.java  |   1 +
 .../table/descriptors/LocalTableDescriptor.java |   1 +
 .../descriptors/RemoteTableDescriptor.java      |   1 +
 .../apache/samza/table/utils/SerdeUtils.java    |   1 +
 .../table/remote/TestTableRateLimiter.java      |   2 +-
 .../apache/samza/context/TaskContextImpl.java   |   4 +-
 .../operators/impl/SendToTableOperatorImpl.java |   2 +-
 .../impl/StreamTableJoinOperatorImpl.java       |   6 +-
 .../apache/samza/table/BaseReadWriteTable.java  |  69 +++
 .../apache/samza/table/BaseReadableTable.java   |  75 ---
 .../org/apache/samza/table/TableManager.java    |   4 +-
 .../samza/table/caching/CachingTable.java       | 108 ++---
 .../table/caching/CachingTableProvider.java     |   8 +-
 .../table/caching/guava/GuavaCacheTable.java    |   4 +-
 .../caching/guava/GuavaCacheTableProvider.java  |   4 +-
 .../table/remote/RemoteReadWriteTable.java      | 244 ----------
 .../samza/table/remote/RemoteReadableTable.java | 246 ----------
 .../apache/samza/table/remote/RemoteTable.java  | 436 ++++++++++++++++++
 .../samza/table/remote/RemoteTableProvider.java |  68 ++-
 .../apache/samza/table/utils/TableMetrics.java  |  77 ++++
 .../samza/table/utils/TableReadMetrics.java     |  54 ---
 .../samza/table/utils/TableWriteMetrics.java    |  60 ---
 .../impl/TestStreamTableJoinOperatorImpl.java   |   4 +-
 .../apache/samza/table/TestTableManager.java    |   4 +-
 .../samza/table/caching/TestCachingTable.java   |  15 +-
 .../descriptors/TestLocalTableDescriptor.java   |   4 +-
 .../table/remote/TestRemoteReadWriteTable.java  | 458 -------------------
 .../samza/table/remote/TestRemoteTable.java     | 456 ++++++++++++++++++
 .../descriptors/TestRemoteTableDescriptor.java  |   8 +-
 .../retry/TestRetriableTableFunctions.java      |   4 +-
 .../inmemory/TestInMemoryTableDescriptor.java   |   4 +-
 .../descriptors/TestRocksDbTableDescriptor.java |   4 +-
 .../samza/storage/kv/LocalReadWriteTable.java   | 154 -------
 .../samza/storage/kv/LocalReadableTable.java    | 108 -----
 .../org/apache/samza/storage/kv/LocalTable.java | 213 +++++++++
 .../samza/storage/kv/LocalTableProvider.java    |   7 +-
 .../storage/kv/TestLocalReadWriteTable.java     | 247 ----------
 .../storage/kv/TestLocalReadableTable.java      | 155 -------
 .../storage/kv/TestLocalTableProvider.java      |   2 +-
 .../samza/storage/kv/TestLocalTableRead.java    | 155 +++++++
 .../samza/storage/kv/TestLocalTableWrite.java   | 247 ++++++++++
 .../framework/StreamTaskIntegrationTest.java    |   2 +-
 .../apache/samza/test/table/TestLocalTable.java | 362 ---------------
 .../test/table/TestLocalTableEndToEnd.java      | 361 +++++++++++++++
 .../samza/test/table/TestRemoteTable.java       | 288 ------------
 .../test/table/TestRemoteTableEndToEnd.java     | 310 +++++++++++++
 51 files changed, 2509 insertions(+), 2699 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/context/TaskContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/context/TaskContext.java 
b/samza-api/src/main/java/org/apache/samza/context/TaskContext.java
index cdf7404..8adfcea 100644
--- a/samza-api/src/main/java/org/apache/samza/context/TaskContext.java
+++ b/samza-api/src/main/java/org/apache/samza/context/TaskContext.java
@@ -25,8 +25,6 @@ import org.apache.samza.scheduler.CallbackScheduler;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.Table;
 
 
 /**
@@ -63,17 +61,15 @@ public interface TaskContext {
   KeyValueStore<?, ?> getStore(String storeName);
 
   /**
-   * Gets the {@link Table} corresponding to the {@code tableId} for this task.
+   * Gets the {@link ReadWriteTable} corresponding to the {@code tableId} for 
this task.
    *
-   * The returned table should be cast with the concrete type parameters based 
on the configured table serdes, and
-   * whether it is {@link ReadWriteTable} or {@link ReadableTable}. E.g., if 
using string key and integer value
-   * serde for a writable table, it should be cast to a {@code 
ReadWriteTable<String, Integer>}.
-   *
-   * @param tableId id of the {@link Table} to get
-   * @return the {@link Table} associated with {@code tableId} for this task
+   * @param tableId id of the {@link ReadWriteTable} to get
+   * @param <K> the type of the key in this table
+   * @param <V> the type of the value in this table
+   * @return the {@link ReadWriteTable} associated with {@code tableId} for 
this task
    * @throws IllegalArgumentException if there is no table associated with 
{@code tableId}
    */
-  Table<?> getTable(String tableId);
+  <K, V> ReadWriteTable<K, V> getTable(String tableId);
 
   /**
    * Gets the {@link CallbackScheduler} for this task, which can be used to 
schedule a callback to be executed

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java 
b/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java
index 083a1b5..ffb87a4 100644
--- a/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java
+++ b/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java
@@ -19,9 +19,11 @@
 package org.apache.samza.table;
 
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.context.Context;
 import org.apache.samza.storage.kv.Entry;
 
 /**
@@ -32,7 +34,51 @@ import org.apache.samza.storage.kv.Entry;
  * @param <V> the type of the value in this table
  */
 @InterfaceStability.Unstable
-public interface ReadWriteTable<K, V> extends ReadableTable<K, V> {
+public interface ReadWriteTable<K, V> extends Table {
+
+  /**
+   * Initializes the table during container initialization.
+   * Guaranteed to be invoked as the first operation on the table.
+   * @param context {@link Context} corresponding to this table
+   */
+  default void init(Context context) {
+  }
+
+  /**
+   * Gets the value associated with the specified {@code key}.
+   *
+   * @param key the key with which the associated value is to be fetched.
+   * @return if found, the value associated with the specified {@code key}; 
otherwise, {@code null}.
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   */
+  V get(K key);
+
+  /**
+   * Asynchronously gets the value associated with the specified {@code key}.
+   *
+   * @param key the key with which the associated value is to be fetched.
+   * @return completableFuture for the requested value
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   */
+  CompletableFuture<V> getAsync(K key);
+
+  /**
+   * Gets the values with which the specified {@code keys} are associated.
+   *
+   * @param keys the keys with which the associated values are to be fetched.
+   * @return a map of the keys that were found and their respective values.
+   * @throws NullPointerException if the specified {@code keys} list, or any 
of the keys, is {@code null}.
+   */
+  Map<K, V> getAll(List<K> keys);
+
+  /**
+   * Asynchronously gets the values with which the specified {@code keys} are 
associated.
+   *
+   * @param keys the keys with which the associated values are to be fetched.
+   * @return completableFuture for the requested entries
+   * @throws NullPointerException if the specified {@code keys} list, or any 
of the keys, is {@code null}.
+   */
+  CompletableFuture<Map<K, V>> getAllAsync(List<K> keys);
 
   /**
    * Updates the mapping of the specified key-value pair;
@@ -114,4 +160,9 @@ public interface ReadWriteTable<K, V> extends 
ReadableTable<K, V> {
    * Flushes the underlying store of this table, if applicable.
    */
   void flush();
+
+  /**
+   * Close the table and release any resources acquired
+   */
+  void close();
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java 
b/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java
deleted file mode 100644
index 6c88fd3..0000000
--- a/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.context.Context;
-import org.apache.samza.operators.KV;
-
-
-/**
- *
- * A table that supports get by one or more keys
- *
- * @param <K> the type of the record key in this table
- * @param <V> the type of the record value in this table
- */
[email protected]
-public interface ReadableTable<K, V> extends Table<KV<K, V>> {
-  /**
-   * Initializes the table during container initialization.
-   * Guaranteed to be invoked as the first operation on the table.
-   * @param context {@link Context} corresponding to this table
-   */
-  default void init(Context context) {
-  }
-
-  /**
-   * Gets the value associated with the specified {@code key}.
-   *
-   * @param key the key with which the associated value is to be fetched.
-   * @return if found, the value associated with the specified {@code key}; 
otherwise, {@code null}.
-   * @throws NullPointerException if the specified {@code key} is {@code null}.
-   */
-  V get(K key);
-
-  /**
-   * Asynchronously gets the value associated with the specified {@code key}.
-   *
-   * @param key the key with which the associated value is to be fetched.
-   * @return completableFuture for the requested value
-   * @throws NullPointerException if the specified {@code key} is {@code null}.
-   */
-  CompletableFuture<V> getAsync(K key);
-
-  /**
-   * Gets the values with which the specified {@code keys} are associated.
-   *
-   * @param keys the keys with which the associated values are to be fetched.
-   * @return a map of the keys that were found and their respective values.
-   * @throws NullPointerException if the specified {@code keys} list, or any 
of the keys, is {@code null}.
-   */
-  Map<K, V> getAll(List<K> keys);
-
-  /**
-   * Asynchronously gets the values with which the specified {@code keys} are 
associated.
-   *
-   * @param keys the keys with which the associated values are to be fetched.
-   * @return completableFuture for the requested entries
-   * @throws NullPointerException if the specified {@code keys} list, or any 
of the keys, is {@code null}.
-   */
-  CompletableFuture<Map<K, V>> getAllAsync(List<K> keys);
-
-  /**
-   * Close the table and release any resources acquired
-   */
-  void close();
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/Table.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/Table.java 
b/samza-api/src/main/java/org/apache/samza/table/Table.java
index 234d15b..c454012 100644
--- a/samza-api/src/main/java/org/apache/samza/table/Table.java
+++ b/samza-api/src/main/java/org/apache/samza/table/Table.java
@@ -36,8 +36,6 @@ import org.apache.samza.task.InitableTask;
  * hybrid tables. For remote data sources, a {@code RemoteTable} provides 
optimized access with caching, rate-limiting,
  * and retry support.
  * <p>
- * Depending on the implementation, a {@link Table} can be a {@link 
ReadableTable} or a {@link ReadWriteTable}.
- * <p>
  * Use a {@link TableDescriptor} to specify the properties of a {@link Table}. 
For High Level API
  * {@link StreamApplication}s, use {@link 
StreamApplicationDescriptor#getTable} to obtain the {@link Table} instance for
  * the descriptor that can be used with the {@link MessageStream} operators 
like {@link MessageStream#sendTo(Table)}.

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java 
b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
index 2dec989..36cad2e 100644
--- a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
+++ b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
@@ -34,10 +34,10 @@ public interface TableProvider {
   void init(Context context);
 
   /**
-   * Get an instance of the table for read/write operations
+   * Get an instance of the {@link ReadWriteTable}
    * @return the underlying table
    */
-  Table getTable();
+  ReadWriteTable getTable();
 
   /**
    * Shutdown the underlying table

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
index 26c2ae3..52eca5f 100644
--- 
a/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
+++ 
b/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
@@ -56,6 +56,7 @@ abstract public class BaseTableDescriptor<K, V, D extends 
BaseTableDescriptor<K,
    * @param value the value
    * @return this table descriptor instance
    */
+  @SuppressWarnings("unchecked")
   public D withConfig(String key, String value) {
     config.put(key, value);
     return (D) this;

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
index 1ebb580..1623710 100644
--- 
a/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
+++ 
b/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
@@ -39,6 +39,7 @@ import org.apache.samza.table.utils.SerdeUtils;
  * @param <V> the type of the value in this table
  * @param <D> the type of the concrete table descriptor
  */
+@SuppressWarnings("unchecked")
 abstract public class LocalTableDescriptor<K, V, D extends 
LocalTableDescriptor<K, V, D>>
     extends BaseTableDescriptor<K, V, D> {
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
index 7286004..4b15c47 100644
--- 
a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
+++ 
b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
@@ -235,6 +235,7 @@ public class RemoteTableDescriptor<K, V> extends 
BaseTableDescriptor<K, V, Remot
     if (!tagCreditsMap.isEmpty()) {
       RateLimiter defaultRateLimiter;
       try {
+        @SuppressWarnings("unchecked")
         Class<? extends RateLimiter> clazz = (Class<? extends RateLimiter>) 
Class.forName(DEFAULT_RATE_LIMITER_CLASS_NAME);
         Constructor<? extends RateLimiter> ctor = 
clazz.getConstructor(Map.class);
         defaultRateLimiter = ctor.newInstance(tagCreditsMap);

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java 
b/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java
index a7b66e5..338baf4 100644
--- a/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java
+++ b/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java
@@ -54,6 +54,7 @@ public final class SerdeUtils {
    * @return deserialized object instance
    * @param <T> type of the object
    */
+  @SuppressWarnings("unchecked")
   public static <T> T deserialize(String name, String strObject) {
     try {
       byte [] bytes = Base64.getDecoder().decode(strObject);

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
 
b/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
index ea9acbd..3235d5a 100644
--- 
a/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
+++ 
b/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
@@ -25,9 +25,9 @@ import java.util.Collections;
 import org.apache.samza.metrics.Timer;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.util.RateLimiter;
+import org.junit.Assert;
 import org.junit.Test;
 
-import junit.framework.Assert;
 
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyMap;

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java 
b/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
index ec52f8a..a29c2b3 100644
--- a/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
@@ -26,7 +26,7 @@ import org.apache.samza.scheduler.CallbackScheduler;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.table.Table;
+import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.TableManager;
 
 import java.util.HashMap;
@@ -83,7 +83,7 @@ public class TaskContextImpl implements TaskContext {
   }
 
   @Override
-  public Table getTable(String tableId) {
+  public <K, V> ReadWriteTable<K, V> getTable(String tableId) {
     return this.tableManager.getTable(tableId);
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
index 0d39c1b..6d84b17 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
@@ -44,7 +44,7 @@ public class SendToTableOperatorImpl<K, V> extends 
OperatorImpl<KV<K, V>, Void>
 
   SendToTableOperatorImpl(SendToTableOperatorSpec<K, V> sendToTableOpSpec, 
Context context) {
     this.sendToTableOpSpec = sendToTableOpSpec;
-    this.table = (ReadWriteTable) 
context.getTaskContext().getTable(sendToTableOpSpec.getTableId());
+    this.table = 
context.getTaskContext().getTable(sendToTableOpSpec.getTableId());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
index 96f07d1..e3fc266 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
@@ -22,7 +22,7 @@ import org.apache.samza.context.Context;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
-import org.apache.samza.table.ReadableTable;
+import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 
@@ -42,11 +42,11 @@ import java.util.Collections;
 class StreamTableJoinOperatorImpl<K, M, R extends KV, JM> extends 
OperatorImpl<M, JM> {
 
   private final StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec;
-  private final ReadableTable<K, ?> table;
+  private final ReadWriteTable<K, ?> table;
 
   StreamTableJoinOperatorImpl(StreamTableJoinOperatorSpec<K, M, R, JM> 
joinOpSpec, Context context) {
     this.joinOpSpec = joinOpSpec;
-    this.table = (ReadableTable) 
context.getTaskContext().getTable(joinOpSpec.getTableId());
+    this.table = context.getTaskContext().getTable(joinOpSpec.getTableId());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/BaseReadWriteTable.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/BaseReadWriteTable.java 
b/samza-core/src/main/java/org/apache/samza/table/BaseReadWriteTable.java
new file mode 100644
index 0000000..cef224d
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/BaseReadWriteTable.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import com.google.common.base.Preconditions;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.context.Context;
+import org.apache.samza.table.utils.TableMetrics;
+import org.apache.samza.util.HighResolutionClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Base class for a concrete table implementation
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+abstract public class BaseReadWriteTable<K, V> implements ReadWriteTable<K, V> 
{
+
+  protected final Logger logger;
+
+  protected final String tableId;
+
+  protected TableMetrics metrics;
+
+  protected HighResolutionClock clock;
+
+  /**
+   * Construct an instance
+   * @param tableId Id of the table
+   */
+  public BaseReadWriteTable(String tableId) {
+    Preconditions.checkArgument(tableId != null & !tableId.isEmpty(),
+        String.format("Invalid table Id: %s", tableId));
+    this.tableId = tableId;
+    this.logger = LoggerFactory.getLogger(getClass().getName() + "." + 
tableId);
+  }
+
+  @Override
+  public void init(Context context) {
+    MetricsConfig metricsConfig = new 
MetricsConfig(context.getJobContext().getConfig());
+    clock = metricsConfig.getMetricsTimerEnabled()
+        ? () -> System.nanoTime()
+        : () -> 0L;
+    metrics = new TableMetrics(context, this, tableId);
+  }
+
+  public String getTableId() {
+    return tableId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java 
b/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java
deleted file mode 100644
index 1dfd54c..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table;
-
-import com.google.common.base.Preconditions;
-import org.apache.samza.config.MetricsConfig;
-import org.apache.samza.context.Context;
-import org.apache.samza.table.utils.TableReadMetrics;
-import org.apache.samza.table.utils.TableWriteMetrics;
-import org.apache.samza.util.HighResolutionClock;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Base class for all readable tables
- *
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- */
-abstract public class BaseReadableTable<K, V> implements ReadableTable<K, V> {
-
-  protected final Logger logger;
-
-  protected final String tableId;
-
-  protected TableReadMetrics readMetrics;
-  protected TableWriteMetrics writeMetrics;
-
-  protected HighResolutionClock clock;
-
-  /**
-   * Construct an instance
-   * @param tableId Id of the table
-   */
-  public BaseReadableTable(String tableId) {
-    Preconditions.checkArgument(tableId != null & !tableId.isEmpty(),
-        String.format("Invalid table Id: %s", tableId));
-    this.tableId = tableId;
-    this.logger = LoggerFactory.getLogger(getClass().getName() + "." + 
tableId);
-  }
-
-  @Override
-  public void init(Context context) {
-    MetricsConfig metricsConfig = new 
MetricsConfig(context.getJobContext().getConfig());
-    clock = metricsConfig.getMetricsTimerEnabled()
-        ? () -> System.nanoTime()
-        : () -> 0L;
-
-    readMetrics = new TableReadMetrics(context, this, tableId);
-    if (this instanceof ReadWriteTable) {
-      writeMetrics = new TableWriteMetrics(context, this, tableId);
-    }
-  }
-
-  public String getTableId() {
-    return tableId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/TableManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/TableManager.java 
b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
index d3ba771..5a3777e 100644
--- a/samza-core/src/main/java/org/apache/samza/table/TableManager.java
+++ b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
@@ -54,7 +54,7 @@ public class TableManager {
 
   static class TableCtx {
     private TableProvider tableProvider;
-    private Table table;
+    private ReadWriteTable table;
   }
 
   private final Logger logger = 
LoggerFactory.getLogger(TableManager.class.getName());
@@ -110,7 +110,7 @@ public class TableManager {
    * @param tableId Id of the table
    * @return table instance
    */
-  public Table getTable(String tableId) {
+  public ReadWriteTable getTable(String tableId) {
     Preconditions.checkState(initialized, "TableManager has not been 
initialized.");
 
     TableCtx ctx = tableContexts.get(tableId);

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java 
b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
index e63bf61..2fde79a 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
@@ -23,9 +23,8 @@ import com.google.common.base.Preconditions;
 import org.apache.samza.SamzaException;
 import org.apache.samza.context.Context;
 import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.table.BaseReadableTable;
+import org.apache.samza.table.BaseReadWriteTable;
 import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.ReadableTable;
 import org.apache.samza.table.utils.TableMetricsUtil;
 
 import java.util.ArrayList;
@@ -41,34 +40,32 @@ import static 
org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
 
 
 /**
- * A composite table incorporating a cache with a Samza table. The cache is
+ * A hybrid table incorporating a cache with a Samza table. The cache is
  * represented as a {@link ReadWriteTable}.
  *
- * The intented use case is to optimize the latency of accessing the actual 
table, eg.
+ * The intented use case is to optimize the latency of accessing the actual 
table, e.g.
  * remote tables, when eventual consistency between cache and table is 
acceptable.
  * The cache is expected to support TTL such that the values can be refreshed 
at some
  * point.
  *
- * If the actual table is read-write table, CachingTable supports both 
write-through
- * and write-around (writes bypassing cache) policies. For write-through 
policy, it
- * supports read-after-write semantics because the value is cached after 
written to
- * the table.
+ * {@link CachingTable} supports write-through and write-around (writes 
bypassing cache) policies.
+ * For write-through policy, it supports read-after-write semantics because 
the value is
+ * cached after written to the table.
  *
- * Note that there is no synchronization in CachingTable because it is 
impossible to
+ * Note that there is no synchronization in {@link CachingTable} because it is 
impossible to
  * implement a critical section between table read/write and cache update in 
the async
  * code paths without serializing all async operations for the same keys. 
Given stale
- * data is a presumed trade off for using a cache for table, it should be 
acceptable
- * for the data in table and cache are out-of-sync. Moreover, unsynchronized 
operations
- * in CachingTable also deliver higher performance when there is contention.
+ * data is a presumed trade-off for using a cache with table, it should be 
acceptable
+ * for the data in table and cache to be temporarily out-of-sync. Moreover, 
unsynchronized
+ * operations in {@link CachingTable} also deliver higher performance when 
there is contention.
  *
  * @param <K> type of the table key
  * @param <V> type of the table value
  */
-public class CachingTable<K, V> extends BaseReadableTable<K, V>
+public class CachingTable<K, V> extends BaseReadWriteTable<K, V>
     implements ReadWriteTable<K, V> {
 
-  private final ReadableTable<K, V> rdTable;
-  private final ReadWriteTable<K, V> rwTable;
+  private final ReadWriteTable<K, V> table;
   private final ReadWriteTable<K, V> cache;
   private final boolean isWriteAround;
 
@@ -76,10 +73,9 @@ public class CachingTable<K, V> extends BaseReadableTable<K, 
V>
   private AtomicLong hitCount = new AtomicLong();
   private AtomicLong missCount = new AtomicLong();
 
-  public CachingTable(String tableId, ReadableTable<K, V> table, 
ReadWriteTable<K, V> cache, boolean isWriteAround) {
+  public CachingTable(String tableId, ReadWriteTable<K, V> table, 
ReadWriteTable<K, V> cache, boolean isWriteAround) {
     super(tableId);
-    this.rdTable = table;
-    this.rwTable = table instanceof ReadWriteTable ? (ReadWriteTable) table : 
null;
+    this.table = table;
     this.cache = cache;
     this.isWriteAround = isWriteAround;
   }
@@ -114,16 +110,14 @@ public class CachingTable<K, V> extends 
BaseReadableTable<K, V>
   public V get(K key) {
     try {
       return getAsync(key).get();
-    } catch (InterruptedException e) {
-      throw new SamzaException(e);
     } catch (Exception e) {
-      throw (SamzaException) e.getCause();
+      throw new SamzaException(e);
     }
   }
 
   @Override
   public CompletableFuture<V> getAsync(K key) {
-    incCounter(readMetrics.numGets);
+    incCounter(metrics.numGets);
     V value = cache.get(key);
     if (value != null) {
       hitCount.incrementAndGet();
@@ -133,14 +127,14 @@ public class CachingTable<K, V> extends 
BaseReadableTable<K, V>
     long startNs = clock.nanoTime();
     missCount.incrementAndGet();
 
-    return rdTable.getAsync(key).handle((result, e) -> {
+    return table.getAsync(key).handle((result, e) -> {
         if (e != null) {
           throw new SamzaException("Failed to get the record for " + key, e);
         } else {
           if (result != null) {
             cache.put(key, result);
           }
-          updateTimer(readMetrics.getNs, clock.nanoTime() - startNs);
+          updateTimer(metrics.getNs, clock.nanoTime() - startNs);
           return result;
         }
       });
@@ -150,16 +144,14 @@ public class CachingTable<K, V> extends 
BaseReadableTable<K, V>
   public Map<K, V> getAll(List<K> keys) {
     try {
       return getAllAsync(keys).get();
-    } catch (InterruptedException e) {
-      throw new SamzaException(e);
     } catch (Exception e) {
-      throw (SamzaException) e.getCause();
+      throw new SamzaException(e);
     }
   }
 
   @Override
   public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) {
-    incCounter(readMetrics.numGetAlls);
+    incCounter(metrics.numGetAlls);
     // Make a copy of entries which might be immutable
     Map<K, V> getAllResult = new HashMap<>();
     List<K> missingKeys = lookupCache(keys, getAllResult);
@@ -169,7 +161,7 @@ public class CachingTable<K, V> extends 
BaseReadableTable<K, V>
     }
 
     long startNs = clock.nanoTime();
-    return rdTable.getAllAsync(missingKeys).handle((records, e) -> {
+    return table.getAllAsync(missingKeys).handle((records, e) -> {
         if (e != null) {
           throw new SamzaException("Failed to get records for " + keys, e);
         } else {
@@ -179,7 +171,7 @@ public class CachingTable<K, V> extends 
BaseReadableTable<K, V>
                 .collect(Collectors.toList()));
             getAllResult.putAll(records);
           }
-          updateTimer(readMetrics.getAllNs, clock.nanoTime() - startNs);
+          updateTimer(metrics.getAllNs, clock.nanoTime() - startNs);
           return getAllResult;
         }
       });
@@ -189,20 +181,18 @@ public class CachingTable<K, V> extends 
BaseReadableTable<K, V>
   public void put(K key, V value) {
     try {
       putAsync(key, value).get();
-    } catch (InterruptedException e) {
-      throw new SamzaException(e);
     } catch (Exception e) {
-      throw (SamzaException) e.getCause();
+      throw new SamzaException(e);
     }
   }
 
   @Override
   public CompletableFuture<Void> putAsync(K key, V value) {
-    incCounter(writeMetrics.numPuts);
-    Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " 
+ rdTable);
+    incCounter(metrics.numPuts);
+    Preconditions.checkNotNull(table, "Cannot write to a read-only table: " + 
table);
 
     long startNs = clock.nanoTime();
-    return rwTable.putAsync(key, value).handle((result, e) -> {
+    return table.putAsync(key, value).handle((result, e) -> {
         if (e != null) {
           throw new SamzaException(String.format("Failed to put a record, 
key=%s, value=%s", key, value), e);
         } else if (!isWriteAround) {
@@ -212,7 +202,7 @@ public class CachingTable<K, V> extends 
BaseReadableTable<K, V>
             cache.put(key, value);
           }
         }
-        updateTimer(writeMetrics.putNs, clock.nanoTime() - startNs);
+        updateTimer(metrics.putNs, clock.nanoTime() - startNs);
         return result;
       });
   }
@@ -221,26 +211,24 @@ public class CachingTable<K, V> extends 
BaseReadableTable<K, V>
   public void putAll(List<Entry<K, V>> records) {
     try {
       putAllAsync(records).get();
-    } catch (InterruptedException e) {
-      throw new SamzaException(e);
     } catch (Exception e) {
-      throw (SamzaException) e.getCause();
+      throw new SamzaException(e);
     }
   }
 
   @Override
   public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> records) {
-    incCounter(writeMetrics.numPutAlls);
+    incCounter(metrics.numPutAlls);
     long startNs = clock.nanoTime();
-    Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " 
+ rdTable);
-    return rwTable.putAllAsync(records).handle((result, e) -> {
+    Preconditions.checkNotNull(table, "Cannot write to a read-only table: " + 
table);
+    return table.putAllAsync(records).handle((result, e) -> {
         if (e != null) {
           throw new SamzaException("Failed to put records " + records, e);
         } else if (!isWriteAround) {
           cache.putAll(records);
         }
 
-        updateTimer(writeMetrics.putAllNs, clock.nanoTime() - startNs);
+        updateTimer(metrics.putAllNs, clock.nanoTime() - startNs);
         return result;
       });
   }
@@ -249,25 +237,23 @@ public class CachingTable<K, V> extends 
BaseReadableTable<K, V>
   public void delete(K key) {
     try {
       deleteAsync(key).get();
-    } catch (InterruptedException e) {
-      throw new SamzaException(e);
     } catch (Exception e) {
-      throw (SamzaException) e.getCause();
+      throw new SamzaException(e);
     }
   }
 
   @Override
   public CompletableFuture<Void> deleteAsync(K key) {
-    incCounter(writeMetrics.numDeletes);
+    incCounter(metrics.numDeletes);
     long startNs = clock.nanoTime();
-    Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: 
" + rdTable);
-    return rwTable.deleteAsync(key).handle((result, e) -> {
+    Preconditions.checkNotNull(table, "Cannot delete from a read-only table: " 
+ table);
+    return table.deleteAsync(key).handle((result, e) -> {
         if (e != null) {
           throw new SamzaException("Failed to delete the record for " + key, 
e);
         } else if (!isWriteAround) {
           cache.delete(key);
         }
-        updateTimer(writeMetrics.deleteNs, clock.nanoTime() - startNs);
+        updateTimer(metrics.deleteNs, clock.nanoTime() - startNs);
         return result;
       });
   }
@@ -283,33 +269,33 @@ public class CachingTable<K, V> extends 
BaseReadableTable<K, V>
 
   @Override
   public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
-    incCounter(writeMetrics.numDeleteAlls);
+    incCounter(metrics.numDeleteAlls);
     long startNs = clock.nanoTime();
-    Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: 
" + rdTable);
-    return rwTable.deleteAllAsync(keys).handle((result, e) -> {
+    Preconditions.checkNotNull(table, "Cannot delete from a read-only table: " 
+ table);
+    return table.deleteAllAsync(keys).handle((result, e) -> {
         if (e != null) {
           throw new SamzaException("Failed to delete the record for " + keys, 
e);
         } else if (!isWriteAround) {
           cache.deleteAll(keys);
         }
-        updateTimer(writeMetrics.deleteAllNs, clock.nanoTime() - startNs);
+        updateTimer(metrics.deleteAllNs, clock.nanoTime() - startNs);
         return result;
       });
   }
 
   @Override
   public synchronized void flush() {
-    incCounter(writeMetrics.numFlushes);
+    incCounter(metrics.numFlushes);
     long startNs = clock.nanoTime();
-    Preconditions.checkNotNull(rwTable, "Cannot flush a read-only table: " + 
rdTable);
-    rwTable.flush();
-    updateTimer(writeMetrics.flushNs, clock.nanoTime() - startNs);
+    Preconditions.checkNotNull(table, "Cannot flush a read-only table: " + 
table);
+    table.flush();
+    updateTimer(metrics.flushNs, clock.nanoTime() - startNs);
   }
 
   @Override
   public void close() {
-    this.cache.close();
-    this.rdTable.close();
+    cache.close();
+    table.close();
   }
 
   double hitRate() {

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
 
b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
index d835809..e533cf4 100644
--- 
a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
+++ 
b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
@@ -25,8 +25,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.Table;
 import org.apache.samza.table.descriptors.CachingTableDescriptor;
 import org.apache.samza.table.caching.guava.GuavaCacheTable;
 import org.apache.samza.table.BaseTableProvider;
@@ -47,18 +45,18 @@ public class CachingTableProvider extends BaseTableProvider 
{
   }
 
   @Override
-  public Table getTable() {
+  public ReadWriteTable getTable() {
     Preconditions.checkNotNull(context, String.format("Table %s not 
initialized", tableId));
 
     JavaTableConfig tableConfig = new 
JavaTableConfig(context.getJobContext().getConfig());
     String realTableId = tableConfig.getForTable(tableId, 
CachingTableDescriptor.REAL_TABLE_ID);
-    ReadableTable table = (ReadableTable) 
this.context.getTaskContext().getTable(realTableId);
+    ReadWriteTable table = this.context.getTaskContext().getTable(realTableId);
 
     String cacheTableId = tableConfig.getForTable(tableId, 
CachingTableDescriptor.CACHE_TABLE_ID);
     ReadWriteTable cache;
 
     if (cacheTableId != null) {
-      cache = (ReadWriteTable) 
this.context.getTaskContext().getTable(cacheTableId);
+      cache = this.context.getTaskContext().getTable(cacheTableId);
     } else {
       cache = createDefaultCacheTable(realTableId, tableConfig);
       defaultCaches.add(cache);

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
 
b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
index b75a0bc..d8a5d9c 100644
--- 
a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
+++ 
b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
@@ -23,7 +23,7 @@ import com.google.common.cache.Cache;
 import org.apache.samza.SamzaException;
 import org.apache.samza.context.Context;
 import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.table.BaseReadableTable;
+import org.apache.samza.table.BaseReadWriteTable;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.utils.TableMetricsUtil;
 
@@ -40,7 +40,7 @@ import java.util.concurrent.CompletableFuture;
  * @param <K> type of the key in the cache
  * @param <V> type of the value in the cache
  */
-public class GuavaCacheTable<K, V> extends BaseReadableTable<K, V>
+public class GuavaCacheTable<K, V> extends BaseReadWriteTable<K, V>
     implements ReadWriteTable<K, V> {
 
   private final Cache<K, V> cache;

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
 
b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
index e45719e..042d3c7 100644
--- 
a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
+++ 
b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
@@ -24,7 +24,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.table.Table;
+import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.BaseTableProvider;
 import org.apache.samza.table.descriptors.GuavaCacheTableDescriptor;
 import org.apache.samza.table.utils.SerdeUtils;
@@ -44,7 +44,7 @@ public class GuavaCacheTableProvider extends 
BaseTableProvider {
   }
 
   @Override
-  public Table getTable() {
+  public ReadWriteTable getTable() {
     Preconditions.checkNotNull(context, String.format("Table %s not 
initialized", tableId));
     JavaTableConfig tableConfig = new 
JavaTableConfig(context.getJobContext().getConfig());
     Cache guavaCache = 
SerdeUtils.deserialize(GuavaCacheTableDescriptor.GUAVA_CACHE,

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
 
b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
deleted file mode 100644
index 80c2cac..0000000
--- 
a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.remote;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import java.util.Collection;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.MetricsConfig;
-import org.apache.samza.context.Context;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.utils.TableMetricsUtil;
-
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.stream.Collectors;
-
-import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
-import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
-
-
-/**
- * Remote store backed read writable table
- *
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- */
-public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V>
-    implements ReadWriteTable<K, V> {
-
-  protected final TableWriteFunction<K, V> writeFn;
-  protected final TableRateLimiter writeRateLimiter;
-
-  public RemoteReadWriteTable(String tableId, TableReadFunction readFn, 
TableWriteFunction writeFn,
-      TableRateLimiter<K, V> readRateLimiter, TableRateLimiter<K, V> 
writeRateLimiter,
-      ExecutorService tableExecutor, ExecutorService callbackExecutor) {
-    super(tableId, readFn, readRateLimiter, tableExecutor, callbackExecutor);
-    Preconditions.checkNotNull(writeFn, "null write function");
-    this.writeFn = writeFn;
-    this.writeRateLimiter = writeRateLimiter;
-  }
-
-  @Override
-  public void init(Context context) {
-    super.init(context);
-    MetricsConfig metricsConfig = new 
MetricsConfig(context.getJobContext().getConfig());
-    if (metricsConfig.getMetricsTimerEnabled()) {
-      TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, 
tableId);
-      
writeRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("put-throttle-ns"));
-    }
-  }
-
-  @Override
-  public void put(K key, V value) {
-    try {
-      putAsync(key, value).get();
-    } catch (Exception e) {
-      throw new SamzaException(e);
-    }
-  }
-
-  @Override
-  public CompletableFuture<Void> putAsync(K key, V value) {
-    Preconditions.checkNotNull(key);
-    if (value == null) {
-      return deleteAsync(key);
-    }
-
-    return execute(writeRateLimiter, key, value, writeFn::putAsync, 
writeMetrics.numPuts, writeMetrics.putNs)
-        .exceptionally(e -> {
-            throw new SamzaException("Failed to put a record with key=" + key, 
(Throwable) e);
-          });
-  }
-
-  @Override
-  public void putAll(List<Entry<K, V>> entries) {
-    try {
-      putAllAsync(entries).get();
-    } catch (Exception e) {
-      throw new SamzaException(e);
-    }
-  }
-
-  @Override
-  public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> records) {
-    Preconditions.checkNotNull(records);
-    if (records.isEmpty()) {
-      return CompletableFuture.completedFuture(null);
-    }
-
-    List<K> deleteKeys = records.stream()
-        .filter(e -> e.getValue() == 
null).map(Entry::getKey).collect(Collectors.toList());
-
-    CompletableFuture<Void> deleteFuture = deleteKeys.isEmpty()
-        ? CompletableFuture.completedFuture(null) : deleteAllAsync(deleteKeys);
-
-    List<Entry<K, V>> putRecords = records.stream()
-        .filter(e -> e.getValue() != null).collect(Collectors.toList());
-
-    // Return the combined future
-    return CompletableFuture.allOf(
-        deleteFuture,
-        executeRecords(writeRateLimiter, putRecords, writeFn::putAllAsync, 
writeMetrics.numPutAlls, writeMetrics.putAllNs))
-        .exceptionally(e -> {
-            String strKeys = records.stream().map(r -> 
r.getKey().toString()).collect(Collectors.joining(","));
-            throw new SamzaException(String.format("Failed to put records with 
keys=" + strKeys), e);
-          });
-  }
-
-  @Override
-  public void delete(K key) {
-    try {
-      deleteAsync(key).get();
-    } catch (Exception e) {
-      throw new SamzaException(e);
-    }
-  }
-
-  @Override
-  public CompletableFuture<Void> deleteAsync(K key) {
-    Preconditions.checkNotNull(key);
-    return execute(writeRateLimiter, key, writeFn::deleteAsync, 
writeMetrics.numDeletes, writeMetrics.deleteNs)
-        .exceptionally(e -> {
-            throw new SamzaException(String.format("Failed to delete the 
record for " + key), (Throwable) e);
-          });
-  }
-
-  @Override
-  public void deleteAll(List<K> keys) {
-    try {
-      deleteAllAsync(keys).get();
-    } catch (Exception e) {
-      throw new SamzaException(e);
-    }
-  }
-
-  @Override
-  public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
-    Preconditions.checkNotNull(keys);
-    if (keys.isEmpty()) {
-      return CompletableFuture.completedFuture(null);
-    }
-
-    return execute(writeRateLimiter, keys, writeFn::deleteAllAsync, 
writeMetrics.numDeleteAlls, writeMetrics.deleteAllNs)
-        .exceptionally(e -> {
-            throw new SamzaException(String.format("Failed to delete records 
for " + keys), (Throwable) e);
-          });
-  }
-
-  @Override
-  public void flush() {
-    try {
-      incCounter(writeMetrics.numFlushes);
-      long startNs = clock.nanoTime();
-      writeFn.flush();
-      updateTimer(writeMetrics.flushNs, clock.nanoTime() - startNs);
-    } catch (Exception e) {
-      String errMsg = "Failed to flush remote store";
-      logger.error(errMsg, e);
-      throw new SamzaException(errMsg, e);
-    }
-  }
-
-  @Override
-  public void close() {
-    writeFn.close();
-    super.close();
-  }
-
-  /**
-   * Execute an async request given a table record (key+value)
-   * @param rateLimiter helper for rate limiting
-   * @param key key of the table record
-   * @param value value of the table record
-   * @param method method to be executed
-   * @param timer latency metric to be updated
-   * @return CompletableFuture of the operation
-   */
-  protected CompletableFuture<Void> execute(TableRateLimiter<K, V> rateLimiter,
-      K key, V value, BiFunction<K, V, CompletableFuture<Void>> method, 
Counter counter, Timer timer) {
-    incCounter(counter);
-    final long startNs = clock.nanoTime();
-    CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited()
-        ? CompletableFuture
-            .runAsync(() -> rateLimiter.throttle(key, value), tableExecutor)
-            .thenCompose((r) -> method.apply(key, value))
-        : method.apply(key, value);
-    return completeExecution(ioFuture, startNs, timer);
-  }
-
-  /**
-   * Execute an async request given a collection of table records
-   * @param rateLimiter helper for rate limiting
-   * @param records list of records
-   * @param method method to be executed
-   * @param timer latency metric to be updated
-   * @return CompletableFuture of the operation
-   */
-  protected CompletableFuture<Void> executeRecords(TableRateLimiter<K, V> 
rateLimiter,
-      Collection<Entry<K, V>> records, Function<Collection<Entry<K, V>>, 
CompletableFuture<Void>> method,
-      Counter counter, Timer timer) {
-    incCounter(counter);
-    final long startNs = clock.nanoTime();
-    CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited()
-        ? CompletableFuture
-            .runAsync(() -> rateLimiter.throttleRecords(records), 
tableExecutor)
-            .thenCompose((r) -> method.apply(records))
-        : method.apply(records);
-    return completeExecution(ioFuture, startNs, timer);
-  }
-
-  @VisibleForTesting
-  public TableWriteFunction<K, V> getWriteFn() {
-    return writeFn;
-  }
-
-  @VisibleForTesting
-  public TableRateLimiter getWriteRateLimiter() {
-    return writeRateLimiter;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
 
b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
deleted file mode 100644
index 84a05b8..0000000
--- 
a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.remote;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import java.util.Objects;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.MetricsConfig;
-import org.apache.samza.context.Context;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.BaseReadableTable;
-import org.apache.samza.table.utils.TableMetricsUtil;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.function.Function;
-
-import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
-import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
-
-/**
- * A Samza {@link org.apache.samza.table.Table} backed by a remote data-store 
or service.
- * <p>
- * Many stream-processing applications require to look-up data from remote 
data sources eg: databases,
- * web-services, RPC systems to process messages in the stream. Such access to 
adjunct datasets can be
- * naturally modeled as a join between the incoming stream and a {@link 
RemoteReadableTable}.
- * <p>
- * Example use-cases include:
- * <ul>
- *  <li> Augmenting a stream of "page-views" with information from a database 
of user-profiles; </li>
- *  <li> Scoring page views with impressions services. </li>
- *  <li> A notifications-system that sends out emails may require a query to 
an external database to process its message. </li>
- * </ul>
- * <p>
- * A {@link RemoteReadableTable} is meant to be used with a {@link 
TableReadFunction} and a {@link TableWriteFunction}
- * which encapsulate the functionality of reading and writing data to the 
remote service. These provide a
- * pluggable means to specify I/O operations on the table. While the base 
implementation merely delegates to
- * these reader and writer functions, sub-classes of {@link 
RemoteReadableTable} may provide rich functionality like
- * caching or throttling on top of them.
- *
- * For async IO methods, requests are dispatched by a single-threaded executor 
after invoking the rateLimiter.
- * Optionally, an executor can be specified for invoking the future callbacks 
which otherwise are
- * executed on the threads of the underlying native data store client. This 
could be useful when
- * application might execute long-running operations upon future completions; 
another use case is to increase
- * throughput with more parallelism in the callback executions.
- *
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- */
-public class RemoteReadableTable<K, V> extends BaseReadableTable<K, V> {
-
-  protected final ExecutorService callbackExecutor;
-  protected final ExecutorService tableExecutor;
-  protected final TableReadFunction<K, V> readFn;
-  protected final TableRateLimiter<K, V> readRateLimiter;
-
-  /**
-   * Construct a RemoteReadableTable instance
-   * @param tableId table id
-   * @param readFn {@link TableReadFunction} for read operations
-   * @param rateLimiter helper for rate limiting
-   * @param tableExecutor executor for issuing async requests
-   * @param callbackExecutor executor for invoking async callbacks
-   */
-  public RemoteReadableTable(String tableId, TableReadFunction<K, V> readFn,
-      TableRateLimiter<K, V> rateLimiter, ExecutorService tableExecutor, 
ExecutorService callbackExecutor) {
-    super(tableId);
-    Preconditions.checkNotNull(readFn, "null read function");
-    this.readFn = readFn;
-    this.readRateLimiter = rateLimiter;
-    this.callbackExecutor = callbackExecutor;
-    this.tableExecutor = tableExecutor;
-  }
-
-  @Override
-  public void init(Context context) {
-    super.init(context);
-    MetricsConfig metricsConfig = new 
MetricsConfig(context.getJobContext().getConfig());
-    if (metricsConfig.getMetricsTimerEnabled()) {
-      TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, 
tableId);
-      
readRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("get-throttle-ns"));
-    }
-  }
-
-  @Override
-  public V get(K key) {
-    try {
-      return getAsync(key).get();
-    } catch (Exception e) {
-      throw new SamzaException(e);
-    }
-  }
-
-  @Override
-  public CompletableFuture<V> getAsync(K key) {
-    Preconditions.checkNotNull(key);
-    return execute(readRateLimiter, key, readFn::getAsync, 
readMetrics.numGets, readMetrics.getNs)
-        .handle((result, e) -> {
-            if (e != null) {
-              throw new SamzaException("Failed to get the records for " + key, 
e);
-            }
-            if (result == null) {
-              incCounter(readMetrics.numMissedLookups);
-            }
-            return result;
-          });
-  }
-
-  @Override
-  public Map<K, V> getAll(List<K> keys) {
-    try {
-      return getAllAsync(keys).get();
-    } catch (Exception e) {
-      throw new SamzaException(e);
-    }
-  }
-
-  @Override
-  public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) {
-    Preconditions.checkNotNull(keys);
-    if (keys.isEmpty()) {
-      return CompletableFuture.completedFuture(Collections.EMPTY_MAP);
-    }
-    return execute(readRateLimiter, keys, readFn::getAllAsync, 
readMetrics.numGetAlls, readMetrics.getAllNs)
-        .handle((result, e) -> {
-            if (e != null) {
-              throw new SamzaException("Failed to get the records for " + 
keys, e);
-            }
-            result.values().stream().filter(Objects::isNull).forEach(v -> 
incCounter(readMetrics.numMissedLookups));
-            return result;
-          });
-  }
-
-  /**
-   * Execute an async request given a table key
-   * @param rateLimiter helper for rate limiting
-   * @param key key of the table record
-   * @param method method to be executed
-   * @param timer latency metric to be updated
-   * @param <T> return type
-   * @return CompletableFuture of the operation
-   */
-  protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> 
rateLimiter,
-      K key, Function<K, CompletableFuture<T>> method, Counter counter, Timer 
timer) {
-    incCounter(counter);
-    final long startNs = clock.nanoTime();
-    CompletableFuture<T> ioFuture = rateLimiter.isRateLimited()
-        ? CompletableFuture
-            .runAsync(() -> rateLimiter.throttle(key), tableExecutor)
-            .thenCompose((r) -> method.apply(key))
-        : method.apply(key);
-    return completeExecution(ioFuture, startNs, timer);
-  }
-
-  /**
-   * Execute an async request given a collection of table keys
-   * @param rateLimiter helper for rate limiting
-   * @param keys collection of keys
-   * @param method method to be executed
-   * @param timer latency metric to be updated
-   * @param <T> return type
-   * @return CompletableFuture of the operation
-   */
-  protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> 
rateLimiter,
-      Collection<K> keys, Function<Collection<K>, CompletableFuture<T>> 
method, Counter counter, Timer timer) {
-    incCounter(counter);
-    final long startNs = clock.nanoTime();
-    CompletableFuture<T> ioFuture = rateLimiter.isRateLimited()
-        ? CompletableFuture
-            .runAsync(() -> rateLimiter.throttle(keys), tableExecutor)
-            .thenCompose((r) -> method.apply(keys))
-        : method.apply(keys);
-    return completeExecution(ioFuture, startNs, timer);
-  }
-
-  /**
-   * Complete the pending execution and update timer
-   * @param ioFuture the future to be executed
-   * @param startNs start time in nanosecond
-   * @param timer latency metric to be updated
-   * @param <T> return type
-   * @return CompletableFuture of the operation
-   */
-  protected  <T> CompletableFuture<T> completeExecution(CompletableFuture<T> 
ioFuture, long startNs, Timer timer) {
-    if (callbackExecutor != null) {
-      ioFuture.thenApplyAsync(r -> {
-          updateTimer(timer, clock.nanoTime() - startNs);
-          return r;
-        }, callbackExecutor);
-    } else {
-      ioFuture.thenApply(r -> {
-          updateTimer(timer, clock.nanoTime() - startNs);
-          return r;
-        });
-    }
-    return ioFuture;
-  }
-
-  @Override
-  public void close() {
-    readFn.close();
-  }
-
-  @VisibleForTesting
-  public ExecutorService getCallbackExecutor() {
-    return callbackExecutor;
-  }
-
-  @VisibleForTesting
-  public ExecutorService getTableExecutor() {
-    return tableExecutor;
-  }
-
-  @VisibleForTesting
-  public TableReadFunction<K, V> getReadFn() {
-    return readFn;
-  }
-
-  @VisibleForTesting
-  public TableRateLimiter<K, V> getReadRateLimiter() {
-    return readRateLimiter;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java 
b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
new file mode 100644
index 0000000..5b9b289
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.BaseReadWriteTable;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.utils.TableMetricsUtil;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
+import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
+
+
+/**
+ * A Samza {@link ReadWriteTable} backed by a remote data-store or service.
+ * <p>
+ * Many stream-processing applications require to look-up data from remote 
data sources eg: databases,
+ * web-services, RPC systems to process messages in the stream. Such access to 
adjunct datasets can be
+ * naturally modeled as a join between the incoming stream and a table.
+ * <p>
+ * Example use-cases include:
+ * <ul>
+ *  <li> Augmenting a stream of "page-views" with information from a database 
of user-profiles; </li>
+ *  <li> Scoring page views with impressions services. </li>
+ *  <li> A notifications-system that sends out emails may require a query to 
an external database to process its message. </li>
+ * </ul>
+ * <p>
+ * A {@link RemoteTable} is meant to be used with a {@link TableReadFunction} 
and a {@link TableWriteFunction}
+ * which encapsulate the functionality of reading and writing data to the 
remote service. These provide a
+ * pluggable means to specify I/O operations on the table.
+ *
+ * For async IO methods, requests are dispatched by a single-threaded executor 
after invoking the rateLimiter.
+ * Optionally, an executor can be specified for invoking the future callbacks 
which otherwise are
+ * executed on the threads of the underlying native data store client. This 
could be useful when
+ * application might execute long-running operations upon future completions; 
another use case is to increase
+ * throughput with more parallelism in the callback executions.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+public class RemoteTable<K, V> extends BaseReadWriteTable<K, V>
+    implements ReadWriteTable<K, V> {
+
+  protected final ExecutorService callbackExecutor;
+  protected final ExecutorService tableExecutor;
+  protected final TableReadFunction<K, V> readFn;
+  protected final TableWriteFunction<K, V> writeFn;
+  protected final TableRateLimiter<K, V> readRateLimiter;
+  protected final TableRateLimiter writeRateLimiter;
+
+  /**
+   * Construct a RemoteTable instance
+   * @param tableId table id
+   * @param readFn {@link TableReadFunction} for read operations
+   * @param writeFn {@link TableWriteFunction} for read operations
+   * @param readRateLimiter helper for read rate limiting
+   * @param writeRateLimiter helper for write rate limiting
+   * @param tableExecutor executor for issuing async requests
+   * @param callbackExecutor executor for invoking async callbacks
+   */
+  public RemoteTable(String tableId, TableReadFunction readFn, 
TableWriteFunction writeFn,
+      TableRateLimiter<K, V> readRateLimiter, TableRateLimiter<K, V> 
writeRateLimiter,
+      ExecutorService tableExecutor, ExecutorService callbackExecutor) {
+    super(tableId);
+    Preconditions.checkNotNull(readFn, "null read function");
+    this.readFn = readFn;
+    this.writeFn = writeFn;
+    this.readRateLimiter = readRateLimiter;
+    this.writeRateLimiter = writeRateLimiter;
+    this.tableExecutor = tableExecutor;
+    this.callbackExecutor = callbackExecutor;
+  }
+
+  @Override
+  public void init(Context context) {
+    super.init(context);
+    MetricsConfig metricsConfig = new 
MetricsConfig(context.getJobContext().getConfig());
+    if (metricsConfig.getMetricsTimerEnabled()) {
+      TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, 
tableId);
+      
readRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("get-throttle-ns"));
+      if (writeRateLimiter != null) {
+        
writeRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("put-throttle-ns"));
+      }
+    }
+  }
+
+  @Override
+  public V get(K key) {
+    try {
+      return getAsync(key).get();
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  @Override
+  public CompletableFuture<V> getAsync(K key) {
+    Preconditions.checkNotNull(key);
+    return execute(readRateLimiter, key, readFn::getAsync, metrics.numGets, 
metrics.getNs)
+        .handle((result, e) -> {
+            if (e != null) {
+              throw new SamzaException("Failed to get the records for " + key, 
e);
+            }
+            if (result == null) {
+              incCounter(metrics.numMissedLookups);
+            }
+            return result;
+          });
+  }
+
+  @Override
+  public Map<K, V> getAll(List<K> keys) {
+    try {
+      return getAllAsync(keys).get();
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) {
+    Preconditions.checkNotNull(keys);
+    if (keys.isEmpty()) {
+      return CompletableFuture.completedFuture(Collections.EMPTY_MAP);
+    }
+    return execute(readRateLimiter, keys, readFn::getAllAsync, 
metrics.numGetAlls, metrics.getAllNs)
+        .handle((result, e) -> {
+            if (e != null) {
+              throw new SamzaException("Failed to get the records for " + 
keys, e);
+            }
+            result.values().stream().filter(Objects::isNull).forEach(v -> 
incCounter(metrics.numMissedLookups));
+            return result;
+          });
+  }
+
+  @Override
+  public void put(K key, V value) {
+    try {
+      putAsync(key, value).get();
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Void> putAsync(K key, V value) {
+    Preconditions.checkNotNull(writeFn, "null write function");
+    Preconditions.checkNotNull(key);
+    if (value == null) {
+      return deleteAsync(key);
+    }
+
+    return execute(writeRateLimiter, key, value, writeFn::putAsync, 
metrics.numPuts, metrics.putNs)
+        .exceptionally(e -> {
+            throw new SamzaException("Failed to put a record with key=" + key, 
(Throwable) e);
+          });
+  }
+
+  @Override
+  public void putAll(List<Entry<K, V>> entries) {
+    try {
+      putAllAsync(entries).get();
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> records) {
+    Preconditions.checkNotNull(writeFn, "null write function");
+    Preconditions.checkNotNull(records);
+    if (records.isEmpty()) {
+      return CompletableFuture.completedFuture(null);
+    }
+
+    List<K> deleteKeys = records.stream()
+        .filter(e -> e.getValue() == 
null).map(Entry::getKey).collect(Collectors.toList());
+
+    CompletableFuture<Void> deleteFuture = deleteKeys.isEmpty()
+        ? CompletableFuture.completedFuture(null) : deleteAllAsync(deleteKeys);
+
+    List<Entry<K, V>> putRecords = records.stream()
+        .filter(e -> e.getValue() != null).collect(Collectors.toList());
+
+    // Return the combined future
+    return CompletableFuture.allOf(
+        deleteFuture,
+        executeRecords(writeRateLimiter, putRecords, writeFn::putAllAsync, 
metrics.numPutAlls, metrics.putAllNs))
+        .exceptionally(e -> {
+            String strKeys = records.stream().map(r -> 
r.getKey().toString()).collect(Collectors.joining(","));
+            throw new SamzaException(String.format("Failed to put records with 
keys=" + strKeys), e);
+          });
+  }
+
+  @Override
+  public void delete(K key) {
+    try {
+      deleteAsync(key).get();
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteAsync(K key) {
+    Preconditions.checkNotNull(writeFn, "null write function");
+    Preconditions.checkNotNull(key);
+    return execute(writeRateLimiter, key, writeFn::deleteAsync, 
metrics.numDeletes, metrics.deleteNs)
+        .exceptionally(e -> {
+            throw new SamzaException(String.format("Failed to delete the 
record for " + key), (Throwable) e);
+          });
+  }
+
+  @Override
+  public void deleteAll(List<K> keys) {
+    try {
+      deleteAllAsync(keys).get();
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
+    Preconditions.checkNotNull(writeFn, "null write function");
+    Preconditions.checkNotNull(keys);
+    if (keys.isEmpty()) {
+      return CompletableFuture.completedFuture(null);
+    }
+
+    return execute(writeRateLimiter, keys, writeFn::deleteAllAsync, 
metrics.numDeleteAlls, metrics.deleteAllNs)
+        .exceptionally(e -> {
+            throw new SamzaException(String.format("Failed to delete records 
for " + keys), (Throwable) e);
+          });
+  }
+
+  @Override
+  public void flush() {
+    if (writeFn != null) {
+      try {
+        incCounter(metrics.numFlushes);
+        long startNs = clock.nanoTime();
+        writeFn.flush();
+        updateTimer(metrics.flushNs, clock.nanoTime() - startNs);
+      } catch (Exception e) {
+        String errMsg = "Failed to flush remote store";
+        logger.error(errMsg, e);
+        throw new SamzaException(errMsg, e);
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    readFn.close();
+    if (writeFn != null) {
+      writeFn.close();
+    }
+  }
+
+  /**
+   * Execute an async request given a table key
+   * @param rateLimiter helper for rate limiting
+   * @param key key of the table record
+   * @param method method to be executed
+   * @param counter count metric to be updated
+   * @param timer latency metric to be updated
+   * @param <T> return type
+   * @return CompletableFuture of the operation
+   */
+  protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> 
rateLimiter,
+      K key, Function<K, CompletableFuture<T>> method, Counter counter, Timer 
timer) {
+    incCounter(counter);
+    final long startNs = clock.nanoTime();
+    CompletableFuture<T> ioFuture = rateLimiter.isRateLimited()
+        ? CompletableFuture
+        .runAsync(() -> rateLimiter.throttle(key), tableExecutor)
+        .thenCompose((r) -> method.apply(key))
+        : method.apply(key);
+    return completeExecution(ioFuture, startNs, timer);
+  }
+
+  /**
+   * Execute an async request given a collection of table keys
+   * @param rateLimiter helper for rate limiting
+   * @param keys collection of keys
+   * @param method method to be executed
+   * @param counter count metric to be updated
+   * @param timer latency metric to be updated
+   * @param <T> return type
+   * @return CompletableFuture of the operation
+   */
+  protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> 
rateLimiter,
+      Collection<K> keys, Function<Collection<K>, CompletableFuture<T>> 
method, Counter counter, Timer timer) {
+    incCounter(counter);
+    final long startNs = clock.nanoTime();
+    CompletableFuture<T> ioFuture = rateLimiter.isRateLimited()
+        ? CompletableFuture
+        .runAsync(() -> rateLimiter.throttle(keys), tableExecutor)
+        .thenCompose((r) -> method.apply(keys))
+        : method.apply(keys);
+    return completeExecution(ioFuture, startNs, timer);
+  }
+
+  /**
+   * Complete the pending execution and update timer
+   * @param ioFuture the future to be executed
+   * @param startNs start time in nanosecond
+   * @param timer latency metric to be updated
+   * @param <T> return type
+   * @return CompletableFuture of the operation
+   */
+  protected  <T> CompletableFuture<T> completeExecution(CompletableFuture<T> 
ioFuture, long startNs, Timer timer) {
+    if (callbackExecutor != null) {
+      ioFuture.thenApplyAsync(r -> {
+          updateTimer(timer, clock.nanoTime() - startNs);
+          return r;
+        }, callbackExecutor);
+    } else {
+      ioFuture.thenApply(r -> {
+          updateTimer(timer, clock.nanoTime() - startNs);
+          return r;
+        });
+    }
+    return ioFuture;
+  }
+
+  /**
+   * Execute an async request given a table record (key+value)
+   * @param rateLimiter helper for rate limiting
+   * @param key key of the table record
+   * @param value value of the table record
+   * @param method method to be executed
+   * @param counter count metric to be updated
+   * @param timer latency metric to be updated
+   * @return CompletableFuture of the operation
+   */
+  protected CompletableFuture<Void> execute(TableRateLimiter<K, V> rateLimiter,
+      K key, V value, BiFunction<K, V, CompletableFuture<Void>> method, 
Counter counter, Timer timer) {
+    incCounter(counter);
+    final long startNs = clock.nanoTime();
+    CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited()
+        ? CompletableFuture
+            .runAsync(() -> rateLimiter.throttle(key, value), tableExecutor)
+            .thenCompose((r) -> method.apply(key, value))
+        : method.apply(key, value);
+    return completeExecution(ioFuture, startNs, timer);
+  }
+
+  /**
+   * Execute an async request given a collection of table records
+   * @param rateLimiter helper for rate limiting
+   * @param records list of records
+   * @param method method to be executed
+   * @param counter count metric to be updated
+   * @param timer latency metric to be updated
+   * @return CompletableFuture of the operation
+   */
+  protected CompletableFuture<Void> executeRecords(TableRateLimiter<K, V> 
rateLimiter,
+      Collection<Entry<K, V>> records, Function<Collection<Entry<K, V>>, 
CompletableFuture<Void>> method,
+      Counter counter, Timer timer) {
+    incCounter(counter);
+    final long startNs = clock.nanoTime();
+    CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited()
+        ? CompletableFuture
+            .runAsync(() -> rateLimiter.throttleRecords(records), 
tableExecutor)
+            .thenCompose((r) -> method.apply(records))
+        : method.apply(records);
+    return completeExecution(ioFuture, startNs, timer);
+  }
+
+  @VisibleForTesting
+  public ExecutorService getCallbackExecutor() {
+    return callbackExecutor;
+  }
+
+  @VisibleForTesting
+  public ExecutorService getTableExecutor() {
+    return tableExecutor;
+  }
+
+  @VisibleForTesting
+  public TableReadFunction<K, V> getReadFn() {
+    return readFn;
+  }
+
+  @VisibleForTesting
+  public TableRateLimiter<K, V> getReadRateLimiter() {
+    return readRateLimiter;
+  }
+
+  @VisibleForTesting
+  public TableWriteFunction<K, V> getWriteFn() {
+    return writeFn;
+  }
+
+  @VisibleForTesting
+  public TableRateLimiter getWriteRateLimiter() {
+    return writeRateLimiter;
+  }
+}

Reply via email to