Hisoka-X commented on code in PR #7799:
URL: https://github.com/apache/seatunnel/pull/7799#discussion_r1798677828
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java:
##########
@@ -97,6 +103,8 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
private EventListener eventListener;
+ private final List<Map<TablePath, TablePath>> tablesMaps = new
ArrayList<>();
Review Comment:
share more details about tablesMaps on comment? Why we need this? And how to
use it.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java:
##########
@@ -182,19 +183,26 @@ nodeEngine, new GetJobStatusOperation(jobId))
jobStatus =
seaTunnelServer.getCoordinatorService().getJobStatus(jobId);
}
+ JobDAGInfo jobDAGInfo =
Review Comment:
ditto
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java:
##########
@@ -246,8 +271,43 @@ public void received(Record<?> record) {
if (prepareClose) {
return;
}
+ AtomicReference<String> tableId = new AtomicReference<>();
Review Comment:
Why need `AtomicReference`?
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java:
##########
@@ -246,8 +271,43 @@ public void received(Record<?> record) {
if (prepareClose) {
return;
}
+ AtomicReference<String> tableId = new AtomicReference<>();
writer.write((T) record.getData());
- taskMetricsCalcContext.updateMetrics(record.getData());
+ if (record.getData() instanceof SeaTunnelRow) {
+ if (this.sinkAction.getSink() instanceof MultiTableSink) {
+ if (((SeaTunnelRow) record.getData()).getTableId() ==
null
+ || ((SeaTunnelRow)
record.getData()).getTableId().isEmpty()) {
+ tableId.set(((SeaTunnelRow)
record.getData()).getTableId());
+ } else {
+ tablesMaps.forEach(
+ tablePathTablePathMap -> {
+ tablePathTablePathMap.forEach(
+ (k, v) -> {
+ if (k.equals(
+ TablePath.of(
+
((SeaTunnelRow)
+
record
+
.getData())
+
.getTableId()))) {
+
tableId.set(v.getFullName());
+ }
+ });
+ });
+ }
Review Comment:
I think using foreach to filter will have performance issues, especially
when there are many tables, because it will be executed once for each row.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java:
##########
@@ -692,6 +693,13 @@ private JsonObject convertToJson(JobInfo jobInfo, long
jobId) {
jobStatus =
seaTunnelServer.getCoordinatorService().getJobStatus(jobId);
}
+ JobDAGInfo jobDAGInfo =
Review Comment:
can we use
https://github.com/apache/seatunnel/blob/511c8afad4e7b4a4e29db532136878305e44b1e1/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java#L436
?
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java:
##########
@@ -118,6 +126,23 @@ public SinkFlowLifeCycle(
boolean isMulti = sinkAction.getSink() instanceof MultiTableSink;
if (isMulti) {
sinkTables = ((MultiTableSink)
sinkAction.getSink()).getSinkTables();
+ String[] keys =
Review Comment:
`keys`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]