Repository: samza Updated Branches: refs/heads/master cc314be3b -> c7e5dcba4
SAMZA-2026: Refactor remote table API to separate retry policy settings As per subject, the goal is to make configuration of retry policies consistent with other API's. Author: Wei Song <[email protected]> Reviewers: Aditya Toomula <[email protected]> Closes #842 from weisong44/SAMZA-2026 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c7e5dcba Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c7e5dcba Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c7e5dcba Branch: refs/heads/master Commit: c7e5dcba4c398937de7805b2755e1ac65485cf96 Parents: cc314be Author: Wei Song <[email protected]> Authored: Tue Dec 4 15:51:58 2018 -0800 Committer: Wei Song <[email protected]> Committed: Tue Dec 4 15:51:58 2018 -0800 ---------------------------------------------------------------------- .../table/descriptors/RemoteTableDescriptor.java | 14 +++++--------- .../descriptors/TestRemoteTableDescriptor.java | 15 +++++++-------- 2 files changed, 12 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/c7e5dcba/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java index 5bed7d6..7286004 100644 --- a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java +++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java @@ -125,29 +125,25 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot } /** - * Use specified TableReadFunction with remote table. - * @param readFn read function instance - * @param retryPolicy retry policy for the read function + * Use specified {@link TableRetryPolicy} with the {@link TableReadFunction}. + * @param retryPolicy retry policy for the write function * @return this table descriptor instance */ - public RemoteTableDescriptor<K, V> withReadFunction(TableReadFunction<K, V> readFn, TableRetryPolicy retryPolicy) { + public RemoteTableDescriptor<K, V> withReadRetryPolicy(TableRetryPolicy retryPolicy) { Preconditions.checkNotNull(readFn, "null read function"); Preconditions.checkNotNull(retryPolicy, "null retry policy"); - this.readFn = readFn; this.readRetryPolicy = retryPolicy; return this; } /** - * Use specified TableWriteFunction with remote table. - * @param writeFn write function instance + * Use specified {@link TableRetryPolicy} with the {@link TableWriteFunction}. * @param retryPolicy retry policy for the write function * @return this table descriptor instance */ - public RemoteTableDescriptor<K, V> withWriteFunction(TableWriteFunction<K, V> writeFn, TableRetryPolicy retryPolicy) { + public RemoteTableDescriptor<K, V> withWriteRetryPolicy(TableRetryPolicy retryPolicy) { Preconditions.checkNotNull(writeFn, "null write function"); Preconditions.checkNotNull(retryPolicy, "null retry policy"); - this.writeFn = writeFn; this.writeRetryPolicy = retryPolicy; return this; } http://git-wip-us.apache.org/repos/asf/samza/blob/c7e5dcba/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java index 7a75c90..3d8e36f 100644 --- a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java +++ b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java @@ -175,12 +175,11 @@ public class TestRemoteTableDescriptor { private void doTestDeserializeReadFunctionAndLimiter(boolean rateOnly, boolean rlGets, boolean rlPuts) { int numRateLimitOps = (rlGets ? 1 : 0) + (rlPuts ? 1 : 0); - RemoteTableDescriptor<String, String> desc = new RemoteTableDescriptor("1"); - TableRetryPolicy retryPolicy = new TableRetryPolicy(); - retryPolicy.withRetryPredicate((ex) -> false); - desc.withReadFunction(createMockTableReadFunction(), retryPolicy); - desc.withWriteFunction(createMockTableWriteFunction()); - desc.withAsyncCallbackExecutorPoolSize(10); + RemoteTableDescriptor<String, String> desc = new RemoteTableDescriptor("1") + .withReadFunction(createMockTableReadFunction()) + .withReadRetryPolicy(new TableRetryPolicy().withRetryPredicate((ex) -> false)) + .withWriteFunction(createMockTableWriteFunction()) + .withAsyncCallbackExecutorPoolSize(10); if (rateOnly) { if (rlGets) { @@ -218,8 +217,8 @@ public class TestRemoteTableDescriptor { ThreadPoolExecutor callbackExecutor = (ThreadPoolExecutor) rwTable.getCallbackExecutor(); Assert.assertEquals(10, callbackExecutor.getCorePoolSize()); - Assert.assertNotNull(rwTable.getReadFn() instanceof RetriableReadFunction); - Assert.assertNotNull(!(rwTable.getWriteFn() instanceof RetriableWriteFunction)); + Assert.assertTrue(rwTable.getReadFn() instanceof RetriableReadFunction); + Assert.assertFalse(rwTable.getWriteFn() instanceof RetriableWriteFunction); } @Test
