corgy-w commented on code in PR #8173:
URL: https://github.com/apache/seatunnel/pull/8173#discussion_r1918060512


##########
seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java:
##########
@@ -155,20 +169,59 @@ protected DataStream<SeaTunnelRow> flinkTransform(
                                 flinkRuntimeEnvironment
                                         .getStreamExecutionEnvironment()
                                         .clean(
-                                                row ->
-                                                        
((SeaTunnelMapTransform<SeaTunnelRow>)
-                                                                        
transform)
-                                                                .map(row))))
+                                                new FlinkRichMapFunction(
+                                                        transform, metricName, 
pluginOutput))))
                 // null value shouldn't be passed to downstream
                 .filter(Objects::nonNull);
     }
 
-    public static class ArrayFlatMap implements FlatMapFunction<SeaTunnelRow, 
SeaTunnelRow> {
+    public static class FlinkRichMapFunction extends 
RichMapFunction<SeaTunnelRow, SeaTunnelRow> {
+        private MetricsContext metricsContext; //
+        private SeaTunnelTransform transform;
+        private final String metricName;
+        private final String pluginOutput;
+
+        public FlinkRichMapFunction(
+                SeaTunnelTransform transform, String metricName, String 
pluginOutput) {
+            this.transform = transform;
+            this.metricName = metricName;
+            this.pluginOutput = pluginOutput;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            // 获取 RuntimeContext
+            metricsContext = new FlinkMetricContext((StreamingRuntimeContext) 
getRuntimeContext());
+        }
 
+        @Override
+        public SeaTunnelRow map(SeaTunnelRow row) throws Exception {
+            if (Objects.isNull(row)) {
+                return null;
+            }
+            SeaTunnelRow rowResult = ((SeaTunnelMapTransform<SeaTunnelRow>) 
transform).map(row);
+            String tableId = pluginOutput == null ? rowResult.getTableId() : 
pluginOutput;
+            updateMetric(metricName, tableId, metricsContext);
+            return rowResult;
+        }
+    }
+
+    public static class ArrayFlatMap extends RichFlatMapFunction<SeaTunnelRow, 
SeaTunnelRow> {
+        private MetricsContext metricsContext; //

Review Comment:
   ditto



##########
seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java:
##########
@@ -155,20 +169,59 @@ protected DataStream<SeaTunnelRow> flinkTransform(
                                 flinkRuntimeEnvironment
                                         .getStreamExecutionEnvironment()
                                         .clean(
-                                                row ->
-                                                        
((SeaTunnelMapTransform<SeaTunnelRow>)
-                                                                        
transform)
-                                                                .map(row))))
+                                                new FlinkRichMapFunction(
+                                                        transform, metricName, 
pluginOutput))))
                 // null value shouldn't be passed to downstream
                 .filter(Objects::nonNull);
     }
 
-    public static class ArrayFlatMap implements FlatMapFunction<SeaTunnelRow, 
SeaTunnelRow> {
+    public static class FlinkRichMapFunction extends 
RichMapFunction<SeaTunnelRow, SeaTunnelRow> {
+        private MetricsContext metricsContext; //
+        private SeaTunnelTransform transform;
+        private final String metricName;
+        private final String pluginOutput;
+
+        public FlinkRichMapFunction(
+                SeaTunnelTransform transform, String metricName, String 
pluginOutput) {
+            this.transform = transform;
+            this.metricName = metricName;
+            this.pluginOutput = pluginOutput;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            // 获取 RuntimeContext
+            metricsContext = new FlinkMetricContext((StreamingRuntimeContext) 
getRuntimeContext());
+        }
 
+        @Override
+        public SeaTunnelRow map(SeaTunnelRow row) throws Exception {
+            if (Objects.isNull(row)) {
+                return null;
+            }
+            SeaTunnelRow rowResult = ((SeaTunnelMapTransform<SeaTunnelRow>) 
transform).map(row);
+            String tableId = pluginOutput == null ? rowResult.getTableId() : 
pluginOutput;
+            updateMetric(metricName, tableId, metricsContext);
+            return rowResult;
+        }
+    }
+
+    public static class ArrayFlatMap extends RichFlatMapFunction<SeaTunnelRow, 
SeaTunnelRow> {
+        private MetricsContext metricsContext; //
         private SeaTunnelTransform transform;
+        private final String metricName;
+        private final String pluginOutput;
 
-        public ArrayFlatMap(SeaTunnelTransform transform) {
+        public ArrayFlatMap(SeaTunnelTransform transform, String metricName, 
String pluginOutput) {
             this.transform = transform;
+            this.metricName = metricName;
+            this.pluginOutput = pluginOutput;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            // 获取 RuntimeContext

Review Comment:
   en



##########
seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java:
##########
@@ -155,20 +169,59 @@ protected DataStream<SeaTunnelRow> flinkTransform(
                                 flinkRuntimeEnvironment
                                         .getStreamExecutionEnvironment()
                                         .clean(
-                                                row ->
-                                                        
((SeaTunnelMapTransform<SeaTunnelRow>)
-                                                                        
transform)
-                                                                .map(row))))
+                                                new FlinkRichMapFunction(
+                                                        transform, metricName, 
pluginOutput))))
                 // null value shouldn't be passed to downstream
                 .filter(Objects::nonNull);
     }
 
-    public static class ArrayFlatMap implements FlatMapFunction<SeaTunnelRow, 
SeaTunnelRow> {
+    public static class FlinkRichMapFunction extends 
RichMapFunction<SeaTunnelRow, SeaTunnelRow> {
+        private MetricsContext metricsContext; //

Review Comment:
   redundant comments `//`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to