Repository: samza Updated Branches: refs/heads/1.0.0 4b0ebb6ec -> cfd29c369
Changes to Standalone integration tests * Removing task.inputs since its not required in the new Samza 1.0 API * Fixing input-output sys bug * Fixing jobModel zk path bug -- camel case. Author: [email protected] <[email protected]> Reviewers: Prateek Maheshwari <[email protected]> Closes #756 from rmatharu/standalonetests Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/cfd29c36 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/cfd29c36 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/cfd29c36 Branch: refs/heads/1.0.0 Commit: cfd29c36989258c38af3a672015b30465c31cac7 Parents: 4b0ebb6 Author: [email protected] <[email protected]> Authored: Mon Oct 22 19:04:58 2018 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Mon Oct 22 19:05:23 2018 -0700 ---------------------------------------------------------------------- .../src/main/config/standalone.failure.test.properties | 1 - .../integration/TestStandaloneIntegrationApplication.java | 6 +++--- samza-test/src/main/python/tests/zk_client.py | 8 ++++---- 3 files changed, 7 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/cfd29c36/samza-test/src/main/config/standalone.failure.test.properties ---------------------------------------------------------------------- diff --git a/samza-test/src/main/config/standalone.failure.test.properties b/samza-test/src/main/config/standalone.failure.test.properties index d855d5f..d200251 100644 --- a/samza-test/src/main/config/standalone.failure.test.properties +++ b/samza-test/src/main/config/standalone.failure.test.properties @@ -26,7 +26,6 @@ job.name=test-app-name job.id=test-app-id ## Kafka I/O system properties. -task.inputs=standalone_integration_test_kafka_input_topic input.stream.name=standalone_integration_test_kafka_input_topic job.default.system=testSystemName systems.testSystemName.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory http://git-wip-us.apache.org/repos/asf/samza/blob/cfd29c36/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java b/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java index 2002ce6..fba3b52 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java @@ -40,15 +40,15 @@ public class TestStandaloneIntegrationApplication implements StreamApplication { public void describe(StreamApplicationDescriptor appDescriptor) { String systemName = "testSystemName"; String inputStreamName = appDescriptor.getConfig().get("input.stream.name"); - String outputStreamName = "standaloneIntegrationTestKafkaOutputTopic"; - LOGGER.info("Publishing message from: {} to: {}.", inputStreamName, outputStreamName); + String outputStreamName = "standaloneIntegrationTestKafkaOutputTopic"; + LOGGER.info("Publishing message from: {} to: {}.", inputStreamName, outputStreamName); KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(systemName); KVSerde<Object, Object> noOpSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()); KafkaInputDescriptor<KV<Object, Object>> isd = kafkaSystemDescriptor.getInputDescriptor(inputStreamName, noOpSerde); KafkaOutputDescriptor<KV<Object, Object>> osd = - kafkaSystemDescriptor.getOutputDescriptor(inputStreamName, noOpSerde); + kafkaSystemDescriptor.getOutputDescriptor(outputStreamName, noOpSerde); appDescriptor.getInputStream(isd).sendTo(appDescriptor.getOutputStream(osd)); } } http://git-wip-us.apache.org/repos/asf/samza/blob/cfd29c36/samza-test/src/main/python/tests/zk_client.py ---------------------------------------------------------------------- diff --git a/samza-test/src/main/python/tests/zk_client.py b/samza-test/src/main/python/tests/zk_client.py index 2a11a80..006d84f 100644 --- a/samza-test/src/main/python/tests/zk_client.py +++ b/samza-test/src/main/python/tests/zk_client.py @@ -47,8 +47,8 @@ class ZkClient: self.kazoo_client.stop() def watch_job_model(self, watch_function): - self.kazoo_client.ensure_path('{0}/JobModelGeneration/jobModels/'.format(self.zk_base_node)) - self.kazoo_client.get_children('{0}/JobModelGeneration/jobModels/'.format(self.zk_base_node), watch=watch_function) + self.kazoo_client.ensure_path('{0}/jobModelGeneration/jobModels/'.format(self.zk_base_node)) + self.kazoo_client.get_children('{0}/jobModelGeneration/jobModels/'.format(self.zk_base_node), watch=watch_function) def get_latest_job_model(self): """ @@ -56,12 +56,12 @@ class ZkClient: """ job_model_dict = {} try: - childZkNodes = self.kazoo_client.get_children('{0}/JobModelGeneration/jobModels/'.format(self.zk_base_node)) + childZkNodes = self.kazoo_client.get_children('{0}/jobModelGeneration/jobModels/'.format(self.zk_base_node)) if len(childZkNodes) > 0: childZkNodes.sort() childZkNodes.reverse() - job_model_generation_path = '{0}/JobModelGeneration/jobModels/{1}/'.format(self.zk_base_node, childZkNodes[0]) + job_model_generation_path = '{0}/jobModelGeneration/jobModels/{1}/'.format(self.zk_base_node, childZkNodes[0]) job_model, _ = self.kazoo_client.get(job_model_generation_path) """
