Repository: samza Updated Branches: refs/heads/master 2d7b0f52c -> 956cf412a
http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java new file mode 100644 index 0000000..3fff0f3 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java @@ -0,0 +1,385 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.test.samzasql; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.avro.generic.GenericRecord; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.samza.config.MapConfig; +import org.apache.samza.serializers.JsonSerdeV2Factory; +import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; +import org.apache.samza.sql.runner.SamzaSqlApplicationRunner; +import org.apache.samza.sql.system.TestAvroSystemFactory; +import org.apache.samza.sql.testutil.JsonUtil; +import org.apache.samza.sql.testutil.MyTestUdf; +import org.apache.samza.sql.testutil.SamzaSqlTestConfig; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.test.harness.AbstractIntegrationTestHarness; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { + + private static final Logger LOG = LoggerFactory.getLogger(TestSamzaSqlEndToEnd.class); + private final Map<String, String> configs = new HashMap<>(); + + @Before + public void setUp() { + super.setUp(); + configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory"); + configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl()); + configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect()); + configs.put("systems.kafka.samza.key.serde", "object"); + configs.put("systems.kafka.samza.msg.serde", "samzaSqlRelMsg"); + configs.put("systems.kafka.default.stream.replication.factor", "1"); + configs.put("job.default.system", "kafka"); + + configs.put("serializers.registry.object.class", JsonSerdeV2Factory.class.getName()); + configs.put("serializers.registry.samzaSqlRelMsg.class", JsonSerdeV2Factory.class.getName()); + } + + @Test + public void testEndToEnd() throws Exception { + int numMessages = 20; + + TestAvroSystemFactory.messages.clear(); + Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); + String sql1 = "Insert into testavro.outputTopic select id, CURRENT_TIME as long_value from testavro.SIMPLE1"; + List<String> sqlStmts = Arrays.asList(sql1); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); + runner.runAndWaitForFinish(); + + List<Integer> outMessages = TestAvroSystemFactory.messages.stream() + .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) + .sorted() + .collect(Collectors.toList()); + Assert.assertEquals(numMessages, outMessages.size()); + Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(outMessages)); + } + + @Test + public void testEndToEndFlatten() throws Exception { + int numMessages = 20; + TestAvroSystemFactory.messages.clear(); + Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); + LOG.info(" Class Path : " + RelOptUtil.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath()); + String sql1 = + "Insert into testavro.outputTopic select Flatten(array_values) as string_value, id from testavro.COMPLEX1"; + List<String> sqlStmts = Collections.singletonList(sql1); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); + runner.runAndWaitForFinish(); + + List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages); + + int expectedMessages = 0; + // Flatten de-normalizes the data. So there is separate record for each entry in the array. + for (int index = 1; index < numMessages; index++) { + expectedMessages = expectedMessages + Math.max(1, index); + } + Assert.assertEquals(expectedMessages, outMessages.size()); + } + + @Test + public void testEndToEndSubQuery() throws Exception { + int numMessages = 20; + TestAvroSystemFactory.messages.clear(); + Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); + String sql1 = + "Insert into testavro.outputTopic select Flatten(a) as id from (select MyTestArray(id) a from testavro.SIMPLE1)"; + List<String> sqlStmts = Collections.singletonList(sql1); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); + runner.runAndWaitForFinish(); + + List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages); + + int expectedMessages = 0; + // Flatten de-normalizes the data. So there is separate record for each entry in the array. + for (int index = 1; index < numMessages; index++) { + expectedMessages = expectedMessages + Math.max(1, index); + } + Assert.assertEquals(expectedMessages, outMessages.size()); + } + + @Test + public void testEndToEndUdf() throws Exception { + int numMessages = 20; + TestAvroSystemFactory.messages.clear(); + Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); + String sql1 = "Insert into testavro.outputTopic select id, MyTest(id) as long_value from testavro.SIMPLE1"; + List<String> sqlStmts = Collections.singletonList(sql1); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); + runner.runAndWaitForFinish(); + + LOG.info("output Messages " + TestAvroSystemFactory.messages); + + List<Integer> outMessages = TestAvroSystemFactory.messages.stream() + .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("long_value").toString())) + .sorted() + .collect(Collectors.toList()); + Assert.assertEquals(outMessages.size(), numMessages); + MyTestUdf udf = new MyTestUdf(); + + Assert.assertTrue( + IntStream.range(0, numMessages).map(udf::execute).boxed().collect(Collectors.toList()).equals(outMessages)); + } + + @Test + public void testRegexMatchUdfInWhereClause() throws Exception { + int numMessages = 20; + TestAvroSystemFactory.messages.clear(); + Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); + String sql1 = "Insert into testavro.outputTopic select id from testavro.SIMPLE1 where RegexMatch('.*4', Name)"; + List<String> sqlStmts = Collections.singletonList(sql1); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); + runner.runAndWaitForFinish(); + + LOG.info("output Messages " + TestAvroSystemFactory.messages); + // There should be two messages that contain "4" + Assert.assertEquals(TestAvroSystemFactory.messages.size(), 2); + } + + @Test + public void testEndToEndStreamTableInnerJoin() throws Exception { + int numMessages = 20; + + TestAvroSystemFactory.messages.clear(); + Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); + staticConfigs.putAll(configs); + String sql = + "Insert into testavro.enrichedPageViewTopic " + + "select pv.pageKey, p.name as profileName " + + "from testavro.PROFILE.`$table` as p " + + "join testavro.PAGEVIEW as pv " + + " on p.id = pv.profileId"; + + List<String> sqlStmts = Arrays.asList(sql); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); + runner.runAndWaitForFinish(); + + List<String> outMessages = TestAvroSystemFactory.messages.stream() + .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," + + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" : + ((GenericRecord) x.getMessage()).get("profileName").toString())) + .collect(Collectors.toList()); + Assert.assertEquals(numMessages, outMessages.size()); + List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameJoin(numMessages); + Assert.assertEquals(outMessages, expectedOutMessages); + } + + @Test + public void testEndToEndStreamTableInnerJoinWithFilter() throws Exception { + int numMessages = 20; + + TestAvroSystemFactory.messages.clear(); + Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); + staticConfigs.putAll(configs); + String sql = + "Insert into testavro.enrichedPageViewTopic " + + "select pv.pageKey, p.name as profileName " + + "from testavro.PROFILE.`$table` as p " + + "join testavro.PAGEVIEW as pv " + + " on p.id = pv.profileId " + + "where p.name = 'Mike'"; + + List<String> sqlStmts = Arrays.asList(sql); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); + runner.runAndWaitForFinish(); + + List<String> outMessages = TestAvroSystemFactory.messages.stream() + .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," + + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" : + ((GenericRecord) x.getMessage()).get("profileName").toString())) + .collect(Collectors.toList()); + Assert.assertEquals(4, outMessages.size()); + List<String> expectedOutMessages = + TestAvroSystemFactory.getPageKeyProfileNameJoin(numMessages) + .stream() + .filter(msg -> msg.endsWith("Mike")) + .collect(Collectors.toList()); + Assert.assertEquals(outMessages, expectedOutMessages); + } + + @Test + public void testEndToEndStreamTableInnerJoinWithNullForeignKeys() throws Exception { + int numMessages = 20; + + TestAvroSystemFactory.messages.clear(); + Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages, true); + String sql = + "Insert into testavro.enrichedPageViewTopic " + + "select pv.pageKey, p.name as profileName " + + "from testavro.PAGEVIEW as pv " + + "join testavro.PROFILE.`$table` as p " + + " on pv.profileId = p.id"; + + List<String> sqlStmts = Arrays.asList(sql); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); + runner.runAndWaitForFinish(); + + List<String> outMessages = TestAvroSystemFactory.messages.stream() + .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," + + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" : + ((GenericRecord) x.getMessage()).get("profileName").toString())) + .collect(Collectors.toList()); + // Half the foreign keys are null. + Assert.assertEquals(numMessages / 2, outMessages.size()); + List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameJoinWithNullForeignKeys(numMessages); + Assert.assertEquals(outMessages, expectedOutMessages); + } + + @Test + public void testEndToEndStreamTableLeftJoin() throws Exception { + int numMessages = 20; + + TestAvroSystemFactory.messages.clear(); + Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages, true); + String sql = + "Insert into testavro.enrichedPageViewTopic " + + "select pv.pageKey, p.name as profileName " + + "from testavro.PAGEVIEW as pv " + + "left join testavro.PROFILE.`$table` as p " + + " on pv.profileId = p.id"; + + List<String> sqlStmts = Arrays.asList(sql); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); + runner.runAndWaitForFinish(); + + List<String> outMessages = TestAvroSystemFactory.messages.stream() + .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," + + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" : + ((GenericRecord) x.getMessage()).get("profileName").toString())) + .collect(Collectors.toList()); + Assert.assertEquals(numMessages, outMessages.size()); + List<String> expectedOutMessages = + TestAvroSystemFactory.getPageKeyProfileNameOuterJoinWithNullForeignKeys(numMessages); + Assert.assertEquals(outMessages, expectedOutMessages); + } + + @Test + public void testEndToEndStreamTableRightJoin() throws Exception { + int numMessages = 20; + + TestAvroSystemFactory.messages.clear(); + Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages, true); + String sql = + "Insert into testavro.enrichedPageViewTopic " + + "select pv.pageKey, p.name as profileName " + + "from testavro.PROFILE.`$table` as p " + + "right join testavro.PAGEVIEW as pv " + + " on p.id = pv.profileId"; + + List<String> sqlStmts = Arrays.asList(sql); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); + runner.runAndWaitForFinish(); + + List<String> outMessages = TestAvroSystemFactory.messages.stream() + .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," + + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" : + ((GenericRecord) x.getMessage()).get("profileName").toString())) + .collect(Collectors.toList()); + Assert.assertEquals(numMessages, outMessages.size()); + List<String> expectedOutMessages = + TestAvroSystemFactory.getPageKeyProfileNameOuterJoinWithNullForeignKeys(numMessages); + Assert.assertEquals(outMessages, expectedOutMessages); + } + + @Test + public void testEndToEndStreamTableTableJoin() throws Exception { + int numMessages = 20; + + TestAvroSystemFactory.messages.clear(); + Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); + String sql = + "Insert into testavro.enrichedPageViewTopic " + + "select pv.pageKey, p.name as profileName, c.name as companyName " + + "from testavro.PAGEVIEW as pv " + + "join testavro.PROFILE.`$table` as p " + + " on p.id = pv.profileId " + + " join testavro.COMPANY.`$table` as c " + + " on p.companyId = c.id"; + + List<String> sqlStmts = Arrays.asList(sql); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); + runner.runAndWaitForFinish(); + + List<String> outMessages = TestAvroSystemFactory.messages.stream() + .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," + + ((GenericRecord) x.getMessage()).get("profileName").toString() + "," + + ((GenericRecord) x.getMessage()).get("companyName").toString()) + .collect(Collectors.toList()); + Assert.assertEquals(numMessages, outMessages.size()); + List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileCompanyNameJoin(numMessages); + Assert.assertEquals(outMessages, expectedOutMessages); + } + + @Test + public void testEndToEndStreamTableTableJoinWithCompositeKey() throws Exception { + int numMessages = 20; + + TestAvroSystemFactory.messages.clear(); + Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); + String sql = + "Insert into testavro.enrichedPageViewTopic " + + "select pv.pageKey, p.name as profileName, c.name as companyName " + + "from testavro.PAGEVIEW as pv " + + "join testavro.PROFILE.`$table` as p " + + " on p.id = pv.profileId " + + " join testavro.COMPANY.`$table` as c " + + " on p.companyId = c.id AND c.id = pv.profileId"; + + List<String> sqlStmts = Arrays.asList(sql); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); + runner.runAndWaitForFinish(); + + List<String> outMessages = TestAvroSystemFactory.messages.stream() + .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," + + ((GenericRecord) x.getMessage()).get("profileName").toString() + "," + + ((GenericRecord) x.getMessage()).get("companyName").toString()) + .collect(Collectors.toList()); + Assert.assertEquals(TestAvroSystemFactory.companies.length, outMessages.size()); + List<String> expectedOutMessages = + TestAvroSystemFactory.getPageKeyProfileCompanyNameJoin(TestAvroSystemFactory.companies.length); + Assert.assertEquals(outMessages, expectedOutMessages); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/956cf412/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java ---------------------------------------------------------------------- diff --git a/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java b/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java index 11a49f7..251ea16 100644 --- a/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java +++ b/samza-tools/src/main/java/org/apache/samza/tools/SamzaSqlConsole.java @@ -40,7 +40,7 @@ import org.apache.samza.sql.fn.FlattenUdf; import org.apache.samza.sql.fn.RegexMatchUdf; import org.apache.samza.sql.impl.ConfigBasedSourceResolverFactory; import org.apache.samza.sql.impl.ConfigBasedUdfResolver; -import org.apache.samza.sql.interfaces.SqlSystemStreamConfig; +import org.apache.samza.sql.interfaces.SqlSystemSourceConfig; import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; import org.apache.samza.sql.runner.SamzaSqlApplicationRunner; import org.apache.samza.sql.testutil.JsonUtil; @@ -148,15 +148,15 @@ public class SamzaSqlConsole { staticConfigs.put(kafkaSystemConfigPrefix + "samza.offset.reset", "true"); staticConfigs.put(kafkaSystemConfigPrefix + "samza.offset.default", "oldest"); - staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemStreamConfig.CFG_SAMZA_REL_CONVERTER, "avro"); - staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemStreamConfig.CFG_REL_SCHEMA_PROVIDER, "config"); + staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_SAMZA_REL_CONVERTER, "avro"); + staticConfigs.put(avroSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_REL_SCHEMA_PROVIDER, "config"); String logSystemConfigPrefix = String.format(ConfigBasedSourceResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_LOG); String logSamzaSqlConfigPrefix = configSourceResolverDomain + String.format("%s.", SAMZA_SYSTEM_LOG); staticConfigs.put(logSystemConfigPrefix + "samza.factory", ConsoleLoggingSystemFactory.class.getName()); - staticConfigs.put(logSamzaSqlConfigPrefix + SqlSystemStreamConfig.CFG_SAMZA_REL_CONVERTER, "json"); - staticConfigs.put(logSamzaSqlConfigPrefix + SqlSystemStreamConfig.CFG_REL_SCHEMA_PROVIDER, "config"); + staticConfigs.put(logSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_SAMZA_REL_CONVERTER, "json"); + staticConfigs.put(logSamzaSqlConfigPrefix + SqlSystemSourceConfig.CFG_REL_SCHEMA_PROVIDER, "config"); String avroSamzaToRelMsgConverterDomain = String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "avro");
