Repository: samza Updated Branches: refs/heads/master 89989fdab -> 119e2fa01
SAMZA-1396 TestZkLocalApplicationRunner tests fails after SAMZA-1385 * Fixes ZkPath issues * Fixes appname / jobname mismatch Author: Navina Ramesh <[email protected]> Reviewers: Xinyu Liu <[email protected]>, Bharath Kumarasubramanian <[email protected]> Closes #274 from navina/SAMZA-1396 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/119e2fa0 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/119e2fa0 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/119e2fa0 Branch: refs/heads/master Commit: 119e2fa0126a66f949cba8d2e2d8cbb2a36cc1ec Parents: 89989fd Author: Navina Ramesh <[email protected]> Authored: Wed Aug 16 14:42:18 2017 -0700 Committer: navina <[email protected]> Committed: Wed Aug 16 14:42:18 2017 -0700 ---------------------------------------------------------------------- .../samza/zk/ZkJobCoordinatorFactory.java | 2 +- .../processor/TestZkLocalApplicationRunner.java | 62 ++++++++++++-------- 2 files changed, 39 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/119e2fa0/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java index 08d826e..563bf4c 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java @@ -60,7 +60,7 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory { return new ZkUtils(keyBuilder, zkClient, zkConfig.getZkConnectionTimeoutMs(), metricsRegistry); } - private String getJobCoordinationZkPath(Config config) { + public static String getJobCoordinationZkPath(Config config) { JobConfig jobConfig = new JobConfig(config); String appId = new ApplicationConfig(config).getGlobalAppId(); String jobName = jobConfig.getName().isDefined() http://git-wip-us.apache.org/repos/asf/samza/blob/119e2fa0/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java index 9ca48ad..76fd046 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java @@ -23,15 +23,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; import kafka.admin.AdminUtils; import kafka.server.KafkaServer; import kafka.utils.TestUtils; @@ -58,6 +49,7 @@ import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.test.StandaloneIntegrationTestHarness; import org.apache.samza.test.StandaloneTestUtils; import org.apache.samza.util.NoOpMetricsRegistry; +import org.apache.samza.zk.ZkJobCoordinatorFactory; import org.apache.samza.zk.ZkKeyBuilder; import org.apache.samza.zk.ZkUtils; import org.junit.Rule; @@ -68,7 +60,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.collection.JavaConverters; -import static org.junit.Assert.*; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * Integration tests for {@link LocalApplicationRunner}. @@ -87,7 +90,6 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne private static final String TEST_TASK_GROUPER_FACTORY = "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"; private static final String TEST_JOB_COORDINATOR_FACTORY = "org.apache.samza.zk.ZkJobCoordinatorFactory"; private static final String TEST_SYSTEM_FACTORY = "org.apache.samza.system.kafka.KafkaSystemFactory"; - private static final String TEST_JOB_NAME = "test-job"; private static final String TASK_SHUTDOWN_MS = "3000"; private static final String JOB_DEBOUNCE_TIME_MS = "1000"; private static final String[] PROCESSOR_IDS = new String[] {"0000000000", "0000000001", "0000000002"}; @@ -118,16 +120,23 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne testStreamAppId = String.format("test-app-id-%s", uniqueTestId); inputKafkaTopic = String.format("test-input-topic-%s", uniqueTestId); outputKafkaTopic = String.format("test-output-topic-%s", uniqueTestId); + + // Set up stream application config map with the given testStreamAppName, testStreamAppId and test kafka system + // TODO: processorId should typically come up from a processorID generator as processor.id will be deprecated in 0.14.0+ + Map<String, String> configMap = + buildStreamApplicationConfigMap(TEST_SYSTEM, inputKafkaTopic, testStreamAppName, testStreamAppId); + configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]); + applicationConfig1 = new ApplicationConfig(new MapConfig(configMap)); + configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]); + applicationConfig2 = new ApplicationConfig(new MapConfig(configMap)); + configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[2]); + applicationConfig3 = new ApplicationConfig(new MapConfig(configMap)); + ZkClient zkClient = new ZkClient(zkConnect()); - ZkKeyBuilder zkKeyBuilder = new ZkKeyBuilder(String.format("app-%s-%s", testStreamAppName, testStreamAppId)); + ZkKeyBuilder zkKeyBuilder = new ZkKeyBuilder(ZkJobCoordinatorFactory.getJobCoordinationZkPath(applicationConfig1)); zkUtils = new ZkUtils(zkKeyBuilder, zkClient, ZK_CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry()); zkUtils.connect(); - // Set up stream application configs with different processorIds and same testStreamAppName, testStreamAppId. - applicationConfig1 = buildStreamApplicationConfig(TEST_SYSTEM, inputKafkaTopic, PROCESSOR_IDS[0], testStreamAppName, testStreamAppId); - applicationConfig2 = buildStreamApplicationConfig(TEST_SYSTEM, inputKafkaTopic, PROCESSOR_IDS[1], testStreamAppName, testStreamAppId); - applicationConfig3 = buildStreamApplicationConfig(TEST_SYSTEM, inputKafkaTopic, PROCESSOR_IDS[2], testStreamAppName, testStreamAppId); - // Create local application runners. applicationRunner1 = new LocalApplicationRunner(applicationConfig1); applicationRunner2 = new LocalApplicationRunner(applicationConfig2); @@ -164,8 +173,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne } } - private ApplicationConfig buildStreamApplicationConfig(String systemName, String inputTopic, - String processorId, String appName, String appId) { + private Map<String, String> buildStreamApplicationConfigMap(String systemName, String inputTopic, + String appName, String appId) { Map<String, String> samzaContainerConfig = ImmutableMap.<String, String>builder() .put(TaskConfig.INPUT_STREAMS(), inputTopic) .put(JobConfig.JOB_DEFAULT_SYSTEM(), systemName) @@ -174,17 +183,17 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne .put(JobConfig.SSP_GROUPER_FACTORY(), TEST_SSP_GROUPER_FACTORY) .put(TaskConfig.GROUPER_FACTORY(), TEST_TASK_GROUPER_FACTORY) .put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, TEST_JOB_COORDINATOR_FACTORY) - .put(JobConfig.PROCESSOR_ID(), processorId) .put(ApplicationConfig.APP_NAME, appName) .put(ApplicationConfig.APP_ID, appId) .put(String.format("systems.%s.samza.factory", systemName), TEST_SYSTEM_FACTORY) - .put(JobConfig.JOB_NAME(), TEST_JOB_NAME) + .put(JobConfig.JOB_NAME(), appName) + .put(JobConfig.JOB_ID(), appId) .put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS) .put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS) .build(); Map<String, String> applicationConfig = Maps.newHashMap(samzaContainerConfig); applicationConfig.putAll(StandaloneTestUtils.getKafkaSystemConfigs(systemName, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, true)); - return new ApplicationConfig(new MapConfig(applicationConfig)); + return applicationConfig; } @Test @@ -354,9 +363,14 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne * are for ZkJobCoordinator output(onNewJobModelConfirmed, onNewJobModelAvailable). Increasing DefaultDebounceTime to make sure that streamApplication dies & rejoins before expiry. */ Map<String, String> debounceTimeConfig = ImmutableMap.of(JobConfig.JOB_DEBOUNCE_TIME_MS(), "40000"); + Map<String, String> configMap = buildStreamApplicationConfigMap(TEST_SYSTEM, inputKafkaTopic, testStreamAppName, testStreamAppId); + configMap.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), "40000"); + + configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]); + Config applicationConfig1 = new MapConfig(configMap); - Config applicationConfig1 = new MapConfig(ImmutableList.of(buildStreamApplicationConfig(TEST_SYSTEM, inputKafkaTopic, PROCESSOR_IDS[0], testStreamAppName, testStreamAppId), debounceTimeConfig)); - Config applicationConfig2 = new MapConfig(ImmutableList.of(buildStreamApplicationConfig(TEST_SYSTEM, inputKafkaTopic, PROCESSOR_IDS[1], testStreamAppName, testStreamAppId), debounceTimeConfig)); + configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]); + Config applicationConfig2 = new MapConfig(configMap); LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationConfig1); LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationConfig2);
