This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 322bfdd  SAMZA-2547: Add missing "tableId" for SendToTableOperatorSpec 
in Samza job execution plan (#1381)
322bfdd is described below

commit 322bfdd7cedcbaa6112d61623111ac2e7726f88f
Author: Alan Zhang <[email protected]>
AuthorDate: Wed Jun 10 22:50:22 2020 -0700

    SAMZA-2547: Add missing "tableId" for SendToTableOperatorSpec in Samza job 
execution plan (#1381)
---
 .../samza/execution/JobGraphJsonGenerator.java     |  9 +++++---
 .../samza/execution/TestJobGraphJsonGenerator.java | 26 ++++++++++++++++++++++
 2 files changed, 32 insertions(+), 3 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
 
b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
index 4b11174..a6717fd 100644
--- 
a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.execution;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.ByteArrayOutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -35,6 +36,7 @@ import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OutputOperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.spec.PartitionByOperatorSpec;
+import org.apache.samza.operators.spec.SendToTableOperatorSpec;
 import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
 import org.apache.samza.table.descriptors.BaseTableDescriptor;
 import org.apache.samza.table.descriptors.TableDescriptor;
@@ -164,7 +166,8 @@ import org.codehaus.jackson.map.ObjectMapper;
    * @param spec a {@link OperatorSpec} instance
    * @return map of the operator properties
    */
-  private Map<String, Object> operatorToMap(OperatorSpec spec) {
+  @VisibleForTesting
+  Map<String, Object> operatorToMap(OperatorSpec spec) {
     Map<String, Object> map = new HashMap<>();
     map.put("opCode", spec.getOpCode().name());
     map.put("opId", spec.getOpId());
@@ -186,8 +189,8 @@ import org.codehaus.jackson.map.ObjectMapper;
       map.put("tableId", tableId);
     }
 
-    if (spec instanceof StreamTableJoinOperatorSpec) {
-      String tableId = ((StreamTableJoinOperatorSpec) spec).getTableId();
+    if (spec instanceof SendToTableOperatorSpec) {
+      String tableId = ((SendToTableOperatorSpec) spec).getTableId();
       map.put("tableId", tableId);
     }
 
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
index eee8fd0..a4d59d3 100644
--- 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
@@ -31,6 +31,11 @@ import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpecs;
+import org.apache.samza.operators.spec.SendToTableOperatorSpec;
+import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
 import org.apache.samza.system.descriptors.GenericInputDescriptor;
 import org.apache.samza.system.descriptors.GenericOutputDescriptor;
 import org.apache.samza.system.descriptors.GenericSystemDescriptor;
@@ -368,4 +373,25 @@ public class TestJobGraphJsonGenerator {
       return "";
     }
   }
+
+  @Test
+  public void testOperatorToMapForTable() {
+    JobGraphJsonGenerator jsonGenerator = new JobGraphJsonGenerator();
+    Map<String, Object> map;
+    SendToTableOperatorSpec<Object, Object> sendToTableOperatorSpec =
+        OperatorSpecs.createSendToTableOperatorSpec("test-sent-to-table", 
"test-sent-to");
+    map = jsonGenerator.operatorToMap(sendToTableOperatorSpec);
+    assertTrue(map.containsKey("tableId"));
+    assertEquals(map.get("tableId"), "test-sent-to-table");
+    assertEquals(map.get("opCode"), OperatorSpec.OpCode.SEND_TO.name());
+    assertEquals(map.get("opId"), "test-sent-to");
+    StreamTableJoinOperatorSpec<String, String, String, String> 
streamTableJoinOperatorSpec =
+        OperatorSpecs.createStreamTableJoinOperatorSpec("test-join-table", 
mock(StreamTableJoinFunction.class), "test-join");
+    map = jsonGenerator.operatorToMap(streamTableJoinOperatorSpec);
+    assertTrue(map.containsKey("tableId"));
+    assertEquals(map.get("tableId"), "test-join-table");
+    assertEquals(map.get("opCode"), OperatorSpec.OpCode.JOIN.name());
+    assertEquals(map.get("opId"), "test-join");
+  }
+
 }

Reply via email to