Repository: samza Updated Branches: refs/heads/master 63d33fa06 -> 1eb4c2663
SAMZA-1948: Updated hybrid table descriptors to take underlying table descriptors Hybrid Table Descriptors, e.g. CachingTableDescriptor, should take both tables as a constructor param instead of using withXYZ methods. This is better because they are required for the hybrid descriptor to work, and it helps with type inference for the descriptor. Author: Wei Song <[email protected]> Reviewers: Peng Du <[email protected]> Closes #706 from weisong44/SAMZA-1948 and squashes the following commits: 53444419 [Wei Song] Updated based on review comments 39d9ab00 [Wei Song] SAMZA-1948 Updated hybrid table descriptors to take underlying table descriptors a56c28dc [Wei Song] Merge remote-tracking branch 'upstream/master' 097958c8 [Wei Song] Merge remote-tracking branch 'upstream/master' 05822f0a [Wei Song] Merge remote-tracking branch 'upstream/master' f7480505 [Wei Song] Merge remote-tracking branch 'upstream/master' 7706ab1f [Wei Song] Merge remote-tracking branch 'upstream/master' f5731b10 [Wei Song] Merge remote-tracking branch 'upstream/master' 1e5de45a [Wei Song] Merge remote-tracking branch 'upstream/master' c85604e0 [Wei Song] Merge remote-tracking branch 'upstream/master' 242d8442 [Wei Song] Merge remote-tracking branch 'upstream/master' ec7d8409 [Wei Song] Merge remote-tracking branch 'upstream/master' e19b4dc9 [Wei Song] Merge remote-tracking branch 'upstream/master' 8ee78441 [Wei Song] Merge remote-tracking branch 'upstream/master' 1c6a2eae [Wei Song] Merge remote-tracking branch 'upstream/master' a6c94add [Wei Song] Merge remote-tracking branch 'upstream/master' 41299b5b [Wei Song] Merge remote-tracking branch 'upstream/master' 239a0950 [Wei Song] Merge remote-tracking branch 'upstream/master' eca00204 [Wei Song] Merge remote-tracking branch 'upstream/master' 51562391 [Wei Song] Merge remote-tracking branch 'upstream/master' de708f5e [Wei Song] Merge remote-tracking branch 'upstream/master' df2f8d7b [Wei Song] Merge remote-tracking branch 'upstream/master' f28b491d [Wei Song] Merge remote-tracking branch 'upstream/master' 4782c61d [Wei Song] Merge remote-tracking branch 'upstream/master' 0440f75f [Wei Song] Merge remote-tracking branch 'upstream/master' aae0f380 [Wei Song] Merge remote-tracking branch 'upstream/master' a15a7c9a [Wei Song] Merge remote-tracking branch 'upstream/master' 5cbf9af9 [Wei Song] Merge remote-tracking branch 'upstream/master' 3f7ed71f [Wei Song] Added self to committer list Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1eb4c266 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1eb4c266 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1eb4c266 Branch: refs/heads/master Commit: 1eb4c266339693edf6f13a0da036cc7ca6ff3fc3 Parents: 63d33fa Author: Wei Song <[email protected]> Authored: Thu Oct 11 14:53:54 2018 -0700 Committer: Wei Song <[email protected]> Committed: Thu Oct 11 14:53:54 2018 -0700 ---------------------------------------------------------------------- .../samza/table/TableDescriptorsProvider.java | 2 +- .../table/caching/CachingTableDescriptor.java | 47 +++++++++----------- .../samza/table/caching/TestCachingTable.java | 13 +++--- .../samza/test/table/TestRemoteTable.java | 6 +-- 4 files changed, 33 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/1eb4c266/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java index 5f8d766..7b8754a 100644 --- a/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java +++ b/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java @@ -91,7 +91,7 @@ import org.apache.samza.operators.TableDescriptor; public interface TableDescriptorsProvider { /** * Constructs instances of the table descriptors - * @param config + * @param config the job config * @return list of table descriptors */ List<TableDescriptor> getTableDescriptors(Config config); http://git-wip-us.apache.org/repos/asf/samza/blob/1eb4c266/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java index 4896e93..f9d4007 100644 --- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java +++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java @@ -47,10 +47,30 @@ public class CachingTableDescriptor<K, V> extends BaseHybridTableDescriptor<K, V private boolean isWriteAround; /** - * {@inheritDoc} + * Constructs a table descriptor instance with internal cache + * + * @param tableId Id of the table, it must conform to pattern { @literal [\\d\\w-_]+ } + * @param table target table descriptor */ - public CachingTableDescriptor(String tableId) { + public CachingTableDescriptor(String tableId, TableDescriptor<K, V, ?> table) { super(tableId); + this.table = table; + } + + /** + * Constructs a table descriptor instance and specify a cache (as Table descriptor) + * to be used for caching. Cache get is not synchronized with put for better parallelism + * in the read path of {@link CachingTable}. As such, cache table implementation is + * expected to be thread-safe for concurrent accesses. + * + * @param tableId Id of the table, it must conform to pattern { @literal [\\d\\w-_]+ } + * @param table target table descriptor + * @param cache cache table descriptor + */ + public CachingTableDescriptor(String tableId, TableDescriptor<K, V, ?> table, + TableDescriptor<K, V, ?> cache) { + this(tableId, table); + this.cache = cache; } @Override @@ -88,29 +108,6 @@ public class CachingTableDescriptor<K, V> extends BaseHybridTableDescriptor<K, V } /** - * Specify a cache (as Table descriptor) to be used for caching. - * Cache get is not synchronized with put for better parallelism in the read path - * of {@link CachingTable}. As such, cache table implementation is expected to be - * thread-safe for concurrent accesses. - * @param cache cache table descriptor - * @return this descriptor - */ - public CachingTableDescriptor withCache(TableDescriptor<K, V, ?> cache) { - this.cache = cache; - return this; - } - - /** - * Specify the target table descriptor for the actual table input/output. - * @param table the target table descriptor - * @return this descriptor - */ - public CachingTableDescriptor withTable(TableDescriptor<K, V, ?> table) { - this.table = table; - return this; - } - - /** * Specify the TTL for each read access, ie. record is expired after * the TTL duration since last read access of each key. * @param readTtl read TTL http://git-wip-us.apache.org/repos/asf/samza/blob/1eb4c266/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java index dc13d00..128b938 100644 --- a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java +++ b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java @@ -82,14 +82,15 @@ public class TestCachingTable { } private void doTestSerialize(TableDescriptor cache) { - CachingTableDescriptor desc = new CachingTableDescriptor("1"); - desc.withTable(createDummyTableDescriptor("2")); + CachingTableDescriptor desc; + TableDescriptor table = createDummyTableDescriptor("2"); if (cache == null) { + desc = new CachingTableDescriptor("1", table); desc.withReadTtl(Duration.ofMinutes(3)); desc.withWriteTtl(Duration.ofMinutes(3)); desc.withCacheSize(1000); } else { - desc.withCache(cache); + desc = new CachingTableDescriptor("1", table, cache); } desc.withWriteAround(); @@ -150,9 +151,9 @@ public class TestCachingTable { } private void doTestCacheOps(boolean isWriteAround) { - CachingTableDescriptor desc = new CachingTableDescriptor("1"); - desc.withTable(createDummyTableDescriptor("realTable")); - desc.withCache(createDummyTableDescriptor("cacheTable")); + CachingTableDescriptor desc = new CachingTableDescriptor("1", + createDummyTableDescriptor("realTable"), + createDummyTableDescriptor("cacheTable")); if (isWriteAround) { desc.withWriteAround(); } http://git-wip-us.apache.org/repos/asf/samza/blob/1eb4c266/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java index a48bb7f..a42f2e6 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java @@ -144,17 +144,17 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness { } private <K, V> Table<KV<K, V>> getCachingTable(TableDescriptor<K, V, ?> actualTableDesc, boolean defaultCache, String id, StreamApplicationDescriptor appDesc) { - CachingTableDescriptor<K, V> cachingDesc = new CachingTableDescriptor<>("caching-table-" + id); + CachingTableDescriptor<K, V> cachingDesc; if (defaultCache) { + cachingDesc = new CachingTableDescriptor<>("caching-table-" + id, actualTableDesc); cachingDesc.withReadTtl(Duration.ofMinutes(5)); cachingDesc.withWriteTtl(Duration.ofMinutes(5)); } else { GuavaCacheTableDescriptor<K, V> guavaTableDesc = new GuavaCacheTableDescriptor<>("guava-table-" + id); guavaTableDesc.withCache(CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build()); - cachingDesc.withCache(guavaTableDesc); + cachingDesc = new CachingTableDescriptor<>("caching-table-" + id, actualTableDesc, guavaTableDesc); } - cachingDesc.withTable(actualTableDesc); return appDesc.getTable(cachingDesc); }
