Repository: samza
Updated Branches:
  refs/heads/master 02192052f -> e4719b448


SAMZA-1795: table: add common retry for IO functions

Add common retry functionality to table IO functions for data stores
that do not have native retry support. We use failsafe as the retry
library.

Author: Peng Du <[email protected]>

Reviewers: Xinyu Liu <[email protected]>

Closes #618 from pdu-mn1/retry-support


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e4719b44
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e4719b44
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e4719b44

Branch: refs/heads/master
Commit: e4719b448c4ac0bbdf7c6832e5e7027dd8c49da0
Parents: 0219205
Author: Peng Du <[email protected]>
Authored: Mon Sep 17 12:16:39 2018 -0700
Committer: xiliu <[email protected]>
Committed: Mon Sep 17 12:16:39 2018 -0700

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 gradle/dependency-versions.gradle               |   1 +
 .../table/remote/RemoteReadWriteTable.java      |   2 +-
 .../samza/table/remote/RemoteReadableTable.java |   2 +-
 .../table/remote/RemoteTableDescriptor.java     |  46 ++-
 .../samza/table/remote/RemoteTableProvider.java |  42 ++-
 .../samza/table/remote/TableReadFunction.java   |   7 +
 .../samza/table/remote/TableWriteFunction.java  |   7 +
 .../samza/table/retry/FailsafeAdapter.java      | 103 ++++++
 .../table/retry/RetriableReadFunction.java      | 102 ++++++
 .../table/retry/RetriableWriteFunction.java     | 120 +++++++
 .../apache/samza/table/retry/RetryMetrics.java  |  59 ++++
 .../samza/table/retry/TableRetryPolicy.java     | 257 +++++++++++++++
 .../samza/table/remote/TestRemoteTable.java     | 116 +++++--
 .../table/remote/TestRemoteTableDescriptor.java |  10 +-
 .../retry/TestRetriableTableFunctions.java      | 316 +++++++++++++++++++
 .../samza/table/retry/TestTableRetryPolicy.java |  82 +++++
 .../samza/test/table/TestRemoteTable.java       |  27 +-
 .../table/TestTableDescriptorsProvider.java     |  18 +-
 19 files changed, 1274 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 7d23717..8b68205 100644
--- a/build.gradle
+++ b/build.gradle
@@ -184,6 +184,7 @@ project(":samza-core_$scalaVersion") {
     compile "org.eclipse.jetty:jetty-webapp:$jettyVersion"
     compile "org.scala-lang:scala-library:$scalaLibVersion"
     compile "org.slf4j:slf4j-api:$slf4jVersion"
+    compile "net.jodah:failsafe:$failsafeVersion"
     testCompile project(":samza-api").sourceSets.test.output
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-core:$mockitoVersion"

http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle 
b/gradle/dependency-versions.gradle
index 3c4c3a9..b33ab82 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -45,4 +45,5 @@
   yarnVersion = "2.6.1"
   zkClientVersion = "0.8"
   zookeeperVersion = "3.4.6"
+  failsafeVersion = "1.1.0"
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
 
b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
index 88bc7df..9ef4c1b 100644
--- 
a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
+++ 
b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
@@ -43,11 +43,11 @@ import com.google.common.base.Preconditions;
  * @param <V> the type of the value in this table
  */
 public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> 
implements ReadWriteTable<K, V> {
-  private final TableWriteFunction<K, V> writeFn;
 
   private DefaultTableWriteMetrics writeMetrics;
 
   @VisibleForTesting
+  final TableWriteFunction<K, V> writeFn;
   final TableRateLimiter writeRateLimiter;
 
   public RemoteReadWriteTable(String tableId, TableReadFunction readFn, 
TableWriteFunction writeFn,

http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
 
b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
index 3186fee..b3d82f3 100644
--- 
a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
+++ 
b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
@@ -80,10 +80,10 @@ public class RemoteReadableTable<K, V> implements 
ReadableTable<K, V> {
   protected final ExecutorService callbackExecutor;
   protected final ExecutorService tableExecutor;
 
-  private final TableReadFunction<K, V> readFn;
   private DefaultTableReadMetrics readMetrics;
 
   @VisibleForTesting
+  final TableReadFunction<K, V> readFn;
   final TableRateLimiter<K, V> readRateLimiter;
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
 
b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
index a8d419d..537ff87 100644
--- 
a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
+++ 
b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.samza.operators.BaseTableDescriptor;
 import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.retry.TableRetryPolicy;
 import org.apache.samza.table.utils.SerdeUtils;
 import org.apache.samza.util.EmbeddedTaggedRateLimiter;
 import org.apache.samza.util.RateLimiter;
@@ -70,6 +71,9 @@ public class RemoteTableDescriptor<K, V> extends 
BaseTableDescriptor<K, V, Remot
   private TableRateLimiter.CreditFunction<K, V> readCreditFn;
   private TableRateLimiter.CreditFunction<K, V> writeCreditFn;
 
+  private TableRetryPolicy readRetryPolicy;
+  private TableRetryPolicy writeRetryPolicy;
+
   // By default execute future callbacks on the native client threads
   // ie. no additional thread pool for callbacks.
   private int asyncCallbackPoolSize = -1;
@@ -115,13 +119,23 @@ public class RemoteTableDescriptor<K, V> extends 
BaseTableDescriptor<K, V, Remot
           "write credit function", writeCreditFn));
     }
 
+    if (readRetryPolicy != null) {
+      tableSpecConfig.put(RemoteTableProvider.READ_RETRY_POLICY, 
SerdeUtils.serialize(
+          "read retry policy", readRetryPolicy));
+    }
+
+    if (writeRetryPolicy != null) {
+      tableSpecConfig.put(RemoteTableProvider.WRITE_RETRY_POLICY, 
SerdeUtils.serialize(
+          "write retry policy", writeRetryPolicy));
+    }
+
     tableSpecConfig.put(RemoteTableProvider.ASYNC_CALLBACK_POOL_SIZE, 
String.valueOf(asyncCallbackPoolSize));
 
     return new TableSpec(tableId, serde, 
RemoteTableProviderFactory.class.getName(), tableSpecConfig);
   }
 
   /**
-   * Use specified TableReadFunction with remote table.
+   * Use specified TableReadFunction with remote table and a retry policy.
    * @param readFn read function instance
    * @return this table descriptor instance
    */
@@ -132,7 +146,7 @@ public class RemoteTableDescriptor<K, V> extends 
BaseTableDescriptor<K, V, Remot
   }
 
   /**
-   * Use specified TableWriteFunction with remote table.
+   * Use specified TableWriteFunction with remote table and a retry policy.
    * @param writeFn write function instance
    * @return this table descriptor instance
    */
@@ -143,6 +157,34 @@ public class RemoteTableDescriptor<K, V> extends 
BaseTableDescriptor<K, V, Remot
   }
 
   /**
+   * Use specified TableReadFunction with remote table.
+   * @param readFn read function instance
+   * @param retryPolicy retry policy for the read function
+   * @return this table descriptor instance
+   */
+  public RemoteTableDescriptor<K, V> withReadFunction(TableReadFunction<K, V> 
readFn, TableRetryPolicy retryPolicy) {
+    Preconditions.checkNotNull(readFn, "null read function");
+    Preconditions.checkNotNull(retryPolicy, "null retry policy");
+    this.readFn = readFn;
+    this.readRetryPolicy = retryPolicy;
+    return this;
+  }
+
+  /**
+   * Use specified TableWriteFunction with remote table.
+   * @param writeFn write function instance
+   * @param retryPolicy retry policy for the write function
+   * @return this table descriptor instance
+   */
+  public RemoteTableDescriptor<K, V> withWriteFunction(TableWriteFunction<K, 
V> writeFn, TableRetryPolicy retryPolicy) {
+    Preconditions.checkNotNull(writeFn, "null write function");
+    Preconditions.checkNotNull(retryPolicy, "null retry policy");
+    this.writeFn = writeFn;
+    this.writeRetryPolicy = retryPolicy;
+    return this;
+  }
+
+  /**
    * Specify a rate limiter along with credit functions to map a table record 
(as KV) to the amount
    * of credits to be charged from the rate limiter for table read and write 
operations.
    * This is an advanced API that provides greater flexibility to throttle 
each record in the table

http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
 
b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
index 6c5d9b3..cae0bbd 100644
--- 
a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
+++ 
b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
@@ -25,11 +25,16 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.samza.table.Table;
 import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.retry.RetriableReadFunction;
+import org.apache.samza.table.retry.RetriableWriteFunction;
+import org.apache.samza.table.retry.TableRetryPolicy;
 import org.apache.samza.table.utils.BaseTableProvider;
 import org.apache.samza.table.utils.SerdeUtils;
+import org.apache.samza.table.utils.TableMetricsUtil;
 import org.apache.samza.util.RateLimiter;
 
 import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_READ_TAG;
@@ -47,6 +52,8 @@ public class RemoteTableProvider extends BaseTableProvider {
   static final String READ_CREDIT_FN = "io.read.credit.func";
   static final String WRITE_CREDIT_FN = "io.write.credit.func";
   static final String ASYNC_CALLBACK_POOL_SIZE = "io.async.callback.pool.size";
+  static final String READ_RETRY_POLICY = "io.read.retry.policy";
+  static final String WRITE_RETRY_POLICY = "io.write.retry.policy";
 
   private final boolean readOnly;
   private final List<RemoteReadableTable<?, ?>> tables = new ArrayList<>();
@@ -58,6 +65,7 @@ public class RemoteTableProvider extends BaseTableProvider {
    */
   private static Map<String, ExecutorService> tableExecutors = new 
ConcurrentHashMap<>();
   private static Map<String, ExecutorService> callbackExecutors = new 
ConcurrentHashMap<>();
+  private static ScheduledExecutorService retryExecutor;
 
   public RemoteTableProvider(TableSpec tableSpec) {
     super(tableSpec);
@@ -72,7 +80,7 @@ public class RemoteTableProvider extends BaseTableProvider {
     RemoteReadableTable table;
     String tableId = tableSpec.getId();
 
-    TableReadFunction<?, ?> readFn = getReadFn();
+    TableReadFunction readFn = getReadFn();
     RateLimiter rateLimiter = deserializeObject(RATE_LIMITER);
     if (rateLimiter != null) {
       rateLimiter.init(containerContext.config, taskContext);
@@ -83,11 +91,33 @@ public class RemoteTableProvider extends BaseTableProvider {
     TableRateLimiter.CreditFunction<?, ?> writeCreditFn;
     TableRateLimiter writeRateLimiter = null;
 
+    TableRetryPolicy readRetryPolicy = deserializeObject(READ_RETRY_POLICY);
+    TableRetryPolicy writeRetryPolicy = null;
+
+    if ((readRetryPolicy != null || writeRetryPolicy != null) && retryExecutor 
== null) {
+      retryExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> {
+          Thread thread = new Thread(runnable);
+          thread.setName("table-retry-executor");
+          thread.setDaemon(true);
+          return thread;
+        });
+    }
+
+    if (readRetryPolicy != null) {
+      readFn = new RetriableReadFunction<>(readRetryPolicy, readFn, 
retryExecutor);
+    }
+
+    TableWriteFunction writeFn = getWriteFn();
+
     boolean isRateLimited = readRateLimiter.isRateLimited();
     if (!readOnly) {
       writeCreditFn = deserializeObject(WRITE_CREDIT_FN);
       writeRateLimiter = new TableRateLimiter(tableSpec.getId(), rateLimiter, 
writeCreditFn, RL_WRITE_TAG);
       isRateLimited |= writeRateLimiter.isRateLimited();
+      writeRetryPolicy = deserializeObject(WRITE_RETRY_POLICY);
+      if (writeRetryPolicy != null) {
+        writeFn = new RetriableWriteFunction(writeRetryPolicy, writeFn, 
retryExecutor);
+      }
     }
 
     // Optional executor for future callback/completion. Shared by both read 
and write operations.
@@ -116,10 +146,18 @@ public class RemoteTableProvider extends 
BaseTableProvider {
       table = new RemoteReadableTable(tableSpec.getId(), readFn, 
readRateLimiter,
           tableExecutors.get(tableId), callbackExecutors.get(tableId));
     } else {
-      table = new RemoteReadWriteTable(tableSpec.getId(), readFn, 
getWriteFn(), readRateLimiter,
+      table = new RemoteReadWriteTable(tableSpec.getId(), readFn, writeFn, 
readRateLimiter,
           writeRateLimiter, tableExecutors.get(tableId), 
callbackExecutors.get(tableId));
     }
 
+    TableMetricsUtil metricsUtil = new TableMetricsUtil(containerContext, 
taskContext, table, tableId);
+    if (readRetryPolicy != null) {
+      ((RetriableReadFunction) readFn).setMetrics(metricsUtil);
+    }
+    if (writeRetryPolicy != null) {
+      ((RetriableWriteFunction) writeFn).setMetrics(metricsUtil);
+    }
+
     table.init(containerContext, taskContext);
     tables.add(table);
     return table;

http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java 
b/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
index 5d0f963..4791779 100644
--- 
a/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
+++ 
b/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
@@ -100,5 +100,12 @@ public interface TableReadFunction<K, V> extends 
Serializable, InitableFunction,
             .collect(Collectors.toMap(e -> e.getKey(), e -> 
e.getValue().join())));
   }
 
+  /**
+   * Determine whether the current operation can be retried with the last 
thrown exception.
+   * @param exception exception thrown by a table operation
+   * @return whether the operation can be retried
+   */
+  boolean isRetriable(Throwable exception);
+
   // optionally implement readObject() to initialize transient states
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
 
b/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
index 0ac3a0c..d9d619f 100644
--- 
a/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
+++ 
b/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
@@ -143,6 +143,13 @@ public interface TableWriteFunction<K, V> extends 
Serializable, InitableFunction
   }
 
   /**
+   * Determine whether the current operation can be retried with the last 
thrown exception.
+   * @param exception exception thrown by a table operation
+   * @return whether the operation can be retried
+   */
+  boolean isRetriable(Throwable exception);
+
+  /**
    * Flush the remote store (optional)
    */
   default void flush() {

http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java 
b/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java
new file mode 100644
index 0000000..b2eccd8
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.retry;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.samza.SamzaException;
+
+import net.jodah.failsafe.AsyncFailsafe;
+import net.jodah.failsafe.Failsafe;
+import net.jodah.failsafe.RetryPolicy;
+
+
+/**
+ * Helper class adapting the generic {@link TableRetryPolicy} to a failsafe 
{@link RetryPolicy} and
+ * creating failsafe retryer instances with proper metrics management.
+ */
+class FailsafeAdapter {
+  /**
+   * Convert the {@link TableRetryPolicy} to failsafe {@link RetryPolicy}.
+   * @return this policy instance
+   */
+  static RetryPolicy valueOf(TableRetryPolicy policy) {
+    RetryPolicy failSafePolicy = new RetryPolicy();
+
+    switch (policy.getBackoffType()) {
+      case NONE:
+        break;
+
+      case FIXED:
+        failSafePolicy.withDelay(policy.getSleepTime().toMillis(), 
TimeUnit.MILLISECONDS);
+        break;
+
+      case RANDOM:
+        failSafePolicy.withDelay(policy.getRandomMin().toMillis(), 
policy.getRandomMax().toMillis(), TimeUnit.MILLISECONDS);
+        break;
+
+      case EXPONENTIAL:
+        failSafePolicy.withBackoff(policy.getSleepTime().toMillis(), 
policy.getExponentialMaxSleep().toMillis(), TimeUnit.MILLISECONDS,
+            policy.getExponentialFactor());
+        break;
+
+      default:
+        throw new SamzaException("Unknown retry policy type.");
+    }
+
+    if (policy.getMaxDuration() != null) {
+      failSafePolicy.withMaxDuration(policy.getMaxDuration().toMillis(), 
TimeUnit.MILLISECONDS);
+    }
+    if (policy.getMaxAttempts() != null) {
+      failSafePolicy.withMaxRetries(policy.getMaxAttempts());
+    }
+    if (policy.getJitter() != null && policy.getBackoffType() != 
TableRetryPolicy.BackoffType.RANDOM) {
+      failSafePolicy.withJitter(policy.getJitter().toMillis(), 
TimeUnit.MILLISECONDS);
+    }
+
+    failSafePolicy.retryOn(e -> policy.getRetryPredicate().test(e));
+
+    return failSafePolicy;
+  }
+
+  /**
+   * Obtain an async failsafe retryer instance with the specified policy, 
metrics, and executor service.
+   * @param retryPolicy retry policy
+   * @param metrics retry metrics
+   * @param retryExec executor service for scheduling async retries
+   * @return {@link net.jodah.failsafe.AsyncFailsafe} instance
+   */
+  static AsyncFailsafe<?> failsafe(RetryPolicy retryPolicy, RetryMetrics 
metrics, ScheduledExecutorService retryExec) {
+    long startMs = System.currentTimeMillis();
+    return Failsafe.with(retryPolicy).with(retryExec)
+        .onRetry(e -> metrics.retryCount.inc())
+        .onRetriesExceeded(e -> {
+            metrics.retryTimer.update(System.currentTimeMillis() - startMs);
+            metrics.permFailureCount.inc();
+          })
+        .onSuccess((e, ctx) -> {
+            if (ctx.getExecutions() > 1) {
+              metrics.retryTimer.update(System.currentTimeMillis() - startMs);
+            } else {
+              metrics.successCount.inc();
+            }
+          });
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java
 
b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java
new file mode 100644
index 0000000..1adddc0
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.retry;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Predicate;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.utils.TableMetricsUtil;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import net.jodah.failsafe.RetryPolicy;
+
+import static org.apache.samza.table.retry.FailsafeAdapter.failsafe;
+
+
+/**
+ * Wrapper for a {@link TableReadFunction} instance to add common retry
+ * support with a {@link TableRetryPolicy}. This wrapper is created by
+ * {@link org.apache.samza.table.remote.RemoteTableProvider} when a retry
+ * policy is specified together with the {@link TableReadFunction}.
+ *
+ * Actual retry mechanism is provided by the failsafe library. Retry is
+ * attempted in an async way with a {@link ScheduledExecutorService}.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+public class RetriableReadFunction<K, V> implements TableReadFunction<K, V> {
+  private final RetryPolicy retryPolicy;
+  private final TableReadFunction<K, V> readFn;
+  private final ScheduledExecutorService retryExecutor;
+
+  @VisibleForTesting
+  RetryMetrics retryMetrics;
+
+  public RetriableReadFunction(TableRetryPolicy policy, TableReadFunction<K, 
V> readFn,
+      ScheduledExecutorService retryExecutor) {
+    Preconditions.checkNotNull(policy);
+    Preconditions.checkNotNull(readFn);
+    Preconditions.checkNotNull(retryExecutor);
+
+    this.readFn = readFn;
+    this.retryExecutor = retryExecutor;
+    Predicate<Throwable> retryPredicate = policy.getRetryPredicate();
+    policy.withRetryPredicate((ex) -> readFn.isRetriable(ex) || 
retryPredicate.test(ex));
+    this.retryPolicy = FailsafeAdapter.valueOf(policy);
+  }
+
+  @Override
+  public CompletableFuture<V> getAsync(K key) {
+    return failsafe(retryPolicy, retryMetrics, retryExecutor)
+        .future(() -> readFn.getAsync(key))
+        .exceptionally(e -> {
+            throw new SamzaException("Failed to get the record for " + key + " 
after retries.", e);
+          });
+  }
+
+  @Override
+  public CompletableFuture<Map<K, V>> getAllAsync(Collection<K> keys) {
+    return failsafe(retryPolicy, retryMetrics, retryExecutor)
+        .future(() -> readFn.getAllAsync(keys))
+        .exceptionally(e -> {
+            throw new SamzaException("Failed to get the records for " + keys + 
" after retries.", e);
+          });
+  }
+
+  @Override
+  public boolean isRetriable(Throwable exception) {
+    return readFn.isRetriable(exception);
+  }
+
+  /**
+   * Initialize retry-related metrics
+   * @param metricsUtil metrics util
+   */
+  public void setMetrics(TableMetricsUtil metricsUtil) {
+    this.retryMetrics = new RetryMetrics("reader", metricsUtil);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java
 
b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java
new file mode 100644
index 0000000..2f3f062
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.retry;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Predicate;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.remote.TableWriteFunction;
+import org.apache.samza.table.utils.TableMetricsUtil;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import net.jodah.failsafe.RetryPolicy;
+
+import static org.apache.samza.table.retry.FailsafeAdapter.failsafe;
+
+
+/**
+ * Wrapper for a {@link TableWriteFunction} instance to add common retry
+ * support with a {@link TableRetryPolicy}. This wrapper is created by
+ * {@link org.apache.samza.table.remote.RemoteTableProvider} when a retry
+ * policy is specified together with the {@link TableWriteFunction}.
+ *
+ * Actual retry mechanism is provided by the failsafe library. Retry is
+ * attempted in an async way with a {@link ScheduledExecutorService}.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+public class RetriableWriteFunction<K, V> implements TableWriteFunction<K, V> {
+  private final RetryPolicy retryPolicy;
+  private final TableWriteFunction<K, V> writeFn;
+  private final ScheduledExecutorService retryExecutor;
+
+  @VisibleForTesting
+  RetryMetrics retryMetrics;
+
+  public RetriableWriteFunction(TableRetryPolicy policy, TableWriteFunction<K, 
V> writeFn,
+      ScheduledExecutorService retryExecutor)  {
+    Preconditions.checkNotNull(policy);
+    Preconditions.checkNotNull(writeFn);
+    Preconditions.checkNotNull(retryExecutor);
+
+    this.writeFn = writeFn;
+    this.retryExecutor = retryExecutor;
+    Predicate<Throwable> retryPredicate = policy.getRetryPredicate();
+    policy.withRetryPredicate((ex) -> writeFn.isRetriable(ex) || 
retryPredicate.test(ex));
+    this.retryPolicy = FailsafeAdapter.valueOf(policy);
+  }
+
+  @Override
+  public CompletableFuture<Void> putAsync(K key, V record) {
+    return failsafe(retryPolicy, retryMetrics, retryExecutor)
+        .future(() -> writeFn.putAsync(key, record))
+        .exceptionally(e -> {
+            throw new SamzaException("Failed to get the record for " + key + " 
after retries.", e);
+          });
+  }
+
+  @Override
+  public CompletableFuture<Void> putAllAsync(Collection<Entry<K, V>> records) {
+    return failsafe(retryPolicy, retryMetrics, retryExecutor)
+        .future(() -> writeFn.putAllAsync(records))
+        .exceptionally(e -> {
+            throw new SamzaException("Failed to put records after retries.", 
e);
+          });
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteAsync(K key) {
+    return failsafe(retryPolicy, retryMetrics, retryExecutor)
+        .future(() -> writeFn.deleteAsync(key))
+        .exceptionally(e -> {
+            throw new SamzaException("Failed to delete the record for " + key 
+ " after retries.", e);
+          });
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteAllAsync(Collection<K> keys) {
+    return failsafe(retryPolicy, retryMetrics, retryExecutor)
+        .future(() -> writeFn.deleteAllAsync(keys))
+        .exceptionally(e -> {
+            throw new SamzaException("Failed to delete the records for " + 
keys + " after retries.", e);
+          });
+  }
+
+  @Override
+  public boolean isRetriable(Throwable exception) {
+    return writeFn.isRetriable(exception);
+  }
+
+  /**
+   * Initialize retry-related metrics.
+   * @param metricsUtil metrics util
+   */
+  public void setMetrics(TableMetricsUtil metricsUtil) {
+    this.retryMetrics = new RetryMetrics("writer", metricsUtil);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/main/java/org/apache/samza/table/retry/RetryMetrics.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/retry/RetryMetrics.java 
b/samza-core/src/main/java/org/apache/samza/table/retry/RetryMetrics.java
new file mode 100644
index 0000000..fbc511c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/retry/RetryMetrics.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.retry;
+
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.utils.TableMetricsUtil;
+
+
+/**
+ * Wrapper of retry-related metrics common to both {@link 
RetriableReadFunction} and
+ * {@link RetriableWriteFunction}.
+ */
+class RetryMetrics {
+  /**
+   * Number of retries executed (excluding the first attempt)
+   */
+  final Counter retryCount;
+
+  /**
+   * Number of successes with only the first attempt
+   */
+  final Counter successCount;
+
+  /**
+   * Number of operations that failed permanently and exhausted all retries
+   */
+  final Counter permFailureCount;
+
+  /**
+   * Total time spent in each IO; this is updated only
+   * when at least one retries have been attempted.
+   */
+  final Timer retryTimer;
+
+  public RetryMetrics(String prefix, TableMetricsUtil metricsUtil) {
+    retryCount = metricsUtil.newCounter(prefix + "-retry-count");
+    successCount = metricsUtil.newCounter(prefix + "-success-count");
+    permFailureCount = metricsUtil.newCounter(prefix + "-perm-failure-count");
+    retryTimer = metricsUtil.newTimer(prefix + "-retry-timer");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java 
b/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java
new file mode 100644
index 0000000..162eb07
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.retry;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.function.Predicate;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Common retry policy parameters for table IO. This serves as an abstraction 
on top of
+ * retry libraries. This common policy supports below features:
+ *  - backoff modes: fixed, random, exponential
+ *  - termination modes: by attempts, by duration
+ *  - jitter
+ *
+ * Retry libraries can implement a subset or all features as described by this 
common policy.
+ */
+public class TableRetryPolicy implements Serializable {
+  enum BackoffType {
+    /**
+     * No backoff in between two retry attempts.
+     */
+    NONE,
+
+    /**
+     * Backoff by a fixed duration {@code sleepTime}.
+     */
+    FIXED,
+
+    /**
+     * Backoff by a randomly selected duration between {@code minSleep} and 
{@code maxSleep}.
+     */
+    RANDOM,
+
+    /**
+     * Backoff by exponentially increasing durations by {@code 
exponentialFactor} starting from {@code sleepTime}.
+     */
+    EXPONENTIAL
+  }
+
+  // Backoff parameters
+  private Duration sleepTime;
+  private Duration randomMin;
+  private Duration randomMax;
+  private double exponentialFactor;
+  private Duration exponentialMaxSleep;
+  private Duration jitter;
+
+  // By default no early termination
+  private Integer maxAttempts = null;
+  private Duration maxDuration = null;
+
+  // By default no backoff during retries
+  private BackoffType backoffType = BackoffType.NONE;
+
+  /**
+   * Serializable adapter interface for {@link java.util.function.Predicate}.
+   * This is needed because TableRetryPolicy needs to be serializable as part 
of the
+   * table config whereas {@link java.util.function.Predicate} is not 
serializable.
+   */
+  public interface RetryPredicate extends Predicate<Throwable>, Serializable {
+  }
+
+  // By default no custom retry predicate so retry decision is made solely by 
the table functions
+  private RetryPredicate retryPredicate = (ex) -> false;
+
+  /**
+   * Set the sleepTime time for the fixed backoff policy.
+   * @param sleepTime sleepTime time
+   * @return this policy instance
+   */
+  public TableRetryPolicy withFixedBackoff(Duration sleepTime) {
+    Preconditions.checkNotNull(sleepTime);
+    this.sleepTime = sleepTime;
+    this.backoffType = BackoffType.FIXED;
+    return this;
+  }
+
+  /**
+   * Set the sleepTime time for the random backoff policy. The actual 
sleepTime time
+   * before each attempt is randomly selected between {@code [minSleep, 
maxSleep]}
+   * @param minSleep lower bound sleepTime time
+   * @param maxSleep upper bound sleepTime time
+   * @return this policy instance
+   */
+  public TableRetryPolicy withRandomBackoff(Duration minSleep, Duration 
maxSleep) {
+    Preconditions.checkNotNull(minSleep);
+    Preconditions.checkNotNull(maxSleep);
+    this.randomMin = minSleep;
+    this.randomMax = maxSleep;
+    this.backoffType = BackoffType.RANDOM;
+    return this;
+  }
+
+  /**
+   * Set the parameters for the exponential backoff policy. The actual 
sleepTime time
+   * is exponentially incremented up to the {@code maxSleep} and multiplying
+   * successive delays by the {@code factor}.
+   * @param sleepTime initial sleepTime time
+   * @param maxSleep upper bound sleepTime time
+   * @param factor exponential factor for backoff
+   * @return this policy instance
+   */
+  public TableRetryPolicy withExponentialBackoff(Duration sleepTime, Duration 
maxSleep, double factor) {
+    Preconditions.checkNotNull(sleepTime);
+    Preconditions.checkNotNull(maxSleep);
+    this.sleepTime = sleepTime;
+    this.exponentialMaxSleep = maxSleep;
+    this.exponentialFactor = factor;
+    this.backoffType = BackoffType.EXPONENTIAL;
+    return this;
+  }
+
+  /**
+   * Set the jitter for the backoff policy to provide additional randomness.
+   * If this is set, a random value between {@code [0, jitter]} will be added
+   * to each sleepTime time. This applies to {@code FIXED} and {@code 
EXPONENTIAL}
+   * modes only.
+   * @param jitter initial sleepTime time
+   * @return this policy instance
+   */
+  public TableRetryPolicy withJitter(Duration jitter) {
+    Preconditions.checkNotNull(jitter);
+    if (backoffType != BackoffType.RANDOM) {
+      this.jitter = jitter;
+    }
+    return this;
+  }
+
+  /**
+   * Set maximum number of attempts before terminating the operation.
+   * @param maxAttempts number of attempts
+   * @return this policy instance
+   */
+  public TableRetryPolicy withStopAfterAttempts(int maxAttempts) {
+    Preconditions.checkArgument(maxAttempts >= 0);
+    this.maxAttempts = maxAttempts;
+    return this;
+  }
+
+  /**
+   * Set maximum total delay (sleepTime + execution) before terminating the 
operation.
+   * @param maxDelay delay time
+   * @return this policy instance
+   */
+  public TableRetryPolicy withStopAfterDelay(Duration maxDelay) {
+    Preconditions.checkNotNull(maxDelay);
+    this.maxDuration = maxDelay;
+    return this;
+  }
+
+  /**
+   * Set the predicate to use for identifying retriable exceptions. If 
specified, table
+   * retry logic will consult both such predicate and table function and retry 
will be
+   * attempted if either option returns true.
+   * @param retryPredicate predicate for retriable exception identification
+   * @return this policy instance
+   */
+  public TableRetryPolicy withRetryPredicate(RetryPredicate retryPredicate) {
+    Preconditions.checkNotNull(retryPredicate);
+    this.retryPredicate = retryPredicate;
+    return this;
+  }
+
+  /**
+   * @return initial/fixed sleep time.
+   */
+  public Duration getSleepTime() {
+    return sleepTime;
+  }
+
+  /**
+   * @return lower sleepTime time for random backoff or null if {@code 
policyType} is not {@code RANDOM}.
+   */
+  public Duration getRandomMin() {
+    return randomMin;
+  }
+
+  /**
+   * @return upper sleepTime time for random backoff or null if {@code 
policyType} is not {@code RANDOM}.
+   */
+  public Duration getRandomMax() {
+    return randomMax;
+  }
+
+  /**
+   * @return exponential factor for exponential backoff.
+   */
+  public double getExponentialFactor() {
+    return exponentialFactor;
+  }
+
+  /**
+   * @return maximum sleepTime time for exponential backoff or null if {@code 
policyType} is not {@code EXPONENTIAL}.
+   */
+  public Duration getExponentialMaxSleep() {
+    return exponentialMaxSleep;
+  }
+
+  /**
+   * Introduce randomness to the sleepTime time.
+   * @return jitter to add on to each backoff or null if not set.
+   */
+  public Duration getJitter() {
+    return jitter;
+  }
+
+  /**
+   * Termination after a fix number of attempts.
+   * @return maximum number of attempts without success before giving up the 
operation or null if not set.
+   */
+  public Integer getMaxAttempts() {
+    return maxAttempts;
+  }
+
+  /**
+   * Termination after a fixed duration.
+   * @return maximum duration without success before giving up the operation 
or null if not set.
+   */
+  public Duration getMaxDuration() {
+    return maxDuration;
+  }
+
+  /**
+   * @return type of the backoff.
+   */
+  public BackoffType getBackoffType() {
+    return backoffType;
+  }
+
+  /**
+   * @return Custom predicate for retriable exception identification or null 
if not specified.
+   */
+  public RetryPredicate getRetryPredicate() {
+    return retryPredicate;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java 
b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
index 21fc6a5..3e844c3 100644
--- 
a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
+++ 
b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.samza.container.SamzaContainerContext;
 import org.apache.samza.metrics.Counter;
@@ -34,6 +35,9 @@ import org.apache.samza.metrics.Gauge;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.Timer;
 import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.retry.RetriableReadFunction;
+import org.apache.samza.table.retry.RetriableWriteFunction;
+import org.apache.samza.table.retry.TableRetryPolicy;
 import org.apache.samza.task.TaskContext;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -43,6 +47,7 @@ import junit.framework.Assert;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyCollection;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -50,6 +55,18 @@ import static org.mockito.Mockito.verify;
 
 
 public class TestRemoteTable {
+  private final ScheduledExecutorService schedExec = 
Executors.newSingleThreadScheduledExecutor();
+
+  public static TaskContext getMockTaskContext() {
+    MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
+    doAnswer(args -> new Timer((String) 
args.getArguments()[0])).when(metricsRegistry).newTimer(anyString(), 
anyString());
+    doAnswer(args -> new Counter((String) 
args.getArguments()[0])).when(metricsRegistry).newCounter(anyString(), 
anyString());
+    doAnswer(args -> new Gauge((String) args.getArguments()[0], 
0)).when(metricsRegistry).newGauge(anyString(), any());
+    TaskContext taskContext = mock(TaskContext.class);
+    doReturn(metricsRegistry).when(taskContext).getMetricsRegistry();
+    return taskContext;
+  }
+
   private <K, V, T extends RemoteReadableTable<K, V>> T getTable(String 
tableId,
       TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn) {
     return getTable(tableId, readFn, writeFn, null);
@@ -72,12 +89,7 @@ public class TestRemoteTable {
       table = new RemoteReadWriteTable<K, V>(tableId, readFn, writeFn, 
readRateLimiter, writeRateLimiter, tableExecutor, cbExecutor);
     }
 
-    TaskContext taskContext = mock(TaskContext.class);
-    MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
-    doReturn(mock(Timer.class)).when(metricsRegistry).newTimer(anyString(), 
anyString());
-    
doReturn(mock(Counter.class)).when(metricsRegistry).newCounter(anyString(), 
anyString());
-    doReturn(mock(Gauge.class)).when(metricsRegistry).newGauge(anyString(), 
any());
-    doReturn(metricsRegistry).when(taskContext).getMetricsRegistry();
+    TaskContext taskContext = getMockTaskContext();
 
     SamzaContainerContext containerContext = mock(SamzaContainerContext.class);
 
@@ -86,35 +98,53 @@ public class TestRemoteTable {
     return (T) table;
   }
 
-  private void doTestGet(boolean sync, boolean error) throws Exception {
+  private void doTestGet(boolean sync, boolean error, boolean retry) throws 
Exception {
+    String tableId = "testGet-" + sync + error + retry;
     TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
     // Sync is backed by async so needs to mock the async method
     CompletableFuture<String> future;
     if (error) {
       future = new CompletableFuture();
       future.completeExceptionally(new RuntimeException("Test exception"));
+      if (!retry) {
+        doReturn(future).when(readFn).getAsync(anyString());
+      } else {
+        final int [] times = new int[] {0};
+        doAnswer(args -> times[0]++ == 0 ? future : 
CompletableFuture.completedFuture("bar"))
+            .when(readFn).getAsync(anyString());
+      }
     } else {
       future = CompletableFuture.completedFuture("bar");
+      doReturn(future).when(readFn).getAsync(anyString());
     }
-    doReturn(future).when(readFn).getAsync(anyString());
-    RemoteReadableTable<String, String> table = getTable("testGet-" + sync + 
error, readFn, null);
+    if (retry) {
+      doReturn(true).when(readFn).isRetriable(any());
+      TableRetryPolicy policy = new TableRetryPolicy();
+      readFn = new RetriableReadFunction<>(policy, readFn, schedExec);
+    }
+    RemoteReadableTable<String, String> table = getTable(tableId, readFn, 
null);
     Assert.assertEquals("bar", sync ? table.get("foo") : 
table.getAsync("foo").get());
     verify(table.readRateLimiter, times(1)).throttle(anyString());
   }
 
   @Test
   public void testGet() throws Exception {
-    doTestGet(true, false);
+    doTestGet(true, false, false);
   }
 
   @Test
   public void testGetAsync() throws Exception {
-    doTestGet(false, false);
+    doTestGet(false, false, false);
   }
 
   @Test(expected = ExecutionException.class)
   public void testGetAsyncError() throws Exception {
-    doTestGet(false, true);
+    doTestGet(false, true, false);
+  }
+
+  @Test
+  public void testGetAsyncErrorRetried() throws Exception {
+    doTestGet(false, true, true);
   }
 
   @Test
@@ -139,23 +169,36 @@ public class TestRemoteTable {
           });
   }
 
-  private void doTestPut(boolean sync, boolean error, boolean isDelete) throws 
Exception {
-    TableWriteFunction<String, String> writeFn = 
mock(TableWriteFunction.class);
-    RemoteReadWriteTable<String, String> table = getTable("testPut-" + sync + 
error + isDelete,
-        mock(TableReadFunction.class), writeFn);
-    CompletableFuture<Void> future;
-    if (error) {
-      future = new CompletableFuture();
-      future.completeExceptionally(new RuntimeException("Test exception"));
+  private void doTestPut(boolean sync, boolean error, boolean isDelete, 
boolean retry) throws Exception {
+    String tableId = "testPut-" + sync + error + isDelete + retry;
+    TableWriteFunction<String, String> mockWriteFn = 
mock(TableWriteFunction.class);
+    TableWriteFunction<String, String> writeFn = mockWriteFn;
+    CompletableFuture<Void> successFuture = 
CompletableFuture.completedFuture(null);
+    CompletableFuture<Void> failureFuture = new CompletableFuture();
+    failureFuture.completeExceptionally(new RuntimeException("Test 
exception"));
+    if (!error) {
+      if (isDelete) {
+        doReturn(successFuture).when(writeFn).deleteAsync(any());
+      } else {
+        doReturn(successFuture).when(writeFn).putAsync(any(), any());
+      }
+    } else if (!retry) {
+      if (isDelete) {
+        doReturn(failureFuture).when(writeFn).deleteAsync(any());
+      } else {
+        doReturn(failureFuture).when(writeFn).putAsync(any(), any());
+      }
     } else {
-      future = CompletableFuture.completedFuture(null);
-    }
-    // Sync is backed by async so needs to mock the async method
-    if (isDelete) {
-      doReturn(future).when(writeFn).deleteAsync(any());
-    } else {
-      doReturn(future).when(writeFn).putAsync(any(), any());
+      doReturn(true).when(writeFn).isRetriable(any());
+      final int [] times = new int[] {0};
+      if (isDelete) {
+        doAnswer(args -> times[0]++ == 0 ? failureFuture : 
successFuture).when(writeFn).deleteAsync(any());
+      } else {
+        doAnswer(args -> times[0]++ == 0 ? failureFuture : 
successFuture).when(writeFn).putAsync(any(), any());
+      }
+      writeFn = new RetriableWriteFunction<>(new TableRetryPolicy(), writeFn, 
schedExec);
     }
+    RemoteReadWriteTable<String, String> table = getTable(tableId, 
mock(TableReadFunction.class), writeFn);
     if (sync) {
       table.put("foo", isDelete ? null : "bar");
     } else {
@@ -164,9 +207,9 @@ public class TestRemoteTable {
     ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
     ArgumentCaptor<String> valCaptor = ArgumentCaptor.forClass(String.class);
     if (isDelete) {
-      verify(writeFn, times(1)).deleteAsync(keyCaptor.capture());
+      verify(mockWriteFn, times(1)).deleteAsync(keyCaptor.capture());
     } else {
-      verify(writeFn, times(1)).putAsync(keyCaptor.capture(), 
valCaptor.capture());
+      verify(mockWriteFn, times(retry ? 2 : 1)).putAsync(keyCaptor.capture(), 
valCaptor.capture());
       Assert.assertEquals("bar", valCaptor.getValue());
     }
     Assert.assertEquals("foo", keyCaptor.getValue());
@@ -179,27 +222,32 @@ public class TestRemoteTable {
 
   @Test
   public void testPut() throws Exception {
-    doTestPut(true, false, false);
+    doTestPut(true, false, false, false);
   }
 
   @Test
   public void testPutDelete() throws Exception {
-    doTestPut(true, false, true);
+    doTestPut(true, false, true, false);
   }
 
   @Test
   public void testPutAsync() throws Exception {
-    doTestPut(false, false, false);
+    doTestPut(false, false, false, false);
   }
 
   @Test
   public void testPutAsyncDelete() throws Exception {
-    doTestPut(false, false, true);
+    doTestPut(false, false, true, false);
   }
 
   @Test(expected = ExecutionException.class)
   public void testPutAsyncError() throws Exception {
-    doTestPut(false, true, false);
+    doTestPut(false, true, false, false);
+  }
+
+  @Test
+  public void testPutAsyncErrorRetried() throws Exception {
+    doTestPut(false, true, false, true);
   }
 
   private void doTestDelete(boolean sync, boolean error) throws Exception {

http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java
 
b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java
index e30da12..efe1acf 100644
--- 
a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java
+++ 
b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java
@@ -31,6 +31,9 @@ import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.Timer;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.retry.RetriableReadFunction;
+import org.apache.samza.table.retry.RetriableWriteFunction;
+import org.apache.samza.table.retry.TableRetryPolicy;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.util.EmbeddedTaggedRateLimiter;
 import org.apache.samza.util.RateLimiter;
@@ -138,7 +141,9 @@ public class TestRemoteTableDescriptor {
   private void doTestDeserializeReadFunctionAndLimiter(boolean rateOnly, 
boolean rlGets, boolean rlPuts) {
     int numRateLimitOps = (rlGets ? 1 : 0) + (rlPuts ? 1 : 0);
     RemoteTableDescriptor<String, String> desc = new 
RemoteTableDescriptor("1");
-    desc.withReadFunction(mock(TableReadFunction.class));
+    TableRetryPolicy retryPolicy = new TableRetryPolicy();
+    retryPolicy.withRetryPredicate((ex) -> false);
+    desc.withReadFunction(mock(TableReadFunction.class), retryPolicy);
     desc.withWriteFunction(mock(TableWriteFunction.class));
     desc.withAsyncCallbackExecutorPoolSize(10);
 
@@ -178,6 +183,9 @@ public class TestRemoteTableDescriptor {
 
     ThreadPoolExecutor callbackExecutor = (ThreadPoolExecutor) 
rwTable.callbackExecutor;
     Assert.assertEquals(10, callbackExecutor.getCorePoolSize());
+
+    Assert.assertNotNull(rwTable.readFn instanceof RetriableReadFunction);
+    Assert.assertNotNull(!(rwTable.writeFn instanceof RetriableWriteFunction));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
 
b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
new file mode 100644
index 0000000..9dd5a74
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.retry;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.remote.TableWriteFunction;
+import org.apache.samza.table.remote.TestRemoteTable;
+import org.apache.samza.table.utils.TableMetricsUtil;
+import org.apache.samza.task.TaskContext;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+
+public class TestRetriableTableFunctions {
+  private final ScheduledExecutorService schedExec = 
Executors.newSingleThreadScheduledExecutor();
+
+  public TableMetricsUtil getMetricsUtil(String tableId) {
+    Table table = mock(Table.class);
+    SamzaContainerContext cntCtx = mock(SamzaContainerContext.class);
+    TaskContext taskCtx = TestRemoteTable.getMockTaskContext();
+    return new TableMetricsUtil(cntCtx, taskCtx, table, tableId);
+  }
+
+  @Test
+  public void testFirstTimeSuccessGet() throws Exception {
+    String tableId = "testFirstTimeSuccessGet";
+    TableRetryPolicy policy = new TableRetryPolicy();
+    policy.withFixedBackoff(Duration.ofMillis(100));
+    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+    doReturn(true).when(readFn).isRetriable(any());
+    
doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(anyString());
+    RetriableReadFunction<String, String> retryIO = new 
RetriableReadFunction<>(policy, readFn, schedExec);
+    retryIO.setMetrics(getMetricsUtil(tableId));
+    Assert.assertEquals("bar", retryIO.getAsync("foo").get());
+    verify(readFn, times(1)).getAsync(anyString());
+
+    Assert.assertEquals(0, retryIO.retryMetrics.retryCount.getCount());
+    Assert.assertEquals(1, retryIO.retryMetrics.successCount.getCount());
+    Assert.assertEquals(0, 
retryIO.retryMetrics.retryTimer.getSnapshot().getMax());
+  }
+
+  @Test
+  public void testRetryEngagedGet() throws Exception {
+    String tableId = "testRetryEngagedGet";
+    TableRetryPolicy policy = new TableRetryPolicy();
+    policy.withFixedBackoff(Duration.ofMillis(10));
+    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+    doReturn(true).when(readFn).isRetriable(any());
+
+    int [] times = new int[] {0};
+    Map<String, String> map = new HashMap<>();
+    map.put("foo1", "bar1");
+    map.put("foo2", "bar2");
+    doAnswer(invocation -> {
+        CompletableFuture<Map<String, String>> future = new 
CompletableFuture();
+        if (times[0] > 0) {
+          future.complete(map);
+        } else {
+          times[0]++;
+          future.completeExceptionally(new RuntimeException("test exception"));
+        }
+        return future;
+      }).when(readFn).getAllAsync(any());
+
+    RetriableReadFunction<String, String> retryIO = new 
RetriableReadFunction<>(policy, readFn, schedExec);
+    retryIO.setMetrics(getMetricsUtil(tableId));
+    Assert.assertEquals(map, retryIO.getAllAsync(Arrays.asList("foo1", 
"foo2")).get());
+    verify(readFn, times(2)).getAllAsync(any());
+
+    Assert.assertEquals(1, retryIO.retryMetrics.retryCount.getCount());
+    Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount());
+    Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 
0);
+  }
+
+  @Test
+  public void testRetryExhaustedTimeGet() throws Exception {
+    String tableId = "testRetryExhaustedTime";
+    TableRetryPolicy policy = new TableRetryPolicy();
+    policy.withFixedBackoff(Duration.ofMillis(5));
+    policy.withStopAfterDelay(Duration.ofMillis(100));
+    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+    doReturn(true).when(readFn).isRetriable(any());
+
+    CompletableFuture<String> future = new CompletableFuture();
+    future.completeExceptionally(new RuntimeException("test exception"));
+    doReturn(future).when(readFn).getAsync(anyString());
+
+    RetriableReadFunction<String, String> retryIO = new 
RetriableReadFunction<>(policy, readFn, schedExec);
+    retryIO.setMetrics(getMetricsUtil(tableId));
+    try {
+      retryIO.getAsync("foo").get();
+      Assert.fail();
+    } catch (ExecutionException e) {
+    }
+
+    // Conservatively: must be at least 3 attempts with 5ms backoff and 100ms 
maxDelay
+    verify(readFn, atLeast(3)).getAsync(anyString());
+    Assert.assertTrue(retryIO.retryMetrics.retryCount.getCount() >= 3);
+    Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount());
+    Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 
0);
+  }
+
+  @Test
+  public void testRetryExhaustedAttemptsGet() throws Exception {
+    String tableId = "testRetryExhaustedAttempts";
+    TableRetryPolicy policy = new TableRetryPolicy();
+    policy.withFixedBackoff(Duration.ofMillis(5));
+    policy.withStopAfterAttempts(10);
+    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+    doReturn(true).when(readFn).isRetriable(any());
+
+    CompletableFuture<String> future = new CompletableFuture();
+    future.completeExceptionally(new RuntimeException("test exception"));
+    doReturn(future).when(readFn).getAllAsync(any());
+
+    RetriableReadFunction<String, String> retryIO = new 
RetriableReadFunction<>(policy, readFn, schedExec);
+    retryIO.setMetrics(getMetricsUtil(tableId));
+    try {
+      retryIO.getAllAsync(Arrays.asList("foo1", "foo2")).get();
+      Assert.fail();
+    } catch (ExecutionException e) {
+    }
+
+    // 1 initial try + 10 retries
+    verify(readFn, times(11)).getAllAsync(any());
+    Assert.assertEquals(10, retryIO.retryMetrics.retryCount.getCount());
+    Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount());
+    Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 
0);
+  }
+
+  @Test
+  public void testFirstTimeSuccessPut() throws Exception {
+    String tableId = "testFirstTimeSuccessPut";
+    TableRetryPolicy policy = new TableRetryPolicy();
+    policy.withFixedBackoff(Duration.ofMillis(100));
+    TableWriteFunction<String, String> writeFn = 
mock(TableWriteFunction.class);
+    doReturn(true).when(writeFn).isRetriable(any());
+    
doReturn(CompletableFuture.completedFuture("bar")).when(writeFn).putAsync(anyString(),
 anyString());
+    RetriableWriteFunction<String, String> retryIO = new 
RetriableWriteFunction<>(policy, writeFn, schedExec);
+    retryIO.setMetrics(getMetricsUtil(tableId));
+    retryIO.putAsync("foo", "bar").get();
+    verify(writeFn, times(1)).putAsync(anyString(), anyString());
+
+    Assert.assertEquals(0, retryIO.retryMetrics.retryCount.getCount());
+    Assert.assertEquals(1, retryIO.retryMetrics.successCount.getCount());
+    Assert.assertEquals(0, 
retryIO.retryMetrics.retryTimer.getSnapshot().getMax());
+  }
+
+  @Test
+  public void testRetryEngagedPut() throws Exception {
+    String tableId = "testRetryEngagedPut";
+    TableRetryPolicy policy = new TableRetryPolicy();
+    policy.withFixedBackoff(Duration.ofMillis(10));
+    TableWriteFunction<String, String> writeFn = 
mock(TableWriteFunction.class);
+    
doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAllAsync(any());
+    doReturn(true).when(writeFn).isRetriable(any());
+
+    int [] times = new int[] {0};
+    List<Entry<String, String>> records = new ArrayList<>();
+    records.add(new Entry<>("foo1", "bar1"));
+    records.add(new Entry<>("foo2", "bar2"));
+    doAnswer(invocation -> {
+        CompletableFuture<Map<String, String>> future = new 
CompletableFuture();
+        if (times[0] > 0) {
+          future.complete(null);
+        } else {
+          times[0]++;
+          future.completeExceptionally(new RuntimeException("test exception"));
+        }
+        return future;
+      }).when(writeFn).putAllAsync(any());
+
+    RetriableWriteFunction<String, String> retryIO = new 
RetriableWriteFunction<>(policy, writeFn, schedExec);
+    retryIO.setMetrics(getMetricsUtil(tableId));
+    retryIO.putAllAsync(records).get();
+    verify(writeFn, times(2)).putAllAsync(any());
+
+    Assert.assertEquals(1, retryIO.retryMetrics.retryCount.getCount());
+    Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount());
+    Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 
0);
+  }
+
+  @Test
+  public void testRetryExhaustedTimePut() throws Exception {
+    String tableId = "testRetryExhaustedTimePut";
+    TableRetryPolicy policy = new TableRetryPolicy();
+    policy.withFixedBackoff(Duration.ofMillis(5));
+    policy.withStopAfterDelay(Duration.ofMillis(100));
+    TableWriteFunction<String, String> writeFn = 
mock(TableWriteFunction.class);
+    doReturn(true).when(writeFn).isRetriable(any());
+
+    CompletableFuture<String> future = new CompletableFuture();
+    future.completeExceptionally(new RuntimeException("test exception"));
+    doReturn(future).when(writeFn).deleteAsync(anyString());
+
+    RetriableWriteFunction<String, String> retryIO = new 
RetriableWriteFunction<>(policy, writeFn, schedExec);
+    retryIO.setMetrics(getMetricsUtil(tableId));
+    try {
+      retryIO.deleteAsync("foo").get();
+      Assert.fail();
+    } catch (ExecutionException e) {
+    }
+
+    // Conservatively: must be at least 3 attempts with 5ms backoff and 100ms 
maxDelay
+    verify(writeFn, atLeast(3)).deleteAsync(anyString());
+    Assert.assertTrue(retryIO.retryMetrics.retryCount.getCount() >= 3);
+    Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount());
+    Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 
0);
+  }
+
+  @Test
+  public void testRetryExhaustedAttemptsPut() throws Exception {
+    String tableId = "testRetryExhaustedAttemptsPut";
+    TableRetryPolicy policy = new TableRetryPolicy();
+    policy.withFixedBackoff(Duration.ofMillis(5));
+    policy.withStopAfterAttempts(10);
+    TableWriteFunction<String, String> writeFn = 
mock(TableWriteFunction.class);
+    doReturn(true).when(writeFn).isRetriable(any());
+
+    CompletableFuture<String> future = new CompletableFuture();
+    future.completeExceptionally(new RuntimeException("test exception"));
+    doReturn(future).when(writeFn).deleteAllAsync(any());
+
+    RetriableWriteFunction<String, String> retryIO = new 
RetriableWriteFunction<>(policy, writeFn, schedExec);
+    retryIO.setMetrics(getMetricsUtil(tableId));
+    try {
+      retryIO.deleteAllAsync(Arrays.asList("foo1", "foo2")).get();
+      Assert.fail();
+    } catch (ExecutionException e) {
+    }
+
+    // 1 initial try + 10 retries
+    verify(writeFn, times(11)).deleteAllAsync(any());
+    Assert.assertEquals(10, retryIO.retryMetrics.retryCount.getCount());
+    Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount());
+    Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 
0);
+  }
+
+  @Test
+  public void testMixedIsRetriablePredicates() throws Exception {
+    String tableId = "testMixedIsRetriablePredicates";
+    TableRetryPolicy policy = new TableRetryPolicy();
+    policy.withFixedBackoff(Duration.ofMillis(100));
+
+    // Retry should be attempted based on the custom classification, ie. retry 
on NPE
+    policy.withRetryPredicate(ex -> ex instanceof NullPointerException);
+
+    TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+
+    // Table fn classification only retries on IllegalArgumentException
+    doAnswer(arg -> arg.getArgumentAt(0, Throwable.class) instanceof 
IllegalArgumentException).when(readFn).isRetriable(any());
+
+    int [] times = new int[1];
+    doAnswer(arg -> {
+        if (times[0]++ == 0) {
+          CompletableFuture<String> future = new CompletableFuture();
+          future.completeExceptionally(new NullPointerException("test 
exception"));
+          return future;
+        } else {
+          return CompletableFuture.completedFuture("bar");
+        }
+      }).when(readFn).getAsync(any());
+
+    RetriableReadFunction<String, String> retryIO = new 
RetriableReadFunction<>(policy, readFn, schedExec);
+    retryIO.setMetrics(getMetricsUtil(tableId));
+
+    Assert.assertEquals("bar", retryIO.getAsync("foo").get());
+
+    verify(readFn, times(2)).getAsync(anyString());
+    Assert.assertEquals(1, retryIO.retryMetrics.retryCount.getCount());
+    Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount());
+    Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 
0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java
 
b/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java
new file mode 100644
index 0000000..c343d63
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.retry;
+
+import java.time.Duration;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import net.jodah.failsafe.RetryPolicy;
+
+
+public class TestTableRetryPolicy {
+  @Test
+  public void testNoRetry() {
+    TableRetryPolicy retryPolicy = new TableRetryPolicy();
+    Assert.assertEquals(TableRetryPolicy.BackoffType.NONE, 
retryPolicy.getBackoffType());
+  }
+
+  @Test
+  public void testFixedRetry() {
+    TableRetryPolicy retryPolicy = new TableRetryPolicy();
+    retryPolicy.withFixedBackoff(Duration.ofMillis(1000));
+    retryPolicy.withJitter(Duration.ofMillis(100));
+    retryPolicy.withStopAfterAttempts(4);
+    Assert.assertEquals(TableRetryPolicy.BackoffType.FIXED, 
retryPolicy.getBackoffType());
+    RetryPolicy fsRetry = FailsafeAdapter.valueOf(retryPolicy);
+    Assert.assertEquals(1000, fsRetry.getDelay().toMillis());
+    Assert.assertEquals(100, fsRetry.getJitter().toMillis());
+    Assert.assertEquals(4, fsRetry.getMaxRetries());
+    Assert.assertNotNull(retryPolicy.getRetryPredicate());
+  }
+
+  @Test
+  public void testRandomRetry() {
+    TableRetryPolicy retryPolicy = new TableRetryPolicy();
+    retryPolicy.withRandomBackoff(Duration.ofMillis(1000), 
Duration.ofMillis(2000));
+    retryPolicy.withJitter(Duration.ofMillis(100)); // no-op
+    Assert.assertEquals(TableRetryPolicy.BackoffType.RANDOM, 
retryPolicy.getBackoffType());
+    RetryPolicy fsRetry = FailsafeAdapter.valueOf(retryPolicy);
+    Assert.assertEquals(1000, fsRetry.getDelayMin().toMillis());
+    Assert.assertEquals(2000, fsRetry.getDelayMax().toMillis());
+  }
+
+  @Test
+  public void testExponentialRetry() {
+    TableRetryPolicy retryPolicy = new TableRetryPolicy();
+    retryPolicy.withExponentialBackoff(Duration.ofMillis(1000), 
Duration.ofMillis(2000), 1.5);
+    retryPolicy.withJitter(Duration.ofMillis(100));
+    Assert.assertEquals(TableRetryPolicy.BackoffType.EXPONENTIAL, 
retryPolicy.getBackoffType());
+    RetryPolicy fsRetry = FailsafeAdapter.valueOf(retryPolicy);
+    Assert.assertEquals(1000, fsRetry.getDelay().toMillis());
+    Assert.assertEquals(2000, fsRetry.getMaxDelay().toMillis());
+    Assert.assertEquals(1.5, fsRetry.getDelayFactor(), 0.001);
+    Assert.assertEquals(100, fsRetry.getJitter().toMillis());
+  }
+
+  @Test
+  public void testCustomRetryPredicate() {
+    TableRetryPolicy retryPolicy = new TableRetryPolicy();
+    retryPolicy.withRetryPredicate((e) -> e instanceof 
IllegalArgumentException);
+    Assert.assertTrue(retryPolicy.getRetryPredicate().test(new 
IllegalArgumentException()));
+    Assert.assertFalse(retryPolicy.getRetryPredicate().test(new 
NullPointerException()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java 
b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
index d79683e..4cf99ff 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
@@ -92,6 +92,11 @@ public class TestRemoteTable extends 
AbstractIntegrationTestHarness {
       return CompletableFuture.completedFuture(profileMap.get(key));
     }
 
+    @Override
+    public boolean isRetriable(Throwable exception) {
+      return false;
+    }
+
     static InMemoryReadFunction getInMemoryReadFunction(String 
serializedProfiles) {
       return new InMemoryReadFunction(serializedProfiles);
     }
@@ -124,6 +129,11 @@ public class TestRemoteTable extends 
AbstractIntegrationTestHarness {
       records.remove(key);
       return CompletableFuture.completedFuture(null);
     }
+
+    @Override
+    public boolean isRetriable(Throwable exception) {
+      return false;
+    }
   }
 
   private <K, V> Table<KV<K, V>> getCachingTable(Table<KV<K, V>> actualTable, 
boolean defaultCache, String id, StreamApplicationDescriptor appDesc) {
@@ -142,6 +152,18 @@ public class TestRemoteTable extends 
AbstractIntegrationTestHarness {
     return appDesc.getTable(cachingDesc);
   }
 
+  static class MyReadFunction implements TableReadFunction {
+    @Override
+    public CompletableFuture getAsync(Object key) {
+      return null;
+    }
+
+    @Override
+    public boolean isRetriable(Throwable exception) {
+      return false;
+    }
+  }
+
   private void doTestStreamTableJoinRemoteTable(boolean withCache, boolean 
defaultCache, String testName) throws Exception {
     final InMemoryWriteFunction writer = new InMemoryWriteFunction(testName);
 
@@ -166,9 +188,12 @@ public class TestRemoteTable extends 
AbstractIntegrationTestHarness {
           
.withReadFunction(InMemoryReadFunction.getInMemoryReadFunction(profiles))
           .withRateLimiter(readRateLimiter, null, null);
 
+      // dummy reader
+      TableReadFunction readFn = new MyReadFunction();
+
       RemoteTableDescriptor<Integer, EnrichedPageView> outputTableDesc = new 
RemoteTableDescriptor<>("enriched-page-view-table-1");
       outputTableDesc
-          .withReadFunction(key -> null) // dummy reader
+          .withReadFunction(readFn)
           .withWriteFunction(writer)
           .withRateLimiter(writeRateLimiter, null, null);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
 
b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
index 41b6509..34ffbd4 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
@@ -24,6 +24,8 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.ConfigRewriter;
@@ -116,15 +118,27 @@ public class TestTableDescriptorsProvider {
   public static class MySampleNonTableDescriptorsProvider {
   }
 
+  static class MyReadFunction implements TableReadFunction {
+    @Override
+    public CompletableFuture getAsync(Object key) {
+      return null;
+    }
+
+    @Override
+    public boolean isRetriable(Throwable exception) {
+      return false;
+    }
+  }
+
   public static class MySampleTableDescriptorsProvider implements 
TableDescriptorsProvider {
     @Override
     public List<TableDescriptor> getTableDescriptors(Config config) {
       List<TableDescriptor> tableDescriptors = new ArrayList<>();
       final RateLimiter readRateLimiter = mock(RateLimiter.class);
-      final TableReadFunction readRemoteTable = (TableReadFunction) key -> 
null;
+      final MyReadFunction readFn = new MyReadFunction();
 
       tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1")
-          .withReadFunction(readRemoteTable)
+          .withReadFunction(readFn)
           .withRateLimiter(readRateLimiter, null, null)
           .withSerde(KVSerde.of(new StringSerde(), new LongSerde())));
       tableDescriptors.add(new RocksDbTableDescriptor("local-table-1")

Reply via email to