This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 322bfdd SAMZA-2547: Add missing "tableId" for SendToTableOperatorSpec
in Samza job execution plan (#1381)
322bfdd is described below
commit 322bfdd7cedcbaa6112d61623111ac2e7726f88f
Author: Alan Zhang <[email protected]>
AuthorDate: Wed Jun 10 22:50:22 2020 -0700
SAMZA-2547: Add missing "tableId" for SendToTableOperatorSpec in Samza job
execution plan (#1381)
---
.../samza/execution/JobGraphJsonGenerator.java | 9 +++++---
.../samza/execution/TestJobGraphJsonGenerator.java | 26 ++++++++++++++++++++++
2 files changed, 32 insertions(+), 3 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
index 4b11174..a6717fd 100644
---
a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
+++
b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
@@ -19,6 +19,7 @@
package org.apache.samza.execution;
+import com.google.common.annotations.VisibleForTesting;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Collection;
@@ -35,6 +36,7 @@ import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.OutputOperatorSpec;
import org.apache.samza.operators.spec.OutputStreamImpl;
import org.apache.samza.operators.spec.PartitionByOperatorSpec;
+import org.apache.samza.operators.spec.SendToTableOperatorSpec;
import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
import org.apache.samza.table.descriptors.BaseTableDescriptor;
import org.apache.samza.table.descriptors.TableDescriptor;
@@ -164,7 +166,8 @@ import org.codehaus.jackson.map.ObjectMapper;
* @param spec a {@link OperatorSpec} instance
* @return map of the operator properties
*/
- private Map<String, Object> operatorToMap(OperatorSpec spec) {
+ @VisibleForTesting
+ Map<String, Object> operatorToMap(OperatorSpec spec) {
Map<String, Object> map = new HashMap<>();
map.put("opCode", spec.getOpCode().name());
map.put("opId", spec.getOpId());
@@ -186,8 +189,8 @@ import org.codehaus.jackson.map.ObjectMapper;
map.put("tableId", tableId);
}
- if (spec instanceof StreamTableJoinOperatorSpec) {
- String tableId = ((StreamTableJoinOperatorSpec) spec).getTableId();
+ if (spec instanceof SendToTableOperatorSpec) {
+ String tableId = ((SendToTableOperatorSpec) spec).getTableId();
map.put("tableId", tableId);
}
diff --git
a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
index eee8fd0..a4d59d3 100644
---
a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
+++
b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
@@ -31,6 +31,11 @@ import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpecs;
+import org.apache.samza.operators.spec.SendToTableOperatorSpec;
+import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
import org.apache.samza.system.descriptors.GenericInputDescriptor;
import org.apache.samza.system.descriptors.GenericOutputDescriptor;
import org.apache.samza.system.descriptors.GenericSystemDescriptor;
@@ -368,4 +373,25 @@ public class TestJobGraphJsonGenerator {
return "";
}
}
+
+ @Test
+ public void testOperatorToMapForTable() {
+ JobGraphJsonGenerator jsonGenerator = new JobGraphJsonGenerator();
+ Map<String, Object> map;
+ SendToTableOperatorSpec<Object, Object> sendToTableOperatorSpec =
+ OperatorSpecs.createSendToTableOperatorSpec("test-sent-to-table",
"test-sent-to");
+ map = jsonGenerator.operatorToMap(sendToTableOperatorSpec);
+ assertTrue(map.containsKey("tableId"));
+ assertEquals(map.get("tableId"), "test-sent-to-table");
+ assertEquals(map.get("opCode"), OperatorSpec.OpCode.SEND_TO.name());
+ assertEquals(map.get("opId"), "test-sent-to");
+ StreamTableJoinOperatorSpec<String, String, String, String>
streamTableJoinOperatorSpec =
+ OperatorSpecs.createStreamTableJoinOperatorSpec("test-join-table",
mock(StreamTableJoinFunction.class), "test-join");
+ map = jsonGenerator.operatorToMap(streamTableJoinOperatorSpec);
+ assertTrue(map.containsKey("tableId"));
+ assertEquals(map.get("tableId"), "test-join-table");
+ assertEquals(map.get("opCode"), OperatorSpec.OpCode.JOIN.name());
+ assertEquals(map.get("opId"), "test-join");
+ }
+
}