This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 19c977c SAMZA-2309: Remove readFn requirement for remote tables
(#1144)
19c977c is described below
commit 19c977ce67ea560f669c790e2f855bfc2323cbcb
Author: Daniel Chen <[email protected]>
AuthorDate: Fri Aug 23 14:29:46 2019 -0700
SAMZA-2309: Remove readFn requirement for remote tables (#1144)
---
.../table/descriptors/RemoteTableDescriptor.java | 12 +++++---
.../samza/table/remote/AsyncRemoteTable.java | 10 +++++--
.../org/apache/samza/table/remote/RemoteTable.java | 7 +++--
.../samza/table/remote/TestAsyncRemoteTable.java | 4 +--
.../apache/samza/table/remote/TestRemoteTable.java | 15 +++++++++-
.../descriptors/TestRemoteTableDescriptor.java | 33 +++++++++++++++++++++-
6 files changed, 69 insertions(+), 12 deletions(-)
diff --git
a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
index b0590c5..3eed914 100644
---
a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
+++
b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
@@ -77,7 +77,7 @@ public class RemoteTableDescriptor<K, V> extends
BaseTableDescriptor<K, V, Remot
public static final String WRITE_RETRY_POLICY = "io.write.retry.policy";
public static final String BATCH_PROVIDER = "io.batch.provider";
- // Input support for a specific remote store (required)
+ // Input support for a specific remote store (optional)
private TableReadFunction<K, V> readFn;
// Output support for a specific remote store (optional)
@@ -86,6 +86,7 @@ public class RemoteTableDescriptor<K, V> extends
BaseTableDescriptor<K, V, Remot
// Rate limiter for client-side throttling; it is set by withRateLimiter()
private RateLimiter rateLimiter;
+ // Indicate whether read rate limiter is enabled or not
private boolean enableReadRateLimiter = true;
// Indicate whether write rate limiter is enabled or not
@@ -327,8 +328,10 @@ public class RemoteTableDescriptor<K, V> extends
BaseTableDescriptor<K, V, Remot
addTableConfig(ASYNC_CALLBACK_POOL_SIZE,
String.valueOf(asyncCallbackPoolSize), tableConfig);
// Handle table reader function
- addTableConfig(READ_FN, SerdeUtils.serialize("read function", readFn),
tableConfig);
- addTablePartConfig(READ_FN, readFn, jobConfig, tableConfig);
+ if (readFn != null) {
+ addTableConfig(READ_FN, SerdeUtils.serialize("read function", readFn),
tableConfig);
+ addTablePartConfig(READ_FN, readFn, jobConfig, tableConfig);
+ }
// Handle table write function
if (writeFn != null) {
@@ -345,7 +348,8 @@ public class RemoteTableDescriptor<K, V> extends
BaseTableDescriptor<K, V, Remot
@Override
protected void validate() {
- Preconditions.checkNotNull(readFn, "TableReadFunction is required.");
+ Preconditions.checkArgument(writeFn != null || readFn != null,
+ "Must have one of TableReadFunction or TableWriteFunction");
Preconditions.checkArgument(rateLimiter == null || tagCreditsMap.isEmpty(),
"Only one of rateLimiter instance or read/write limits can be
specified");
// Assume callback executor pool should have no more than 20 threads
diff --git
a/samza-core/src/main/java/org/apache/samza/table/remote/AsyncRemoteTable.java
b/samza-core/src/main/java/org/apache/samza/table/remote/AsyncRemoteTable.java
index 4b1851b..ebe4858 100644
---
a/samza-core/src/main/java/org/apache/samza/table/remote/AsyncRemoteTable.java
+++
b/samza-core/src/main/java/org/apache/samza/table/remote/AsyncRemoteTable.java
@@ -42,13 +42,15 @@ public class AsyncRemoteTable<K, V> implements
AsyncReadWriteTable<K, V> {
private final TableWriteFunction<K, V> writeFn;
public AsyncRemoteTable(TableReadFunction<K, V> readFn,
TableWriteFunction<K, V> writeFn) {
- Preconditions.checkNotNull(readFn, "null readFn");
+ Preconditions.checkArgument(writeFn != null || readFn != null,
+ "Must have one of TableReadFunction or TableWriteFunction");
this.readFn = readFn;
this.writeFn = writeFn;
}
@Override
public CompletableFuture<V> getAsync(K key, Object ... args) {
+ Preconditions.checkNotNull(readFn, "null readFn");
return args.length > 0
? readFn.getAsync(key, args)
: readFn.getAsync(key);
@@ -56,6 +58,7 @@ public class AsyncRemoteTable<K, V> implements
AsyncReadWriteTable<K, V> {
@Override
public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys, Object ...
args) {
+ Preconditions.checkNotNull(readFn, "null readFn");
return args.length > 0
? readFn.getAllAsync(keys, args)
: readFn.getAllAsync(keys);
@@ -63,6 +66,7 @@ public class AsyncRemoteTable<K, V> implements
AsyncReadWriteTable<K, V> {
@Override
public <T> CompletableFuture<T> readAsync(int opId, Object... args) {
+ Preconditions.checkNotNull(readFn, "null readFn");
return readFn.readAsync(opId, args);
}
@@ -119,7 +123,9 @@ public class AsyncRemoteTable<K, V> implements
AsyncReadWriteTable<K, V> {
@Override
public void close() {
- readFn.close();
+ if (readFn != null) {
+ readFn.close();
+ }
if (writeFn != null) {
writeFn.close();
}
diff --git
a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
index 85f612c..6d6c23a 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
@@ -130,7 +130,8 @@ public final class RemoteTable<K, V> extends
BaseReadWriteTable<K, V>
ExecutorService callbackExecutor) {
super(tableId);
- Preconditions.checkNotNull(readFn, "null readFn");
+ Preconditions.checkArgument(writeFn != null || readFn != null,
+ "Must have one of TableReadFunction or TableWriteFunction");
this.readFn = readFn;
this.writeFn = writeFn;
@@ -348,7 +349,9 @@ public final class RemoteTable<K, V> extends
BaseReadWriteTable<K, V>
public void init(Context context) {
super.init(context);
asyncTable.init(context);
- readFn.init(context, this);
+ if (readFn != null) {
+ readFn.init(context, this);
+ }
if (writeFn != null) {
writeFn.init(context, this);
}
diff --git
a/samza-core/src/test/java/org/apache/samza/table/remote/TestAsyncRemoteTable.java
b/samza-core/src/test/java/org/apache/samza/table/remote/TestAsyncRemoteTable.java
index d557c31..20706dc 100644
---
a/samza-core/src/test/java/org/apache/samza/table/remote/TestAsyncRemoteTable.java
+++
b/samza-core/src/test/java/org/apache/samza/table/remote/TestAsyncRemoteTable.java
@@ -176,8 +176,8 @@ public class TestAsyncRemoteTable {
verify(writeFn, times(1)).flush();
}
- @Test(expected = NullPointerException.class)
- public void testFailOnNullReadFn() {
+ @Test(expected = IllegalArgumentException.class)
+ public void testFailOnNullReadFnAndWriteFn() {
new AsyncRemoteTable(null, null);
}
diff --git
a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
index 7a98504..718aa2c 100644
---
a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
+++
b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
@@ -88,7 +88,9 @@ public class TestRemoteTable {
readRateLimiter, writeRateLimiter, rateLimitingExecutor,
readPolicy, writePolicy, retryExecutor, null, null, cbExecutor);
table.init(getMockContext());
- verify(readFn, times(1)).init(any(), any());
+ if (readFn != null) {
+ verify(readFn, times(1)).init(any(), any());
+ }
if (writeFn != null) {
verify(writeFn, times(1)).init(any(), any());
}
@@ -122,6 +124,17 @@ public class TestRemoteTable {
verify(table.readRateLimiter, times(error && retry ? 2 :
1)).throttle(anyString());
}
+ @Test(expected = IllegalArgumentException.class)
+ public void testFailOnNullReadFnAndWriteFn() {
+ getTable("id", null, null, false);
+ }
+
+ @Test
+ public void testSucceedValidationOnNullReadFn() {
+ RemoteTable<String, String> table = getTable("tableId", null,
mock(TableWriteFunction.class), false);
+ Assert.assertNotNull(table);
+ }
+
@Test
public void testInit() {
String tableId = "testInit";
diff --git
a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
index ce89c5a..cf03599 100644
---
a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
+++
b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
@@ -100,6 +100,37 @@ public class TestRemoteTableDescriptor {
}
@Test
+ public void testValidateOnlyReadOrWriteFn() {
+ // Only read defined
+ String tableId = "1";
+ RemoteTableDescriptor desc = new RemoteTableDescriptor(tableId)
+ .withReadFunction(createMockTableReadFunction())
+ .withReadRateLimiterDisabled();
+ Map<String, String> tableConfig = desc.toConfig(new MapConfig());
+ Assert.assertNotNull(tableConfig);
+
+ // Only write defined
+ String tableId2 = "2";
+ RemoteTableDescriptor desc2 = new RemoteTableDescriptor(tableId2)
+ .withWriteFunction(createMockTableWriteFunction())
+ .withWriteRateLimiterDisabled();
+ tableConfig = desc2.toConfig(new MapConfig());
+ Assert.assertNotNull(tableConfig);
+
+ // Neither read or write defined (Failure case)
+ String tableId3 = "3";
+ RemoteTableDescriptor desc3 = new RemoteTableDescriptor(tableId3);
+ try {
+ desc3.toConfig(new MapConfig());
+ Assert.fail("Should not allow neither readFn or writeFn defined");
+ } catch (Exception e) {
+ Assert.assertTrue(e instanceof IllegalArgumentException);
+ Assert.assertTrue(e.getMessage().contains("Must have one of
TableReadFunction or TableWriteFunction"));
+ }
+ }
+
+
+ @Test
public void testSerializeSimple() {
doTestSerialize(null, null, null);
}
@@ -135,7 +166,7 @@ public class TestRemoteTableDescriptor {
assertEquals(null, RemoteTableDescriptor.WRITE_FN, tableId, tableConfig);
}
- @Test(expected = NullPointerException.class)
+ @Test(expected = IllegalArgumentException.class)
public void testSerializeNullReadFunction() {
RemoteTableDescriptor desc = new RemoteTableDescriptor("1");
Map<String, String> tableConfig = desc.toConfig(new MapConfig());