SAMZA-1854: Changed caching table descriptor to take table descriptor instead of run-time objects
As per subject, changed caching table descriptor to take table descriptor instead of run-time objects - Added BaseHybridTableDescriptor, which models a hybrid table that may contain other tables - Modified StreamApplicationDescriptorImpl to also include tables contained within a hybrid table Author: Wei Song <[email protected]> Reviewers: Jagadish Venkatraman <[email protected]> Closes #645 from weisong44/SAMZA-1854 and squashes the following commits: 2c0d1362 [Wei Song] Updated based on review comments dd18bbee [Wei Song] Merge branch 'master' into SAMZA-1854 a6c94add [Wei Song] Merge remote-tracking branch 'upstream/master' 41299b5b [Wei Song] Merge remote-tracking branch 'upstream/master' 239a0950 [Wei Song] Merge remote-tracking branch 'upstream/master' a87a9b04 [Wei Song] SAMZA-1854: Changed caching table descriptor to take table descriptor instead of run-time objects eca00204 [Wei Song] Merge remote-tracking branch 'upstream/master' 51562391 [Wei Song] Merge remote-tracking branch 'upstream/master' de708f5e [Wei Song] Merge remote-tracking branch 'upstream/master' df2f8d7b [Wei Song] Merge remote-tracking branch 'upstream/master' f28b491d [Wei Song] Merge remote-tracking branch 'upstream/master' 4782c61d [Wei Song] Merge remote-tracking branch 'upstream/master' 0440f75f [Wei Song] Merge remote-tracking branch 'upstream/master' aae0f380 [Wei Song] Merge remote-tracking branch 'upstream/master' a15a7c9a [Wei Song] Merge remote-tracking branch 'upstream/master' 5cbf9af9 [Wei Song] Merge remote-tracking branch 'upstream/master' 3f7ed71f [Wei Song] Added self to committer list Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/db6996ed Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/db6996ed Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/db6996ed Branch: refs/heads/NewKafkaSystemConsumer Commit: db6996ed99bb6a677f588f247b373345077580b0 Parents: d893912 Author: Wei Song <[email protected]> Authored: Thu Sep 20 11:19:06 2018 -0700 Committer: Wei Song <[email protected]> Committed: Thu Sep 20 11:19:06 2018 -0700 ---------------------------------------------------------------------- .../StreamApplicationDescriptorImpl.java | 8 ++++ .../table/caching/CachingTableDescriptor.java | 37 +++++++++------ .../table/hybrid/BaseHybridTableDescriptor.java | 50 ++++++++++++++++++++ .../samza/table/caching/TestCachingTable.java | 22 ++++++--- .../kv/LocalStoreBackedReadWriteTable.java | 1 + .../samza/test/table/TestRemoteTable.java | 33 ++++++------- 6 files changed, 112 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/db6996ed/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java index ae7a45d..d50b0d0 100644 --- a/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -54,6 +55,7 @@ import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.serializers.Serde; import org.apache.samza.table.Table; import org.apache.samza.table.TableSpec; +import org.apache.samza.table.hybrid.BaseHybridTableDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -171,6 +173,12 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S @Override public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor) { + + if (tableDescriptor instanceof BaseHybridTableDescriptor) { + List<? extends TableDescriptor<K, V, ?>> tableDescs = ((BaseHybridTableDescriptor) tableDescriptor).getTableDescriptors(); + tableDescs.forEach(td -> getTable(td)); + } + String tableId = tableDescriptor.getTableId(); Preconditions.checkState(StringUtils.isNotBlank(tableId) && ID_PATTERN.matcher(tableId).matches(), String.format("tableId: %s must confirm to pattern: %s", tableId, ID_PATTERN.toString())); http://git-wip-us.apache.org/repos/asf/samza/blob/db6996ed/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java index a1accd8..4896e93 100644 --- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java +++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java @@ -20,28 +20,30 @@ package org.apache.samza.table.caching; import java.time.Duration; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.samza.operators.BaseTableDescriptor; -import org.apache.samza.operators.KV; -import org.apache.samza.operators.TableImpl; -import org.apache.samza.table.Table; +import org.apache.samza.operators.TableDescriptor; import org.apache.samza.table.TableSpec; +import org.apache.samza.table.hybrid.BaseHybridTableDescriptor; import com.google.common.base.Preconditions; + /** * Table descriptor for {@link CachingTable}. * @param <K> type of the key in the cache * @param <V> type of the value in the cache */ -public class CachingTableDescriptor<K, V> extends BaseTableDescriptor<K, V, CachingTableDescriptor<K, V>> { +public class CachingTableDescriptor<K, V> extends BaseHybridTableDescriptor<K, V, CachingTableDescriptor<K, V>> { private Duration readTtl; private Duration writeTtl; private long cacheSize; - private Table<KV<K, V>> cache; - private Table<KV<K, V>> table; + private TableDescriptor<K, V, ?> cache; + private TableDescriptor<K, V, ?> table; private boolean isWriteAround; /** @@ -52,6 +54,13 @@ public class CachingTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Cach } @Override + public List<? extends TableDescriptor<K, V, ?>> getTableDescriptors() { + return cache != null + ? Arrays.asList(cache, table) + : Arrays.asList(table); + } + + @Override public TableSpec getTableSpec() { validate(); @@ -59,7 +68,7 @@ public class CachingTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Cach generateTableSpecConfig(tableSpecConfig); if (cache != null) { - tableSpecConfig.put(CachingTableProvider.CACHE_TABLE_ID, ((TableImpl) cache).getTableSpec().getId()); + tableSpecConfig.put(CachingTableProvider.CACHE_TABLE_ID, ((BaseTableDescriptor) cache).getTableSpec().getId()); } else { if (readTtl != null) { tableSpecConfig.put(CachingTableProvider.READ_TTL_MS, String.valueOf(readTtl.toMillis())); @@ -72,31 +81,31 @@ public class CachingTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Cach } } - tableSpecConfig.put(CachingTableProvider.REAL_TABLE_ID, ((TableImpl) table).getTableSpec().getId()); + tableSpecConfig.put(CachingTableProvider.REAL_TABLE_ID, ((BaseTableDescriptor) table).getTableSpec().getId()); tableSpecConfig.put(CachingTableProvider.WRITE_AROUND, String.valueOf(isWriteAround)); return new TableSpec(tableId, serde, CachingTableProviderFactory.class.getName(), tableSpecConfig); } /** - * Specify a cache instance (as Table abstraction) to be used for caching. + * Specify a cache (as Table descriptor) to be used for caching. * Cache get is not synchronized with put for better parallelism in the read path * of {@link CachingTable}. As such, cache table implementation is expected to be * thread-safe for concurrent accesses. - * @param cache cache instance + * @param cache cache table descriptor * @return this descriptor */ - public CachingTableDescriptor withCache(Table<KV<K, V>> cache) { + public CachingTableDescriptor withCache(TableDescriptor<K, V, ?> cache) { this.cache = cache; return this; } /** - * Specify the table instance for the actual table input/output. - * @param table table instance + * Specify the target table descriptor for the actual table input/output. + * @param table the target table descriptor * @return this descriptor */ - public CachingTableDescriptor withTable(Table<KV<K, V>> table) { + public CachingTableDescriptor withTable(TableDescriptor<K, V, ?> table) { this.table = table; return this; } http://git-wip-us.apache.org/repos/asf/samza/blob/db6996ed/samza-core/src/main/java/org/apache/samza/table/hybrid/BaseHybridTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/hybrid/BaseHybridTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/hybrid/BaseHybridTableDescriptor.java new file mode 100644 index 0000000..48efd0c --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/hybrid/BaseHybridTableDescriptor.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.table.hybrid; + +import java.util.List; +import org.apache.samza.operators.BaseTableDescriptor; +import org.apache.samza.operators.TableDescriptor; + +/** + * Base class for hybrid table descriptors. A hybrid table consists of one or more + * table descriptors, and it orchestrates operations between them to achieve more advanced + * functionality. + * + * @param <K> the type of the key + * @param <V> the type of the value + * @param <D> the type of this table descriptor + */ +abstract public class BaseHybridTableDescriptor<K, V, D extends BaseHybridTableDescriptor<K, V, D>> + extends BaseTableDescriptor<K, V, D> { + + /** + * {@inheritDoc} + */ + public BaseHybridTableDescriptor(String tableId) { + super(tableId); + } + + /** + * Get tables contained within this table. + * @return list of tables + */ + abstract public List<? extends TableDescriptor<K, V, ?>> getTableDescriptors(); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/db6996ed/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java index 49c72dc..ec1c915 100644 --- a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java +++ b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java @@ -36,11 +36,11 @@ import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.Gauge; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.Timer; -import org.apache.samza.operators.TableImpl; +import org.apache.samza.operators.BaseTableDescriptor; +import org.apache.samza.operators.TableDescriptor; import org.apache.samza.storage.kv.Entry; import org.apache.samza.table.ReadWriteTable; import org.apache.samza.table.ReadableTable; -import org.apache.samza.table.Table; import org.apache.samza.table.TableSpec; import org.apache.samza.table.caching.guava.GuavaCacheTable; import org.apache.samza.table.caching.guava.GuavaCacheTableDescriptor; @@ -79,12 +79,12 @@ public class TestCachingTable { guavaTableDesc.withCache(CacheBuilder.newBuilder().build()); TableSpec spec = guavaTableDesc.getTableSpec(); Assert.assertTrue(spec.getConfig().containsKey(GuavaCacheTableProvider.GUAVA_CACHE)); - doTestSerialize(new TableImpl(guavaTableDesc.getTableSpec())); + doTestSerialize(guavaTableDesc); } - private void doTestSerialize(Table cache) { + private void doTestSerialize(TableDescriptor cache) { CachingTableDescriptor desc = new CachingTableDescriptor("1"); - desc.withTable(new TableImpl(new TableSpec("2", null, null, new HashMap<>()))); + desc.withTable(createDummyTableDescriptor("2")); if (cache == null) { desc.withReadTtl(Duration.ofMinutes(3)); desc.withWriteTtl(Duration.ofMinutes(3)); @@ -153,8 +153,8 @@ public class TestCachingTable { private void doTestCacheOps(boolean isWriteAround) { CachingTableDescriptor desc = new CachingTableDescriptor("1"); - desc.withTable(new TableImpl(new TableSpec("realTable", null, null, new HashMap<>()))); - desc.withCache(new TableImpl(new TableSpec("cacheTable", null, null, new HashMap<>()))); + desc.withTable(createDummyTableDescriptor("realTable")); + desc.withCache(createDummyTableDescriptor("cacheTable")); if (isWriteAround) { desc.withWriteAround(); } @@ -363,4 +363,12 @@ public class TestCachingTable { Assert.assertNull(guavaCache.getIfPresent("foo1")); Assert.assertNull(guavaCache.getIfPresent("foo3")); } + + private TableDescriptor createDummyTableDescriptor(String tableId) { + BaseTableDescriptor tableDescriptor = mock(BaseTableDescriptor.class); + when(tableDescriptor.getTableId()).thenReturn(tableId); + when(tableDescriptor.getTableSpec()).thenReturn( + new TableSpec(tableId, null, null, new HashMap<>())); + return tableDescriptor; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/db6996ed/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java index 98c3e3c..9eeb55e 100644 --- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java @@ -40,6 +40,7 @@ public class LocalStoreBackedReadWriteTable<K, V> extends LocalStoreBackedReadab /** * Constructs an instance of {@link LocalStoreBackedReadWriteTable} + * @param tableId the table Id * @param kvStore the backing store */ public LocalStoreBackedReadWriteTable(String tableId, KeyValueStore kvStore) { http://git-wip-us.apache.org/repos/asf/samza/blob/db6996ed/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java index 4cf99ff..e23cb58 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java @@ -42,6 +42,7 @@ import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.Timer; import org.apache.samza.operators.KV; +import org.apache.samza.operators.TableDescriptor; import org.apache.samza.operators.descriptors.GenericInputDescriptor; import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor; import org.apache.samza.runtime.LocalApplicationRunner; @@ -67,6 +68,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.withSettings; public class TestRemoteTable extends AbstractIntegrationTestHarness { @@ -136,19 +138,18 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness { } } - private <K, V> Table<KV<K, V>> getCachingTable(Table<KV<K, V>> actualTable, boolean defaultCache, String id, StreamApplicationDescriptor appDesc) { + private <K, V> Table<KV<K, V>> getCachingTable(TableDescriptor<K, V, ?> actualTableDesc, boolean defaultCache, String id, StreamApplicationDescriptor appDesc) { CachingTableDescriptor<K, V> cachingDesc = new CachingTableDescriptor<>("caching-table-" + id); if (defaultCache) { cachingDesc.withReadTtl(Duration.ofMinutes(5)); cachingDesc.withWriteTtl(Duration.ofMinutes(5)); } else { - GuavaCacheTableDescriptor<K, V> guavaDesc = new GuavaCacheTableDescriptor<>("guava-table-" + id); - guavaDesc.withCache(CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build()); - Table<KV<K, V>> guavaTable = appDesc.getTable(guavaDesc); - cachingDesc.withCache(guavaTable); + GuavaCacheTableDescriptor<K, V> guavaTableDesc = new GuavaCacheTableDescriptor<>("guava-table-" + id); + guavaTableDesc.withCache(CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build()); + cachingDesc.withCache(guavaTableDesc); } - cachingDesc.withTable(actualTable); + cachingDesc.withTable(actualTableDesc); return appDesc.getTable(cachingDesc); } @@ -180,8 +181,8 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness { configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews)); configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount)); - final RateLimiter readRateLimiter = mock(RateLimiter.class); - final RateLimiter writeRateLimiter = mock(RateLimiter.class); + final RateLimiter readRateLimiter = mock(RateLimiter.class, withSettings().serializable()); + final RateLimiter writeRateLimiter = mock(RateLimiter.class, withSettings().serializable()); final StreamApplication app = appDesc -> { RemoteTableDescriptor<Integer, TestTableData.Profile> inputTableDesc = new RemoteTableDescriptor<>("profile-table-1"); inputTableDesc @@ -197,17 +198,13 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness { .withWriteFunction(writer) .withRateLimiter(writeRateLimiter, null, null); - Table<KV<Integer, EnrichedPageView>> outputTable = appDesc.getTable(outputTableDesc); + Table<KV<Integer, EnrichedPageView>> outputTable = withCache + ? getCachingTable(outputTableDesc, defaultCache, "output", appDesc) + : appDesc.getTable(outputTableDesc); - if (withCache) { - outputTable = getCachingTable(outputTable, defaultCache, "output", appDesc); - } - - Table<KV<Integer, Profile>> inputTable = appDesc.getTable(inputTableDesc); - - if (withCache) { - inputTable = getCachingTable(inputTable, defaultCache, "input", appDesc); - } + Table<KV<Integer, Profile>> inputTable = withCache + ? getCachingTable(inputTableDesc, defaultCache, "input", appDesc) + : appDesc.getTable(inputTableDesc); DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test"); GenericInputDescriptor<TestTableData.PageView> isd = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
