SAMZA-1849: Table Descriptors should take Serde at construction time so that descriptors can be typed
Changed table descriptor to take serde in constructor, and removed withSerde() from all table descriptors. Author: Wei Song <[email protected]> Reviewers: Prateek Maheshwari <[email protected]> Closes #649 from weisong44/SAMZA-1849 and squashes the following commits: a3ba2f70 [Wei Song] Merge branch 'master' into SAMZA-1849 41299b5b [Wei Song] Merge remote-tracking branch 'upstream/master' e7a716c0 [Wei Song] Updated based on review comments 0601566f [Wei Song] SAMZA-1849: Table Descriptors should take Serde at construction time so that descriptors can be typed 239a0950 [Wei Song] Merge remote-tracking branch 'upstream/master' eca00204 [Wei Song] Merge remote-tracking branch 'upstream/master' 51562391 [Wei Song] Merge remote-tracking branch 'upstream/master' de708f5e [Wei Song] Merge remote-tracking branch 'upstream/master' df2f8d7b [Wei Song] Merge remote-tracking branch 'upstream/master' f28b491d [Wei Song] Merge remote-tracking branch 'upstream/master' 4782c61d [Wei Song] Merge remote-tracking branch 'upstream/master' 0440f75f [Wei Song] Merge remote-tracking branch 'upstream/master' aae0f380 [Wei Song] Merge remote-tracking branch 'upstream/master' a15a7c9a [Wei Song] Merge remote-tracking branch 'upstream/master' 5cbf9af9 [Wei Song] Merge remote-tracking branch 'upstream/master' 3f7ed71f [Wei Song] Added self to committer list Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d8939123 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d8939123 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d8939123 Branch: refs/heads/NewKafkaSystemConsumer Commit: d89391231144b1700895673448a13d83b1c92a3a Parents: c48bcd2 Author: Wei Song <[email protected]> Authored: Thu Sep 20 10:17:49 2018 -0700 Committer: Wei Song <[email protected]> Committed: Thu Sep 20 10:17:49 2018 -0700 ---------------------------------------------------------------------- .../apache/samza/operators/TableDescriptor.java | 13 ++----------- .../samza/table/TableDescriptorsProvider.java | 8 +++----- .../samza/operators/BaseTableDescriptor.java | 19 ++++++++++--------- .../table/remote/RemoteTableDescriptor.java | 13 ++++++++++++- .../kv/inmemory/InMemoryTableDescriptor.java | 13 ++++++++++++- .../kv/inmemory/TestInMemoryTableDescriptor.java | 4 ++-- .../samza/storage/kv/RocksDbTableDescriptor.java | 13 ++++++++++++- .../storage/kv/TestRocksDbTableDescriptor.java | 11 +++++------ .../kv/BaseLocalStoreBackedTableDescriptor.java | 13 +++++++++++-- .../sql/impl/ConfigBasedIOResolverFactory.java | 7 +++---- .../sql/testutil/TestIOResolverFactory.java | 7 +++---- .../apache/samza/test/table/TestLocalTable.java | 9 ++++----- .../test/table/TestLocalTableWithSideInputs.java | 6 ++---- .../test/table/TestTableDescriptorsProvider.java | 10 ++++------ 14 files changed, 85 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java b/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java index a60b6a9..dbcd65e 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java +++ b/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java @@ -19,7 +19,6 @@ package org.apache.samza.operators; import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.serializers.KVSerde; /** * User facing class to collect metadata that fully describes a @@ -30,8 +29,8 @@ import org.apache.samza.serializers.KVSerde; * * <pre> * {@code - * TableDescriptor<Integer, String, ?> tableDesc = new RocksDbTableDescriptor("tbl") - * .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde("UTF-8"))) + * TableDescriptor<Integer, String, ?> tableDesc = new RocksDbTableDescriptor("tbl", + * KVSerde.of(new IntegerSerde(), new StringSerde("UTF-8"))) * .withBlockSize(1024) * .withConfig("some-key", "some-value"); * } @@ -55,14 +54,6 @@ public interface TableDescriptor<K, V, D extends TableDescriptor<K, V, D>> { String getTableId(); /** - * Set the Serde for this table - * @param serde the serde - * @return this table descriptor instance - * @throws IllegalArgumentException if null is provided - */ - D withSerde(KVSerde<K, V> serde); - - /** * Add a configuration entry for the table * @param key the key * @param value the value http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java index 766a4b4..5f8d766 100644 --- a/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java +++ b/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java @@ -44,13 +44,11 @@ import org.apache.samza.operators.TableDescriptor; * public List<TableDescriptor> getTableDescriptors() { * List<TableDescriptor> tableDescriptors = new ArrayList<>(); * final TableReadFunction readRemoteTableFn = new MyStoreReadFunction(); - * tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1") - * .withReadFunction(readRemoteTableFn) - * .withSerde(KVSerde.of(new StringSerde(), new StringSerde()))); + * tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1", KVSerde.of(new StringSerde(), new StringSerde())) + * .withReadFunction(readRemoteTableFn); * - * tableDescriptors.add(new RocksDbTableDescriptor("local-table-1") + * tableDescriptors.add(new RocksDbTableDescriptor("local-table-1", KVSerde.of(new LongSerde(), new StringSerde<>())) * .withBlockSize(4096) - * .withSerde(KVSerde.of(new LongSerde(), new StringSerde<>()))); * .withConfig("some-key", "some-value"); * return tableDescriptors; * } http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java index f81f3b8..1e4194a 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java +++ b/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java @@ -51,18 +51,19 @@ abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K, this.tableId = tableId; } - @Override - public D withConfig(String key, String value) { - config.put(key, value); - return (D) this; + /** + * Constructs a table descriptor instance + * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ } + * @param serde the serde for key and value + */ + protected BaseTableDescriptor(String tableId, KVSerde<K, V> serde) { + this.tableId = tableId; + this.serde = serde; } @Override - public D withSerde(KVSerde<K, V> serde) { - if (serde == null) { - throw new IllegalArgumentException("Serde cannot be null"); - } - this.serde = serde; + public D withConfig(String key, String value) { + config.put(key, value); return (D) this; } http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java index 537ff87..c31348f 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.samza.operators.BaseTableDescriptor; +import org.apache.samza.serializers.KVSerde; import org.apache.samza.table.TableSpec; import org.apache.samza.table.retry.TableRetryPolicy; import org.apache.samza.table.utils.SerdeUtils; @@ -79,12 +80,22 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot private int asyncCallbackPoolSize = -1; /** - * {@inheritDoc} + * Constructs a table descriptor instance + * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ } */ public RemoteTableDescriptor(String tableId) { super(tableId); } + /** + * Constructs a table descriptor instance + * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ } + * @param serde the serde for key and value + */ + public RemoteTableDescriptor(String tableId, KVSerde<K, V> serde) { + super(tableId, serde); + } + @Override public TableSpec getTableSpec() { validate(); http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java index 8328417..d364234 100644 --- a/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java +++ b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java @@ -21,6 +21,7 @@ package org.apache.samza.storage.kv.inmemory; import java.util.HashMap; import java.util.Map; +import org.apache.samza.serializers.KVSerde; import org.apache.samza.storage.kv.BaseLocalStoreBackedTableDescriptor; import org.apache.samza.table.TableSpec; @@ -34,12 +35,22 @@ import org.apache.samza.table.TableSpec; public class InMemoryTableDescriptor<K, V> extends BaseLocalStoreBackedTableDescriptor<K, V, InMemoryTableDescriptor<K, V>> { /** - * {@inheritDoc} + * Constructs a table descriptor instance + * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ } */ public InMemoryTableDescriptor(String tableId) { super(tableId); } + /** + * Constructs a table descriptor instance + * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ } + * @param serde the serde for key and value + */ + public InMemoryTableDescriptor(String tableId, KVSerde<K, V> serde) { + super(tableId, serde); + } + @Override protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) { super.generateTableSpecConfig(tableSpecConfig); http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java index 840fb70..89bd058 100644 --- a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java +++ b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java @@ -31,8 +31,8 @@ public class TestInMemoryTableDescriptor { @Test public void testTableSpec() { - TableSpec tableSpec = new InMemoryTableDescriptor<Integer, String>("1") - .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde())) + TableSpec tableSpec = new InMemoryTableDescriptor("1", + KVSerde.of(new IntegerSerde(), new StringSerde())) .withConfig("inmemory.abc", "xyz") .getTableSpec(); http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java index 9b81605..325d023 100644 --- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java +++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java @@ -21,6 +21,7 @@ package org.apache.samza.storage.kv; import java.util.HashMap; import java.util.Map; +import org.apache.samza.serializers.KVSerde; import org.apache.samza.table.TableSpec; @@ -57,13 +58,23 @@ public class RocksDbTableDescriptor<K, V> extends BaseLocalStoreBackedTableDescr private String compactionStyle; /** - * {@inheritDoc} + * Constructs a table descriptor instance + * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ } */ public RocksDbTableDescriptor(String tableId) { super(tableId); } /** + * Constructs a table descriptor instance + * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ } + * @param serde the serde for key and value + */ + public RocksDbTableDescriptor(String tableId, KVSerde<K, V> serde) { + super(tableId, serde); + } + + /** * Refer to <code>stores.store-name.write.batch.size</code> in Samza configuration guide * @param writeBatchSize write batch size * @return this table descriptor instance http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java index 50f0920..35a66e8 100644 --- a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java +++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java @@ -37,8 +37,8 @@ public class TestRocksDbTableDescriptor { @Test public void testSerde() { - TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1") - .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde())) + TableSpec tableSpec = new RocksDbTableDescriptor("1", + KVSerde.of(new IntegerSerde(), new StringSerde())) .getTableSpec(); Assert.assertNotNull(tableSpec.getSerde()); Assert.assertEquals(tableSpec.getSerde().getKeySerde().getClass(), IntegerSerde.class); @@ -48,8 +48,8 @@ public class TestRocksDbTableDescriptor { @Test public void testTableSpec() { - TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1") - .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde())) + TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1", + KVSerde.of(new IntegerSerde(), new StringSerde())) .withBlockSize(1) .withCacheSize(2) .withCompactionStyle("fifo") @@ -85,8 +85,7 @@ public class TestRocksDbTableDescriptor { @Test public void testTableSpecWithChangelogEnabled() { - TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1") - .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde())) + TableSpec tableSpec = new RocksDbTableDescriptor("1", KVSerde.of(new IntegerSerde(), new StringSerde())) .withChangelogStream("changelog-$tream") .withChangelogReplicationFactor(10) .getTableSpec(); http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java index c46f9e1..96057d6 100644 --- a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import org.apache.samza.operators.BaseTableDescriptor; +import org.apache.samza.serializers.KVSerde; import org.apache.samza.storage.SideInputsProcessor; @@ -49,13 +50,21 @@ abstract public class BaseLocalStoreBackedTableDescriptor<K, V, D extends BaseLo /** * Constructs a table descriptor instance - * - * @param tableId Id of the table + * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ } */ public BaseLocalStoreBackedTableDescriptor(String tableId) { super(tableId); } + /** + * Constructs a table descriptor instance + * @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ } + * @param serde the serde for key and value + */ + public BaseLocalStoreBackedTableDescriptor(String tableId, KVSerde<K, V> serde) { + super(tableId, serde); + } + public D withSideInputs(List<String> sideInputs) { this.sideInputs = sideInputs; // Disable changelog http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java index 1ada813..a1c1bdd 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java @@ -100,10 +100,9 @@ public class ConfigBasedIOResolverFactory implements SqlIOResolverFactory { TableDescriptor tableDescriptor = null; if (isTable) { - tableDescriptor = new RocksDbTableDescriptor("InputTable-" + name) - .withSerde(KVSerde.of( - new JsonSerdeV2<>(SamzaSqlCompositeKey.class), - new JsonSerdeV2<>(SamzaSqlRelMessage.class))); + tableDescriptor = new RocksDbTableDescriptor("InputTable-" + name, KVSerde.of( + new JsonSerdeV2<>(SamzaSqlCompositeKey.class), + new JsonSerdeV2<>(SamzaSqlRelMessage.class))); } return new SqlIOConfig(systemName, streamName, fetchSystemConfigs(systemName), tableDescriptor); http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java index bd61afd..8318e8a 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java @@ -202,10 +202,9 @@ public class TestIOResolverFactory implements SqlIOResolverFactory { tableDescriptor = new TestTableDescriptor(TEST_TABLE_ID + tableDescMap.size()); } else { String tableId = "InputTable-" + ioName.replace(".", "-").replace("$", "-"); - tableDescriptor = new RocksDbTableDescriptor(tableId) - .withSerde(KVSerde.of( - new JsonSerdeV2<>(SamzaSqlCompositeKey.class), - new JsonSerdeV2<>(SamzaSqlRelMessage.class))); + tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of( + new JsonSerdeV2<>(SamzaSqlCompositeKey.class), + new JsonSerdeV2<>(SamzaSqlRelMessage.class))); } tableDescMap.put(ioName, tableDescriptor); } http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java index fbf0539..e1386c8 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java @@ -96,8 +96,8 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { final StreamApplication app = appDesc -> { - Table<KV<Integer, Profile>> table = appDesc.getTable(new InMemoryTableDescriptor("t1") - .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))); + Table<KV<Integer, Profile>> table = appDesc.getTable(new InMemoryTableDescriptor("t1", + KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))); DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test"); GenericInputDescriptor<Profile> isd = ksd.getInputDescriptor("Profile", new NoOpSerde<>()); @@ -134,7 +134,7 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { final StreamApplication app = appDesc -> { Table<KV<Integer, Profile>> table = appDesc.getTable( - new InMemoryTableDescriptor("t1").withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))); + new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))); DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test"); GenericInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>()); appDesc.getInputStream(profileISD) @@ -209,8 +209,7 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { final StreamApplication app = appDesc -> { - Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new InMemoryTableDescriptor("t1") - .withSerde(profileKVSerde)); + Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new InMemoryTableDescriptor("t1", profileKVSerde)); DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test"); GenericInputDescriptor<Profile> profileISD1 = ksd.getInputDescriptor("Profile1", new NoOpSerde<>()); http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java index adcea48..3c22818 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java @@ -143,8 +143,7 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness } protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() { - return new InMemoryTableDescriptor<Integer, Profile>(PROFILE_TABLE) - .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())) + return new InMemoryTableDescriptor(PROFILE_TABLE, KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())) .withSideInputs(ImmutableList.of(PROFILE_STREAM)) .withSideInputsProcessor((msg, store) -> { Profile profile = (Profile) msg.getMessage(); @@ -158,8 +157,7 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness static class DurablePageViewProfileJoin extends PageViewProfileJoin { @Override protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() { - return new RocksDbTableDescriptor<Integer, Profile>(PROFILE_TABLE) - .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())) + return new RocksDbTableDescriptor(PROFILE_TABLE, KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())) .withSideInputs(ImmutableList.of(PROFILE_STREAM)) .withSideInputsProcessor((msg, store) -> { TestTableData.Profile profile = (TestTableData.Profile) msg.getMessage(); http://git-wip-us.apache.org/repos/asf/samza/blob/d8939123/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java index 34ffbd4..d123cee 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java @@ -137,13 +137,11 @@ public class TestTableDescriptorsProvider { final RateLimiter readRateLimiter = mock(RateLimiter.class); final MyReadFunction readFn = new MyReadFunction(); - tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1") + tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1", KVSerde.of(new StringSerde(), new LongSerde())) .withReadFunction(readFn) - .withRateLimiter(readRateLimiter, null, null) - .withSerde(KVSerde.of(new StringSerde(), new LongSerde()))); - tableDescriptors.add(new RocksDbTableDescriptor("local-table-1") - .withBlockSize(4096) - .withSerde(KVSerde.of(new StringSerde(), new StringSerde()))); + .withRateLimiter(readRateLimiter, null, null)); + tableDescriptors.add(new RocksDbTableDescriptor("local-table-1", KVSerde.of(new StringSerde(), new StringSerde())) + .withBlockSize(4096)); return tableDescriptors; } }
