Repository: samza Updated Branches: refs/heads/master 02192052f -> e4719b448
SAMZA-1795: table: add common retry for IO functions Add common retry functionality to table IO functions for data stores that do not have native retry support. We use failsafe as the retry library. Author: Peng Du <[email protected]> Reviewers: Xinyu Liu <[email protected]> Closes #618 from pdu-mn1/retry-support Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e4719b44 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e4719b44 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e4719b44 Branch: refs/heads/master Commit: e4719b448c4ac0bbdf7c6832e5e7027dd8c49da0 Parents: 0219205 Author: Peng Du <[email protected]> Authored: Mon Sep 17 12:16:39 2018 -0700 Committer: xiliu <[email protected]> Committed: Mon Sep 17 12:16:39 2018 -0700 ---------------------------------------------------------------------- build.gradle | 1 + gradle/dependency-versions.gradle | 1 + .../table/remote/RemoteReadWriteTable.java | 2 +- .../samza/table/remote/RemoteReadableTable.java | 2 +- .../table/remote/RemoteTableDescriptor.java | 46 ++- .../samza/table/remote/RemoteTableProvider.java | 42 ++- .../samza/table/remote/TableReadFunction.java | 7 + .../samza/table/remote/TableWriteFunction.java | 7 + .../samza/table/retry/FailsafeAdapter.java | 103 ++++++ .../table/retry/RetriableReadFunction.java | 102 ++++++ .../table/retry/RetriableWriteFunction.java | 120 +++++++ .../apache/samza/table/retry/RetryMetrics.java | 59 ++++ .../samza/table/retry/TableRetryPolicy.java | 257 +++++++++++++++ .../samza/table/remote/TestRemoteTable.java | 116 +++++-- .../table/remote/TestRemoteTableDescriptor.java | 10 +- .../retry/TestRetriableTableFunctions.java | 316 +++++++++++++++++++ .../samza/table/retry/TestTableRetryPolicy.java | 82 +++++ .../samza/test/table/TestRemoteTable.java | 27 +- .../table/TestTableDescriptorsProvider.java | 18 +- 19 files changed, 1274 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 7d23717..8b68205 100644 --- a/build.gradle +++ b/build.gradle @@ -184,6 +184,7 @@ project(":samza-core_$scalaVersion") { compile "org.eclipse.jetty:jetty-webapp:$jettyVersion" compile "org.scala-lang:scala-library:$scalaLibVersion" compile "org.slf4j:slf4j-api:$slf4jVersion" + compile "net.jodah:failsafe:$failsafeVersion" testCompile project(":samza-api").sourceSets.test.output testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-core:$mockitoVersion" http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/gradle/dependency-versions.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index 3c4c3a9..b33ab82 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -45,4 +45,5 @@ yarnVersion = "2.6.1" zkClientVersion = "0.8" zookeeperVersion = "3.4.6" + failsafeVersion = "1.1.0" } http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java index 88bc7df..9ef4c1b 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java @@ -43,11 +43,11 @@ import com.google.common.base.Preconditions; * @param <V> the type of the value in this table */ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implements ReadWriteTable<K, V> { - private final TableWriteFunction<K, V> writeFn; private DefaultTableWriteMetrics writeMetrics; @VisibleForTesting + final TableWriteFunction<K, V> writeFn; final TableRateLimiter writeRateLimiter; public RemoteReadWriteTable(String tableId, TableReadFunction readFn, TableWriteFunction writeFn, http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java index 3186fee..b3d82f3 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java @@ -80,10 +80,10 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> { protected final ExecutorService callbackExecutor; protected final ExecutorService tableExecutor; - private final TableReadFunction<K, V> readFn; private DefaultTableReadMetrics readMetrics; @VisibleForTesting + final TableReadFunction<K, V> readFn; final TableRateLimiter<K, V> readRateLimiter; /** http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java index a8d419d..537ff87 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.samza.operators.BaseTableDescriptor; import org.apache.samza.table.TableSpec; +import org.apache.samza.table.retry.TableRetryPolicy; import org.apache.samza.table.utils.SerdeUtils; import org.apache.samza.util.EmbeddedTaggedRateLimiter; import org.apache.samza.util.RateLimiter; @@ -70,6 +71,9 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot private TableRateLimiter.CreditFunction<K, V> readCreditFn; private TableRateLimiter.CreditFunction<K, V> writeCreditFn; + private TableRetryPolicy readRetryPolicy; + private TableRetryPolicy writeRetryPolicy; + // By default execute future callbacks on the native client threads // ie. no additional thread pool for callbacks. private int asyncCallbackPoolSize = -1; @@ -115,13 +119,23 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot "write credit function", writeCreditFn)); } + if (readRetryPolicy != null) { + tableSpecConfig.put(RemoteTableProvider.READ_RETRY_POLICY, SerdeUtils.serialize( + "read retry policy", readRetryPolicy)); + } + + if (writeRetryPolicy != null) { + tableSpecConfig.put(RemoteTableProvider.WRITE_RETRY_POLICY, SerdeUtils.serialize( + "write retry policy", writeRetryPolicy)); + } + tableSpecConfig.put(RemoteTableProvider.ASYNC_CALLBACK_POOL_SIZE, String.valueOf(asyncCallbackPoolSize)); return new TableSpec(tableId, serde, RemoteTableProviderFactory.class.getName(), tableSpecConfig); } /** - * Use specified TableReadFunction with remote table. + * Use specified TableReadFunction with remote table and a retry policy. * @param readFn read function instance * @return this table descriptor instance */ @@ -132,7 +146,7 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot } /** - * Use specified TableWriteFunction with remote table. + * Use specified TableWriteFunction with remote table and a retry policy. * @param writeFn write function instance * @return this table descriptor instance */ @@ -143,6 +157,34 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot } /** + * Use specified TableReadFunction with remote table. + * @param readFn read function instance + * @param retryPolicy retry policy for the read function + * @return this table descriptor instance + */ + public RemoteTableDescriptor<K, V> withReadFunction(TableReadFunction<K, V> readFn, TableRetryPolicy retryPolicy) { + Preconditions.checkNotNull(readFn, "null read function"); + Preconditions.checkNotNull(retryPolicy, "null retry policy"); + this.readFn = readFn; + this.readRetryPolicy = retryPolicy; + return this; + } + + /** + * Use specified TableWriteFunction with remote table. + * @param writeFn write function instance + * @param retryPolicy retry policy for the write function + * @return this table descriptor instance + */ + public RemoteTableDescriptor<K, V> withWriteFunction(TableWriteFunction<K, V> writeFn, TableRetryPolicy retryPolicy) { + Preconditions.checkNotNull(writeFn, "null write function"); + Preconditions.checkNotNull(retryPolicy, "null retry policy"); + this.writeFn = writeFn; + this.writeRetryPolicy = retryPolicy; + return this; + } + + /** * Specify a rate limiter along with credit functions to map a table record (as KV) to the amount * of credits to be charged from the rate limiter for table read and write operations. * This is an advanced API that provides greater flexibility to throttle each record in the table http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java index 6c5d9b3..cae0bbd 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java @@ -25,11 +25,16 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import org.apache.samza.table.Table; import org.apache.samza.table.TableSpec; +import org.apache.samza.table.retry.RetriableReadFunction; +import org.apache.samza.table.retry.RetriableWriteFunction; +import org.apache.samza.table.retry.TableRetryPolicy; import org.apache.samza.table.utils.BaseTableProvider; import org.apache.samza.table.utils.SerdeUtils; +import org.apache.samza.table.utils.TableMetricsUtil; import org.apache.samza.util.RateLimiter; import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_READ_TAG; @@ -47,6 +52,8 @@ public class RemoteTableProvider extends BaseTableProvider { static final String READ_CREDIT_FN = "io.read.credit.func"; static final String WRITE_CREDIT_FN = "io.write.credit.func"; static final String ASYNC_CALLBACK_POOL_SIZE = "io.async.callback.pool.size"; + static final String READ_RETRY_POLICY = "io.read.retry.policy"; + static final String WRITE_RETRY_POLICY = "io.write.retry.policy"; private final boolean readOnly; private final List<RemoteReadableTable<?, ?>> tables = new ArrayList<>(); @@ -58,6 +65,7 @@ public class RemoteTableProvider extends BaseTableProvider { */ private static Map<String, ExecutorService> tableExecutors = new ConcurrentHashMap<>(); private static Map<String, ExecutorService> callbackExecutors = new ConcurrentHashMap<>(); + private static ScheduledExecutorService retryExecutor; public RemoteTableProvider(TableSpec tableSpec) { super(tableSpec); @@ -72,7 +80,7 @@ public class RemoteTableProvider extends BaseTableProvider { RemoteReadableTable table; String tableId = tableSpec.getId(); - TableReadFunction<?, ?> readFn = getReadFn(); + TableReadFunction readFn = getReadFn(); RateLimiter rateLimiter = deserializeObject(RATE_LIMITER); if (rateLimiter != null) { rateLimiter.init(containerContext.config, taskContext); @@ -83,11 +91,33 @@ public class RemoteTableProvider extends BaseTableProvider { TableRateLimiter.CreditFunction<?, ?> writeCreditFn; TableRateLimiter writeRateLimiter = null; + TableRetryPolicy readRetryPolicy = deserializeObject(READ_RETRY_POLICY); + TableRetryPolicy writeRetryPolicy = null; + + if ((readRetryPolicy != null || writeRetryPolicy != null) && retryExecutor == null) { + retryExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> { + Thread thread = new Thread(runnable); + thread.setName("table-retry-executor"); + thread.setDaemon(true); + return thread; + }); + } + + if (readRetryPolicy != null) { + readFn = new RetriableReadFunction<>(readRetryPolicy, readFn, retryExecutor); + } + + TableWriteFunction writeFn = getWriteFn(); + boolean isRateLimited = readRateLimiter.isRateLimited(); if (!readOnly) { writeCreditFn = deserializeObject(WRITE_CREDIT_FN); writeRateLimiter = new TableRateLimiter(tableSpec.getId(), rateLimiter, writeCreditFn, RL_WRITE_TAG); isRateLimited |= writeRateLimiter.isRateLimited(); + writeRetryPolicy = deserializeObject(WRITE_RETRY_POLICY); + if (writeRetryPolicy != null) { + writeFn = new RetriableWriteFunction(writeRetryPolicy, writeFn, retryExecutor); + } } // Optional executor for future callback/completion. Shared by both read and write operations. @@ -116,10 +146,18 @@ public class RemoteTableProvider extends BaseTableProvider { table = new RemoteReadableTable(tableSpec.getId(), readFn, readRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId)); } else { - table = new RemoteReadWriteTable(tableSpec.getId(), readFn, getWriteFn(), readRateLimiter, + table = new RemoteReadWriteTable(tableSpec.getId(), readFn, writeFn, readRateLimiter, writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId)); } + TableMetricsUtil metricsUtil = new TableMetricsUtil(containerContext, taskContext, table, tableId); + if (readRetryPolicy != null) { + ((RetriableReadFunction) readFn).setMetrics(metricsUtil); + } + if (writeRetryPolicy != null) { + ((RetriableWriteFunction) writeFn).setMetrics(metricsUtil); + } + table.init(containerContext, taskContext); tables.add(table); return table; http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java b/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java index 5d0f963..4791779 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java @@ -100,5 +100,12 @@ public interface TableReadFunction<K, V> extends Serializable, InitableFunction, .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().join()))); } + /** + * Determine whether the current operation can be retried with the last thrown exception. + * @param exception exception thrown by a table operation + * @return whether the operation can be retried + */ + boolean isRetriable(Throwable exception); + // optionally implement readObject() to initialize transient states } http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java b/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java index 0ac3a0c..d9d619f 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java @@ -143,6 +143,13 @@ public interface TableWriteFunction<K, V> extends Serializable, InitableFunction } /** + * Determine whether the current operation can be retried with the last thrown exception. + * @param exception exception thrown by a table operation + * @return whether the operation can be retried + */ + boolean isRetriable(Throwable exception); + + /** * Flush the remote store (optional) */ default void flush() { http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java b/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java new file mode 100644 index 0000000..b2eccd8 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.retry; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.samza.SamzaException; + +import net.jodah.failsafe.AsyncFailsafe; +import net.jodah.failsafe.Failsafe; +import net.jodah.failsafe.RetryPolicy; + + +/** + * Helper class adapting the generic {@link TableRetryPolicy} to a failsafe {@link RetryPolicy} and + * creating failsafe retryer instances with proper metrics management. + */ +class FailsafeAdapter { + /** + * Convert the {@link TableRetryPolicy} to failsafe {@link RetryPolicy}. + * @return this policy instance + */ + static RetryPolicy valueOf(TableRetryPolicy policy) { + RetryPolicy failSafePolicy = new RetryPolicy(); + + switch (policy.getBackoffType()) { + case NONE: + break; + + case FIXED: + failSafePolicy.withDelay(policy.getSleepTime().toMillis(), TimeUnit.MILLISECONDS); + break; + + case RANDOM: + failSafePolicy.withDelay(policy.getRandomMin().toMillis(), policy.getRandomMax().toMillis(), TimeUnit.MILLISECONDS); + break; + + case EXPONENTIAL: + failSafePolicy.withBackoff(policy.getSleepTime().toMillis(), policy.getExponentialMaxSleep().toMillis(), TimeUnit.MILLISECONDS, + policy.getExponentialFactor()); + break; + + default: + throw new SamzaException("Unknown retry policy type."); + } + + if (policy.getMaxDuration() != null) { + failSafePolicy.withMaxDuration(policy.getMaxDuration().toMillis(), TimeUnit.MILLISECONDS); + } + if (policy.getMaxAttempts() != null) { + failSafePolicy.withMaxRetries(policy.getMaxAttempts()); + } + if (policy.getJitter() != null && policy.getBackoffType() != TableRetryPolicy.BackoffType.RANDOM) { + failSafePolicy.withJitter(policy.getJitter().toMillis(), TimeUnit.MILLISECONDS); + } + + failSafePolicy.retryOn(e -> policy.getRetryPredicate().test(e)); + + return failSafePolicy; + } + + /** + * Obtain an async failsafe retryer instance with the specified policy, metrics, and executor service. + * @param retryPolicy retry policy + * @param metrics retry metrics + * @param retryExec executor service for scheduling async retries + * @return {@link net.jodah.failsafe.AsyncFailsafe} instance + */ + static AsyncFailsafe<?> failsafe(RetryPolicy retryPolicy, RetryMetrics metrics, ScheduledExecutorService retryExec) { + long startMs = System.currentTimeMillis(); + return Failsafe.with(retryPolicy).with(retryExec) + .onRetry(e -> metrics.retryCount.inc()) + .onRetriesExceeded(e -> { + metrics.retryTimer.update(System.currentTimeMillis() - startMs); + metrics.permFailureCount.inc(); + }) + .onSuccess((e, ctx) -> { + if (ctx.getExecutions() > 1) { + metrics.retryTimer.update(System.currentTimeMillis() - startMs); + } else { + metrics.successCount.inc(); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java new file mode 100644 index 0000000..1adddc0 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.retry; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Predicate; + +import org.apache.samza.SamzaException; +import org.apache.samza.table.remote.TableReadFunction; +import org.apache.samza.table.utils.TableMetricsUtil; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import net.jodah.failsafe.RetryPolicy; + +import static org.apache.samza.table.retry.FailsafeAdapter.failsafe; + + +/** + * Wrapper for a {@link TableReadFunction} instance to add common retry + * support with a {@link TableRetryPolicy}. This wrapper is created by + * {@link org.apache.samza.table.remote.RemoteTableProvider} when a retry + * policy is specified together with the {@link TableReadFunction}. + * + * Actual retry mechanism is provided by the failsafe library. Retry is + * attempted in an async way with a {@link ScheduledExecutorService}. + * + * @param <K> the type of the key in this table + * @param <V> the type of the value in this table + */ +public class RetriableReadFunction<K, V> implements TableReadFunction<K, V> { + private final RetryPolicy retryPolicy; + private final TableReadFunction<K, V> readFn; + private final ScheduledExecutorService retryExecutor; + + @VisibleForTesting + RetryMetrics retryMetrics; + + public RetriableReadFunction(TableRetryPolicy policy, TableReadFunction<K, V> readFn, + ScheduledExecutorService retryExecutor) { + Preconditions.checkNotNull(policy); + Preconditions.checkNotNull(readFn); + Preconditions.checkNotNull(retryExecutor); + + this.readFn = readFn; + this.retryExecutor = retryExecutor; + Predicate<Throwable> retryPredicate = policy.getRetryPredicate(); + policy.withRetryPredicate((ex) -> readFn.isRetriable(ex) || retryPredicate.test(ex)); + this.retryPolicy = FailsafeAdapter.valueOf(policy); + } + + @Override + public CompletableFuture<V> getAsync(K key) { + return failsafe(retryPolicy, retryMetrics, retryExecutor) + .future(() -> readFn.getAsync(key)) + .exceptionally(e -> { + throw new SamzaException("Failed to get the record for " + key + " after retries.", e); + }); + } + + @Override + public CompletableFuture<Map<K, V>> getAllAsync(Collection<K> keys) { + return failsafe(retryPolicy, retryMetrics, retryExecutor) + .future(() -> readFn.getAllAsync(keys)) + .exceptionally(e -> { + throw new SamzaException("Failed to get the records for " + keys + " after retries.", e); + }); + } + + @Override + public boolean isRetriable(Throwable exception) { + return readFn.isRetriable(exception); + } + + /** + * Initialize retry-related metrics + * @param metricsUtil metrics util + */ + public void setMetrics(TableMetricsUtil metricsUtil) { + this.retryMetrics = new RetryMetrics("reader", metricsUtil); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java new file mode 100644 index 0000000..2f3f062 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.retry; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Predicate; + +import org.apache.samza.SamzaException; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.table.remote.TableWriteFunction; +import org.apache.samza.table.utils.TableMetricsUtil; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import net.jodah.failsafe.RetryPolicy; + +import static org.apache.samza.table.retry.FailsafeAdapter.failsafe; + + +/** + * Wrapper for a {@link TableWriteFunction} instance to add common retry + * support with a {@link TableRetryPolicy}. This wrapper is created by + * {@link org.apache.samza.table.remote.RemoteTableProvider} when a retry + * policy is specified together with the {@link TableWriteFunction}. + * + * Actual retry mechanism is provided by the failsafe library. Retry is + * attempted in an async way with a {@link ScheduledExecutorService}. + * + * @param <K> the type of the key in this table + * @param <V> the type of the value in this table + */ +public class RetriableWriteFunction<K, V> implements TableWriteFunction<K, V> { + private final RetryPolicy retryPolicy; + private final TableWriteFunction<K, V> writeFn; + private final ScheduledExecutorService retryExecutor; + + @VisibleForTesting + RetryMetrics retryMetrics; + + public RetriableWriteFunction(TableRetryPolicy policy, TableWriteFunction<K, V> writeFn, + ScheduledExecutorService retryExecutor) { + Preconditions.checkNotNull(policy); + Preconditions.checkNotNull(writeFn); + Preconditions.checkNotNull(retryExecutor); + + this.writeFn = writeFn; + this.retryExecutor = retryExecutor; + Predicate<Throwable> retryPredicate = policy.getRetryPredicate(); + policy.withRetryPredicate((ex) -> writeFn.isRetriable(ex) || retryPredicate.test(ex)); + this.retryPolicy = FailsafeAdapter.valueOf(policy); + } + + @Override + public CompletableFuture<Void> putAsync(K key, V record) { + return failsafe(retryPolicy, retryMetrics, retryExecutor) + .future(() -> writeFn.putAsync(key, record)) + .exceptionally(e -> { + throw new SamzaException("Failed to get the record for " + key + " after retries.", e); + }); + } + + @Override + public CompletableFuture<Void> putAllAsync(Collection<Entry<K, V>> records) { + return failsafe(retryPolicy, retryMetrics, retryExecutor) + .future(() -> writeFn.putAllAsync(records)) + .exceptionally(e -> { + throw new SamzaException("Failed to put records after retries.", e); + }); + } + + @Override + public CompletableFuture<Void> deleteAsync(K key) { + return failsafe(retryPolicy, retryMetrics, retryExecutor) + .future(() -> writeFn.deleteAsync(key)) + .exceptionally(e -> { + throw new SamzaException("Failed to delete the record for " + key + " after retries.", e); + }); + } + + @Override + public CompletableFuture<Void> deleteAllAsync(Collection<K> keys) { + return failsafe(retryPolicy, retryMetrics, retryExecutor) + .future(() -> writeFn.deleteAllAsync(keys)) + .exceptionally(e -> { + throw new SamzaException("Failed to delete the records for " + keys + " after retries.", e); + }); + } + + @Override + public boolean isRetriable(Throwable exception) { + return writeFn.isRetriable(exception); + } + + /** + * Initialize retry-related metrics. + * @param metricsUtil metrics util + */ + public void setMetrics(TableMetricsUtil metricsUtil) { + this.retryMetrics = new RetryMetrics("writer", metricsUtil); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/main/java/org/apache/samza/table/retry/RetryMetrics.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/RetryMetrics.java b/samza-core/src/main/java/org/apache/samza/table/retry/RetryMetrics.java new file mode 100644 index 0000000..fbc511c --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/retry/RetryMetrics.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.retry; + +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.Timer; +import org.apache.samza.table.utils.TableMetricsUtil; + + +/** + * Wrapper of retry-related metrics common to both {@link RetriableReadFunction} and + * {@link RetriableWriteFunction}. + */ +class RetryMetrics { + /** + * Number of retries executed (excluding the first attempt) + */ + final Counter retryCount; + + /** + * Number of successes with only the first attempt + */ + final Counter successCount; + + /** + * Number of operations that failed permanently and exhausted all retries + */ + final Counter permFailureCount; + + /** + * Total time spent in each IO; this is updated only + * when at least one retries have been attempted. + */ + final Timer retryTimer; + + public RetryMetrics(String prefix, TableMetricsUtil metricsUtil) { + retryCount = metricsUtil.newCounter(prefix + "-retry-count"); + successCount = metricsUtil.newCounter(prefix + "-success-count"); + permFailureCount = metricsUtil.newCounter(prefix + "-perm-failure-count"); + retryTimer = metricsUtil.newTimer(prefix + "-retry-timer"); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java b/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java new file mode 100644 index 0000000..162eb07 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.retry; + +import java.io.Serializable; +import java.time.Duration; +import java.util.function.Predicate; + +import com.google.common.base.Preconditions; + + +/** + * Common retry policy parameters for table IO. This serves as an abstraction on top of + * retry libraries. This common policy supports below features: + * - backoff modes: fixed, random, exponential + * - termination modes: by attempts, by duration + * - jitter + * + * Retry libraries can implement a subset or all features as described by this common policy. + */ +public class TableRetryPolicy implements Serializable { + enum BackoffType { + /** + * No backoff in between two retry attempts. + */ + NONE, + + /** + * Backoff by a fixed duration {@code sleepTime}. + */ + FIXED, + + /** + * Backoff by a randomly selected duration between {@code minSleep} and {@code maxSleep}. + */ + RANDOM, + + /** + * Backoff by exponentially increasing durations by {@code exponentialFactor} starting from {@code sleepTime}. + */ + EXPONENTIAL + } + + // Backoff parameters + private Duration sleepTime; + private Duration randomMin; + private Duration randomMax; + private double exponentialFactor; + private Duration exponentialMaxSleep; + private Duration jitter; + + // By default no early termination + private Integer maxAttempts = null; + private Duration maxDuration = null; + + // By default no backoff during retries + private BackoffType backoffType = BackoffType.NONE; + + /** + * Serializable adapter interface for {@link java.util.function.Predicate}. + * This is needed because TableRetryPolicy needs to be serializable as part of the + * table config whereas {@link java.util.function.Predicate} is not serializable. + */ + public interface RetryPredicate extends Predicate<Throwable>, Serializable { + } + + // By default no custom retry predicate so retry decision is made solely by the table functions + private RetryPredicate retryPredicate = (ex) -> false; + + /** + * Set the sleepTime time for the fixed backoff policy. + * @param sleepTime sleepTime time + * @return this policy instance + */ + public TableRetryPolicy withFixedBackoff(Duration sleepTime) { + Preconditions.checkNotNull(sleepTime); + this.sleepTime = sleepTime; + this.backoffType = BackoffType.FIXED; + return this; + } + + /** + * Set the sleepTime time for the random backoff policy. The actual sleepTime time + * before each attempt is randomly selected between {@code [minSleep, maxSleep]} + * @param minSleep lower bound sleepTime time + * @param maxSleep upper bound sleepTime time + * @return this policy instance + */ + public TableRetryPolicy withRandomBackoff(Duration minSleep, Duration maxSleep) { + Preconditions.checkNotNull(minSleep); + Preconditions.checkNotNull(maxSleep); + this.randomMin = minSleep; + this.randomMax = maxSleep; + this.backoffType = BackoffType.RANDOM; + return this; + } + + /** + * Set the parameters for the exponential backoff policy. The actual sleepTime time + * is exponentially incremented up to the {@code maxSleep} and multiplying + * successive delays by the {@code factor}. + * @param sleepTime initial sleepTime time + * @param maxSleep upper bound sleepTime time + * @param factor exponential factor for backoff + * @return this policy instance + */ + public TableRetryPolicy withExponentialBackoff(Duration sleepTime, Duration maxSleep, double factor) { + Preconditions.checkNotNull(sleepTime); + Preconditions.checkNotNull(maxSleep); + this.sleepTime = sleepTime; + this.exponentialMaxSleep = maxSleep; + this.exponentialFactor = factor; + this.backoffType = BackoffType.EXPONENTIAL; + return this; + } + + /** + * Set the jitter for the backoff policy to provide additional randomness. + * If this is set, a random value between {@code [0, jitter]} will be added + * to each sleepTime time. This applies to {@code FIXED} and {@code EXPONENTIAL} + * modes only. + * @param jitter initial sleepTime time + * @return this policy instance + */ + public TableRetryPolicy withJitter(Duration jitter) { + Preconditions.checkNotNull(jitter); + if (backoffType != BackoffType.RANDOM) { + this.jitter = jitter; + } + return this; + } + + /** + * Set maximum number of attempts before terminating the operation. + * @param maxAttempts number of attempts + * @return this policy instance + */ + public TableRetryPolicy withStopAfterAttempts(int maxAttempts) { + Preconditions.checkArgument(maxAttempts >= 0); + this.maxAttempts = maxAttempts; + return this; + } + + /** + * Set maximum total delay (sleepTime + execution) before terminating the operation. + * @param maxDelay delay time + * @return this policy instance + */ + public TableRetryPolicy withStopAfterDelay(Duration maxDelay) { + Preconditions.checkNotNull(maxDelay); + this.maxDuration = maxDelay; + return this; + } + + /** + * Set the predicate to use for identifying retriable exceptions. If specified, table + * retry logic will consult both such predicate and table function and retry will be + * attempted if either option returns true. + * @param retryPredicate predicate for retriable exception identification + * @return this policy instance + */ + public TableRetryPolicy withRetryPredicate(RetryPredicate retryPredicate) { + Preconditions.checkNotNull(retryPredicate); + this.retryPredicate = retryPredicate; + return this; + } + + /** + * @return initial/fixed sleep time. + */ + public Duration getSleepTime() { + return sleepTime; + } + + /** + * @return lower sleepTime time for random backoff or null if {@code policyType} is not {@code RANDOM}. + */ + public Duration getRandomMin() { + return randomMin; + } + + /** + * @return upper sleepTime time for random backoff or null if {@code policyType} is not {@code RANDOM}. + */ + public Duration getRandomMax() { + return randomMax; + } + + /** + * @return exponential factor for exponential backoff. + */ + public double getExponentialFactor() { + return exponentialFactor; + } + + /** + * @return maximum sleepTime time for exponential backoff or null if {@code policyType} is not {@code EXPONENTIAL}. + */ + public Duration getExponentialMaxSleep() { + return exponentialMaxSleep; + } + + /** + * Introduce randomness to the sleepTime time. + * @return jitter to add on to each backoff or null if not set. + */ + public Duration getJitter() { + return jitter; + } + + /** + * Termination after a fix number of attempts. + * @return maximum number of attempts without success before giving up the operation or null if not set. + */ + public Integer getMaxAttempts() { + return maxAttempts; + } + + /** + * Termination after a fixed duration. + * @return maximum duration without success before giving up the operation or null if not set. + */ + public Duration getMaxDuration() { + return maxDuration; + } + + /** + * @return type of the backoff. + */ + public BackoffType getBackoffType() { + return backoffType; + } + + /** + * @return Custom predicate for retriable exception identification or null if not specified. + */ + public RetryPredicate getRetryPredicate() { + return retryPredicate; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java index 21fc6a5..3e844c3 100644 --- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java +++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java @@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import org.apache.samza.container.SamzaContainerContext; import org.apache.samza.metrics.Counter; @@ -34,6 +35,9 @@ import org.apache.samza.metrics.Gauge; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.Timer; import org.apache.samza.storage.kv.Entry; +import org.apache.samza.table.retry.RetriableReadFunction; +import org.apache.samza.table.retry.RetriableWriteFunction; +import org.apache.samza.table.retry.TableRetryPolicy; import org.apache.samza.task.TaskContext; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -43,6 +47,7 @@ import junit.framework.Assert; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyCollection; import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -50,6 +55,18 @@ import static org.mockito.Mockito.verify; public class TestRemoteTable { + private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor(); + + public static TaskContext getMockTaskContext() { + MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); + doAnswer(args -> new Timer((String) args.getArguments()[0])).when(metricsRegistry).newTimer(anyString(), anyString()); + doAnswer(args -> new Counter((String) args.getArguments()[0])).when(metricsRegistry).newCounter(anyString(), anyString()); + doAnswer(args -> new Gauge((String) args.getArguments()[0], 0)).when(metricsRegistry).newGauge(anyString(), any()); + TaskContext taskContext = mock(TaskContext.class); + doReturn(metricsRegistry).when(taskContext).getMetricsRegistry(); + return taskContext; + } + private <K, V, T extends RemoteReadableTable<K, V>> T getTable(String tableId, TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn) { return getTable(tableId, readFn, writeFn, null); @@ -72,12 +89,7 @@ public class TestRemoteTable { table = new RemoteReadWriteTable<K, V>(tableId, readFn, writeFn, readRateLimiter, writeRateLimiter, tableExecutor, cbExecutor); } - TaskContext taskContext = mock(TaskContext.class); - MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); - doReturn(mock(Timer.class)).when(metricsRegistry).newTimer(anyString(), anyString()); - doReturn(mock(Counter.class)).when(metricsRegistry).newCounter(anyString(), anyString()); - doReturn(mock(Gauge.class)).when(metricsRegistry).newGauge(anyString(), any()); - doReturn(metricsRegistry).when(taskContext).getMetricsRegistry(); + TaskContext taskContext = getMockTaskContext(); SamzaContainerContext containerContext = mock(SamzaContainerContext.class); @@ -86,35 +98,53 @@ public class TestRemoteTable { return (T) table; } - private void doTestGet(boolean sync, boolean error) throws Exception { + private void doTestGet(boolean sync, boolean error, boolean retry) throws Exception { + String tableId = "testGet-" + sync + error + retry; TableReadFunction<String, String> readFn = mock(TableReadFunction.class); // Sync is backed by async so needs to mock the async method CompletableFuture<String> future; if (error) { future = new CompletableFuture(); future.completeExceptionally(new RuntimeException("Test exception")); + if (!retry) { + doReturn(future).when(readFn).getAsync(anyString()); + } else { + final int [] times = new int[] {0}; + doAnswer(args -> times[0]++ == 0 ? future : CompletableFuture.completedFuture("bar")) + .when(readFn).getAsync(anyString()); + } } else { future = CompletableFuture.completedFuture("bar"); + doReturn(future).when(readFn).getAsync(anyString()); } - doReturn(future).when(readFn).getAsync(anyString()); - RemoteReadableTable<String, String> table = getTable("testGet-" + sync + error, readFn, null); + if (retry) { + doReturn(true).when(readFn).isRetriable(any()); + TableRetryPolicy policy = new TableRetryPolicy(); + readFn = new RetriableReadFunction<>(policy, readFn, schedExec); + } + RemoteReadableTable<String, String> table = getTable(tableId, readFn, null); Assert.assertEquals("bar", sync ? table.get("foo") : table.getAsync("foo").get()); verify(table.readRateLimiter, times(1)).throttle(anyString()); } @Test public void testGet() throws Exception { - doTestGet(true, false); + doTestGet(true, false, false); } @Test public void testGetAsync() throws Exception { - doTestGet(false, false); + doTestGet(false, false, false); } @Test(expected = ExecutionException.class) public void testGetAsyncError() throws Exception { - doTestGet(false, true); + doTestGet(false, true, false); + } + + @Test + public void testGetAsyncErrorRetried() throws Exception { + doTestGet(false, true, true); } @Test @@ -139,23 +169,36 @@ public class TestRemoteTable { }); } - private void doTestPut(boolean sync, boolean error, boolean isDelete) throws Exception { - TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class); - RemoteReadWriteTable<String, String> table = getTable("testPut-" + sync + error + isDelete, - mock(TableReadFunction.class), writeFn); - CompletableFuture<Void> future; - if (error) { - future = new CompletableFuture(); - future.completeExceptionally(new RuntimeException("Test exception")); + private void doTestPut(boolean sync, boolean error, boolean isDelete, boolean retry) throws Exception { + String tableId = "testPut-" + sync + error + isDelete + retry; + TableWriteFunction<String, String> mockWriteFn = mock(TableWriteFunction.class); + TableWriteFunction<String, String> writeFn = mockWriteFn; + CompletableFuture<Void> successFuture = CompletableFuture.completedFuture(null); + CompletableFuture<Void> failureFuture = new CompletableFuture(); + failureFuture.completeExceptionally(new RuntimeException("Test exception")); + if (!error) { + if (isDelete) { + doReturn(successFuture).when(writeFn).deleteAsync(any()); + } else { + doReturn(successFuture).when(writeFn).putAsync(any(), any()); + } + } else if (!retry) { + if (isDelete) { + doReturn(failureFuture).when(writeFn).deleteAsync(any()); + } else { + doReturn(failureFuture).when(writeFn).putAsync(any(), any()); + } } else { - future = CompletableFuture.completedFuture(null); - } - // Sync is backed by async so needs to mock the async method - if (isDelete) { - doReturn(future).when(writeFn).deleteAsync(any()); - } else { - doReturn(future).when(writeFn).putAsync(any(), any()); + doReturn(true).when(writeFn).isRetriable(any()); + final int [] times = new int[] {0}; + if (isDelete) { + doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).deleteAsync(any()); + } else { + doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).putAsync(any(), any()); + } + writeFn = new RetriableWriteFunction<>(new TableRetryPolicy(), writeFn, schedExec); } + RemoteReadWriteTable<String, String> table = getTable(tableId, mock(TableReadFunction.class), writeFn); if (sync) { table.put("foo", isDelete ? null : "bar"); } else { @@ -164,9 +207,9 @@ public class TestRemoteTable { ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class); ArgumentCaptor<String> valCaptor = ArgumentCaptor.forClass(String.class); if (isDelete) { - verify(writeFn, times(1)).deleteAsync(keyCaptor.capture()); + verify(mockWriteFn, times(1)).deleteAsync(keyCaptor.capture()); } else { - verify(writeFn, times(1)).putAsync(keyCaptor.capture(), valCaptor.capture()); + verify(mockWriteFn, times(retry ? 2 : 1)).putAsync(keyCaptor.capture(), valCaptor.capture()); Assert.assertEquals("bar", valCaptor.getValue()); } Assert.assertEquals("foo", keyCaptor.getValue()); @@ -179,27 +222,32 @@ public class TestRemoteTable { @Test public void testPut() throws Exception { - doTestPut(true, false, false); + doTestPut(true, false, false, false); } @Test public void testPutDelete() throws Exception { - doTestPut(true, false, true); + doTestPut(true, false, true, false); } @Test public void testPutAsync() throws Exception { - doTestPut(false, false, false); + doTestPut(false, false, false, false); } @Test public void testPutAsyncDelete() throws Exception { - doTestPut(false, false, true); + doTestPut(false, false, true, false); } @Test(expected = ExecutionException.class) public void testPutAsyncError() throws Exception { - doTestPut(false, true, false); + doTestPut(false, true, false, false); + } + + @Test + public void testPutAsyncErrorRetried() throws Exception { + doTestPut(false, true, false, true); } private void doTestDelete(boolean sync, boolean error) throws Exception { http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java index e30da12..efe1acf 100644 --- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java +++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java @@ -31,6 +31,9 @@ import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.Timer; import org.apache.samza.table.Table; import org.apache.samza.table.TableSpec; +import org.apache.samza.table.retry.RetriableReadFunction; +import org.apache.samza.table.retry.RetriableWriteFunction; +import org.apache.samza.table.retry.TableRetryPolicy; import org.apache.samza.task.TaskContext; import org.apache.samza.util.EmbeddedTaggedRateLimiter; import org.apache.samza.util.RateLimiter; @@ -138,7 +141,9 @@ public class TestRemoteTableDescriptor { private void doTestDeserializeReadFunctionAndLimiter(boolean rateOnly, boolean rlGets, boolean rlPuts) { int numRateLimitOps = (rlGets ? 1 : 0) + (rlPuts ? 1 : 0); RemoteTableDescriptor<String, String> desc = new RemoteTableDescriptor("1"); - desc.withReadFunction(mock(TableReadFunction.class)); + TableRetryPolicy retryPolicy = new TableRetryPolicy(); + retryPolicy.withRetryPredicate((ex) -> false); + desc.withReadFunction(mock(TableReadFunction.class), retryPolicy); desc.withWriteFunction(mock(TableWriteFunction.class)); desc.withAsyncCallbackExecutorPoolSize(10); @@ -178,6 +183,9 @@ public class TestRemoteTableDescriptor { ThreadPoolExecutor callbackExecutor = (ThreadPoolExecutor) rwTable.callbackExecutor; Assert.assertEquals(10, callbackExecutor.getCorePoolSize()); + + Assert.assertNotNull(rwTable.readFn instanceof RetriableReadFunction); + Assert.assertNotNull(!(rwTable.writeFn instanceof RetriableWriteFunction)); } @Test http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java new file mode 100644 index 0000000..9dd5a74 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.retry; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.table.Table; +import org.apache.samza.table.remote.TableReadFunction; +import org.apache.samza.table.remote.TableWriteFunction; +import org.apache.samza.table.remote.TestRemoteTable; +import org.apache.samza.table.utils.TableMetricsUtil; +import org.apache.samza.task.TaskContext; +import org.junit.Test; + +import junit.framework.Assert; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + + +public class TestRetriableTableFunctions { + private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor(); + + public TableMetricsUtil getMetricsUtil(String tableId) { + Table table = mock(Table.class); + SamzaContainerContext cntCtx = mock(SamzaContainerContext.class); + TaskContext taskCtx = TestRemoteTable.getMockTaskContext(); + return new TableMetricsUtil(cntCtx, taskCtx, table, tableId); + } + + @Test + public void testFirstTimeSuccessGet() throws Exception { + String tableId = "testFirstTimeSuccessGet"; + TableRetryPolicy policy = new TableRetryPolicy(); + policy.withFixedBackoff(Duration.ofMillis(100)); + TableReadFunction<String, String> readFn = mock(TableReadFunction.class); + doReturn(true).when(readFn).isRetriable(any()); + doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(anyString()); + RetriableReadFunction<String, String> retryIO = new RetriableReadFunction<>(policy, readFn, schedExec); + retryIO.setMetrics(getMetricsUtil(tableId)); + Assert.assertEquals("bar", retryIO.getAsync("foo").get()); + verify(readFn, times(1)).getAsync(anyString()); + + Assert.assertEquals(0, retryIO.retryMetrics.retryCount.getCount()); + Assert.assertEquals(1, retryIO.retryMetrics.successCount.getCount()); + Assert.assertEquals(0, retryIO.retryMetrics.retryTimer.getSnapshot().getMax()); + } + + @Test + public void testRetryEngagedGet() throws Exception { + String tableId = "testRetryEngagedGet"; + TableRetryPolicy policy = new TableRetryPolicy(); + policy.withFixedBackoff(Duration.ofMillis(10)); + TableReadFunction<String, String> readFn = mock(TableReadFunction.class); + doReturn(true).when(readFn).isRetriable(any()); + + int [] times = new int[] {0}; + Map<String, String> map = new HashMap<>(); + map.put("foo1", "bar1"); + map.put("foo2", "bar2"); + doAnswer(invocation -> { + CompletableFuture<Map<String, String>> future = new CompletableFuture(); + if (times[0] > 0) { + future.complete(map); + } else { + times[0]++; + future.completeExceptionally(new RuntimeException("test exception")); + } + return future; + }).when(readFn).getAllAsync(any()); + + RetriableReadFunction<String, String> retryIO = new RetriableReadFunction<>(policy, readFn, schedExec); + retryIO.setMetrics(getMetricsUtil(tableId)); + Assert.assertEquals(map, retryIO.getAllAsync(Arrays.asList("foo1", "foo2")).get()); + verify(readFn, times(2)).getAllAsync(any()); + + Assert.assertEquals(1, retryIO.retryMetrics.retryCount.getCount()); + Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount()); + Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0); + } + + @Test + public void testRetryExhaustedTimeGet() throws Exception { + String tableId = "testRetryExhaustedTime"; + TableRetryPolicy policy = new TableRetryPolicy(); + policy.withFixedBackoff(Duration.ofMillis(5)); + policy.withStopAfterDelay(Duration.ofMillis(100)); + TableReadFunction<String, String> readFn = mock(TableReadFunction.class); + doReturn(true).when(readFn).isRetriable(any()); + + CompletableFuture<String> future = new CompletableFuture(); + future.completeExceptionally(new RuntimeException("test exception")); + doReturn(future).when(readFn).getAsync(anyString()); + + RetriableReadFunction<String, String> retryIO = new RetriableReadFunction<>(policy, readFn, schedExec); + retryIO.setMetrics(getMetricsUtil(tableId)); + try { + retryIO.getAsync("foo").get(); + Assert.fail(); + } catch (ExecutionException e) { + } + + // Conservatively: must be at least 3 attempts with 5ms backoff and 100ms maxDelay + verify(readFn, atLeast(3)).getAsync(anyString()); + Assert.assertTrue(retryIO.retryMetrics.retryCount.getCount() >= 3); + Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount()); + Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0); + } + + @Test + public void testRetryExhaustedAttemptsGet() throws Exception { + String tableId = "testRetryExhaustedAttempts"; + TableRetryPolicy policy = new TableRetryPolicy(); + policy.withFixedBackoff(Duration.ofMillis(5)); + policy.withStopAfterAttempts(10); + TableReadFunction<String, String> readFn = mock(TableReadFunction.class); + doReturn(true).when(readFn).isRetriable(any()); + + CompletableFuture<String> future = new CompletableFuture(); + future.completeExceptionally(new RuntimeException("test exception")); + doReturn(future).when(readFn).getAllAsync(any()); + + RetriableReadFunction<String, String> retryIO = new RetriableReadFunction<>(policy, readFn, schedExec); + retryIO.setMetrics(getMetricsUtil(tableId)); + try { + retryIO.getAllAsync(Arrays.asList("foo1", "foo2")).get(); + Assert.fail(); + } catch (ExecutionException e) { + } + + // 1 initial try + 10 retries + verify(readFn, times(11)).getAllAsync(any()); + Assert.assertEquals(10, retryIO.retryMetrics.retryCount.getCount()); + Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount()); + Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0); + } + + @Test + public void testFirstTimeSuccessPut() throws Exception { + String tableId = "testFirstTimeSuccessPut"; + TableRetryPolicy policy = new TableRetryPolicy(); + policy.withFixedBackoff(Duration.ofMillis(100)); + TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class); + doReturn(true).when(writeFn).isRetriable(any()); + doReturn(CompletableFuture.completedFuture("bar")).when(writeFn).putAsync(anyString(), anyString()); + RetriableWriteFunction<String, String> retryIO = new RetriableWriteFunction<>(policy, writeFn, schedExec); + retryIO.setMetrics(getMetricsUtil(tableId)); + retryIO.putAsync("foo", "bar").get(); + verify(writeFn, times(1)).putAsync(anyString(), anyString()); + + Assert.assertEquals(0, retryIO.retryMetrics.retryCount.getCount()); + Assert.assertEquals(1, retryIO.retryMetrics.successCount.getCount()); + Assert.assertEquals(0, retryIO.retryMetrics.retryTimer.getSnapshot().getMax()); + } + + @Test + public void testRetryEngagedPut() throws Exception { + String tableId = "testRetryEngagedPut"; + TableRetryPolicy policy = new TableRetryPolicy(); + policy.withFixedBackoff(Duration.ofMillis(10)); + TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class); + doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAllAsync(any()); + doReturn(true).when(writeFn).isRetriable(any()); + + int [] times = new int[] {0}; + List<Entry<String, String>> records = new ArrayList<>(); + records.add(new Entry<>("foo1", "bar1")); + records.add(new Entry<>("foo2", "bar2")); + doAnswer(invocation -> { + CompletableFuture<Map<String, String>> future = new CompletableFuture(); + if (times[0] > 0) { + future.complete(null); + } else { + times[0]++; + future.completeExceptionally(new RuntimeException("test exception")); + } + return future; + }).when(writeFn).putAllAsync(any()); + + RetriableWriteFunction<String, String> retryIO = new RetriableWriteFunction<>(policy, writeFn, schedExec); + retryIO.setMetrics(getMetricsUtil(tableId)); + retryIO.putAllAsync(records).get(); + verify(writeFn, times(2)).putAllAsync(any()); + + Assert.assertEquals(1, retryIO.retryMetrics.retryCount.getCount()); + Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount()); + Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0); + } + + @Test + public void testRetryExhaustedTimePut() throws Exception { + String tableId = "testRetryExhaustedTimePut"; + TableRetryPolicy policy = new TableRetryPolicy(); + policy.withFixedBackoff(Duration.ofMillis(5)); + policy.withStopAfterDelay(Duration.ofMillis(100)); + TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class); + doReturn(true).when(writeFn).isRetriable(any()); + + CompletableFuture<String> future = new CompletableFuture(); + future.completeExceptionally(new RuntimeException("test exception")); + doReturn(future).when(writeFn).deleteAsync(anyString()); + + RetriableWriteFunction<String, String> retryIO = new RetriableWriteFunction<>(policy, writeFn, schedExec); + retryIO.setMetrics(getMetricsUtil(tableId)); + try { + retryIO.deleteAsync("foo").get(); + Assert.fail(); + } catch (ExecutionException e) { + } + + // Conservatively: must be at least 3 attempts with 5ms backoff and 100ms maxDelay + verify(writeFn, atLeast(3)).deleteAsync(anyString()); + Assert.assertTrue(retryIO.retryMetrics.retryCount.getCount() >= 3); + Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount()); + Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0); + } + + @Test + public void testRetryExhaustedAttemptsPut() throws Exception { + String tableId = "testRetryExhaustedAttemptsPut"; + TableRetryPolicy policy = new TableRetryPolicy(); + policy.withFixedBackoff(Duration.ofMillis(5)); + policy.withStopAfterAttempts(10); + TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class); + doReturn(true).when(writeFn).isRetriable(any()); + + CompletableFuture<String> future = new CompletableFuture(); + future.completeExceptionally(new RuntimeException("test exception")); + doReturn(future).when(writeFn).deleteAllAsync(any()); + + RetriableWriteFunction<String, String> retryIO = new RetriableWriteFunction<>(policy, writeFn, schedExec); + retryIO.setMetrics(getMetricsUtil(tableId)); + try { + retryIO.deleteAllAsync(Arrays.asList("foo1", "foo2")).get(); + Assert.fail(); + } catch (ExecutionException e) { + } + + // 1 initial try + 10 retries + verify(writeFn, times(11)).deleteAllAsync(any()); + Assert.assertEquals(10, retryIO.retryMetrics.retryCount.getCount()); + Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount()); + Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0); + } + + @Test + public void testMixedIsRetriablePredicates() throws Exception { + String tableId = "testMixedIsRetriablePredicates"; + TableRetryPolicy policy = new TableRetryPolicy(); + policy.withFixedBackoff(Duration.ofMillis(100)); + + // Retry should be attempted based on the custom classification, ie. retry on NPE + policy.withRetryPredicate(ex -> ex instanceof NullPointerException); + + TableReadFunction<String, String> readFn = mock(TableReadFunction.class); + + // Table fn classification only retries on IllegalArgumentException + doAnswer(arg -> arg.getArgumentAt(0, Throwable.class) instanceof IllegalArgumentException).when(readFn).isRetriable(any()); + + int [] times = new int[1]; + doAnswer(arg -> { + if (times[0]++ == 0) { + CompletableFuture<String> future = new CompletableFuture(); + future.completeExceptionally(new NullPointerException("test exception")); + return future; + } else { + return CompletableFuture.completedFuture("bar"); + } + }).when(readFn).getAsync(any()); + + RetriableReadFunction<String, String> retryIO = new RetriableReadFunction<>(policy, readFn, schedExec); + retryIO.setMetrics(getMetricsUtil(tableId)); + + Assert.assertEquals("bar", retryIO.getAsync("foo").get()); + + verify(readFn, times(2)).getAsync(anyString()); + Assert.assertEquals(1, retryIO.retryMetrics.retryCount.getCount()); + Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount()); + Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java b/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java new file mode 100644 index 0000000..c343d63 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.retry; + +import java.time.Duration; + +import org.junit.Assert; +import org.junit.Test; + +import net.jodah.failsafe.RetryPolicy; + + +public class TestTableRetryPolicy { + @Test + public void testNoRetry() { + TableRetryPolicy retryPolicy = new TableRetryPolicy(); + Assert.assertEquals(TableRetryPolicy.BackoffType.NONE, retryPolicy.getBackoffType()); + } + + @Test + public void testFixedRetry() { + TableRetryPolicy retryPolicy = new TableRetryPolicy(); + retryPolicy.withFixedBackoff(Duration.ofMillis(1000)); + retryPolicy.withJitter(Duration.ofMillis(100)); + retryPolicy.withStopAfterAttempts(4); + Assert.assertEquals(TableRetryPolicy.BackoffType.FIXED, retryPolicy.getBackoffType()); + RetryPolicy fsRetry = FailsafeAdapter.valueOf(retryPolicy); + Assert.assertEquals(1000, fsRetry.getDelay().toMillis()); + Assert.assertEquals(100, fsRetry.getJitter().toMillis()); + Assert.assertEquals(4, fsRetry.getMaxRetries()); + Assert.assertNotNull(retryPolicy.getRetryPredicate()); + } + + @Test + public void testRandomRetry() { + TableRetryPolicy retryPolicy = new TableRetryPolicy(); + retryPolicy.withRandomBackoff(Duration.ofMillis(1000), Duration.ofMillis(2000)); + retryPolicy.withJitter(Duration.ofMillis(100)); // no-op + Assert.assertEquals(TableRetryPolicy.BackoffType.RANDOM, retryPolicy.getBackoffType()); + RetryPolicy fsRetry = FailsafeAdapter.valueOf(retryPolicy); + Assert.assertEquals(1000, fsRetry.getDelayMin().toMillis()); + Assert.assertEquals(2000, fsRetry.getDelayMax().toMillis()); + } + + @Test + public void testExponentialRetry() { + TableRetryPolicy retryPolicy = new TableRetryPolicy(); + retryPolicy.withExponentialBackoff(Duration.ofMillis(1000), Duration.ofMillis(2000), 1.5); + retryPolicy.withJitter(Duration.ofMillis(100)); + Assert.assertEquals(TableRetryPolicy.BackoffType.EXPONENTIAL, retryPolicy.getBackoffType()); + RetryPolicy fsRetry = FailsafeAdapter.valueOf(retryPolicy); + Assert.assertEquals(1000, fsRetry.getDelay().toMillis()); + Assert.assertEquals(2000, fsRetry.getMaxDelay().toMillis()); + Assert.assertEquals(1.5, fsRetry.getDelayFactor(), 0.001); + Assert.assertEquals(100, fsRetry.getJitter().toMillis()); + } + + @Test + public void testCustomRetryPredicate() { + TableRetryPolicy retryPolicy = new TableRetryPolicy(); + retryPolicy.withRetryPredicate((e) -> e instanceof IllegalArgumentException); + Assert.assertTrue(retryPolicy.getRetryPredicate().test(new IllegalArgumentException())); + Assert.assertFalse(retryPolicy.getRetryPredicate().test(new NullPointerException())); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java index d79683e..4cf99ff 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java @@ -92,6 +92,11 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness { return CompletableFuture.completedFuture(profileMap.get(key)); } + @Override + public boolean isRetriable(Throwable exception) { + return false; + } + static InMemoryReadFunction getInMemoryReadFunction(String serializedProfiles) { return new InMemoryReadFunction(serializedProfiles); } @@ -124,6 +129,11 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness { records.remove(key); return CompletableFuture.completedFuture(null); } + + @Override + public boolean isRetriable(Throwable exception) { + return false; + } } private <K, V> Table<KV<K, V>> getCachingTable(Table<KV<K, V>> actualTable, boolean defaultCache, String id, StreamApplicationDescriptor appDesc) { @@ -142,6 +152,18 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness { return appDesc.getTable(cachingDesc); } + static class MyReadFunction implements TableReadFunction { + @Override + public CompletableFuture getAsync(Object key) { + return null; + } + + @Override + public boolean isRetriable(Throwable exception) { + return false; + } + } + private void doTestStreamTableJoinRemoteTable(boolean withCache, boolean defaultCache, String testName) throws Exception { final InMemoryWriteFunction writer = new InMemoryWriteFunction(testName); @@ -166,9 +188,12 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness { .withReadFunction(InMemoryReadFunction.getInMemoryReadFunction(profiles)) .withRateLimiter(readRateLimiter, null, null); + // dummy reader + TableReadFunction readFn = new MyReadFunction(); + RemoteTableDescriptor<Integer, EnrichedPageView> outputTableDesc = new RemoteTableDescriptor<>("enriched-page-view-table-1"); outputTableDesc - .withReadFunction(key -> null) // dummy reader + .withReadFunction(readFn) .withWriteFunction(writer) .withRateLimiter(writeRateLimiter, null, null); http://git-wip-us.apache.org/repos/asf/samza/blob/e4719b44/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java index 41b6509..34ffbd4 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java @@ -24,6 +24,8 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; + import org.apache.samza.config.Config; import org.apache.samza.config.ConfigException; import org.apache.samza.config.ConfigRewriter; @@ -116,15 +118,27 @@ public class TestTableDescriptorsProvider { public static class MySampleNonTableDescriptorsProvider { } + static class MyReadFunction implements TableReadFunction { + @Override + public CompletableFuture getAsync(Object key) { + return null; + } + + @Override + public boolean isRetriable(Throwable exception) { + return false; + } + } + public static class MySampleTableDescriptorsProvider implements TableDescriptorsProvider { @Override public List<TableDescriptor> getTableDescriptors(Config config) { List<TableDescriptor> tableDescriptors = new ArrayList<>(); final RateLimiter readRateLimiter = mock(RateLimiter.class); - final TableReadFunction readRemoteTable = (TableReadFunction) key -> null; + final MyReadFunction readFn = new MyReadFunction(); tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1") - .withReadFunction(readRemoteTable) + .withReadFunction(readFn) .withRateLimiter(readRateLimiter, null, null) .withSerde(KVSerde.of(new StringSerde(), new LongSerde()))); tableDescriptors.add(new RocksDbTableDescriptor("local-table-1")
