Repository: samza Updated Branches: refs/heads/master 43a501b49 -> 58f18117c
SAMZA-1927: Samza-sql - always repartition the stream denoted by stream-table join. Author: Aditya Toomula <[email protected]> Reviewers: Srinivasulu Punuru <[email protected]> Closes #676 from atoomula/dsl3 and squashes the following commits: e86ef83c [Aditya Toomula] Adding metadatastream prefix config. This will be used to reset both the intermediate streams and changelogstore streams by changing the prefix name. 14450264 [Aditya Toomula] Adding metadatastream prefix config. This will be used to reset both the intermediate streams and changelogstore streams by changing the prefix name. c3289673 [Aditya Toomula] Adding changelogstreamname prefix config. This will be used to reset the state by changing the prefix name. 804b07a1 [Aditya Toomula] SAMZA-1927: Samza-sql - always repartition the stream denoted by stream-table join. Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/58f18117 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/58f18117 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/58f18117 Branch: refs/heads/master Commit: 58f18117ccddc62cf4c74325faa8dfa6ad9ec8e3 Parents: 43a501b Author: Aditya Toomula <[email protected]> Authored: Thu Oct 25 14:16:24 2018 -0700 Committer: Srinivasulu Punuru <[email protected]> Committed: Thu Oct 25 14:16:24 2018 -0700 ---------------------------------------------------------------------- .../sql/impl/ConfigBasedIOResolverFactory.java | 13 +++- .../sql/runner/SamzaSqlApplicationConfig.java | 20 +++++- .../samza/sql/translator/JoinTranslator.java | 18 +++++- .../translator/LogicalAggregateTranslator.java | 7 +- .../samza/sql/translator/QueryTranslator.java | 6 +- .../sql/testutil/TestIOResolverFactory.java | 12 +++- .../sql/translator/TestJoinTranslator.java | 2 +- .../sql/translator/TestQueryTranslator.java | 67 ++++++++++++++------ 8 files changed, 111 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/58f18117/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java index 7faff17..2514d30 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java @@ -25,14 +25,17 @@ import org.apache.samza.table.descriptors.TableDescriptor; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.sql.data.SamzaSqlCompositeKey; -import org.apache.samza.sql.data.SamzaSqlRelMessage; import org.apache.samza.sql.interfaces.SqlIOResolver; import org.apache.samza.sql.interfaces.SqlIOResolverFactory; import org.apache.samza.sql.interfaces.SqlIOConfig; +import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory; import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.CFG_METADATA_TOPIC_PREFIX; +import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.DEFAULT_METADATA_TOPIC_PREFIX; + /** * Source Resolver implementation that uses static config to return a config corresponding to a system stream. @@ -54,9 +57,12 @@ public class ConfigBasedIOResolverFactory implements SqlIOResolverFactory { private class ConfigBasedIOResolver implements SqlIOResolver { private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table"; private final Config config; + private final String changeLogStorePrefix; public ConfigBasedIOResolver(Config config) { this.config = config; + String metadataTopicPrefix = config.get(CFG_METADATA_TOPIC_PREFIX, DEFAULT_METADATA_TOPIC_PREFIX); + this.changeLogStorePrefix = metadataTopicPrefix + (metadataTopicPrefix.isEmpty() ? "" : "_"); } @Override @@ -100,9 +106,10 @@ public class ConfigBasedIOResolverFactory implements SqlIOResolverFactory { TableDescriptor tableDescriptor = null; if (isTable) { - tableDescriptor = new RocksDbTableDescriptor("InputTable-" + name, KVSerde.of( + String tableId = changeLogStorePrefix + "InputTable-" + name.replace(".", "-").replace("$", "-"); + tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of( new JsonSerdeV2<>(SamzaSqlCompositeKey.class), - new JsonSerdeV2<>(SamzaSqlRelMessage.class))); + new SamzaSqlRelMessageSerdeFactory().getSerde(null, null))).withChangelogEnabled(); } return new SqlIOConfig(systemName, streamName, fetchSystemConfigs(systemName), tableDescriptor); http://git-wip-us.apache.org/repos/asf/samza/blob/58f18117/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java index 745c934..dcb5043 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java @@ -19,6 +19,7 @@ package org.apache.samza.sql.runner; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -82,10 +83,13 @@ public class SamzaSqlApplicationConfig { public static final String CFG_UDF_RESOLVER = "samza.sql.udfResolver"; public static final String CFG_FMT_UDF_RESOLVER_DOMAIN = "samza.sql.udfResolver.%s."; + public static final String CFG_METADATA_TOPIC_PREFIX = "samza.sql.metadataTopicPrefix"; public static final String CFG_GROUPBY_WINDOW_DURATION_MS = "samza.sql.groupby.window.ms"; public static final String SAMZA_SYSTEM_LOG = "log"; + public static final String DEFAULT_METADATA_TOPIC_PREFIX = ""; + private static final long DEFAULT_GROUPBY_WINDOW_DURATION_MS = 300000; // default groupby window duration is 5 mins. private final Map<String, RelSchemaProvider> relSchemaProvidersBySource; @@ -100,6 +104,7 @@ public class SamzaSqlApplicationConfig { private final Map<String, SqlIOConfig> outputSystemStreamConfigsBySource; private final Map<String, SqlIOConfig> systemStreamConfigsBySource; + private final String metadataTopicPrefix; private final long windowDurationMs; public SamzaSqlApplicationConfig(Config staticConfig, Set<String> inputSystemStreams, @@ -133,6 +138,8 @@ public class SamzaSqlApplicationConfig { udfResolver = createUdfResolver(staticConfig); udfMetadata = udfResolver.getUdfs(); + metadataTopicPrefix = + staticConfig.get(CFG_METADATA_TOPIC_PREFIX, DEFAULT_METADATA_TOPIC_PREFIX); windowDurationMs = staticConfig.getLong(CFG_GROUPBY_WINDOW_DURATION_MS, DEFAULT_GROUPBY_WINDOW_DURATION_MS); // remove the SqlIOConfigs of outputs whose system is "log" out of systemStreamConfigsBySource @@ -168,9 +175,14 @@ public class SamzaSqlApplicationConfig { public static SqlIOResolver createIOResolver(Config config) { String sourceResolveValue = config.get(CFG_IO_RESOLVER); + Map<String, String> metadataPrefixProperties = new HashMap<>(); + metadataPrefixProperties.put( + String.format(CFG_FMT_SOURCE_RESOLVER_DOMAIN, sourceResolveValue) + CFG_METADATA_TOPIC_PREFIX, + config.get(CFG_METADATA_TOPIC_PREFIX, DEFAULT_METADATA_TOPIC_PREFIX)); + Config newConfig = new MapConfig(Arrays.asList(config, metadataPrefixProperties)); Validate.notEmpty(sourceResolveValue, "ioResolver config is not set or empty"); - return initializePlugin("SqlIOResolver", sourceResolveValue, config, CFG_FMT_SOURCE_RESOLVER_DOMAIN, - (o, c) -> ((SqlIOResolverFactory) o).create(c, config)); + return initializePlugin("SqlIOResolver", sourceResolveValue, newConfig, CFG_FMT_SOURCE_RESOLVER_DOMAIN, + (o, c) -> ((SqlIOResolverFactory) o).create(c, newConfig)); } private UdfResolver createUdfResolver(Map<String, String> config) { @@ -283,6 +295,10 @@ public class SamzaSqlApplicationConfig { return ioResolver; } + public String getMetadataTopicPrefix() { + return metadataTopicPrefix; + } + public long getWindowDurationMs() { return windowDurationMs; } http://git-wip-us.apache.org/repos/asf/samza/blob/58f18117/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java index ac2c64d..0761898 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java @@ -76,10 +76,12 @@ class JoinTranslator { private static final Logger log = LoggerFactory.getLogger(JoinTranslator.class); private int joinId; private SqlIOResolver ioResolver; + private final String intermediateStreamPrefix; - JoinTranslator(int joinId, SqlIOResolver ioResolver) { + JoinTranslator(int joinId, SqlIOResolver ioResolver, String intermediateStreamPrefix) { this.joinId = joinId; this.ioResolver = ioResolver; + this.intermediateStreamPrefix = intermediateStreamPrefix + (intermediateStreamPrefix.isEmpty() ? "" : "_"); } void translate(final LogicalJoin join, final TranslatorContext context) { @@ -124,7 +126,7 @@ class JoinTranslator { .partitionBy(m -> createSamzaSqlCompositeKey(m, streamKeyIds), m -> m, KVSerde.of(keySerde, valueSerde), - "stream_" + joinId) + intermediateStreamPrefix + "stream_" + joinId) .map(KV::getValue) .join(table, joinFn); // MessageStream<SamzaSqlRelMessage> outputStream = inputStream.join(table, joinFn); @@ -288,8 +290,18 @@ class JoinTranslator { Table<KV<SamzaSqlCompositeKey, SamzaSqlRelMessage>> table = context.getStreamAppDescriptor().getTable(sourceConfig.getTableDescriptor().get()); + Serde<SamzaSqlCompositeKey> keySerde = new JsonSerdeV2<>(SamzaSqlCompositeKey.class); + SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde = + (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null); + + // Let's always repartition by the join fields as key before sending the key and value to the table. + // We need to repartition the stream denoted as table to ensure that both the stream and table that are joined + // have the same partitioning scheme and partition key. relOutputStream - .map(m -> new KV(createSamzaSqlCompositeKey(m, tableKeyIds), m)) + .partitionBy(m -> createSamzaSqlCompositeKey(m, tableKeyIds), + m -> m, + KVSerde.of(keySerde, valueSerde), + intermediateStreamPrefix + "table_" + joinId) .sendTo(table); return table; http://git-wip-us.apache.org/repos/asf/samza/blob/58f18117/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java index 216ecea..40a08ff 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java @@ -45,9 +45,11 @@ class LogicalAggregateTranslator { private static final Logger log = LoggerFactory.getLogger(JoinTranslator.class); private int windowId; + private String changeLogStorePrefix; - LogicalAggregateTranslator(int windowId) { + LogicalAggregateTranslator(int windowId, String changeLogStorePrefix) { this.windowId = windowId; + this.changeLogStorePrefix = changeLogStorePrefix + (changeLogStorePrefix.isEmpty() ? "" : "_"); } void translate(final LogicalAggregate aggregate, final TranslatorContext context) { @@ -69,7 +71,8 @@ class LogicalAggregateTranslator { foldCountFn, new SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde(), new LongSerde()) - .setAccumulationMode(AccumulationMode.DISCARDING), "tumblingWindow_" + windowId) + .setAccumulationMode( + AccumulationMode.DISCARDING), changeLogStorePrefix + "_tumblingWindow_" + windowId) .map(windowPane -> { List<String> fieldNames = windowPane.getKey().getKey().getSamzaSqlRelRecord().getFieldNames(); List<Object> fieldValues = windowPane.getKey().getKey().getSamzaSqlRelRecord().getFieldValues(); http://git-wip-us.apache.org/repos/asf/samza/blob/58f18117/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java index a1e7adb..4c5f11c 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java @@ -177,7 +177,8 @@ public class QueryTranslator { public RelNode visit(LogicalJoin join) { RelNode node = super.visit(join); joinId++; - new JoinTranslator(joinId, ioResolver).translate(join, translatorContext); + new JoinTranslator(joinId, ioResolver, sqlConfig.getMetadataTopicPrefix()) + .translate(join, translatorContext); return node; } @@ -185,7 +186,8 @@ public class QueryTranslator { public RelNode visit(LogicalAggregate aggregate) { RelNode node = super.visit(aggregate); windowId++; - new LogicalAggregateTranslator(windowId).translate(aggregate, translatorContext); + new LogicalAggregateTranslator(windowId, sqlConfig.getMetadataTopicPrefix()) + .translate(aggregate, translatorContext); return node; } }); http://git-wip-us.apache.org/repos/asf/samza/blob/58f18117/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java index 14314c8..818e33d 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java @@ -32,10 +32,10 @@ import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.sql.data.SamzaSqlCompositeKey; -import org.apache.samza.sql.data.SamzaSqlRelMessage; import org.apache.samza.sql.interfaces.SqlIOConfig; import org.apache.samza.sql.interfaces.SqlIOResolver; import org.apache.samza.sql.interfaces.SqlIOResolverFactory; +import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory; import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor; import org.apache.samza.table.ReadWriteTable; import org.apache.samza.table.Table; @@ -44,6 +44,9 @@ import org.apache.samza.table.descriptors.TableProviderFactory; import org.apache.samza.table.TableSpec; import org.apache.samza.table.utils.descriptors.BaseTableProvider; +import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.CFG_METADATA_TOPIC_PREFIX; +import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.DEFAULT_METADATA_TOPIC_PREFIX; + public class TestIOResolverFactory implements SqlIOResolverFactory { public static final String TEST_DB_SYSTEM = "testDb"; @@ -179,9 +182,12 @@ public class TestIOResolverFactory implements SqlIOResolverFactory { private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table"; private final Config config; private final Map<String, TableDescriptor> tableDescMap = new HashMap<>(); + private final String changeLogStorePrefix; public TestIOResolver(Config config) { this.config = config; + String metadataTopicPrefix = config.get(CFG_METADATA_TOPIC_PREFIX, DEFAULT_METADATA_TOPIC_PREFIX); + this.changeLogStorePrefix = metadataTopicPrefix + (metadataTopicPrefix.isEmpty() ? "" : "_"); } private SqlIOConfig fetchIOInfo(String ioName, boolean isSink) { @@ -200,10 +206,10 @@ public class TestIOResolverFactory implements SqlIOResolverFactory { if (isSink) { tableDescriptor = new TestTableDescriptor(TEST_TABLE_ID + tableDescMap.size()); } else { - String tableId = "InputTable-" + ioName.replace(".", "-").replace("$", "-"); + String tableId = changeLogStorePrefix + "InputTable-" + ioName.replace(".", "-").replace("$", "-"); tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of( new JsonSerdeV2<>(SamzaSqlCompositeKey.class), - new JsonSerdeV2<>(SamzaSqlRelMessage.class))); + new SamzaSqlRelMessageSerdeFactory().getSerde(null, null))).withChangelogEnabled(); } tableDescMap.put(ioName, tableDescriptor); } http://git-wip-us.apache.org/repos/asf/samza/blob/58f18117/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java index dcd7023..33c6d02 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java @@ -164,7 +164,7 @@ public class TestJoinTranslator extends TranslatorTestBase { when(mockIOConfig.getTableDescriptor()).thenReturn(Optional.of(mockTableDesc)); // Apply translate() method to verify that we are getting the correct map operator constructed - JoinTranslator joinTranslator = new JoinTranslator(3, mockResolver); + JoinTranslator joinTranslator = new JoinTranslator(3, mockResolver, ""); joinTranslator.translate(mockJoin, mockContext); // make sure that context has been registered with LogicFilter and output message streams verify(mockContext, times(1)).registerMessageStream(3, this.getRegisteredMessageStream(3)); http://git-wip-us.apache.org/repos/asf/samza/blob/58f18117/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java index d0dc23f..250253e 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java @@ -495,6 +495,7 @@ public class TestQueryTranslator { + " join testavro.PROFILE.`$table` as p" + " on p.id = pv.profileId"; config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + config.put(SamzaSqlApplicationConfig.CFG_METADATA_TOPIC_PREFIX, "sampleAppv1"); Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); List<String> sqlStmts = fetchSqlFromConfig(config); @@ -520,26 +521,36 @@ public class TestQueryTranslator { String input3StreamId = specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get(); String input3System = streamConfig.getSystem(input3StreamId); String input3PhysicalName = streamConfig.getPhysicalName(input3StreamId); + String input4StreamId = specGraph.getInputOperators().keySet().stream().skip(3).findFirst().get(); + String input4System = streamConfig.getSystem(input4StreamId); + String input4PhysicalName = streamConfig.getPhysicalName(input4StreamId); String output1StreamId = specGraph.getOutputStreams().keySet().stream().findFirst().get(); String output1System = streamConfig.getSystem(output1StreamId); String output1PhysicalName = streamConfig.getPhysicalName(output1StreamId); String output2StreamId = specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get(); String output2System = streamConfig.getSystem(output2StreamId); String output2PhysicalName = streamConfig.getPhysicalName(output2StreamId); + String output3StreamId = specGraph.getOutputStreams().keySet().stream().skip(2).findFirst().get(); + String output3System = streamConfig.getSystem(output3StreamId); + String output3PhysicalName = streamConfig.getPhysicalName(output3StreamId); - Assert.assertEquals(2, specGraph.getOutputStreams().size()); + Assert.assertEquals(3, specGraph.getOutputStreams().size()); Assert.assertEquals("kafka", output1System); - Assert.assertEquals("sql-job-1-partition_by-stream_1", output1PhysicalName); - Assert.assertEquals("testavro", output2System); - Assert.assertEquals("enrichedPageViewTopic", output2PhysicalName); + Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_1", output1PhysicalName); + Assert.assertEquals("kafka", output2System); + Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_1", output2PhysicalName); + Assert.assertEquals("testavro", output3System); + Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName); - Assert.assertEquals(3, specGraph.getInputOperators().size()); + Assert.assertEquals(4, specGraph.getInputOperators().size()); Assert.assertEquals("testavro", input1System); Assert.assertEquals("PAGEVIEW", input1PhysicalName); Assert.assertEquals("testavro", input2System); Assert.assertEquals("PROFILE", input2PhysicalName); Assert.assertEquals("kafka", input3System); - Assert.assertEquals("sql-job-1-partition_by-stream_1", input3PhysicalName); + Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_1", input3PhysicalName); + Assert.assertEquals("kafka", input4System); + Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_1", input4PhysicalName); } @Test @@ -578,26 +589,36 @@ public class TestQueryTranslator { String input3StreamId = specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get(); String input3System = streamConfig.getSystem(input3StreamId); String input3PhysicalName = streamConfig.getPhysicalName(input3StreamId); + String input4StreamId = specGraph.getInputOperators().keySet().stream().skip(3).findFirst().get(); + String input4System = streamConfig.getSystem(input4StreamId); + String input4PhysicalName = streamConfig.getPhysicalName(input4StreamId); String output1StreamId = specGraph.getOutputStreams().keySet().stream().findFirst().get(); String output1System = streamConfig.getSystem(output1StreamId); String output1PhysicalName = streamConfig.getPhysicalName(output1StreamId); String output2StreamId = specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get(); String output2System = streamConfig.getSystem(output2StreamId); String output2PhysicalName = streamConfig.getPhysicalName(output2StreamId); + String output3StreamId = specGraph.getOutputStreams().keySet().stream().skip(2).findFirst().get(); + String output3System = streamConfig.getSystem(output3StreamId); + String output3PhysicalName = streamConfig.getPhysicalName(output3StreamId); - Assert.assertEquals(2, specGraph.getOutputStreams().size()); + Assert.assertEquals(3, specGraph.getOutputStreams().size()); Assert.assertEquals("kafka", output1System); - Assert.assertEquals("sql-job-1-partition_by-stream_1", output1PhysicalName); - Assert.assertEquals("testavro", output2System); - Assert.assertEquals("enrichedPageViewTopic", output2PhysicalName); + Assert.assertEquals("sql-job-1-partition_by-table_1", output1PhysicalName); + Assert.assertEquals("kafka", output2System); + Assert.assertEquals("sql-job-1-partition_by-stream_1", output2PhysicalName); + Assert.assertEquals("testavro", output3System); + Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName); - Assert.assertEquals(3, specGraph.getInputOperators().size()); + Assert.assertEquals(4, specGraph.getInputOperators().size()); Assert.assertEquals("testavro", input1System); Assert.assertEquals("PAGEVIEW", input1PhysicalName); Assert.assertEquals("testavro", input2System); Assert.assertEquals("PROFILE", input2PhysicalName); Assert.assertEquals("kafka", input3System); - Assert.assertEquals("sql-job-1-partition_by-stream_1", input3PhysicalName); + Assert.assertEquals("sql-job-1-partition_by-table_1", input3PhysicalName); + Assert.assertEquals("kafka", input4System); + Assert.assertEquals("sql-job-1-partition_by-stream_1", input4PhysicalName); } @Test @@ -635,26 +656,36 @@ public class TestQueryTranslator { String input3StreamId = specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get(); String input3System = streamConfig.getSystem(input3StreamId); String input3PhysicalName = streamConfig.getPhysicalName(input3StreamId); + String input4StreamId = specGraph.getInputOperators().keySet().stream().skip(3).findFirst().get(); + String input4System = streamConfig.getSystem(input4StreamId); + String input4PhysicalName = streamConfig.getPhysicalName(input4StreamId); String output1StreamId = specGraph.getOutputStreams().keySet().stream().findFirst().get(); String output1System = streamConfig.getSystem(output1StreamId); String output1PhysicalName = streamConfig.getPhysicalName(output1StreamId); String output2StreamId = specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get(); String output2System = streamConfig.getSystem(output2StreamId); String output2PhysicalName = streamConfig.getPhysicalName(output2StreamId); + String output3StreamId = specGraph.getOutputStreams().keySet().stream().skip(2).findFirst().get(); + String output3System = streamConfig.getSystem(output3StreamId); + String output3PhysicalName = streamConfig.getPhysicalName(output3StreamId); - Assert.assertEquals(2, specGraph.getOutputStreams().size()); + Assert.assertEquals(3, specGraph.getOutputStreams().size()); Assert.assertEquals("kafka", output1System); - Assert.assertEquals("sql-job-1-partition_by-stream_1", output1PhysicalName); - Assert.assertEquals("testavro", output2System); - Assert.assertEquals("enrichedPageViewTopic", output2PhysicalName); + Assert.assertEquals("sql-job-1-partition_by-table_1", output1PhysicalName); + Assert.assertEquals("kafka", output2System); + Assert.assertEquals("sql-job-1-partition_by-stream_1", output2PhysicalName); + Assert.assertEquals("testavro", output3System); + Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName); - Assert.assertEquals(3, specGraph.getInputOperators().size()); + Assert.assertEquals(4, specGraph.getInputOperators().size()); Assert.assertEquals("testavro", input1System); Assert.assertEquals("PROFILE", input1PhysicalName); Assert.assertEquals("testavro", input2System); Assert.assertEquals("PAGEVIEW", input2PhysicalName); Assert.assertEquals("kafka", input3System); - Assert.assertEquals("sql-job-1-partition_by-stream_1", input3PhysicalName); + Assert.assertEquals("sql-job-1-partition_by-table_1", input3PhysicalName); + Assert.assertEquals("kafka", input4System); + Assert.assertEquals("sql-job-1-partition_by-stream_1", input4PhysicalName); } @Test
