This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 0c5c3a5 SAMZA-2306: Use in-memory system for SQL tests in samza-test
(#1142)
0c5c3a5 is described below
commit 0c5c3a56104b0872a2cc7692fd87db691621a887
Author: Cameron Lee <[email protected]>
AuthorDate: Wed Aug 21 10:44:26 2019 -0700
SAMZA-2306: Use in-memory system for SQL tests in samza-test (#1142)
---
.../harness/InMemoryIntegrationTestHarness.java | 63 +++++++++++++
.../samzasql/SamzaSqlIntegrationTestHarness.java | 7 +-
.../samza/test/samzasql/TestSamzaSqlEndToEnd.java | 103 ++++++++-------------
3 files changed, 109 insertions(+), 64 deletions(-)
diff --git
a/samza-test/src/test/java/org/apache/samza/test/harness/InMemoryIntegrationTestHarness.java
b/samza-test/src/test/java/org/apache/samza/test/harness/InMemoryIntegrationTestHarness.java
new file mode 100644
index 0000000..f120ac3
--- /dev/null
+++
b/samza-test/src/test/java/org/apache/samza/test/harness/InMemoryIntegrationTestHarness.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.test.harness;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.InMemorySystemConfig;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.SystemConfig;
+import org.apache.samza.context.ExternalContext;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.system.inmemory.InMemorySystemFactory;
+
+
+/**
+ * Provides helpers for configuring an in-memory system to be used for tests
and executing those tests.
+ *
+ * This is somewhat based on {@link IntegrationTestHarness}, but it avoids
using Kafka/Zookeeper.
+ */
+public class InMemoryIntegrationTestHarness {
+ protected static final String IN_MEMORY = "inmemory";
+
+ protected Config baseInMemorySystemConfigs() {
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put(String.format(SystemConfig.SYSTEM_FACTORY_FORMAT,
IN_MEMORY), InMemorySystemFactory.class.getName());
+ configMap.put(InMemorySystemConfig.INMEMORY_SCOPE,
RandomStringUtils.random(10, true, true));
+ configMap.put(JobConfig.JOB_DEFAULT_SYSTEM, IN_MEMORY);
+ return new MapConfig(configMap);
+ }
+
+ protected void executeRun(ApplicationRunner applicationRunner, Config
config) {
+ applicationRunner.run(buildExternalContext(config).orElse(null));
+ }
+
+ private Optional<ExternalContext> buildExternalContext(Config config) {
+ /*
+ * By default, use an empty ExternalContext here. In a custom fork of
Samza, this can be implemented to pass
+ * a non-empty ExternalContext. Only config should be used to build the
external context. In the future, components
+ * like the application descriptor may not be available.
+ */
+ return Optional.empty();
+ }
+}
diff --git
a/samza-test/src/test/java/org/apache/samza/test/samzasql/SamzaSqlIntegrationTestHarness.java
b/samza-test/src/test/java/org/apache/samza/test/samzasql/SamzaSqlIntegrationTestHarness.java
index d41418a..28bad3c 100644
---
a/samza-test/src/test/java/org/apache/samza/test/samzasql/SamzaSqlIntegrationTestHarness.java
+++
b/samza-test/src/test/java/org/apache/samza/test/samzasql/SamzaSqlIntegrationTestHarness.java
@@ -29,11 +29,11 @@ import
org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
import org.apache.samza.sql.util.SamzaSqlTestConfig;
import org.apache.samza.system.MockSystemFactory;
import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.test.harness.IntegrationTestHarness;
+import org.apache.samza.test.harness.InMemoryIntegrationTestHarness;
import org.apache.samza.util.CoordinatorStreamUtil;
-public class SamzaSqlIntegrationTestHarness extends IntegrationTestHarness {
+public class SamzaSqlIntegrationTestHarness extends
InMemoryIntegrationTestHarness {
public static final String MOCK_METADATA_SYSTEM = "mockmetadatasystem";
@@ -45,6 +45,9 @@ public class SamzaSqlIntegrationTestHarness extends
IntegrationTestHarness {
HashMap<String, String> mapConfig = new HashMap<>();
mapConfig.put(JobConfig.JOB_COORDINATOR_SYSTEM, MOCK_METADATA_SYSTEM);
mapConfig.put(String.format(SystemConfig.SYSTEM_FACTORY_FORMAT,
MOCK_METADATA_SYSTEM), MockSystemFactory.class.getName());
+
+ // configs for using in-memory system as the default system
+ mapConfig.putAll(baseInMemorySystemConfigs());
mapConfig.putAll(config);
SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new
MapConfig(mapConfig));
diff --git
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 0cec337..ac84fe8 100644
---
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -33,7 +33,6 @@ import java.util.stream.IntStream;
import org.apache.avro.generic.GenericRecord;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.samza.config.MapConfig;
-import org.apache.samza.serializers.JsonSerdeV2Factory;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.system.TestAvroSystemFactory;
import org.apache.samza.sql.util.JsonUtil;
@@ -42,7 +41,6 @@ import org.apache.samza.sql.util.SampleRelConverterFactory;
import org.apache.samza.sql.util.SamzaSqlTestConfig;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
@@ -50,31 +48,14 @@ import org.slf4j.LoggerFactory;
public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
-
private static final Logger LOG =
LoggerFactory.getLogger(TestSamzaSqlEndToEnd.class);
- private final Map<String, String> configs = new HashMap<>();
-
- @Before
- public void setUp() {
- super.setUp();
- configs.put("systems.kafka.samza.factory",
"org.apache.samza.system.kafka.KafkaSystemFactory");
- configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
- configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect());
- configs.put("systems.kafka.samza.key.serde", "object");
- configs.put("systems.kafka.samza.msg.serde", "samzaSqlRelMsg");
- configs.put("systems.kafka.default.stream.replication.factor", "1");
- configs.put("job.default.system", "kafka");
-
- configs.put("serializers.registry.object.class",
JsonSerdeV2Factory.class.getName());
- configs.put("serializers.registry.samzaSqlRelMsg.class",
JsonSerdeV2Factory.class.getName());
- }
@Test
public void testEndToEnd() {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql = "Insert into testavro.simpleOutputTopic select * from
testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
@@ -93,7 +74,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String avroSamzaToRelMsgConverterDomain =
String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN,
"avro");
staticConfigs.put(avroSamzaToRelMsgConverterDomain +
SamzaSqlApplicationConfig.CFG_FACTORY,
@@ -115,7 +96,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String avroSamzaToRelMsgConverterDomain =
String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN,
"avro");
staticConfigs.put(avroSamzaToRelMsgConverterDomain +
SamzaSqlApplicationConfig.CFG_FACTORY,
@@ -139,7 +120,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs =
- SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs,
numMessages, false, true);
+
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(Collections.emptyMap(),
numMessages, false, true);
String sql = "Insert into testavro.simpleOutputTopic select * from
testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
@@ -164,7 +145,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql = "Insert into testavro2.SIMPLE1 select * from
testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON,
JsonUtil.toJson(sqlStmts));
@@ -182,7 +163,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
public void testEndToEndMultiSqlStmts() {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 = "Insert into testavro.simpleOutputTopic select * from
testavro.SIMPLE1";
String sql2 = "Insert into testavro.SIMPLE3 select * from
testavro.SIMPLE2";
List<String> sqlStmts = Arrays.asList(sql1, sql2);
@@ -202,7 +183,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
public void testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput() {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 = "Insert into testavro.SIMPLE1 select * from
testavro.SIMPLE2";
String sql2 = "Insert into testavro.simpleOutputTopic select * from
testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql1, sql2);
@@ -223,7 +204,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
public void testEndToEndFanIn() {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 = "Insert into testavro.simpleOutputTopic select * from
testavro.SIMPLE2";
String sql2 = "Insert into testavro.simpleOutputTopic select * from
testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql1, sql2);
@@ -243,7 +224,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
public void testEndToEndFanOut() {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 = "Insert into testavro.SIMPLE2 select * from
testavro.SIMPLE1";
String sql2 = "Insert into testavro.SIMPLE3 select * from
testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql1, sql2);
@@ -264,7 +245,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 = "Insert into testavro.outputTopic(id, long_value) "
+ " select id, TIMESTAMPDIFF(HOUR, CURRENT_TIMESTAMP(),
LOCALTIMESTAMP()) + MONTH(CURRENT_DATE()) as long_value from testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql1);
@@ -284,7 +265,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 = "Insert into testavro.outputTopic"
+ " select * from testavro.COMPLEX1 where bool_value IS TRUE";
List<String> sqlStmts = Arrays.asList(sql1);
@@ -301,7 +282,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 = "Insert into testavro.outputTopic"
+ " select * from testavro.COMPLEX1 where id >= 0 and bool_value IS
TRUE";
List<String> sqlStmts = Arrays.asList(sql1);
@@ -318,7 +299,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
// BUG Compound boolean checks dont work in calcite, So workaround by
casting it to String
String sql1 = "Insert into testavro.outputTopic"
+ " select * from testavro.COMPLEX1 where id >= 0 and CAST(bool_value
AS VARCHAR) = 'TRUE'";
@@ -336,7 +317,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 = "Insert into testavro.outputTopic(id, long_value) "
+ " select id, NOT(id = 5) as bool_value, CASE WHEN id IN (5, 6, 7)
THEN CAST('foo' AS VARCHAR) WHEN id < 5 THEN CAST('bars' AS VARCHAR) ELSE NULL
END as string_value from testavro.SIMPLE1";
List<String> sqlStmts = Arrays.asList(sql1);
@@ -356,7 +337,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 = "Insert into testavro.outputTopic(id, long_value) "
+ " select id, name as string_value from testavro.SIMPLE1 where name
like 'Name%'";
List<String> sqlStmts = Arrays.asList(sql1);
@@ -375,7 +356,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
public void testEndToEndFlatten() throws Exception {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
LOG.info(" Class Path : " +
RelOptUtil.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath());
String sql1 =
@@ -401,7 +382,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
public void testEndToEndComplexRecord() {
int numMessages = 10;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 =
"Insert into testavro.outputTopic"
@@ -421,7 +402,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
public void testEndToEndNestedRecord() {
int numMessages = 10;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 =
"Insert into testavro.outputTopic"
@@ -440,7 +421,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
public void testEndToEndFlattenWithUdf() throws Exception {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 =
"Insert into testavro.outputTopic(id) select Flatten(MyTestArray(id))
as id from testavro.SIMPLE1";
List<String> sqlStmts = Collections.singletonList(sql1);
@@ -461,7 +442,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
public void testEndToEndSubQuery() throws Exception {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 =
"Insert into testavro.outputTopic(id) select Flatten(a) as id from
(select MyTestArray(id) a from testavro.SIMPLE1)";
List<String> sqlStmts = Collections.singletonList(sql1);
@@ -482,7 +463,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
public void testUdfUnTypedArgumentToTypedUdf() {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 = "Insert into testavro.outputTopic(id, long_value) "
+ "select id, MyTest(MyTestObj(id)) as long_value from
testavro.SIMPLE1";
List<String> sqlStmts = Collections.singletonList(sql1);
@@ -502,7 +483,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
public void testEndToEndUdf() throws Exception {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 = "Insert into testavro.outputTopic(id, long_value) "
+ "select id, MYTest(id) as long_value from testavro.SIMPLE1";
List<String> sqlStmts = Collections.singletonList(sql1);
@@ -526,7 +507,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
public void testEndToEndUdfWithDisabledArgCheck() throws Exception {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 = "Insert into testavro.PROFILE1(id, address) "
+ "select id, BuildOutputRecord('key', GetNestedField(address, 'zip'))
as address from testavro.PROFILE";
List<String> sqlStmts = Collections.singletonList(sql1);
@@ -546,7 +527,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
public void testEndToEndUdfPolymorphism() throws Exception {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 = "Insert into testavro.outputTopic(id, long_value) "
+ "select MyTestPoly(id) as long_value, MyTestPoly(name) as id from
testavro.SIMPLE1";
List<String> sqlStmts = Collections.singletonList(sql1);
@@ -570,7 +551,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
public void testRegexMatchUdfInWhereClause() throws Exception {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql1 =
"Insert into testavro.outputTopic(id) "
+ "select id "
@@ -590,8 +571,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
- staticConfigs.putAll(configs);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql =
"Insert into testavro.enrichedPageViewTopic "
+ "select pv.pageKey as __key__, pv.pageKey as pageKey,
coalesce(null, 'N/A') as companyName,"
@@ -619,8 +599,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
- staticConfigs.putAll(configs);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql =
"Insert into testavro.enrichedPageViewTopic "
+ "select pv.pageKey as __key__, pv.pageKey as pageKey,
coalesce(null, 'N/A') as companyName,"
@@ -648,8 +627,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
- staticConfigs.putAll(configs);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql =
"Insert into testavro.enrichedPageViewTopic "
+ "select pv.pageKey as __key__, pv.pageKey as pageKey,
coalesce(null, 'N/A') as companyName,"
@@ -677,8 +655,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
- staticConfigs.putAll(configs);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql =
"Insert into testavro.enrichedPageViewTopic "
+ "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as
companyName, p.name as profileName,"
@@ -711,8 +688,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
- staticConfigs.putAll(configs);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql =
"Insert into testavro.enrichedPageViewTopic "
+ "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as
companyName, p.name as profileName,"
@@ -745,7 +721,8 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages, true);
+ Map<String, String> staticConfigs =
+
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(Collections.emptyMap(),
numMessages, true);
String sql =
"Insert into testavro.enrichedPageViewTopic "
+ "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as
companyName, p.name as profileName,"
@@ -774,7 +751,8 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages, true);
+ Map<String, String> staticConfigs =
+
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(Collections.emptyMap(),
numMessages, true);
String sql =
"Insert into testavro.enrichedPageViewTopic "
+ "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as
companyName, p.name as profileName,"
@@ -803,7 +781,8 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages, true);
+ Map<String, String> staticConfigs =
+
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(Collections.emptyMap(),
numMessages, true);
String sql =
"Insert into testavro.enrichedPageViewTopic "
+ "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as
companyName, p.name as profileName,"
@@ -833,7 +812,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql =
"Insert into testavro.enrichedPageViewTopic "
+ "select pv.pageKey as __key__, pv.pageKey as pageKey, c.name as
companyName, p.name as profileName,"
@@ -863,7 +842,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql =
"Insert into testavro.enrichedPageViewTopic "
+ "select pv.pageKey as __key__, pv.pageKey as pageKey, c.name as
companyName, p.name as profileName,"
@@ -893,7 +872,7 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
- Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ Map<String, String> staticConfigs =
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
String sql =
"Insert into testavro.enrichedPageViewTopic "
+ "select pv.pageKey as __key__, pv.pageKey as pageKey, c.name as
companyName, p.name as profileName,"
@@ -928,8 +907,8 @@ public class TestSamzaSqlEndToEnd extends
SamzaSqlIntegrationTestHarness {
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs =
- SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs,
numMessages, false, false, windowDurationMs);
- staticConfigs.putAll(configs);
+
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(Collections.emptyMap(),
numMessages, false, false,
+ windowDurationMs);
String sql =
"Insert into testavro.pageViewCountTopic"
+ " select 'SampleJob' as jobName, pv.pageKey, count(*) as `count`"