Hisoka-X commented on code in PR #7799:
URL: https://github.com/apache/seatunnel/pull/7799#discussion_r1798677828


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java:
##########
@@ -97,6 +103,8 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
 
     private EventListener eventListener;
 
+    private final List<Map<TablePath, TablePath>> tablesMaps = new 
ArrayList<>();

Review Comment:
   share more details about tablesMaps on comment? Why we need this? And how to 
use it.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java:
##########
@@ -182,19 +183,26 @@ nodeEngine, new GetJobStatusOperation(jobId))
             jobStatus = 
seaTunnelServer.getCoordinatorService().getJobStatus(jobId);
         }
 
+        JobDAGInfo jobDAGInfo =

Review Comment:
   ditto



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java:
##########
@@ -246,8 +271,43 @@ public void received(Record<?> record) {
                 if (prepareClose) {
                     return;
                 }
+                AtomicReference<String> tableId = new AtomicReference<>();

Review Comment:
   Why need `AtomicReference`?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java:
##########
@@ -246,8 +271,43 @@ public void received(Record<?> record) {
                 if (prepareClose) {
                     return;
                 }
+                AtomicReference<String> tableId = new AtomicReference<>();
                 writer.write((T) record.getData());
-                taskMetricsCalcContext.updateMetrics(record.getData());
+                if (record.getData() instanceof SeaTunnelRow) {
+                    if (this.sinkAction.getSink() instanceof MultiTableSink) {
+                        if (((SeaTunnelRow) record.getData()).getTableId() == 
null
+                                || ((SeaTunnelRow) 
record.getData()).getTableId().isEmpty()) {
+                            tableId.set(((SeaTunnelRow) 
record.getData()).getTableId());
+                        } else {
+                            tablesMaps.forEach(
+                                    tablePathTablePathMap -> {
+                                        tablePathTablePathMap.forEach(
+                                                (k, v) -> {
+                                                    if (k.equals(
+                                                            TablePath.of(
+                                                                    
((SeaTunnelRow)
+                                                                               
     record
+                                                                               
             .getData())
+                                                                            
.getTableId()))) {
+                                                        
tableId.set(v.getFullName());
+                                                    }
+                                                });
+                                    });
+                        }

Review Comment:
   I think using foreach to filter will have performance issues, especially 
when there are many tables, because it will be executed once for each row.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java:
##########
@@ -692,6 +693,13 @@ private JsonObject convertToJson(JobInfo jobInfo, long 
jobId) {
             jobStatus = 
seaTunnelServer.getCoordinatorService().getJobStatus(jobId);
         }
 
+        JobDAGInfo jobDAGInfo =

Review Comment:
   can we use 
https://github.com/apache/seatunnel/blob/511c8afad4e7b4a4e29db532136878305e44b1e1/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java#L436
 ?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java:
##########
@@ -118,6 +126,23 @@ public SinkFlowLifeCycle(
         boolean isMulti = sinkAction.getSink() instanceof MultiTableSink;
         if (isMulti) {
             sinkTables = ((MultiTableSink) 
sinkAction.getSink()).getSinkTables();
+            String[] keys =

Review Comment:
   `keys`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to