xintongsong commented on code in PR #21579:
URL: https://github.com/apache/flink/pull/21579#discussion_r1062140088


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java:
##########
@@ -40,33 +38,24 @@
     private static final Logger LOG = 
LoggerFactory.getLogger(ChainingOutput.class);
 
     protected final Input<T> input;
+    protected final Counter numRecordsOut;
     protected final Counter numRecordsIn;
     protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
     @Nullable protected final OutputTag<T> outputTag;
     protected WatermarkStatus announcedStatus = WatermarkStatus.ACTIVE;
 
-    public ChainingOutput(OneInputStreamOperator<T, ?> operator, @Nullable 
OutputTag<T> outputTag) {
-        this(operator, operator.getMetricGroup(), outputTag);
-    }
-
     public ChainingOutput(
             Input<T> input,
-            OperatorMetricGroup operatorMetricGroup,
+            @Nullable Counter prevNumRecordsOut,
+            OperatorMetricGroup curOperatorMetricGroup,
             @Nullable OutputTag<T> outputTag) {
         this.input = input;
-
-        {
-            Counter tmpNumRecordsIn;
-            try {
-                OperatorIOMetricGroup ioMetricGroup = 
operatorMetricGroup.getIOMetricGroup();
-                tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
-            } catch (Exception e) {
-                LOG.warn("An exception occurred during the metrics setup.", e);
-                tmpNumRecordsIn = new SimpleCounter();
-            }
-            numRecordsIn = tmpNumRecordsIn;
+        if (prevNumRecordsOut != null) {
+            this.numRecordsOut = prevNumRecordsOut;
+        } else {
+            this.numRecordsOut = new SimpleCounter();

Review Comment:
   IIUC, this is to avoid checking the existence of `numRecordsOut` on the 
per-record path? Let's add a comment for it.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java:
##########
@@ -700,11 +752,20 @@ private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> 
createOutputCollector(
             // If the chaining output does not copy we need to copy in the 
broadcast output,
             // otherwise multi-chaining would not work correctly.
             if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
-                return closer.register(new 
CopyingBroadcastingOutputCollector<>(asArray));
+                result = closer.register(new 
CopyingBroadcastingOutputCollector<>(asArray));
             } else {
-                return closer.register(new 
BroadcastingOutputCollector<>(asArray));
+                result = closer.register(new 
BroadcastingOutputCollector<>(asArray));
+            }
+        }
+
+        if (shouldAddMetric) {

Review Comment:
   I'd suggest to add an inline comment explaining that this only happens when 
there's no downstream chained operators recording the metrics.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java:
##########
@@ -622,6 +627,43 @@ private Map<StreamConfig.SourceInputConfig, ChainedSource> 
createChainedSources(
         return chainedSourceInputs;
     }
 
+    // Get the numRecordsOut counter for the operator represented by the given 
config. And re-use
+    // this operator-level counter for the task-level numRecordsOut counter if 
this operator
+    // is at the end of the operator chain.
+    //
+    // Return null if we should not use the numRecordsOut counter to track the 
records emitted
+    // by this operator.

Review Comment:
   Should be JavaDoc (`/**`) rather than inline comments (`//`).



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java:
##########
@@ -622,6 +627,43 @@ private Map<StreamConfig.SourceInputConfig, ChainedSource> 
createChainedSources(
         return chainedSourceInputs;
     }
 
+    // Get the numRecordsOut counter for the operator represented by the given 
config. And re-use
+    // this operator-level counter for the task-level numRecordsOut counter if 
this operator
+    // is at the end of the operator chain.
+    //
+    // Return null if we should not use the numRecordsOut counter to track the 
records emitted
+    // by this operator.
+    private Counter getOperatorRecordsOutCounter(
+            StreamTask<?, ?> containingTask, StreamConfig operatorConfig) {
+        InternalOperatorMetricGroup operatorMetricGroup =
+                containingTask
+                        .getEnvironment()
+                        .getMetricGroup()
+                        .getOrAddOperator(
+                                operatorConfig.getOperatorID(), 
operatorConfig.getOperatorName());
+        if (operatorConfig.isChainEnd()) {
+            operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
+        }
+        ClassLoader userCodeClassloader = 
containingTask.getUserCodeClassLoader();
+
+        /**
+         * Do not use the numRecordsOut counter on output if this operator is 
SinkWriterOperator.
+         *
+         * <p>Metric "numRecordsOut" is defined as the total number of records 
written to the
+         * external system in FLIP-33, but this metric is occupied in 
AbstractStreamOperator as the
+         * number of records sent to downstream operators, which is number of 
Committable batches
+         * sent to SinkCommitter. So we skip registering this metric on output 
and leave this metric
+         * to sink writer implementations to report.
+         */

Review Comment:
   Should be inline comments (`//`) rather than JavaDoc (`/**`).



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java:
##########
@@ -675,18 +722,23 @@ private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> 
createOutputCollector(
             WatermarkGaugeExposingOutput<StreamRecord<T>> output =
                     createOperatorChain(
                             containingTask,
+                            operatorConfig,
                             chainedOpConfig,
                             chainedConfigs,
                             userCodeClassloader,
                             recordWriterOutputs,
                             allOperatorWrappers,
                             outputEdge.getOutputTag(),
-                            mailboxExecutorFactory);
+                            mailboxExecutorFactory,
+                            shouldAddMetric);
             allOutputs.add(output);
+            shouldAddMetric = false;

Review Comment:
   I'd suggest to add an inline comment explaining that when there're multiple 
downstream operators, only one of them should add the metrics.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java:
##########
@@ -622,6 +627,43 @@ private Map<StreamConfig.SourceInputConfig, ChainedSource> 
createChainedSources(
         return chainedSourceInputs;
     }
 
+    // Get the numRecordsOut counter for the operator represented by the given 
config. And re-use
+    // this operator-level counter for the task-level numRecordsOut counter if 
this operator
+    // is at the end of the operator chain.
+    //
+    // Return null if we should not use the numRecordsOut counter to track the 
records emitted
+    // by this operator.
+    private Counter getOperatorRecordsOutCounter(

Review Comment:
   Better to annotate the method with `@Nullable`.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java:
##########
@@ -713,13 +774,15 @@ private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> 
createOutputCollector(
      */
     private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> 
createOperatorChain(
             StreamTask<OUT, ?> containingTask,
+            StreamConfig prevOperatorConfig,
             StreamConfig operatorConfig,
             Map<Integer, StreamConfig> chainedConfigs,
             ClassLoader userCodeClassloader,
             Map<IntermediateDataSetID, RecordWriterOutput<?>> 
recordWriterOutputs,
             List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
             OutputTag<IN> outputTag,
-            MailboxExecutorFactory mailboxExecutorFactory) {
+            MailboxExecutorFactory mailboxExecutorFactory,
+            boolean shouldAddMetric) {

Review Comment:
   ```suggestion
               boolean shouldAddMetricForPrevOperator) {
   ```



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java:
##########
@@ -786,17 +856,33 @@ private <OUT, OP extends StreamOperator<OUT>> OP 
createOperator(
     private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> 
wrapOperatorIntoOutput(
             OneInputStreamOperator<IN, OUT> operator,
             StreamTask<OUT, ?> containingTask,
+            StreamConfig prevOperatorConfig,
             StreamConfig operatorConfig,
             ClassLoader userCodeClassloader,
-            OutputTag<IN> outputTag) {
+            OutputTag<IN> outputTag,
+            boolean shouldAddMetric) {

Review Comment:
   ```suggestion
               boolean shouldAddMetricForPrevOperator) {
   ```



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java:
##########
@@ -622,6 +627,43 @@ private Map<StreamConfig.SourceInputConfig, ChainedSource> 
createChainedSources(
         return chainedSourceInputs;
     }
 
+    // Get the numRecordsOut counter for the operator represented by the given 
config. And re-use
+    // this operator-level counter for the task-level numRecordsOut counter if 
this operator
+    // is at the end of the operator chain.
+    //
+    // Return null if we should not use the numRecordsOut counter to track the 
records emitted
+    // by this operator.
+    private Counter getOperatorRecordsOutCounter(
+            StreamTask<?, ?> containingTask, StreamConfig operatorConfig) {
+        InternalOperatorMetricGroup operatorMetricGroup =
+                containingTask
+                        .getEnvironment()
+                        .getMetricGroup()
+                        .getOrAddOperator(
+                                operatorConfig.getOperatorID(), 
operatorConfig.getOperatorName());
+        if (operatorConfig.isChainEnd()) {
+            operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
+        }
+        ClassLoader userCodeClassloader = 
containingTask.getUserCodeClassLoader();
+
+        /**
+         * Do not use the numRecordsOut counter on output if this operator is 
SinkWriterOperator.
+         *
+         * <p>Metric "numRecordsOut" is defined as the total number of records 
written to the
+         * external system in FLIP-33, but this metric is occupied in 
AbstractStreamOperator as the
+         * number of records sent to downstream operators, which is number of 
Committable batches
+         * sent to SinkCommitter. So we skip registering this metric on output 
and leave this metric
+         * to sink writer implementations to report.
+         */
+        StreamOperatorFactory<?> operatorFactory =
+                operatorConfig.getStreamOperatorFactory(userCodeClassloader);
+        if (operatorFactory instanceof SinkWriterOperatorFactory) {
+            return null;
+        }

Review Comment:
   I think we should bypass `SinkWriterOperatorFactory` before adding new 
metric group for the operator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to