http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java index 04fdec5..5309838 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/TestQueryTranslator.java @@ -1,5 +1,3 @@ -package org.apache.samza.sql; - /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,27 +17,40 @@ package org.apache.samza.sql; * under the License. */ +package org.apache.samza.sql; + +import java.util.HashMap; import java.util.Map; +import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.sql.impl.ConfigBasedSourceResolverFactory; import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; import org.apache.samza.sql.runner.SamzaSqlApplicationRunner; import org.apache.samza.sql.testutil.SamzaSqlQueryParser; import org.apache.samza.sql.testutil.SamzaSqlTestConfig; import org.apache.samza.sql.translator.QueryTranslator; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; public class TestQueryTranslator { + private final Map<String, String> configs = new HashMap<>(); + + @Before + public void setUp() { + configs.put("job.default.system", "kafka"); + } + @Test public void testTranslate() { Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10); config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, - "Insert into testavro.outputTopic select MyTest(id) from testavro.level1.level2.SIMPLE1"); + "Insert into testavro.outputTopic select MyTest(id) from testavro.level1.level2.SIMPLE1 as s where s.id = 10"); Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); @@ -100,4 +111,357 @@ public class TestQueryTranslator { Assert.assertEquals("COMPLEX1", streamGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName()); } + + @Test (expected = SamzaException.class) + public void testTranslateStreamTableJoinWithoutJoinOperator() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW as pv, testavro.PROFILE.`$table` as p" + + " where p.id = pv.profileId"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, streamGraph); + } + + @Test (expected = SamzaException.class) + public void testTranslateStreamTableJoinWithFullJoinOperator() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW as pv" + + " full join testavro.PROFILE.`$table` as p" + + " on p.id = pv.profileId"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, streamGraph); + } + + @Test (expected = IllegalStateException.class) + public void testTranslateStreamTableJoinWithSelfJoinOperator() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p1.name as profileName" + + " from testavro.PROFILE.`$table` as p1" + + " join testavro.PROFILE.`$table` as p2" + + " on p1.id = p2.id"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, streamGraph); + } + + @Test (expected = SamzaException.class) + public void testTranslateStreamTableJoinWithThetaCondition() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW as pv" + + " join testavro.PROFILE.`$table` as p" + + " on p.id <> pv.profileId"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, streamGraph); + } + + @Test (expected = SamzaException.class) + public void testTranslateStreamTableCrossJoin() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW as pv, testavro.PROFILE.`$table` as p"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, streamGraph); + } + + @Test (expected = SamzaException.class) + public void testTranslateStreamTableJoinWithAndLiteralCondition() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW as pv" + + " join testavro.PROFILE.`$table` as p" + + " on p.id = pv.profileId and p.name = 'John'"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, streamGraph); + } + + @Test (expected = SamzaException.class) + public void testTranslateStreamTableJoinWithSubQuery() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW as pv" + + " where exists " + + " (select p.id from testavro.PROFILE.`$table` as p" + + " where p.id = pv.profileId)"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, streamGraph); + } + + @Test (expected = SamzaException.class) + public void testTranslateTableTableJoin() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW.`$table` as pv" + + " join testavro.PROFILE.`$table` as p" + + " on p.id = pv.profileId"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, streamGraph); + } + + @Test (expected = SamzaException.class) + public void testTranslateStreamStreamJoin() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW as pv" + + " join testavro.PROFILE as p" + + " on p.id = pv.profileId"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, streamGraph); + } + + @Test (expected = SamzaException.class) + public void testTranslateJoinWithIncorrectLeftJoin() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW.`$table` as pv" + + " left join testavro.PROFILE as p" + + " on p.id = pv.profileId"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, streamGraph); + } + + @Test (expected = SamzaException.class) + public void testTranslateJoinWithIncorrectRightJoin() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 1); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW as pv" + + " right join testavro.PROFILE.`$table` as p" + + " on p.id = pv.profileId"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, streamGraph); + } + + @Test (expected = SamzaException.class) + public void testTranslateStreamTableInnerJoinWithMissingStream() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10); + String configSourceResolverDomain = + String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config"); + config.put(configSourceResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY, + ConfigBasedSourceResolverFactory.class.getName()); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW as pv" + + " join testavro.`$table` as p" + + " on p.id = pv.profileId"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, streamGraph); + } + + @Test (expected = SamzaException.class) + public void testTranslateStreamTableInnerJoinWithUdf() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW as pv" + + " join testavro.PROFILE.`$table` as p" + + " on MyTest(p.id) = MyTest(pv.profileId)"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, streamGraph); + } + @Test + public void testTranslateStreamTableInnerJoin() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW as pv" + + " join testavro.PROFILE.`$table` as p" + + " on p.id = pv.profileId"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, streamGraph); + + Assert.assertEquals(2, streamGraph.getOutputStreams().size()); + Assert.assertEquals("kafka", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName()); + Assert.assertEquals("sql-job-1-partition_by-stream_1", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName()); + Assert.assertEquals("testavro", streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName()); + Assert.assertEquals("enrichedPageViewTopic", streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName()); + + Assert.assertEquals(3, streamGraph.getInputOperators().size()); + Assert.assertEquals("testavro", + streamGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName()); + Assert.assertEquals("PAGEVIEW", + streamGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName()); + Assert.assertEquals("testavro", + streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName()); + Assert.assertEquals("PROFILE", + streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName()); + Assert.assertEquals("kafka", + streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName()); + Assert.assertEquals("sql-job-1-partition_by-stream_1", + streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName()); + } + + @Test + public void testTranslateStreamTableLeftJoin() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW as pv" + + " left join testavro.PROFILE.`$table` as p" + + " on p.id = pv.profileId"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, streamGraph); + + Assert.assertEquals(2, streamGraph.getOutputStreams().size()); + Assert.assertEquals("kafka", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName()); + Assert.assertEquals("sql-job-1-partition_by-stream_1", + streamGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName()); + Assert.assertEquals("testavro", streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName()); + Assert.assertEquals("enrichedPageViewTopic", + streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName()); + + Assert.assertEquals(3, streamGraph.getInputOperators().size()); + Assert.assertEquals("testavro", + streamGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName()); + Assert.assertEquals("PAGEVIEW", + streamGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName()); + Assert.assertEquals("testavro", + streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName()); + Assert.assertEquals("PROFILE", + streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName()); + Assert.assertEquals("kafka", + streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName()); + Assert.assertEquals("sql-job-1-partition_by-stream_1", + streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName()); + } + + @Test + public void testTranslateStreamTableRightJoin() { + Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 10); + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PROFILE.`$table` as p" + + " right join testavro.PAGEVIEW as pv" + + " on p.id = pv.profileId"; + config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql); + Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config)); + SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); + QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); + SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); + StreamGraphImpl streamGraph = new StreamGraphImpl(new LocalApplicationRunner(samzaConfig), samzaConfig); + translator.translate(queryInfo, streamGraph); + + Assert.assertEquals(2, streamGraph.getOutputStreams().size()); + Assert.assertEquals("kafka", streamGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName()); + Assert.assertEquals("sql-job-1-partition_by-stream_1", + streamGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName()); + Assert.assertEquals("testavro", streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName()); + Assert.assertEquals("enrichedPageViewTopic", + streamGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName()); + + Assert.assertEquals(3, streamGraph.getInputOperators().size()); + Assert.assertEquals("testavro", + streamGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName()); + Assert.assertEquals("PROFILE", + streamGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName()); + Assert.assertEquals("testavro", + streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName()); + Assert.assertEquals("PAGEVIEW", + streamGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName()); + Assert.assertEquals("kafka", + streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName()); + Assert.assertEquals("sql-job-1-partition_by-stream_1", + streamGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName()); + } }
http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java index 1c5fc41..0804a6d 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlApplicationConfig.java @@ -21,13 +21,13 @@ package org.apache.samza.sql; import java.util.HashMap; import java.util.Map; -import junit.framework.Assert; import org.apache.samza.SamzaException; import org.apache.samza.config.MapConfig; import org.apache.samza.sql.impl.ConfigBasedUdfResolver; -import org.apache.samza.sql.interfaces.SqlSystemStreamConfig; +import org.apache.samza.sql.interfaces.SqlSystemSourceConfig; import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; import org.apache.samza.sql.testutil.SamzaSqlTestConfig; +import org.junit.Assert; import org.junit.Test; @@ -69,11 +69,11 @@ public class TestSamzaSqlApplicationConfig { String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config"); String avroSamzaSqlConfigPrefix = configSourceResolverDomain + String.format("%s.", "testavro"); - testWithoutConfigShouldFail(config, avroSamzaSqlConfigPrefix + SqlSystemStreamConfig.CFG_SAMZA_REL_CONVERTER); + testWithoutConfigShouldFail(config, avroSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_SAMZA_REL_CONVERTER); // Configs for the unused system "log" is not mandatory. String logSamzaSqlConfigPrefix = configSourceResolverDomain + String.format("%s.", "log"); - testWithoutConfigShouldPass(config, logSamzaSqlConfigPrefix + SqlSystemStreamConfig.CFG_SAMZA_REL_CONVERTER); + testWithoutConfigShouldPass(config, logSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_SAMZA_REL_CONVERTER); } private void testWithoutConfigShouldPass(Map<String, String> config, String configKey) { http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java index 97196e2..0bfd721 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlQueryParser.java @@ -38,6 +38,21 @@ public class TestSamzaSqlQueryParser { } @Test + public void testParseJoinQuery() { + String sql = + "Insert into testavro.enrichedPageViewTopic" + + " select p.name as profileName, pv.pageKey" + + " from testavro.PAGEVIEW as pv" + + " join testavro.PROFILE.`$table` as p" + + " on p.id = pv.profileId"; + QueryInfo queryInfo = SamzaSqlQueryParser.parseQuery(sql); + Assert.assertEquals("testavro.enrichedPageViewTopic", queryInfo.getOutputSource()); + Assert.assertEquals(2, queryInfo.getInputSources().size()); + Assert.assertEquals("testavro.PAGEVIEW", queryInfo.getInputSources().get(0)); + Assert.assertEquals("testavro.PROFILE.$table", queryInfo.getInputSources().get(1)); + } + + @Test public void testParseInvalidQuery() { try { @@ -58,13 +73,4 @@ public class TestSamzaSqlQueryParser { } catch (SamzaException e) { } } - - @Test - public void testParseJoin() { - try { - SamzaSqlQueryParser.parseQuery("insert into log.foo select * from tracking.bar1,tracking.bar2"); - Assert.fail("Expected a samzaException"); - } catch (SamzaException e) { - } - } } http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java new file mode 100644 index 0000000..3da004a --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageJoinFunction.java @@ -0,0 +1,116 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.samza.operators.KV; +import org.apache.samza.sql.data.SamzaSqlCompositeKey; +import org.apache.samza.sql.data.SamzaSqlRelMessage; +import org.apache.samza.sql.translator.SamzaSqlRelMessageJoinFunction; +import org.junit.Assert; +import org.junit.Test; + + +public class TestSamzaSqlRelMessageJoinFunction { + + private List<String> streamFieldNames = Arrays.asList("field1", "field2", "field3", "field4"); + private List<Object> streamFieldValues = Arrays.asList("value1", 1, null, "value4"); + private List<String> tableFieldNames = Arrays.asList("field11", "field12", "field13", "field14"); + private List<Object> tableFieldValues = Arrays.asList("value1", 1, null, "value5"); + + @Test + public void testWithInnerJoinWithTableOnRight() { + SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues); + SamzaSqlRelMessage tableMsg = new SamzaSqlRelMessage(tableFieldNames, tableFieldValues); + JoinRelType joinRelType = JoinRelType.INNER; + List<Integer> streamKeyIds = Arrays.asList(0, 1); + List<Integer> tableKeyIds = Arrays.asList(0, 1); + SamzaSqlCompositeKey compositeKey = SamzaSqlCompositeKey.createSamzaSqlCompositeKey(tableMsg, tableKeyIds); + KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg); + + SamzaSqlRelMessageJoinFunction joinFn = + new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableFieldNames); + SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record); + + Assert.assertEquals(outMsg.getFieldValues().size(), outMsg.getFieldNames().size()); + List<String> expectedFieldNames = new ArrayList<>(streamFieldNames); + expectedFieldNames.addAll(tableFieldNames); + List<Object> expectedFieldValues = new ArrayList<>(streamFieldValues); + expectedFieldValues.addAll(tableFieldValues); + Assert.assertEquals(outMsg.getFieldValues(), expectedFieldValues); + } + + @Test + public void testWithInnerJoinWithTableOnLeft() { + SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues); + SamzaSqlRelMessage tableMsg = new SamzaSqlRelMessage(tableFieldNames, tableFieldValues); + JoinRelType joinRelType = JoinRelType.INNER; + List<Integer> streamKeyIds = Arrays.asList(0, 2); + List<Integer> tableKeyIds = Arrays.asList(0, 2); + SamzaSqlCompositeKey compositeKey = SamzaSqlCompositeKey.createSamzaSqlCompositeKey(tableMsg, tableKeyIds); + KV<SamzaSqlCompositeKey, SamzaSqlRelMessage> record = KV.of(compositeKey, tableMsg); + + SamzaSqlRelMessageJoinFunction joinFn = + new SamzaSqlRelMessageJoinFunction(joinRelType, false, streamKeyIds, streamFieldNames, tableFieldNames); + SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, record); + + Assert.assertEquals(outMsg.getFieldValues().size(), outMsg.getFieldNames().size()); + List<String> expectedFieldNames = new ArrayList<>(tableFieldNames); + expectedFieldNames.addAll(streamFieldNames); + List<Object> expectedFieldValues = new ArrayList<>(tableFieldValues); + expectedFieldValues.addAll(streamFieldValues); + Assert.assertEquals(outMsg.getFieldValues(), expectedFieldValues); + } + + @Test + public void testNullRecordWithInnerJoin() { + SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues); + JoinRelType joinRelType = JoinRelType.INNER; + List<Integer> streamKeyIds = Arrays.asList(0, 1); + + SamzaSqlRelMessageJoinFunction joinFn = + new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, tableFieldNames); + SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null); + Assert.assertNull(outMsg); + } + + @Test + public void testNullRecordWithLeftOuterJoin() { + SamzaSqlRelMessage streamMsg = new SamzaSqlRelMessage(streamFieldNames, streamFieldValues); + JoinRelType joinRelType = JoinRelType.LEFT; + List<Integer> streamKeyIds = Arrays.asList(0, 1); + + SamzaSqlRelMessageJoinFunction joinFn = + new SamzaSqlRelMessageJoinFunction(joinRelType, true, streamKeyIds, streamFieldNames, + tableFieldNames); + SamzaSqlRelMessage outMsg = joinFn.apply(streamMsg, null); + + Assert.assertEquals(outMsg.getFieldValues().size(), outMsg.getFieldNames().size()); + List<String> expectedFieldNames = new ArrayList<>(streamFieldNames); + expectedFieldNames.addAll(tableFieldNames); + List<Object> expectedFieldValues = new ArrayList<>(streamFieldValues); + expectedFieldValues.addAll(tableFieldNames.stream().map( name -> null ).collect(Collectors.toList())); + Assert.assertEquals(outMsg.getFieldValues(), expectedFieldValues); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java new file mode 100644 index 0000000..3416ee1 --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java @@ -0,0 +1,43 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.sql; + +import java.util.Arrays; +import java.util.List; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.sql.data.SamzaSqlRelMessage; +import org.junit.Assert; +import org.junit.Test; + + +public class TestSamzaSqlRelMessageSerde { + + private List<Object> values = Arrays.asList("value1", 1, null); + private List<String> names = Arrays.asList("field1", "field2", "field3"); + + @Test + public void testWithDifferentFields() { + SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values); + JsonSerdeV2<SamzaSqlRelMessage> serde = new JsonSerdeV2<>(SamzaSqlRelMessage.class); + SamzaSqlRelMessage resultMsg = serde.fromBytes(serde.toBytes(message)); + Assert.assertEquals(resultMsg.getFieldNames(), names); + Assert.assertEquals(resultMsg.getFieldValues(), values); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Company.avsc ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Company.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Company.avsc new file mode 100644 index 0000000..a235c28 --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Company.avsc @@ -0,0 +1,39 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +{ + "name": "Company", + "version" : 1, + "namespace": "org.apache.samza.sql.system.avro", + "type": "record", + "fields": [ + { + "name": "id", + "doc": "Company id.", + "type": ["null", "int"], + "default":null + }, + { + "name": "name", + "doc" : "Company name.", + "type": ["null", "string"], + "default":null + } + ] +} http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Company.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Company.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Company.java new file mode 100644 index 0000000..9513acf --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Company.java @@ -0,0 +1,52 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.apache.samza.sql.avro.schemas; + +@SuppressWarnings("all") +public class Company extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"Company\",\"namespace\":\"org.apache.samza.sql.system.avro\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Company id.\",\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"doc\":\"Company name.\",\"default\":null}]}"); + /** Company id. */ + public java.lang.Integer id; + /** Company name. */ + public java.lang.CharSequence name; + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return id; + case 1: return name; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: id = (java.lang.Integer)value$; break; + case 1: name = (java.lang.CharSequence)value$; break; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.avsc ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.avsc new file mode 100644 index 0000000..5117a5e --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.avsc @@ -0,0 +1,45 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +{ + "name": "EnrichedPageView", + "version" : 1, + "namespace": "org.apache.samza.sql.system.avro", + "type": "record", + "fields": [ + { + "name": "pageKey", + "doc": "Page key.", + "type": ["null", "string"], + "default":null + }, + { + "name": "companyName", + "doc" : "Company name.", + "type": ["null", "string"], + "default":null + }, + { + "name": "profileName", + "doc" : "Profile name.", + "type": ["null", "string"], + "default":null + } + ] +} http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.java new file mode 100644 index 0000000..cf3f62d --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/EnrichedPageView.java @@ -0,0 +1,56 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.apache.samza.sql.avro.schemas; + +@SuppressWarnings("all") +public class EnrichedPageView extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"EnrichedPageView\",\"namespace\":\"org.apache.samza.sql.system.avro\",\"fields\":[{\"name\":\"pageKey\",\"type\":[\"null\",\"string\"],\"doc\":\"Page key.\",\"default\":null},{\"name\":\"companyName\",\"type\":[\"null\",\"string\"],\"doc\":\"Company name.\",\"default\":null},{\"name\":\"profileName\",\"type\":[\"null\",\"string\"],\"doc\":\"Profile name.\",\"default\":null}]}"); + /** Page key. */ + public java.lang.CharSequence pageKey; + /** Company name. */ + public java.lang.CharSequence companyName; + /** Profile name. */ + public java.lang.CharSequence profileName; + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return pageKey; + case 1: return companyName; + case 2: return profileName; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: pageKey = (java.lang.CharSequence)value$; break; + case 1: companyName = (java.lang.CharSequence)value$; break; + case 2: profileName = (java.lang.CharSequence)value$; break; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageView.avsc ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageView.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageView.avsc new file mode 100644 index 0000000..5c107ab --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageView.avsc @@ -0,0 +1,39 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +{ + "name": "PageView", + "version" : 1, + "namespace": "org.apache.samza.sql.system.avro", + "type": "record", + "fields": [ + { + "name": "pageKey", + "doc": "Page key.", + "type": ["null", "string"], + "default":null + }, + { + "name": "profileId", + "doc" : "Profile id.", + "type": ["null", "int"], + "default":null + } + ] +} http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageView.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageView.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageView.java new file mode 100644 index 0000000..21e7bb7 --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/PageView.java @@ -0,0 +1,52 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.apache.samza.sql.avro.schemas; + +@SuppressWarnings("all") +public class PageView extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"PageView\",\"namespace\":\"org.apache.samza.sql.system.avro\",\"fields\":[{\"name\":\"pageKey\",\"type\":[\"null\",\"string\"],\"doc\":\"Page key.\",\"default\":null},{\"name\":\"profileId\",\"type\":[\"null\",\"int\"],\"doc\":\"Profile id.\",\"default\":null}]}"); + /** Page key. */ + public java.lang.CharSequence pageKey; + /** Profile id. */ + public java.lang.Integer profileId; + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return pageKey; + case 1: return profileId; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: pageKey = (java.lang.CharSequence)value$; break; + case 1: profileId = (java.lang.Integer)value$; break; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.avsc ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.avsc b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.avsc new file mode 100644 index 0000000..4e5e7dc --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.avsc @@ -0,0 +1,45 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +{ + "name": "Profile", + "version" : 1, + "namespace": "org.apache.samza.sql.system.avro", + "type": "record", + "fields": [ + { + "name": "id", + "doc": "Profile id.", + "type": ["null", "int"], + "default":null + }, + { + "name": "name", + "doc" : "Profile name.", + "type": ["null", "string"], + "default":null + }, + { + "name": "companyId", + "doc" : "Company id.", + "type": ["null", "int"], + "default":null + } + ] +} http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.java b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.java new file mode 100644 index 0000000..b5c1828 --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/Profile.java @@ -0,0 +1,56 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.apache.samza.sql.avro.schemas; + +@SuppressWarnings("all") +public class Profile extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"Profile\",\"namespace\":\"org.apache.samza.sql.system.avro\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Profile id.\",\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"doc\":\"Profile name.\",\"default\":null},{\"name\":\"companyId\",\"type\":[\"null\",\"int\"],\"doc\":\"Company id.\",\"default\":null}]}"); + /** Profile id. */ + public java.lang.Integer id; + /** Profile name. */ + public java.lang.CharSequence name; + /** Company id. */ + public java.lang.Integer companyId; + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return id; + case 1: return name; + case 2: return companyId; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: id = (java.lang.Integer)value$; break; + case 1: name = (java.lang.CharSequence)value$; break; + case 2: companyId = (java.lang.Integer)value$; break; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlEndToEnd.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlEndToEnd.java b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlEndToEnd.java deleted file mode 100644 index 8baa9e7..0000000 --- a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlEndToEnd.java +++ /dev/null @@ -1,153 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -package org.apache.samza.sql.e2e; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import org.apache.avro.generic.GenericRecord; -import org.apache.calcite.plan.RelOptUtil; -import org.apache.samza.config.MapConfig; -import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; -import org.apache.samza.sql.runner.SamzaSqlApplicationRunner; -import org.apache.samza.sql.system.TestAvroSystemFactory; -import org.apache.samza.sql.testutil.JsonUtil; -import org.apache.samza.sql.testutil.MyTestUdf; -import org.apache.samza.sql.testutil.SamzaSqlTestConfig; -import org.apache.samza.system.OutgoingMessageEnvelope; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class TestSamzaSqlEndToEnd { - - private static final Logger LOG = LoggerFactory.getLogger(TestSamzaSqlEndToEnd.class); - - @Test - public void testEndToEnd() throws Exception { - int numMessages = 20; - - TestAvroSystemFactory.messages.clear(); - Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages); - String sql1 = "Insert into testavro.outputTopic select id, CURRENT_TIME as long_value from testavro.SIMPLE1"; - List<String> sqlStmts = Arrays.asList(sql1); - staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); - - List<Integer> outMessages = TestAvroSystemFactory.messages.stream() - .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) - .sorted() - .collect(Collectors.toList()); - Assert.assertEquals(numMessages, outMessages.size()); - Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(outMessages)); - } - - @Test - public void testEndToEndFlatten() throws Exception { - int numMessages = 20; - TestAvroSystemFactory.messages.clear(); - Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages); - LOG.info(" Class Path : " + RelOptUtil.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath()); - String sql1 = - "Insert into testavro.outputTopic select Flatten(array_values) as string_value, id from testavro.COMPLEX1"; - List<String> sqlStmts = Collections.singletonList(sql1); - staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); - - List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages); - - int expectedMessages = 0; - // Flatten de-normalizes the data. So there is separate record for each entry in the array. - for (int index = 1; index < numMessages; index++) { - expectedMessages = expectedMessages + Math.max(1, index); - } - Assert.assertEquals(expectedMessages, outMessages.size()); - } - - @Test - public void testEndToEndSubQuery() throws Exception { - int numMessages = 20; - TestAvroSystemFactory.messages.clear(); - Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages); - String sql1 = - "Insert into testavro.outputTopic select Flatten(a) as id from (select MyTestArray(id) a from testavro.SIMPLE1)"; - List<String> sqlStmts = Collections.singletonList(sql1); - staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); - - List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages); - - int expectedMessages = 0; - // Flatten de-normalizes the data. So there is separate record for each entry in the array. - for (int index = 1; index < numMessages; index++) { - expectedMessages = expectedMessages + Math.max(1, index); - } - Assert.assertEquals(expectedMessages, outMessages.size()); - } - - @Test - public void testEndToEndUdf() throws Exception { - int numMessages = 20; - TestAvroSystemFactory.messages.clear(); - Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages); - String sql1 = "Insert into testavro.outputTopic select id, MyTest(id) as long_value from testavro.SIMPLE1"; - List<String> sqlStmts = Collections.singletonList(sql1); - staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); - - LOG.info("output Messages " + TestAvroSystemFactory.messages); - - List<Integer> outMessages = TestAvroSystemFactory.messages.stream() - .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("long_value").toString())) - .sorted() - .collect(Collectors.toList()); - Assert.assertEquals(outMessages.size(), numMessages); - MyTestUdf udf = new MyTestUdf(); - - Assert.assertTrue( - IntStream.range(0, numMessages).map(udf::execute).boxed().collect(Collectors.toList()).equals(outMessages)); - } - - @Test - public void testRegexMatchUdfInWhereClause() throws Exception { - int numMessages = 20; - TestAvroSystemFactory.messages.clear(); - Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages); - String sql1 = "Insert into testavro.outputTopic select id from testavro.SIMPLE1 where RegexMatch('.*4', Name)"; - List<String> sqlStmts = Collections.singletonList(sql1); - staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); - - LOG.info("output Messages " + TestAvroSystemFactory.messages); - // There should be two messages that contain "4" - Assert.assertEquals(TestAvroSystemFactory.messages.size(), 2); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/system/SimpleSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/SimpleSystemAdmin.java b/samza-sql/src/test/java/org/apache/samza/sql/system/SimpleSystemAdmin.java index 5655a81..a8731fb 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/system/SimpleSystemAdmin.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/system/SimpleSystemAdmin.java @@ -26,10 +26,13 @@ import java.util.function.Function; import java.util.stream.Collectors; import org.apache.samza.Partition; import org.apache.samza.config.Config; +import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemStreamMetadata; import org.apache.samza.system.SystemStreamPartition; +import static org.apache.samza.system.IncomingMessageEnvelope.*; + public class SimpleSystemAdmin implements SystemAdmin { @@ -46,7 +49,7 @@ public class SimpleSystemAdmin implements SystemAdmin { return streamNames.stream() .collect(Collectors.toMap(Function.identity(), streamName -> new SystemStreamMetadata(streamName, Collections.singletonMap(new Partition(0), - new SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, null))))); + new SystemStreamMetadata.SystemStreamPartitionMetadata(null, END_OF_STREAM_OFFSET, null))))); } @Override @@ -58,4 +61,10 @@ public class SimpleSystemAdmin implements SystemAdmin { } return offset1.compareTo(offset2); } + + @Override + public boolean createStream(StreamSpec streamSpec) { + // Do nothing. + return true; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java index 3a9ae16..6dcad25 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java @@ -21,6 +21,7 @@ package org.apache.samza.sql.system; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -31,7 +32,10 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.samza.config.Config; import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.sql.avro.schemas.Company; import org.apache.samza.sql.avro.schemas.ComplexRecord; +import org.apache.samza.sql.avro.schemas.PageView; +import org.apache.samza.sql.avro.schemas.Profile; import org.apache.samza.sql.avro.schemas.SimpleRecord; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.OutgoingMessageEnvelope; @@ -46,9 +50,42 @@ import org.slf4j.LoggerFactory; public class TestAvroSystemFactory implements SystemFactory { private static final Logger LOG = LoggerFactory.getLogger(TestAvroSystemFactory.class); + public static final String CFG_NUM_MESSAGES = "numMessages"; + public static final String CFG_INCLUDE_NULL_FOREIGN_KEYS = "includeNullForeignKeys"; public static List<OutgoingMessageEnvelope> messages = new ArrayList<>(); + public static final String[] profiles = {"John", "Mike", "Mary", "Joe", "Brad", "Jennifer"}; + public static final String[] companies = {"MSFT", "LKND", "GOOG", "FB", "AMZN", "CSCO"}; + public static final String[] pagekeys = {"inbox", "home", "search", "pymk", "group", "job"}; + + public static List<String> getPageKeyProfileNameJoin(int numMessages) { + return IntStream.range(0, numMessages) + .mapToObj(i -> pagekeys[i % pagekeys.length] + "," + profiles[i % profiles.length]) + .collect(Collectors.toList()); + } + + public static List<String> getPageKeyProfileNameJoinWithNullForeignKeys(int numMessages) { + // All even profileId foreign keys are null + return IntStream.range(0, numMessages / 2) + .mapToObj(i -> pagekeys[(i * 2 + 1) % pagekeys.length] + "," + profiles[(i * 2 + 1) % profiles.length]) + .collect(Collectors.toList()); + } + + public static List<String> getPageKeyProfileNameOuterJoinWithNullForeignKeys(int numMessages) { + // All even profileId foreign keys are null + return IntStream.range(0, numMessages) + .mapToObj(i -> pagekeys[i % pagekeys.length] + "," + ((i % 2 == 0) ? "null" : profiles[i % profiles.length])) + .collect(Collectors.toList()); + } + + public static List<String> getPageKeyProfileCompanyNameJoin(int numMessages) { + return IntStream.range(0, numMessages) + .mapToObj(i -> pagekeys[i % pagekeys.length] + "," + profiles[i % profiles.length] + + "," + companies[i % companies.length]) + .collect(Collectors.toList()); + } + @Override public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) { return new TestAvroSystemConsumer(systemName, config); @@ -67,10 +104,16 @@ public class TestAvroSystemFactory implements SystemFactory { private class TestAvroSystemConsumer implements SystemConsumer { public static final int DEFAULT_NUM_EVENTS = 10; private final int numMessages; - private boolean simpleRecord; + private final boolean includeNullForeignKeys; + private final Set<SystemStreamPartition> simpleRecordMap = new HashSet<>(); + private final Set<SystemStreamPartition> profileRecordMap = new HashSet<>(); + private final Set<SystemStreamPartition> companyRecordMap = new HashSet<>(); + private final Set<SystemStreamPartition> pageViewRecordMap = new HashSet<>(); public TestAvroSystemConsumer(String systemName, Config config) { numMessages = config.getInt(String.format("systems.%s.%s", systemName, CFG_NUM_MESSAGES), DEFAULT_NUM_EVENTS); + includeNullForeignKeys = config.getBoolean(String.format("systems.%s.%s", systemName, + CFG_INCLUDE_NULL_FOREIGN_KEYS), false); } @Override @@ -83,7 +126,18 @@ public class TestAvroSystemFactory implements SystemFactory { @Override public void register(SystemStreamPartition systemStreamPartition, String offset) { - simpleRecord = systemStreamPartition.getStream().toLowerCase().contains("simple"); + if (systemStreamPartition.getStream().toLowerCase().contains("simple1")) { + simpleRecordMap.add(systemStreamPartition); + } + if (systemStreamPartition.getStream().toLowerCase().contains("profile")) { + profileRecordMap.add(systemStreamPartition); + } + if (systemStreamPartition.getStream().toLowerCase().contains("company")) { + companyRecordMap.add(systemStreamPartition); + } + if (systemStreamPartition.getStream().toLowerCase().contains("pageview")) { + pageViewRecordMap.add(systemStreamPartition); + } } @Override @@ -93,8 +147,8 @@ public class TestAvroSystemFactory implements SystemFactory { set.forEach(ssp -> { // We send num Messages and an end of stream message following that. List<IncomingMessageEnvelope> envelopes = IntStream.range(0, numMessages + 1) - .mapToObj(i -> new IncomingMessageEnvelope(ssp, - i == numMessages ? IncomingMessageEnvelope.END_OF_STREAM_OFFSET : null, "key" + i, getData(i))) + .mapToObj(i -> i < numMessages ? new IncomingMessageEnvelope(ssp, null, "key" + i, + getData(i, ssp)) : IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp)) .collect(Collectors.toList()); envelopeMap.put(ssp, envelopes); }); @@ -102,9 +156,15 @@ public class TestAvroSystemFactory implements SystemFactory { return envelopeMap; } - private Object getData(int index) { - if (simpleRecord) { + private Object getData(int index, SystemStreamPartition ssp) { + if (simpleRecordMap.contains(ssp)) { return createSimpleRecord(index); + } else if (profileRecordMap.contains(ssp)) { + return createProfileRecord(index); + } else if (companyRecordMap.contains(ssp)) { + return createCompanyRecord(index); + } else if (pageViewRecordMap.contains(ssp)) { + return createPageViewRecord(index); } else { return createComplexRecord(index); } @@ -117,6 +177,29 @@ public class TestAvroSystemFactory implements SystemFactory { return record; } + private Object createProfileRecord(int index) { + GenericRecord record = new GenericData.Record(Profile.SCHEMA$); + record.put("id", index); + record.put("name", profiles[index % profiles.length]); + record.put("companyId", includeNullForeignKeys && (index % 2 == 0) ? null : index % companies.length); + return record; + } + + private Object createCompanyRecord(int index) { + GenericRecord record = new GenericData.Record(Company.SCHEMA$); + record.put("id", index); + record.put("name", companies[index % companies.length]); + return record; + } + + private Object createPageViewRecord(int index) { + GenericRecord record = new GenericData.Record(PageView.SCHEMA$); + // All even profileId foreign keys are null + record.put("profileId", includeNullForeignKeys && (index % 2 == 0) ? null : index); + record.put("pageKey", pagekeys[index % pagekeys.length]); + return record; + } + private Object createComplexRecord(int index) { GenericRecord record = new GenericData.Record(ComplexRecord.SCHEMA$); record.put("id", index); http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java index 92766f6..b8b2814 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java @@ -28,15 +28,20 @@ import org.apache.samza.config.TaskConfig; import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory; import org.apache.samza.sql.avro.AvroRelConverterFactory; import org.apache.samza.sql.avro.ConfigBasedAvroRelSchemaProviderFactory; +import org.apache.samza.sql.avro.schemas.Company; import org.apache.samza.sql.avro.schemas.ComplexRecord; +import org.apache.samza.sql.avro.schemas.EnrichedPageView; +import org.apache.samza.sql.avro.schemas.PageView; +import org.apache.samza.sql.avro.schemas.Profile; import org.apache.samza.sql.avro.schemas.SimpleRecord; import org.apache.samza.sql.fn.FlattenUdf; import org.apache.samza.sql.fn.RegexMatchUdf; import org.apache.samza.sql.impl.ConfigBasedSourceResolverFactory; import org.apache.samza.sql.impl.ConfigBasedUdfResolver; -import org.apache.samza.sql.interfaces.SqlSystemStreamConfig; +import org.apache.samza.sql.interfaces.SqlSystemSourceConfig; import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; import org.apache.samza.sql.system.TestAvroSystemFactory; +import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory; import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; @@ -48,11 +53,21 @@ public class SamzaSqlTestConfig { public static final String SAMZA_SYSTEM_TEST_AVRO = "testavro"; public static Map<String, String> fetchStaticConfigsWithFactories(int numberOfMessages) { + return fetchStaticConfigsWithFactories(new HashMap<>(), numberOfMessages, false); + } + + public static Map<String, String> fetchStaticConfigsWithFactories(Map<String, String> props, int numberOfMessages) { + return fetchStaticConfigsWithFactories(props, numberOfMessages, false); + } + + public static Map<String, String> fetchStaticConfigsWithFactories(Map<String, String> props, int numberOfMessages, + boolean includeNullForeignKeys) { HashMap<String, String> staticConfigs = new HashMap<>(); staticConfigs.put(JobConfig.JOB_NAME(), "sql-job"); staticConfigs.put(JobConfig.PROCESSOR_ID(), "1"); staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); + staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName()); staticConfigs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SOURCE_RESOLVER, "config"); @@ -75,8 +90,10 @@ public class SamzaSqlTestConfig { staticConfigs.put(avroSystemConfigPrefix + "samza.factory", TestAvroSystemFactory.class.getName()); staticConfigs.put(avroSystemConfigPrefix + TestAvroSystemFactory.CFG_NUM_MESSAGES, String.valueOf(numberOfMessages)); - staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemStreamConfig.CFG_SAMZA_REL_CONVERTER, "avro"); - staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemStreamConfig.CFG_REL_SCHEMA_PROVIDER, "config"); + staticConfigs.put(avroSystemConfigPrefix + TestAvroSystemFactory.CFG_INCLUDE_NULL_FOREIGN_KEYS, + includeNullForeignKeys ? "true" : "false"); + staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_SAMZA_REL_CONVERTER, "avro"); + staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_REL_SCHEMA_PROVIDER, "config"); String avroSamzaToRelMsgConverterDomain = String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "avro"); @@ -88,17 +105,31 @@ public class SamzaSqlTestConfig { staticConfigs.put(configAvroRelSchemaProviderDomain + SamzaSqlApplicationConfig.CFG_FACTORY, ConfigBasedAvroRelSchemaProviderFactory.class.getName()); - staticConfigs.put( - configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, - "testavro", "SIMPLE1"), SimpleRecord.SCHEMA$.toString()); + staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, + "testavro", "SIMPLE1"), SimpleRecord.SCHEMA$.toString()); + + staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, + "testavro", "outputTopic"), ComplexRecord.SCHEMA$.toString()); + + staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, + "testavro", "COMPLEX1"), ComplexRecord.SCHEMA$.toString()); + + staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, + "testavro", "Profile"), ComplexRecord.SCHEMA$.toString()); + + staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, + "testavro", "PROFILE"), Profile.SCHEMA$.toString()); + + staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, + "testavro", "PAGEVIEW"), PageView.SCHEMA$.toString()); + + staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, + "testavro", "COMPANY"), Company.SCHEMA$.toString()); - staticConfigs.put( - configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, - "testavro", "outputTopic"), ComplexRecord.SCHEMA$.toString()); + staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, + "testavro", "enrichedPageViewTopic"), EnrichedPageView.SCHEMA$.toString()); - staticConfigs.put( - configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, - "testavro", "COMPLEX1"), ComplexRecord.SCHEMA$.toString()); + staticConfigs.putAll(props); return staticConfigs; } http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java index b9cf803..4f1d08e 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSourceResolverFactory.java @@ -23,7 +23,7 @@ import java.util.Arrays; import org.apache.samza.config.Config; import org.apache.samza.sql.interfaces.SourceResolver; import org.apache.samza.sql.interfaces.SourceResolverFactory; -import org.apache.samza.sql.interfaces.SqlSystemStreamConfig; +import org.apache.samza.sql.interfaces.SqlSystemSourceConfig; public class TestSourceResolverFactory implements SourceResolverFactory { @@ -33,6 +33,7 @@ public class TestSourceResolverFactory implements SourceResolverFactory { } private class TestSourceResolver implements SourceResolver { + private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table"; private final Config config; public TestSourceResolver(Config config) { @@ -40,11 +41,26 @@ public class TestSourceResolverFactory implements SourceResolverFactory { } @Override - public SqlSystemStreamConfig fetchSourceInfo(String sourceName) { + public SqlSystemSourceConfig fetchSourceInfo(String sourceName) { String[] sourceComponents = sourceName.split("\\."); - Config systemConfigs = config.subset(sourceComponents[0] + "."); - return new SqlSystemStreamConfig(sourceComponents[0], sourceComponents[sourceComponents.length - 1], - Arrays.asList(sourceComponents), systemConfigs); + boolean isTable = false; + int systemIdx = 0; + int endIdx = sourceComponents.length - 1; + int streamIdx = endIdx; + + if (sourceComponents[endIdx].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) { + isTable = true; + streamIdx = endIdx - 1; + } + Config systemConfigs = config.subset(sourceComponents[systemIdx] + "."); + return new SqlSystemSourceConfig(sourceComponents[systemIdx], sourceComponents[streamIdx], + Arrays.asList(sourceComponents), systemConfigs, isTable); + } + + @Override + public boolean isTable(String sourceName) { + String[] sourceComponents = sourceName.split("\\."); + return sourceComponents[sourceComponents.length - 1].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD); } } } http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-sql/src/test/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/resources/log4j.xml b/samza-sql/src/test/resources/log4j.xml index 6259b48..9d29506 100644 --- a/samza-sql/src/test/resources/log4j.xml +++ b/samza-sql/src/test/resources/log4j.xml @@ -28,6 +28,12 @@ @log4j.loggers.public_access@ <logger name="org.apache" additivity="false"> + <level value="INFO"/> + <appender-ref ref="console"/> + </logger> + + @log4j.loggers.public_access@ + <logger name="org.apache.calcite.sql2rel" additivity="false"> <level value="DEBUG"/> <appender-ref ref="console"/> </logger>
