Repository: samza
Updated Branches:
  refs/heads/1.0.0 4b0ebb6ec -> cfd29c369


Changes to Standalone integration tests

* Removing task.inputs since its not required in the new Samza 1.0 API
* Fixing input-output sys bug
* Fixing jobModel zk path bug -- camel case.

Author: [email protected] <[email protected]>

Reviewers: Prateek Maheshwari <[email protected]>

Closes #756 from rmatharu/standalonetests


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/cfd29c36
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/cfd29c36
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/cfd29c36

Branch: refs/heads/1.0.0
Commit: cfd29c36989258c38af3a672015b30465c31cac7
Parents: 4b0ebb6
Author: [email protected] <[email protected]>
Authored: Mon Oct 22 19:04:58 2018 -0700
Committer: Prateek Maheshwari <[email protected]>
Committed: Mon Oct 22 19:05:23 2018 -0700

----------------------------------------------------------------------
 .../src/main/config/standalone.failure.test.properties       | 1 -
 .../integration/TestStandaloneIntegrationApplication.java    | 6 +++---
 samza-test/src/main/python/tests/zk_client.py                | 8 ++++----
 3 files changed, 7 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/cfd29c36/samza-test/src/main/config/standalone.failure.test.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/standalone.failure.test.properties 
b/samza-test/src/main/config/standalone.failure.test.properties
index d855d5f..d200251 100644
--- a/samza-test/src/main/config/standalone.failure.test.properties
+++ b/samza-test/src/main/config/standalone.failure.test.properties
@@ -26,7 +26,6 @@ job.name=test-app-name
 job.id=test-app-id
 
 ## Kafka I/O system properties.
-task.inputs=standalone_integration_test_kafka_input_topic
 input.stream.name=standalone_integration_test_kafka_input_topic
 job.default.system=testSystemName
 
systems.testSystemName.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory

http://git-wip-us.apache.org/repos/asf/samza/blob/cfd29c36/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
 
b/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
index 2002ce6..fba3b52 100644
--- 
a/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
+++ 
b/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
@@ -40,15 +40,15 @@ public class TestStandaloneIntegrationApplication 
implements StreamApplication {
   public void describe(StreamApplicationDescriptor appDescriptor) {
     String systemName = "testSystemName";
     String inputStreamName = 
appDescriptor.getConfig().get("input.stream.name");
-    String outputStreamName = "standaloneIntegrationTestKafkaOutputTopic";
-    LOGGER.info("Publishing message from: {} to: {}.", inputStreamName, 
outputStreamName);
+     String outputStreamName = "standaloneIntegrationTestKafkaOutputTopic";
+     LOGGER.info("Publishing message from: {} to: {}.", inputStreamName, 
outputStreamName);
     KafkaSystemDescriptor kafkaSystemDescriptor = new 
KafkaSystemDescriptor(systemName);
 
     KVSerde<Object, Object> noOpSerde = KVSerde.of(new NoOpSerde<>(), new 
NoOpSerde<>());
     KafkaInputDescriptor<KV<Object, Object>> isd =
         kafkaSystemDescriptor.getInputDescriptor(inputStreamName, noOpSerde);
     KafkaOutputDescriptor<KV<Object, Object>> osd =
-        kafkaSystemDescriptor.getOutputDescriptor(inputStreamName, noOpSerde);
+        kafkaSystemDescriptor.getOutputDescriptor(outputStreamName, noOpSerde);
     
appDescriptor.getInputStream(isd).sendTo(appDescriptor.getOutputStream(osd));
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/cfd29c36/samza-test/src/main/python/tests/zk_client.py
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/tests/zk_client.py 
b/samza-test/src/main/python/tests/zk_client.py
index 2a11a80..006d84f 100644
--- a/samza-test/src/main/python/tests/zk_client.py
+++ b/samza-test/src/main/python/tests/zk_client.py
@@ -47,8 +47,8 @@ class ZkClient:
         self.kazoo_client.stop()
 
     def watch_job_model(self, watch_function):
-        
self.kazoo_client.ensure_path('{0}/JobModelGeneration/jobModels/'.format(self.zk_base_node))
-        
self.kazoo_client.get_children('{0}/JobModelGeneration/jobModels/'.format(self.zk_base_node),
 watch=watch_function)
+        
self.kazoo_client.ensure_path('{0}/jobModelGeneration/jobModels/'.format(self.zk_base_node))
+        
self.kazoo_client.get_children('{0}/jobModelGeneration/jobModels/'.format(self.zk_base_node),
 watch=watch_function)
 
     def get_latest_job_model(self):
         """
@@ -56,12 +56,12 @@ class ZkClient:
         """
         job_model_dict = {}
         try:
-            childZkNodes = 
self.kazoo_client.get_children('{0}/JobModelGeneration/jobModels/'.format(self.zk_base_node))
+            childZkNodes = 
self.kazoo_client.get_children('{0}/jobModelGeneration/jobModels/'.format(self.zk_base_node))
             if len(childZkNodes) > 0:
                 childZkNodes.sort()
                 childZkNodes.reverse()
 
-                job_model_generation_path = 
'{0}/JobModelGeneration/jobModels/{1}/'.format(self.zk_base_node, 
childZkNodes[0])
+                job_model_generation_path = 
'{0}/jobModelGeneration/jobModels/{1}/'.format(self.zk_base_node, 
childZkNodes[0])
                 job_model, _ = self.kazoo_client.get(job_model_generation_path)
 
                 """

Reply via email to