alnzng opened a new pull request #1381:
URL: https://github.com/apache/samza/pull/1381


   #### Symptom
   There is `tableId` information for `StreamTableJoinOperatorSpec` in Samza 
job execution plan:
   ```json
   "samza-hello-fluent-api-remote-table-venice-i001-alazhang-join-1": {
       "opId": 
"samza-hello-fluent-api-remote-table-venice-i001-alazhang-join-1",
       "tableId": "table-id",
       "opCode": "JOIN",
       "sourceLocation": "VeniceRemoteTableJoinApp.java:69",
       "nextOperatorIds": [
           "samza-hello-fluent-api-remote-table-venice-i001-alazhang-send_to-2"
       ]
   }
   ```
   However, there is no `tableId ` information for `SendToTableOperatorSpec`.
   ```json
    "samza-hello-fluent-api-remote-table-venice-i001-alazhang-send_to-2": {
       "opId": 
"samza-hello-fluent-api-remote-table-venice-i001-alazhang-send_to-2",
       "opCode": "SEND_TO",
       "sourceLocation": "VeniceRemoteTableJoinApp.java:70",
       "outputStreamId": "PageViewUserAgentTypeEvent",
       "nextOperatorIds": []
   }           
   ```
   
   #### Cause
   Currently, JobGraphJsonGenerator.operatorToMap() function populates 
"tableId" field when the given operator spec is table related. 
   However, as shown in the below codes, we forgot to handle 
`SendToTableOperatorSpec`, but handle `StreamTableJoinOperatorSpec` twice 
somehow. 
   
   ```java
   private Map<String, Object> operatorToMap(OperatorSpec spec) {
       Map<String, Object> map = new HashMap<>();
       map.put("opCode", spec.getOpCode().name());
       map.put("opId", spec.getOpId());
       map.put("sourceLocation", spec.getSourceLocation());
       ...
   
       if (spec instanceof StreamTableJoinOperatorSpec) {
         String tableId = ((StreamTableJoinOperatorSpec) spec).getTableId();
         map.put("tableId", tableId);
       }
   
       if (spec instanceof StreamTableJoinOperatorSpec) {
         String tableId = ((StreamTableJoinOperatorSpec) spec).getTableId();
         map.put("tableId", tableId);
       }
   
       if (spec instanceof JoinOperatorSpec) {
         map.put("ttlMs", ((JoinOperatorSpec) spec).getTtlMs());
       }
   
       return map;
     }
   ```
   #### Changes
   Replace `StreamTableJoinOperatorSpec` with `SendToTableOperatorSpec` in one 
of above if statements.
   
   #### Tests
   Add test case to test all Table related operators.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to