Repository: samza Updated Branches: refs/heads/master 1a7e27097 -> e2adf8f99
http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlRemoteTable.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlRemoteTable.java b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlRemoteTable.java deleted file mode 100644 index dc9fd27..0000000 --- a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlRemoteTable.java +++ /dev/null @@ -1,231 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -package org.apache.samza.sql.e2e; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import org.apache.avro.generic.GenericRecord; -import org.apache.samza.config.MapConfig; -import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; -import org.apache.samza.sql.runner.SamzaSqlApplicationRunner; -import org.apache.samza.sql.system.TestAvroSystemFactory; -import org.apache.samza.sql.testutil.JsonUtil; -import org.apache.samza.sql.testutil.SamzaSqlTestConfig; -import org.apache.samza.sql.testutil.RemoteStoreIOResolverTestFactory; -import org.junit.Assert; -import org.junit.Test; - - -public class TestSamzaSqlRemoteTable { - @Test - public void testSinkEndToEndWithKey() { - int numMessages = 20; - - RemoteStoreIOResolverTestFactory.records.clear(); - - Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages); - - String sql = "Insert into testRemoteStore.testTable.`$table` select __key__, id, name from testavro.SIMPLE1"; - List<String> sqlStmts = Arrays.asList(sql); - staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - appRunnable.runAndWaitForFinish(); - - Assert.assertEquals(numMessages, RemoteStoreIOResolverTestFactory.records.size()); - } - - @Test - public void testSinkEndToEndWithKeyWithNullRecords() { - int numMessages = 20; - - RemoteStoreIOResolverTestFactory.records.clear(); - - Map<String, String> props = new HashMap<>(); - Map<String, String> staticConfigs = - SamzaSqlTestConfig.fetchStaticConfigsWithFactories(props, numMessages, false, true); - - String sql = "Insert into testRemoteStore.testTable.`$table` select __key__, id, name from testavro.SIMPLE1"; - List<String> sqlStmts = Arrays.asList(sql); - staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - appRunnable.runAndWaitForFinish(); - - Assert.assertEquals(numMessages - ((numMessages - 1) / TestAvroSystemFactory.NULL_RECORD_FREQUENCY + 1), - RemoteStoreIOResolverTestFactory.records.size()); - } - - @Test (expected = AssertionError.class) - public void testSinkEndToEndWithoutKey() { - int numMessages = 20; - - RemoteStoreIOResolverTestFactory.records.clear(); - Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages); - - String sql = "Insert into testRemoteStore.testTable.`$table`(id,name) select id, name from testavro.SIMPLE1"; - List<String> sqlStmts = Arrays.asList(sql); - staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - appRunnable.runAndWaitForFinish(); - - Assert.assertEquals(numMessages, RemoteStoreIOResolverTestFactory.records.size()); - } - - @Test - public void testSourceEndToEndWithKey() { - int numMessages = 20; - - TestAvroSystemFactory.messages.clear(); - RemoteStoreIOResolverTestFactory.records.clear(); - Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages); - populateProfileTable(staticConfigs, numMessages); - - String sql = - "Insert into testavro.enrichedPageViewTopic " - + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName," - + " p.name as profileName, p.address as profileAddress " - + "from testRemoteStore.Profile.`$table` as p " - + "join testavro.PAGEVIEW as pv " - + " on p.__key__ = pv.profileId"; - - List<String> sqlStmts = Arrays.asList(sql); - staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - appRunnable.runAndWaitForFinish(); - - List<String> outMessages = TestAvroSystemFactory.messages.stream() - .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," - + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" : - ((GenericRecord) x.getMessage()).get("profileName").toString())) - .collect(Collectors.toList()); - Assert.assertEquals(numMessages, outMessages.size()); - List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameJoin(numMessages); - Assert.assertEquals(expectedOutMessages, outMessages); - } - - @Test - public void testSourceEndToEndWithKeyWithNullForeignKeys() { - int numMessages = 20; - - TestAvroSystemFactory.messages.clear(); - RemoteStoreIOResolverTestFactory.records.clear(); - Map<String, String> staticConfigs = - SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), numMessages, true); - populateProfileTable(staticConfigs, numMessages); - - String sql = - "Insert into testavro.enrichedPageViewTopic " - + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName," - + " p.name as profileName, p.address as profileAddress " - + "from testRemoteStore.Profile.`$table` as p " - + "join testavro.PAGEVIEW as pv " - + " on p.__key__ = pv.profileId"; - - List<String> sqlStmts = Arrays.asList(sql); - staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - appRunnable.runAndWaitForFinish(); - - List<String> outMessages = TestAvroSystemFactory.messages.stream() - .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," - + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" : - ((GenericRecord) x.getMessage()).get("profileName").toString())) - .collect(Collectors.toList()); - Assert.assertEquals(numMessages / 2, outMessages.size()); - List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameJoinWithNullForeignKeys(numMessages); - Assert.assertEquals(expectedOutMessages, outMessages); - } - - @Test - public void testSourceEndToEndWithKeyWithNullForeignKeysRightOuterJoin() { - int numMessages = 20; - - TestAvroSystemFactory.messages.clear(); - RemoteStoreIOResolverTestFactory.records.clear(); - Map<String, String> staticConfigs = - SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), numMessages, true); - populateProfileTable(staticConfigs, numMessages); - - String sql = - "Insert into testavro.enrichedPageViewTopic " - + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName," - + " p.name as profileName, p.address as profileAddress " - + "from testRemoteStore.Profile.`$table` as p " - + "right join testavro.PAGEVIEW as pv " - + " on p.__key__ = pv.profileId"; - - List<String> sqlStmts = Arrays.asList(sql); - staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - appRunnable.runAndWaitForFinish(); - - List<String> outMessages = TestAvroSystemFactory.messages.stream() - .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," - + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" : - ((GenericRecord) x.getMessage()).get("profileName").toString())) - .collect(Collectors.toList()); - Assert.assertEquals(numMessages, outMessages.size()); - List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameOuterJoinWithNullForeignKeys(numMessages); - Assert.assertEquals(expectedOutMessages, outMessages); - } - - @Test - public void testSameJoinTargetSinkEndToEndRightOuterJoin() { - int numMessages = 21; - - TestAvroSystemFactory.messages.clear(); - RemoteStoreIOResolverTestFactory.records.clear(); - Map<String, String> staticConfigs = - SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), numMessages, true); - populateProfileTable(staticConfigs, numMessages); - - // The below query reads messages from a stream and deletes the corresponding records from the table. - // Since the stream has alternate messages with null foreign key, only half of the messages will have - // successful joins and hence only half of the records in the table will be deleted. Although join is - // redundant here, keeping it just for testing purpose. - String sql = - "Insert into testRemoteStore.Profile.`$table` " - + "select p.__key__ as __key__ " - + "from testRemoteStore.Profile.`$table` as p " - + "join testavro.PAGEVIEW as pv " - + " on p.__key__ = pv.profileId "; - - List<String> sqlStmts = Arrays.asList(sql); - staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - appRunnable.runAndWaitForFinish(); - - Assert.assertEquals((numMessages + 1) / 2, RemoteStoreIOResolverTestFactory.records.size()); - } - - private void populateProfileTable(Map<String, String> staticConfigs, int numMessages) { - RemoteStoreIOResolverTestFactory.records.clear(); - - String sql = "Insert into testRemoteStore.Profile.`$table` select * from testavro.PROFILE"; - List<String> sqlStmts = Arrays.asList(sql); - staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner appRunnable = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - appRunnable.runAndWaitForFinish(); - - Assert.assertEquals(numMessages, RemoteStoreIOResolverTestFactory.records.size()); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java index cada93d..e1ca2e6 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java @@ -27,6 +27,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import org.apache.commons.lang.RandomStringUtils; @@ -42,6 +43,7 @@ import org.apache.samza.config.MapConfig; import org.apache.samza.config.StreamConfig; import org.apache.samza.config.TaskConfig; import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory; +import org.apache.samza.context.ExternalContext; import org.apache.samza.job.ApplicationStatus; import org.apache.samza.operators.KV; import org.apache.samza.runtime.LocalApplicationRunner; @@ -264,8 +266,9 @@ public class TestRunner { Preconditions.checkState(!timeout.isZero() || !timeout.isNegative(), "Timeouts should be positive"); // Cleaning store directories to ensure current run does not pick up state from previous run deleteStoreDirectories(); - final LocalApplicationRunner runner = new LocalApplicationRunner(app, new MapConfig(configs)); - runner.run(); + Config config = new MapConfig(configs); + final LocalApplicationRunner runner = new LocalApplicationRunner(app, config); + runner.run(buildExternalContext(config).orElse(null)); if (!runner.waitForFinish(timeout)) { throw new SamzaException("Timed out waiting for application to finish"); } @@ -410,4 +413,13 @@ public class TestRunner { this.configs.put(keySerdeConfigKey, null); this.configs.put(msgSerdeConfigKey, null); } + + private static Optional<ExternalContext> buildExternalContext(Config config) { + /* + * By default, use an empty ExternalContext here. In a custom fork of Samza, this can be implemented to pass + * a non-empty ExternalContext. Only config should be used to build the external context. In the future, components + * like the application descriptor may not be available. + */ + return Optional.empty(); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java b/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java index 34b264f..ed73725 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java @@ -19,10 +19,12 @@ package org.apache.samza.test.integration; +import java.util.Optional; import joptsimple.OptionSet; import org.apache.samza.application.SamzaApplication; import org.apache.samza.application.ApplicationUtil; import org.apache.samza.config.Config; +import org.apache.samza.context.ExternalContext; import org.apache.samza.runtime.ApplicationRunnerMain; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.runtime.ApplicationRunners; @@ -51,10 +53,19 @@ public class LocalApplicationRunnerMain { try { LOGGER.info("Launching stream application: {} to run.", app); - runner.run(); + runner.run(buildExternalContext(config).orElse(null)); runner.waitForFinish(); } catch (Exception e) { LOGGER.error("Exception occurred when running application: {}.", app, e); } } + + private static Optional<ExternalContext> buildExternalContext(Config config) { + /* + * By default, use an empty ExternalContext here. In a custom fork of Samza, this can be implemented to pass + * a non-empty ExternalContext. Only config should be used to build the external context. In the future, components + * like the application descriptor may not be available. + */ + return Optional.empty(); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java index 6f381e2..45b5668 100644 --- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Random; import org.apache.samza.application.descriptors.StreamApplicationDescriptor; import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.MapConfig; @@ -109,9 +110,11 @@ public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness { } } - final ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new PipelineApplication(), new MapConfig(configs)); + Config config = new MapConfig(configs); + final ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new PipelineApplication(), + config); - runner.run(); + executeRun(runner, config); runner.waitForFinish(); assertEquals(received.size(), count * partitionCount); http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java index 74c32b4..12b6f6d 100644 --- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java @@ -164,8 +164,9 @@ public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness { } } - final ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new TestStreamApp(), new MapConfig(configs)); - runner.run(); + Config config = new MapConfig(configs); + final ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new TestStreamApp(), config); + executeRun(runner, config); // processors are only available when the app is running Map<String, StreamOperatorTask> tasks = getTaskOperationGraphs((MockLocalApplicationRunner) runner); http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java index 0c3d755..16953f0 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java @@ -255,7 +255,7 @@ public class StreamApplicationIntegrationTestHarness extends AbstractIntegration Config config = new MapConfig(configMap); ApplicationRunner runner = ApplicationRunners.getApplicationRunner(streamApplication, config); - runner.run(); + executeRun(runner, config); MessageStreamAssert.waitForComplete(); return new RunApplicationContext(runner, config); http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java index 6faf80b..e2b458e 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java @@ -263,7 +263,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne if (hasSecondProcessorJoined.compareAndSet(false, true)) { previousJobModelVersion[0] = zkUtils.getJobModelVersion(); previousJobModel[0] = zkUtils.getJobModel(previousJobModelVersion[0]); - appRunner2.run(); + executeRun(appRunner2, localTestConfig2); try { // Wait for appRunner2 to register with zookeeper. secondProcessorRegistered.await(); @@ -279,7 +279,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne ApplicationRunner appRunner1 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( TEST_SYSTEM, inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic, null, callback, kafkaEventsConsumedLatch, localTestConfig1), localTestConfig1); - appRunner1.run(); + executeRun(appRunner1, localTestConfig1); kafkaEventsConsumedLatch.await(); @@ -345,7 +345,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne if (hasSecondProcessorJoined.compareAndSet(false, true)) { previousJobModelVersion[0] = zkUtils.getJobModelVersion(); previousJobModel[0] = zkUtils.getJobModel(previousJobModelVersion[0]); - appRunner2.run(); + executeRun(appRunner2, testAppConfig2); try { // Wait for appRunner2 to register with zookeeper. secondProcessorRegistered.await(); @@ -363,7 +363,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne ApplicationRunner appRunner1 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, null, streamApplicationCallback, kafkaEventsConsumedLatch, testAppConfig1), testAppConfig1); - appRunner1.run(); + executeRun(appRunner1, testAppConfig1); kafkaEventsConsumedLatch.await(); @@ -424,8 +424,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, processedMessagesLatch3, null, kafkaEventsConsumedLatch, applicationConfig3), applicationConfig3); - appRunner1.run(); - appRunner2.run(); + executeRun(appRunner1, applicationConfig1); + executeRun(appRunner2, applicationConfig2); // Wait until all processors have processed a message. processedMessagesLatch1.await(); @@ -451,7 +451,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne kafkaEventsConsumedLatch.await(); publishKafkaEvents(inputKafkaTopic, 0, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); - appRunner3.run(); + executeRun(appRunner3, applicationConfig3); processedMessagesLatch3.await(); // Verifications after killing the leader. @@ -489,8 +489,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne applicationConfig2), applicationConfig2); // Run stream applications. - appRunner1.run(); - appRunner2.run(); + executeRun(appRunner1, applicationConfig1); + executeRun(appRunner2, applicationConfig2); // Wait for message processing to run in both the processors. processedMessagesLatch1.await(); @@ -505,7 +505,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne // Fail when the duplicate processor joins. expectedException.expect(SamzaException.class); try { - appRunner3.run(); + executeRun(appRunner3, applicationConfig2); } finally { appRunner1.kill(); appRunner2.kill(); @@ -545,8 +545,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne applicationConfig2), applicationConfig2); // Run stream application. - appRunner1.run(); - appRunner2.run(); + executeRun(appRunner1, applicationConfig1); + executeRun(appRunner2, applicationConfig2); processedMessagesLatch1.await(); processedMessagesLatch2.await(); @@ -571,7 +571,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne ApplicationRunner appRunner3 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1), applicationConfig1); - appRunner3.run(); + executeRun(appRunner3, applicationConfig1); processedMessagesLatch1.await(); @@ -615,8 +615,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2), applicationConfig2); - appRunner1.run(); - appRunner2.run(); + executeRun(appRunner1, applicationConfig1); + executeRun(appRunner2, applicationConfig2); processedMessagesLatch1.await(); processedMessagesLatch2.await(); @@ -636,7 +636,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne ApplicationRunner appRunner3 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, processedMessagesLatch3, null, kafkaEventsConsumedLatch, applicationConfig3), applicationConfig3); - appRunner3.run(); + executeRun(appRunner3, applicationConfig3); publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); @@ -674,7 +674,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch1, applicationConfig1), applicationConfig1); - appRunner1.run(); + executeRun(appRunner1, applicationConfig1); processedMessagesLatch1.await(); String jobModelVersion = zkUtils.getJobModelVersion(); http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-test/src/test/java/org/apache/samza/test/samzasql/SamzaSqlIntegrationTestHarness.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/SamzaSqlIntegrationTestHarness.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/SamzaSqlIntegrationTestHarness.java new file mode 100644 index 0000000..475617c --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/SamzaSqlIntegrationTestHarness.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.test.samzasql; + +import org.apache.samza.config.Config; +import org.apache.samza.sql.runner.SamzaSqlApplicationRunner; +import org.apache.samza.test.harness.AbstractIntegrationTestHarness; + + +public class SamzaSqlIntegrationTestHarness extends AbstractIntegrationTestHarness { + protected void runApplication(Config config) { + SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, config); + executeRun(runner, config); + runner.waitForFinish(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java index d593870..a009e41 100644 --- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java +++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java @@ -35,14 +35,12 @@ import org.apache.calcite.plan.RelOptUtil; import org.apache.samza.config.MapConfig; import org.apache.samza.serializers.JsonSerdeV2Factory; import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; -import org.apache.samza.sql.runner.SamzaSqlApplicationRunner; import org.apache.samza.sql.system.TestAvroSystemFactory; import org.apache.samza.sql.testutil.JsonUtil; import org.apache.samza.sql.testutil.MyTestUdf; import org.apache.samza.sql.testutil.SampleRelConverterFactory; import org.apache.samza.sql.testutil.SamzaSqlTestConfig; import org.apache.samza.system.OutgoingMessageEnvelope; -import org.apache.samza.test.harness.AbstractIntegrationTestHarness; import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; @@ -51,7 +49,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { +public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness { private static final Logger LOG = LoggerFactory.getLogger(TestSamzaSqlEndToEnd.class); private final Map<String, String> configs = new HashMap<>(); @@ -80,8 +78,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { String sql = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1"; List<String> sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); + runApplication(new MapConfig(staticConfigs)); List<Integer> outMessages = TestAvroSystemFactory.messages.stream() .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) @@ -104,8 +101,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { String sql = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1"; List<String> sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); + runApplication(new MapConfig(staticConfigs)); List<Integer> outMessages = TestAvroSystemFactory.messages.stream() .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) @@ -124,8 +120,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { String sql = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1"; List<String> sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); + runApplication(new MapConfig(staticConfigs)); List<Integer> outMessages = TestAvroSystemFactory.messages.stream() .map(x -> x.getMessage() == null ? null : Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) @@ -149,8 +144,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { String sql = "Insert into testavro2.SIMPLE1 select * from testavro.SIMPLE1"; List<String> sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); + runApplication(new MapConfig(staticConfigs)); List<Integer> outMessages = TestAvroSystemFactory.messages.stream() .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) @@ -169,8 +163,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { String sql2 = "Insert into testavro.SIMPLE3 select * from testavro.SIMPLE2"; List<String> sqlStmts = Arrays.asList(sql1, sql2); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); + runApplication(new MapConfig(staticConfigs)); List<Integer> outMessages = TestAvroSystemFactory.messages.stream() .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) .sorted() @@ -194,8 +187,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { List<String> sqlStmts = Arrays.asList(sql1, sql2); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); + runApplication(new MapConfig(staticConfigs)); List<Integer> outMessages = TestAvroSystemFactory.messages.stream() .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) .sorted() @@ -215,8 +207,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { String sql2 = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1"; List<String> sqlStmts = Arrays.asList(sql1, sql2); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); + runApplication(new MapConfig(staticConfigs)); List<Integer> outMessages = TestAvroSystemFactory.messages.stream() .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) .sorted() @@ -236,8 +227,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { String sql2 = "Insert into testavro.SIMPLE3 select * from testavro.SIMPLE1"; List<String> sqlStmts = Arrays.asList(sql1, sql2); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); + runApplication(new MapConfig(staticConfigs)); List<Integer> outMessages = TestAvroSystemFactory.messages.stream() .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) .sorted() @@ -258,8 +248,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { + " select id, TIMESTAMPDIFF(HOUR, CURRENT_TIMESTAMP, LOCALTIMESTAMP) + MONTH(CURRENT_DATE) as long_value from testavro.SIMPLE1"; List<String> sqlStmts = Arrays.asList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); + runApplication(new MapConfig(staticConfigs)); List<Integer> outMessages = TestAvroSystemFactory.messages.stream() .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString())) @@ -282,8 +271,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { + " from testavro.COMPLEX1"; List<String> sqlStmts = Collections.singletonList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); + runApplication(new MapConfig(staticConfigs)); List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages); @@ -304,8 +292,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { "Insert into testavro.outputTopic(id) select Flatten(a) as id from (select MyTestArray(id) a from testavro.SIMPLE1)"; List<String> sqlStmts = Collections.singletonList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); + runApplication(new MapConfig(staticConfigs)); List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages); @@ -326,8 +313,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { + "select id, MyTest(id) as long_value from testavro.SIMPLE1"; List<String> sqlStmts = Collections.singletonList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); + runApplication(new MapConfig(staticConfigs)); LOG.info("output Messages " + TestAvroSystemFactory.messages); @@ -354,8 +340,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { + "where RegexMatch('.*4', name)"; List<String> sqlStmts = Collections.singletonList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); + runApplication(new MapConfig(staticConfigs)); LOG.info("output Messages " + TestAvroSystemFactory.messages); // There should be two messages that contain "4" @@ -379,8 +364,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { List<String> sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); + runApplication(new MapConfig(staticConfigs)); List<String> outMessages = TestAvroSystemFactory.messages.stream() .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," @@ -409,8 +393,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { List<String> sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); + runApplication(new MapConfig(staticConfigs)); List<String> outMessages = TestAvroSystemFactory.messages.stream() .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," @@ -439,8 +422,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { List<String> sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); + runApplication(new MapConfig(staticConfigs)); List<String> outMessages = TestAvroSystemFactory.messages.stream() .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," @@ -469,8 +451,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { List<String> sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); + runApplication(new MapConfig(staticConfigs)); List<String> outMessages = TestAvroSystemFactory.messages.stream() .map(x -> { @@ -505,8 +486,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { List<String> sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); + runApplication(new MapConfig(staticConfigs)); List<String> outMessages = TestAvroSystemFactory.messages.stream() .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," @@ -538,8 +518,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { List<String> sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); + runApplication(new MapConfig(staticConfigs)); List<String> outMessages = TestAvroSystemFactory.messages.stream() .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," @@ -568,8 +547,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { List<String> sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); + runApplication(new MapConfig(staticConfigs)); List<String> outMessages = TestAvroSystemFactory.messages.stream() .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," @@ -598,8 +576,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { List<String> sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); + runApplication(new MapConfig(staticConfigs)); List<String> outMessages = TestAvroSystemFactory.messages.stream() .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," @@ -630,8 +607,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { List<String> sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); + runApplication(new MapConfig(staticConfigs)); List<String> outMessages = TestAvroSystemFactory.messages.stream() .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," @@ -661,8 +637,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { List<String> sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); + runApplication(new MapConfig(staticConfigs)); List<String> outMessages = TestAvroSystemFactory.messages.stream() .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," @@ -692,8 +667,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { List<String> sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); + runApplication(new MapConfig(staticConfigs)); List<String> outMessages = TestAvroSystemFactory.messages.stream() .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," @@ -726,8 +700,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { List<String> sqlStmts = Arrays.asList(sql); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); - SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs)); - runner.runAndWaitForFinish(); + runApplication(new MapConfig(staticConfigs)); // Let's capture the list of windows/counts per key. HashMap<String, List<String>> pageKeyCountListMap = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java new file mode 100644 index 0000000..439b8f7 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java @@ -0,0 +1,222 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.test.samzasql; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.avro.generic.GenericRecord; +import org.apache.samza.config.MapConfig; +import org.apache.samza.sql.runner.SamzaSqlApplicationConfig; +import org.apache.samza.sql.system.TestAvroSystemFactory; +import org.apache.samza.sql.testutil.JsonUtil; +import org.apache.samza.sql.testutil.SamzaSqlTestConfig; +import org.apache.samza.sql.testutil.RemoteStoreIOResolverTestFactory; +import org.junit.Assert; +import org.junit.Test; + + +public class TestSamzaSqlRemoteTable extends SamzaSqlIntegrationTestHarness { + @Test + public void testSinkEndToEndWithKey() { + int numMessages = 20; + + RemoteStoreIOResolverTestFactory.records.clear(); + + Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages); + + String sql = "Insert into testRemoteStore.testTable.`$table` select __key__, id, name from testavro.SIMPLE1"; + List<String> sqlStmts = Arrays.asList(sql); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + runApplication(new MapConfig(staticConfigs)); + + Assert.assertEquals(numMessages, RemoteStoreIOResolverTestFactory.records.size()); + } + + @Test + public void testSinkEndToEndWithKeyWithNullRecords() { + int numMessages = 20; + + RemoteStoreIOResolverTestFactory.records.clear(); + + Map<String, String> props = new HashMap<>(); + Map<String, String> staticConfigs = + SamzaSqlTestConfig.fetchStaticConfigsWithFactories(props, numMessages, false, true); + + String sql = "Insert into testRemoteStore.testTable.`$table` select __key__, id, name from testavro.SIMPLE1"; + List<String> sqlStmts = Arrays.asList(sql); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + runApplication(new MapConfig(staticConfigs)); + + Assert.assertEquals(numMessages - ((numMessages - 1) / TestAvroSystemFactory.NULL_RECORD_FREQUENCY + 1), + RemoteStoreIOResolverTestFactory.records.size()); + } + + @Test (expected = AssertionError.class) + public void testSinkEndToEndWithoutKey() { + int numMessages = 20; + + RemoteStoreIOResolverTestFactory.records.clear(); + Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages); + + String sql = "Insert into testRemoteStore.testTable.`$table`(id,name) select id, name from testavro.SIMPLE1"; + List<String> sqlStmts = Arrays.asList(sql); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + runApplication(new MapConfig(staticConfigs)); + + Assert.assertEquals(numMessages, RemoteStoreIOResolverTestFactory.records.size()); + } + + @Test + public void testSourceEndToEndWithKey() { + int numMessages = 20; + + TestAvroSystemFactory.messages.clear(); + RemoteStoreIOResolverTestFactory.records.clear(); + Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages); + populateProfileTable(staticConfigs, numMessages); + + String sql = + "Insert into testavro.enrichedPageViewTopic " + + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName," + + " p.name as profileName, p.address as profileAddress " + + "from testRemoteStore.Profile.`$table` as p " + + "join testavro.PAGEVIEW as pv " + + " on p.__key__ = pv.profileId"; + + List<String> sqlStmts = Arrays.asList(sql); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + runApplication(new MapConfig(staticConfigs)); + + List<String> outMessages = TestAvroSystemFactory.messages.stream() + .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," + + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" : + ((GenericRecord) x.getMessage()).get("profileName").toString())) + .collect(Collectors.toList()); + Assert.assertEquals(numMessages, outMessages.size()); + List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameJoin(numMessages); + Assert.assertEquals(expectedOutMessages, outMessages); + } + + @Test + public void testSourceEndToEndWithKeyWithNullForeignKeys() { + int numMessages = 20; + + TestAvroSystemFactory.messages.clear(); + RemoteStoreIOResolverTestFactory.records.clear(); + Map<String, String> staticConfigs = + SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), numMessages, true); + populateProfileTable(staticConfigs, numMessages); + + String sql = + "Insert into testavro.enrichedPageViewTopic " + + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName," + + " p.name as profileName, p.address as profileAddress " + + "from testRemoteStore.Profile.`$table` as p " + + "join testavro.PAGEVIEW as pv " + + " on p.__key__ = pv.profileId"; + + List<String> sqlStmts = Arrays.asList(sql); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + runApplication(new MapConfig(staticConfigs)); + + List<String> outMessages = TestAvroSystemFactory.messages.stream() + .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," + + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" : + ((GenericRecord) x.getMessage()).get("profileName").toString())) + .collect(Collectors.toList()); + Assert.assertEquals(numMessages / 2, outMessages.size()); + List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameJoinWithNullForeignKeys(numMessages); + Assert.assertEquals(expectedOutMessages, outMessages); + } + + @Test + public void testSourceEndToEndWithKeyWithNullForeignKeysRightOuterJoin() { + int numMessages = 20; + + TestAvroSystemFactory.messages.clear(); + RemoteStoreIOResolverTestFactory.records.clear(); + Map<String, String> staticConfigs = + SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), numMessages, true); + populateProfileTable(staticConfigs, numMessages); + + String sql = + "Insert into testavro.enrichedPageViewTopic " + + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName," + + " p.name as profileName, p.address as profileAddress " + + "from testRemoteStore.Profile.`$table` as p " + + "right join testavro.PAGEVIEW as pv " + + " on p.__key__ = pv.profileId"; + + List<String> sqlStmts = Arrays.asList(sql); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + runApplication(new MapConfig(staticConfigs)); + + List<String> outMessages = TestAvroSystemFactory.messages.stream() + .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + "," + + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" : + ((GenericRecord) x.getMessage()).get("profileName").toString())) + .collect(Collectors.toList()); + Assert.assertEquals(numMessages, outMessages.size()); + List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameOuterJoinWithNullForeignKeys(numMessages); + Assert.assertEquals(expectedOutMessages, outMessages); + } + + @Test + public void testSameJoinTargetSinkEndToEndRightOuterJoin() { + int numMessages = 21; + + TestAvroSystemFactory.messages.clear(); + RemoteStoreIOResolverTestFactory.records.clear(); + Map<String, String> staticConfigs = + SamzaSqlTestConfig.fetchStaticConfigsWithFactories(new HashMap<>(), numMessages, true); + populateProfileTable(staticConfigs, numMessages); + + // The below query reads messages from a stream and deletes the corresponding records from the table. + // Since the stream has alternate messages with null foreign key, only half of the messages will have + // successful joins and hence only half of the records in the table will be deleted. Although join is + // redundant here, keeping it just for testing purpose. + String sql = + "Insert into testRemoteStore.Profile.`$table` " + + "select p.__key__ as __key__ " + + "from testRemoteStore.Profile.`$table` as p " + + "join testavro.PAGEVIEW as pv " + + " on p.__key__ = pv.profileId "; + + List<String> sqlStmts = Arrays.asList(sql); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + runApplication(new MapConfig(staticConfigs)); + + Assert.assertEquals((numMessages + 1) / 2, RemoteStoreIOResolverTestFactory.records.size()); + } + + private void populateProfileTable(Map<String, String> staticConfigs, int numMessages) { + RemoteStoreIOResolverTestFactory.records.clear(); + + String sql = "Insert into testRemoteStore.Profile.`$table` select * from testavro.PROFILE"; + List<String> sqlStmts = Arrays.asList(sql); + staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); + runApplication(new MapConfig(staticConfigs)); + + Assert.assertEquals(numMessages, RemoteStoreIOResolverTestFactory.records.size()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java index e112804..b447493 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java @@ -27,7 +27,9 @@ import java.util.Map; import org.apache.samza.application.StreamApplication; import org.apache.samza.application.TaskApplication; +import org.apache.samza.application.descriptors.StreamApplicationDescriptor; import org.apache.samza.application.descriptors.TaskApplicationDescriptor; +import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.MapConfig; @@ -78,7 +80,7 @@ import static org.junit.Assert.assertTrue; public class TestLocalTable extends AbstractIntegrationTestHarness { @Test - public void testSendTo() throws Exception { + public void testSendTo() throws Exception { int count = 10; Profile[] profiles = TestTableData.generateProfiles(count); @@ -104,8 +106,9 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { .sendTo(table); }; - final LocalApplicationRunner runner = new LocalApplicationRunner(app, new MapConfig(configs)); - runner.run(); + Config config = new MapConfig(configs); + final LocalApplicationRunner runner = new LocalApplicationRunner(app, config); + executeRun(runner, config); runner.waitForFinish(); for (int i = 0; i < partitionCount; i++) { @@ -115,48 +118,29 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { } } - static class TestStreamTableJoin { + static class StreamTableJoinApp implements StreamApplication { static List<PageView> received = new LinkedList<>(); static List<EnrichedPageView> joined = new LinkedList<>(); - final int count; - final int partitionCount; - final Map<String, String> configs; - - TestStreamTableJoin(int count, int partitionCount, Map<String, String> configs) { - this.count = count; - this.partitionCount = partitionCount; - this.configs = configs; - } - void runTest() { - final StreamApplication app = appDesc -> { - - Table<KV<Integer, Profile>> table = appDesc.getTable( - new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))); - DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test"); - GenericInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>()); - appDesc.getInputStream(profileISD) - .map(m -> new KV(m.getMemberId(), m)) - .sendTo(table); - - GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>()); - appDesc.getInputStream(pageViewISD) - .map(pv -> { - received.add(pv); - return pv; - }) - .partitionBy(PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1") - .join(table, new PageViewToProfileJoinFunction()) - .sink((m, collector, coordinator) -> joined.add(m)); - }; - - final LocalApplicationRunner runner = new LocalApplicationRunner(app, new MapConfig(configs)); - runner.run(); - runner.waitForFinish(); - - assertEquals(count * partitionCount, received.size()); - assertEquals(count * partitionCount, joined.size()); - assertTrue(joined.get(0) instanceof EnrichedPageView); + @Override + public void describe(StreamApplicationDescriptor appDesc) { + Table<KV<Integer, Profile>> table = appDesc.getTable( + new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))); + DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test"); + GenericInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>()); + appDesc.getInputStream(profileISD) + .map(m -> new KV(m.getMemberId(), m)) + .sendTo(table); + + GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>()); + appDesc.getInputStream(pageViewISD) + .map(pv -> { + received.add(pv); + return pv; + }) + .partitionBy(PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1") + .join(table, new PageViewToProfileJoinFunction()) + .sink((m, collector, coordinator) -> joined.add(m)); } } @@ -179,85 +163,66 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { configs.put("streams.Profile.source", Base64Serializer.serialize(profiles)); configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount)); - TestStreamTableJoin joinTest = new TestStreamTableJoin(count, partitionCount, configs); - joinTest.runTest(); + Config config = new MapConfig(configs); + final LocalApplicationRunner runner = new LocalApplicationRunner(new StreamTableJoinApp(), config); + executeRun(runner, config); + runner.waitForFinish(); + + assertEquals(count * partitionCount, StreamTableJoinApp.received.size()); + assertEquals(count * partitionCount, StreamTableJoinApp.joined.size()); + assertTrue(StreamTableJoinApp.joined.get(0) instanceof EnrichedPageView); } - static class TestDualStreamTableJoin { + static class DualStreamTableJoinApp implements StreamApplication { static List<Profile> sentToProfileTable1 = new LinkedList<>(); static List<Profile> sentToProfileTable2 = new LinkedList<>(); static List<EnrichedPageView> joinedPageViews1 = new LinkedList<>(); static List<EnrichedPageView> joinedPageViews2 = new LinkedList<>(); - final int count; - final int partitionCount; - final Map<String, String> configs; - - TestDualStreamTableJoin(int count, int partitionCount, Map<String, String> configs) { - this.count = count; - this.partitionCount = partitionCount; - this.configs = configs; - } - void runTest() { + @Override + public void describe(StreamApplicationDescriptor appDesc) { KVSerde<Integer, Profile> profileKVSerde = KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()); KVSerde<Integer, PageView> pageViewKVSerde = KVSerde.of(new IntegerSerde(), new PageViewJsonSerde()); PageViewToProfileJoinFunction joinFn1 = new PageViewToProfileJoinFunction(); PageViewToProfileJoinFunction joinFn2 = new PageViewToProfileJoinFunction(); - final StreamApplication app = appDesc -> { - - Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new InMemoryTableDescriptor("t1", profileKVSerde)); - - DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test"); - GenericInputDescriptor<Profile> profileISD1 = ksd.getInputDescriptor("Profile1", new NoOpSerde<>()); - GenericInputDescriptor<Profile> profileISD2 = ksd.getInputDescriptor("Profile2", new NoOpSerde<>()); - MessageStream<Profile> profileStream1 = appDesc.getInputStream(profileISD1); - MessageStream<Profile> profileStream2 = appDesc.getInputStream(profileISD2); - - profileStream1 - .map(m -> { - sentToProfileTable1.add(m); - return new KV(m.getMemberId(), m); - }) - .sendTo(profileTable); - profileStream2 - .map(m -> { - sentToProfileTable2.add(m); - return new KV(m.getMemberId(), m); - }) - .sendTo(profileTable); - - GenericInputDescriptor<PageView> pageViewISD1 = ksd.getInputDescriptor("PageView1", new NoOpSerde<PageView>()); - GenericInputDescriptor<PageView> pageViewISD2 = ksd.getInputDescriptor("PageView2", new NoOpSerde<PageView>()); - MessageStream<PageView> pageViewStream1 = appDesc.getInputStream(pageViewISD1); - MessageStream<PageView> pageViewStream2 = appDesc.getInputStream(pageViewISD2); - - pageViewStream1 - .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p1") - .join(profileTable, joinFn1) - .sink((m, collector, coordinator) -> joinedPageViews1.add(m)); - - pageViewStream2 - .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p2") - .join(profileTable, joinFn2) - .sink((m, collector, coordinator) -> joinedPageViews2.add(m)); - }; - - final LocalApplicationRunner runner = new LocalApplicationRunner(app, new MapConfig(configs)); - runner.run(); - runner.waitForFinish(); - - assertEquals(count * partitionCount, sentToProfileTable1.size()); - assertEquals(count * partitionCount, sentToProfileTable2.size()); - - assertEquals(count * partitionCount, joinedPageViews1.size()); - assertEquals(count * partitionCount, joinedPageViews2.size()); - assertTrue(joinedPageViews1.get(0) instanceof EnrichedPageView); - assertTrue(joinedPageViews2.get(0) instanceof EnrichedPageView); + Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new InMemoryTableDescriptor("t1", profileKVSerde)); + DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test"); + GenericInputDescriptor<Profile> profileISD1 = ksd.getInputDescriptor("Profile1", new NoOpSerde<>()); + GenericInputDescriptor<Profile> profileISD2 = ksd.getInputDescriptor("Profile2", new NoOpSerde<>()); + MessageStream<Profile> profileStream1 = appDesc.getInputStream(profileISD1); + MessageStream<Profile> profileStream2 = appDesc.getInputStream(profileISD2); + + profileStream1 + .map(m -> { + sentToProfileTable1.add(m); + return new KV(m.getMemberId(), m); + }) + .sendTo(profileTable); + profileStream2 + .map(m -> { + sentToProfileTable2.add(m); + return new KV(m.getMemberId(), m); + }) + .sendTo(profileTable); + + GenericInputDescriptor<PageView> pageViewISD1 = ksd.getInputDescriptor("PageView1", new NoOpSerde<PageView>()); + GenericInputDescriptor<PageView> pageViewISD2 = ksd.getInputDescriptor("PageView2", new NoOpSerde<PageView>()); + MessageStream<PageView> pageViewStream1 = appDesc.getInputStream(pageViewISD1); + MessageStream<PageView> pageViewStream2 = appDesc.getInputStream(pageViewISD2); + + pageViewStream1 + .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p1") + .join(profileTable, joinFn1) + .sink((m, collector, coordinator) -> joinedPageViews1.add(m)); + + pageViewStream2 + .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p2") + .join(profileTable, joinFn2) + .sink((m, collector, coordinator) -> joinedPageViews2.add(m)); } - } @Test @@ -288,8 +253,18 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { configs.put("streams.PageView2.source", Base64Serializer.serialize(pageViews)); configs.put("streams.PageView2.partitionCount", String.valueOf(partitionCount)); - TestDualStreamTableJoin dualJoinTest = new TestDualStreamTableJoin(count, partitionCount, configs); - dualJoinTest.runTest(); + Config config = new MapConfig(configs); + final LocalApplicationRunner runner = new LocalApplicationRunner(new DualStreamTableJoinApp(), config); + executeRun(runner, config); + runner.waitForFinish(); + + assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable1.size()); + assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable2.size()); + + assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews1.size()); + assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews2.size()); + assertTrue(DualStreamTableJoinApp.joinedPageViews1.get(0) instanceof EnrichedPageView); + assertTrue(DualStreamTableJoinApp.joinedPageViews2.get(0) instanceof EnrichedPageView); } static Map<String, String> getBaseJobConfig(String bootstrapUrl, String zkConnect) { @@ -351,8 +326,9 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { configs.put("streams.PageView.partitionCount", String.valueOf(4)); configs.put("task.inputs", "test.PageView"); - final LocalApplicationRunner runner = new LocalApplicationRunner(new MyTaskApplication(), new MapConfig(configs)); - runner.run(); + Config config = new MapConfig(configs); + final LocalApplicationRunner runner = new LocalApplicationRunner(new MyTaskApplication(), config); + executeRun(runner, config); runner.waitForFinish(); } http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java index c9228af..3de8300 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java @@ -38,6 +38,7 @@ import java.util.stream.Collectors; import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplication; import org.apache.samza.application.descriptors.StreamApplicationDescriptor; +import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; import org.apache.samza.context.Context; import org.apache.samza.context.MockContext; @@ -223,8 +224,9 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness { .sendTo(outputTable); }; - final LocalApplicationRunner runner = new LocalApplicationRunner(app, new MapConfig(configs)); - runner.run(); + Config config = new MapConfig(configs); + final LocalApplicationRunner runner = new LocalApplicationRunner(app, config); + executeRun(runner, config); runner.waitForFinish(); int numExpected = count * partitionCount; http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala b/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala index 2f18875..1340ca1 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala @@ -22,7 +22,9 @@ import java.util.Properties import kafka.server.KafkaConfig import kafka.utils.TestUtils -import org.apache.samza.config.{JobConfig, KafkaConsumerConfig, MapConfig} +import org.apache.samza.config.{Config, JobConfig, KafkaConsumerConfig, MapConfig} +import org.apache.samza.context.ExternalContext +import org.apache.samza.runtime.ApplicationRunner import org.apache.samza.system.kafka.{KafkaSystemAdmin, KafkaSystemConsumer} /** @@ -83,4 +85,16 @@ abstract class AbstractIntegrationTestHarness extends AbstractKafkaServerTestHar new KafkaSystemAdmin(system, new MapConfig(map), KafkaSystemConsumer.createKafkaConsumerImpl(system, consumerConfig)); } + protected def executeRun(applicationRunner: ApplicationRunner, config: Config): Unit = { + applicationRunner.run(buildExternalContext(config).orNull) + } + + private def buildExternalContext(config: Config): Option[ExternalContext] = { + /* + * By default, use an empty ExternalContext here. In a custom fork of Samza, this can be implemented to pass + * a non-empty ExternalContext. Only config should be used to build the external context. In the future, components + * like the application descriptor may not be available. + */ + None + } } \ No newline at end of file
