This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c5c3a5  SAMZA-2306: Use in-memory system for SQL tests in samza-test 
(#1142)
0c5c3a5 is described below

commit 0c5c3a56104b0872a2cc7692fd87db691621a887
Author: Cameron Lee <[email protected]>
AuthorDate: Wed Aug 21 10:44:26 2019 -0700

    SAMZA-2306: Use in-memory system for SQL tests in samza-test (#1142)
---
 .../harness/InMemoryIntegrationTestHarness.java    |  63 +++++++++++++
 .../samzasql/SamzaSqlIntegrationTestHarness.java   |   7 +-
 .../samza/test/samzasql/TestSamzaSqlEndToEnd.java  | 103 ++++++++-------------
 3 files changed, 109 insertions(+), 64 deletions(-)

diff --git 
a/samza-test/src/test/java/org/apache/samza/test/harness/InMemoryIntegrationTestHarness.java
 
b/samza-test/src/test/java/org/apache/samza/test/harness/InMemoryIntegrationTestHarness.java
new file mode 100644
index 0000000..f120ac3
--- /dev/null
+++ 
b/samza-test/src/test/java/org/apache/samza/test/harness/InMemoryIntegrationTestHarness.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.test.harness;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.InMemorySystemConfig;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.SystemConfig;
+import org.apache.samza.context.ExternalContext;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.system.inmemory.InMemorySystemFactory;
+
+
+/**
+ * Provides helpers for configuring an in-memory system to be used for tests 
and executing those tests.
+ *
+ * This is somewhat based on {@link IntegrationTestHarness}, but it avoids 
using Kafka/Zookeeper.
+ */
+public class InMemoryIntegrationTestHarness {
+  protected static final String IN_MEMORY = "inmemory";
+
+  protected Config baseInMemorySystemConfigs() {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(String.format(SystemConfig.SYSTEM_FACTORY_FORMAT, 
IN_MEMORY), InMemorySystemFactory.class.getName());
+    configMap.put(InMemorySystemConfig.INMEMORY_SCOPE, 
RandomStringUtils.random(10, true, true));
+    configMap.put(JobConfig.JOB_DEFAULT_SYSTEM, IN_MEMORY);
+    return new MapConfig(configMap);
+  }
+
+  protected void executeRun(ApplicationRunner applicationRunner, Config 
config) {
+    applicationRunner.run(buildExternalContext(config).orElse(null));
+  }
+
+  private Optional<ExternalContext> buildExternalContext(Config config) {
+    /*
+     * By default, use an empty ExternalContext here. In a custom fork of 
Samza, this can be implemented to pass
+     * a non-empty ExternalContext. Only config should be used to build the 
external context. In the future, components
+     * like the application descriptor may not be available.
+     */
+    return Optional.empty();
+  }
+}
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/SamzaSqlIntegrationTestHarness.java
 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/SamzaSqlIntegrationTestHarness.java
index d41418a..28bad3c 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/SamzaSqlIntegrationTestHarness.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/SamzaSqlIntegrationTestHarness.java
@@ -29,11 +29,11 @@ import 
org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
 import org.apache.samza.sql.util.SamzaSqlTestConfig;
 import org.apache.samza.system.MockSystemFactory;
 import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.test.harness.IntegrationTestHarness;
+import org.apache.samza.test.harness.InMemoryIntegrationTestHarness;
 import org.apache.samza.util.CoordinatorStreamUtil;
 
 
-public class SamzaSqlIntegrationTestHarness extends IntegrationTestHarness {
+public class SamzaSqlIntegrationTestHarness extends 
InMemoryIntegrationTestHarness {
 
   public static final String MOCK_METADATA_SYSTEM = "mockmetadatasystem";
 
@@ -45,6 +45,9 @@ public class SamzaSqlIntegrationTestHarness extends 
IntegrationTestHarness {
     HashMap<String, String> mapConfig = new HashMap<>();
     mapConfig.put(JobConfig.JOB_COORDINATOR_SYSTEM, MOCK_METADATA_SYSTEM);
     mapConfig.put(String.format(SystemConfig.SYSTEM_FACTORY_FORMAT, 
MOCK_METADATA_SYSTEM), MockSystemFactory.class.getName());
+
+    // configs for using in-memory system as the default system
+    mapConfig.putAll(baseInMemorySystemConfigs());
     mapConfig.putAll(config);
 
     SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(mapConfig));
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 0cec337..ac84fe8 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -33,7 +33,6 @@ import java.util.stream.IntStream;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.serializers.JsonSerdeV2Factory;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.system.TestAvroSystemFactory;
 import org.apache.samza.sql.util.JsonUtil;
@@ -42,7 +41,6 @@ import org.apache.samza.sql.util.SampleRelConverterFactory;
 import org.apache.samza.sql.util.SamzaSqlTestConfig;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -50,31 +48,14 @@ import org.slf4j.LoggerFactory;
 
 
 public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
-
   private static final Logger LOG = 
LoggerFactory.getLogger(TestSamzaSqlEndToEnd.class);
-  private final Map<String, String> configs = new HashMap<>();
-
-  @Before
-  public void setUp() {
-    super.setUp();
-    configs.put("systems.kafka.samza.factory", 
"org.apache.samza.system.kafka.KafkaSystemFactory");
-    configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
-    configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect());
-    configs.put("systems.kafka.samza.key.serde", "object");
-    configs.put("systems.kafka.samza.msg.serde", "samzaSqlRelMsg");
-    configs.put("systems.kafka.default.stream.replication.factor", "1");
-    configs.put("job.default.system", "kafka");
-
-    configs.put("serializers.registry.object.class", 
JsonSerdeV2Factory.class.getName());
-    configs.put("serializers.registry.samzaSqlRelMsg.class", 
JsonSerdeV2Factory.class.getName());
-  }
 
   @Test
   public void testEndToEnd() {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql = "Insert into testavro.simpleOutputTopic select * from 
testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
@@ -93,7 +74,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String avroSamzaToRelMsgConverterDomain =
         
String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, 
"avro");
     staticConfigs.put(avroSamzaToRelMsgConverterDomain + 
SamzaSqlApplicationConfig.CFG_FACTORY,
@@ -115,7 +96,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String avroSamzaToRelMsgConverterDomain =
         
String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, 
"avro");
     staticConfigs.put(avroSamzaToRelMsgConverterDomain + 
SamzaSqlApplicationConfig.CFG_FACTORY,
@@ -139,7 +120,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
 
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs =
-        SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 
numMessages, false, true);
+        
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(Collections.emptyMap(), 
numMessages, false, true);
     String sql = "Insert into testavro.simpleOutputTopic select * from 
testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
@@ -164,7 +145,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql = "Insert into testavro2.SIMPLE1 select * from 
testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
@@ -182,7 +163,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   public void testEndToEndMultiSqlStmts() {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql1 = "Insert into testavro.simpleOutputTopic select * from 
testavro.SIMPLE1";
     String sql2 = "Insert into testavro.SIMPLE3 select * from 
testavro.SIMPLE2";
     List<String> sqlStmts = Arrays.asList(sql1, sql2);
@@ -202,7 +183,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   public void testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput() {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql1 = "Insert into testavro.SIMPLE1 select * from 
testavro.SIMPLE2";
     String sql2 = "Insert into testavro.simpleOutputTopic select * from 
testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql1, sql2);
@@ -223,7 +204,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   public void testEndToEndFanIn() {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql1 = "Insert into testavro.simpleOutputTopic select * from 
testavro.SIMPLE2";
     String sql2 = "Insert into testavro.simpleOutputTopic select * from 
testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql1, sql2);
@@ -243,7 +224,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   public void testEndToEndFanOut() {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql1 = "Insert into testavro.SIMPLE2 select * from 
testavro.SIMPLE1";
     String sql2 = "Insert into testavro.SIMPLE3 select * from 
testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql1, sql2);
@@ -264,7 +245,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql1 = "Insert into testavro.outputTopic(id, long_value) "
         + " select id, TIMESTAMPDIFF(HOUR, CURRENT_TIMESTAMP(), 
LOCALTIMESTAMP()) + MONTH(CURRENT_DATE()) as long_value from testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql1);
@@ -284,7 +265,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql1 = "Insert into testavro.outputTopic"
         + " select * from testavro.COMPLEX1 where bool_value IS TRUE";
     List<String> sqlStmts = Arrays.asList(sql1);
@@ -301,7 +282,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql1 = "Insert into testavro.outputTopic"
         + " select * from testavro.COMPLEX1 where id >= 0 and bool_value IS 
TRUE";
     List<String> sqlStmts = Arrays.asList(sql1);
@@ -318,7 +299,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     // BUG Compound boolean checks dont work in calcite, So workaround by 
casting it to String
     String sql1 = "Insert into testavro.outputTopic"
         + " select * from testavro.COMPLEX1 where id >= 0 and CAST(bool_value 
AS VARCHAR) =  'TRUE'";
@@ -336,7 +317,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql1 = "Insert into testavro.outputTopic(id, long_value) "
         + " select id, NOT(id = 5) as bool_value, CASE WHEN id IN (5, 6, 7) 
THEN CAST('foo' AS VARCHAR) WHEN id < 5 THEN CAST('bars' AS VARCHAR) ELSE NULL 
END as string_value from testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql1);
@@ -356,7 +337,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql1 = "Insert into testavro.outputTopic(id, long_value) "
         + " select id, name as string_value from testavro.SIMPLE1 where name 
like 'Name%'";
     List<String> sqlStmts = Arrays.asList(sql1);
@@ -375,7 +356,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   public void testEndToEndFlatten() throws Exception {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
 
     LOG.info(" Class Path : " + 
RelOptUtil.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath());
     String sql1 =
@@ -401,7 +382,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   public void testEndToEndComplexRecord() {
     int numMessages = 10;
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
 
     String sql1 =
         "Insert into testavro.outputTopic"
@@ -421,7 +402,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   public void testEndToEndNestedRecord() {
     int numMessages = 10;
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
 
     String sql1 =
         "Insert into testavro.outputTopic"
@@ -440,7 +421,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   public void testEndToEndFlattenWithUdf() throws Exception {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql1 =
         "Insert into testavro.outputTopic(id) select Flatten(MyTestArray(id)) 
as id from testavro.SIMPLE1";
     List<String> sqlStmts = Collections.singletonList(sql1);
@@ -461,7 +442,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   public void testEndToEndSubQuery() throws Exception {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql1 =
         "Insert into testavro.outputTopic(id) select Flatten(a) as id from 
(select MyTestArray(id) a from testavro.SIMPLE1)";
     List<String> sqlStmts = Collections.singletonList(sql1);
@@ -482,7 +463,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   public void testUdfUnTypedArgumentToTypedUdf() {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql1 = "Insert into testavro.outputTopic(id, long_value) "
         + "select id, MyTest(MyTestObj(id)) as long_value from 
testavro.SIMPLE1";
     List<String> sqlStmts = Collections.singletonList(sql1);
@@ -502,7 +483,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   public void testEndToEndUdf() throws Exception {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql1 = "Insert into testavro.outputTopic(id, long_value) "
         + "select id, MYTest(id) as long_value from testavro.SIMPLE1";
     List<String> sqlStmts = Collections.singletonList(sql1);
@@ -526,7 +507,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   public void testEndToEndUdfWithDisabledArgCheck() throws Exception {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql1 = "Insert into testavro.PROFILE1(id, address) "
         + "select id, BuildOutputRecord('key', GetNestedField(address, 'zip')) 
as address from testavro.PROFILE";
     List<String> sqlStmts = Collections.singletonList(sql1);
@@ -546,7 +527,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   public void testEndToEndUdfPolymorphism() throws Exception {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql1 = "Insert into testavro.outputTopic(id, long_value) "
         + "select MyTestPoly(id) as long_value, MyTestPoly(name) as id from 
testavro.SIMPLE1";
     List<String> sqlStmts = Collections.singletonList(sql1);
@@ -570,7 +551,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
   public void testRegexMatchUdfInWhereClause() throws Exception {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql1 =
         "Insert into testavro.outputTopic(id) "
             + "select id "
@@ -590,8 +571,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
-    staticConfigs.putAll(configs);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
             + "select pv.pageKey as __key__, pv.pageKey as pageKey, 
coalesce(null, 'N/A') as companyName,"
@@ -619,8 +599,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
-    staticConfigs.putAll(configs);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
             + "select pv.pageKey as __key__, pv.pageKey as pageKey, 
coalesce(null, 'N/A') as companyName,"
@@ -648,8 +627,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
-    staticConfigs.putAll(configs);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
             + "select pv.pageKey as __key__, pv.pageKey as pageKey, 
coalesce(null, 'N/A') as companyName,"
@@ -677,8 +655,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
-    staticConfigs.putAll(configs);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
             + "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as 
companyName, p.name as profileName,"
@@ -711,8 +688,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
-    staticConfigs.putAll(configs);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
             + "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as 
companyName, p.name as profileName,"
@@ -745,7 +721,8 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages, true);
+    Map<String, String> staticConfigs =
+        
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(Collections.emptyMap(), 
numMessages, true);
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
             + "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as 
companyName, p.name as profileName,"
@@ -774,7 +751,8 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages, true);
+    Map<String, String> staticConfigs =
+        
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(Collections.emptyMap(), 
numMessages, true);
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
             + "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as 
companyName, p.name as profileName,"
@@ -803,7 +781,8 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages, true);
+    Map<String, String> staticConfigs =
+        
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(Collections.emptyMap(), 
numMessages, true);
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
             + "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as 
companyName, p.name as profileName,"
@@ -833,7 +812,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
             + "select pv.pageKey as __key__, pv.pageKey as pageKey, c.name as 
companyName, p.name as profileName,"
@@ -863,7 +842,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
             + "select pv.pageKey as __key__, pv.pageKey as pageKey, c.name as 
companyName, p.name as profileName,"
@@ -893,7 +872,7 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
-    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
     String sql =
         "Insert into testavro.enrichedPageViewTopic "
             + "select pv.pageKey as __key__, pv.pageKey as pageKey, c.name as 
companyName, p.name as profileName,"
@@ -928,8 +907,8 @@ public class TestSamzaSqlEndToEnd extends 
SamzaSqlIntegrationTestHarness {
 
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs =
-        SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, 
numMessages, false, false, windowDurationMs);
-    staticConfigs.putAll(configs);
+        
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(Collections.emptyMap(), 
numMessages, false, false,
+            windowDurationMs);
     String sql =
         "Insert into testavro.pageViewCountTopic"
             + " select 'SampleJob' as jobName, pv.pageKey, count(*) as `count`"

Reply via email to