This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch FLINK-18808-record-out
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3e6578edc4aeeefbd675b4735863e5f80f3144d0
Author: Weijie Guo <[email protected]>
AuthorDate: Wed May 10 19:17:16 2023 +0800

    fixup! [FLINK-18808][streaming] Include side outputs in numRecordsOut metric
---
 .../runtime/tasks/MultipleInputStreamTaskTest.java | 51 +++++++++++++++++++---
 .../runtime/tasks/OneInputStreamTaskTest.java      | 19 +++++---
 .../runtime/tasks/TwoInputStreamTaskTest.java      | 24 +++++++---
 3 files changed, 79 insertions(+), 15 deletions(-)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
index 59c79fa1ff4..799040b3648 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
@@ -1419,7 +1419,7 @@ public class MultipleInputStreamTaskTest {
                         .addInput(BasicTypeInfo.INT_TYPE_INFO)
                         .addInput(BasicTypeInfo.INT_TYPE_INFO)
                         .addAdditionalOutput(partitionWriters)
-                        .setupOperatorChain(new OddEvenOperatorFactory())
+                        .setupOperatorChain(new PassThroughOperatorFactory<>())
                         
.chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
                         .setOperatorFactory(
                                 SimpleOperatorFactory.of(
@@ -1458,13 +1458,13 @@ public class MultipleInputStreamTaskTest {
             int totalOddRecords = numOddRecords + numNaturalRecords / 2;
             int totalEvenRecords = numEvenRecords + (int) 
Math.ceil(numNaturalRecords / 2.0);
 
-            final int secondOddEvenOperatorOutputsWithOddTag = totalOddRecords;
-            final int secondOddEvenOperatorOutputsWithoutTag = totalOddRecords 
+ totalEvenRecords;
+            final int oddEvenOperatorOutputsWithOddTag = totalOddRecords;
+            final int oddEvenOperatorOutputsWithoutTag = totalOddRecords + 
totalEvenRecords;
             final int duplicatingOperatorOutput = (totalOddRecords + 
totalEvenRecords) * 2;
             assertEquals(totalOddRecords + totalEvenRecords, 
numRecordsInCounter.getCount());
             assertEquals(
-                    secondOddEvenOperatorOutputsWithOddTag
-                            + secondOddEvenOperatorOutputsWithoutTag
+                    oddEvenOperatorOutputsWithOddTag
+                            + oddEvenOperatorOutputsWithoutTag
                             + duplicatingOperatorOutput,
                     numRecordsOutCounter.getCount());
             testHarness.waitForTaskCompletion();
@@ -1475,6 +1475,47 @@ public class MultipleInputStreamTaskTest {
         }
     }
 
+    static class PassThroughOperator<T> extends AbstractStreamOperatorV2<T>
+            implements MultipleInputStreamOperator<T> {
+
+        public PassThroughOperator(StreamOperatorParameters<T> parameters) {
+            super(parameters, 3);
+        }
+
+        @Override
+        public List<Input> getInputs() {
+            return Arrays.asList(
+                    new PassThroughInput<>(this, 1),
+                    new PassThroughInput<>(this, 2),
+                    new PassThroughInput<>(this, 3));
+        }
+
+        static class PassThroughInput<I> extends AbstractInput<I, I> {
+
+            public PassThroughInput(AbstractStreamOperatorV2<I> owner, int 
inputId) {
+                super(owner, inputId);
+            }
+
+            @Override
+            public void processElement(StreamRecord<I> element) throws 
Exception {
+                output.collect(element);
+            }
+        }
+    }
+
+    private static class PassThroughOperatorFactory<T> extends 
AbstractStreamOperatorFactory<T> {
+        @Override
+        public <O extends StreamOperator<T>> O createStreamOperator(
+                StreamOperatorParameters<T> parameters) {
+            return (O) new PassThroughOperator<>(parameters);
+        }
+
+        @Override
+        public Class<? extends StreamOperator> 
getStreamOperatorClass(ClassLoader classLoader) {
+            return PassThroughOperator.class;
+        }
+    }
+
     static class OddEvenOperator extends AbstractStreamOperatorV2<Integer>
             implements MultipleInputStreamOperator<Integer> {
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index cb959899f0f..936fa21ca9d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -1023,7 +1023,7 @@ public class OneInputStreamTaskTest extends TestLogger {
                                 OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
                         .addInput(BasicTypeInfo.INT_TYPE_INFO)
                         .addAdditionalOutput(partitionWriters)
-                        .setupOperatorChain(new OperatorID(), new 
OddEvenOperator())
+                        .setupOperatorChain(new OperatorID(), new 
PassThroughOperator<>())
                         
.chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
                         .setOperatorFactory(SimpleOperatorFactory.of(new 
OddEvenOperator()))
                         .addNonChainedOutputsCount(
@@ -1053,13 +1053,13 @@ public class OneInputStreamTaskTest extends TestLogger {
                 testHarness.processElement(new StreamRecord<>(2 * x + 1));
             }
 
-            final int secondOddEvenOperatorOutputsWithOddTag = numOddRecords;
-            final int secondOddEvenOperatorOutputsWithoutTag = numOddRecords + 
numEvenRecords;
+            final int oddEvenOperatorOutputsWithOddTag = numOddRecords;
+            final int oddEvenOperatorOutputsWithoutTag = numOddRecords + 
numEvenRecords;
             final int duplicatingOperatorOutput = (numOddRecords + 
numEvenRecords) * 2;
             assertEquals(numOddRecords + numEvenRecords, 
numRecordsInCounter.getCount());
             assertEquals(
-                    secondOddEvenOperatorOutputsWithOddTag
-                            + secondOddEvenOperatorOutputsWithoutTag
+                    oddEvenOperatorOutputsWithOddTag
+                            + oddEvenOperatorOutputsWithoutTag
                             + duplicatingOperatorOutput,
                     numRecordsOutCounter.getCount());
             testHarness.waitForTaskCompletion();
@@ -1070,6 +1070,15 @@ public class OneInputStreamTaskTest extends TestLogger {
         }
     }
 
+    static class PassThroughOperator<T> extends AbstractStreamOperator<T>
+            implements OneInputStreamOperator<T, T> {
+
+        @Override
+        public void processElement(StreamRecord<T> element) throws Exception {
+            output.collect(element);
+        }
+    }
+
     static class OddEvenOperator extends AbstractStreamOperator<Integer>
             implements OneInputStreamOperator<Integer, Integer> {
         private final OutputTag<Integer> oddOutputTag =
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 906076c678c..5272abc7f6c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -854,7 +854,7 @@ public class TwoInputStreamTaskTest {
                         .addInput(BasicTypeInfo.INT_TYPE_INFO)
                         .addInput(BasicTypeInfo.INT_TYPE_INFO)
                         .addAdditionalOutput(partitionWriters)
-                        .setupOperatorChain(new OperatorID(), new 
OddEvenOperator())
+                        .setupOperatorChain(new OperatorID(), new 
PassThroughOperator<>())
                         
.chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()))
                         .setOperatorFactory(
                                 SimpleOperatorFactory.of(
@@ -888,13 +888,13 @@ public class TwoInputStreamTaskTest {
                 testHarness.processElement(new StreamRecord<>(2 * x + 1));
             }
 
-            final int secondOddEvenOperatorOutputsWithOddTag = numOddRecords;
-            final int secondOddEvenOperatorOutputsWithoutTag = numOddRecords + 
numEvenRecords;
+            final int oddEvenOperatorOutputsWithOddTag = numOddRecords;
+            final int oddEvenOperatorOutputsWithoutTag = numOddRecords + 
numEvenRecords;
             final int duplicatingOperatorOutput = (numOddRecords + 
numEvenRecords) * 2;
             assertEquals(numOddRecords + numEvenRecords, 
numRecordsInCounter.getCount());
             assertEquals(
-                    secondOddEvenOperatorOutputsWithOddTag
-                            + secondOddEvenOperatorOutputsWithoutTag
+                    oddEvenOperatorOutputsWithOddTag
+                            + oddEvenOperatorOutputsWithoutTag
                             + duplicatingOperatorOutput,
                     numRecordsOutCounter.getCount());
             testHarness.waitForTaskCompletion();
@@ -905,6 +905,20 @@ public class TwoInputStreamTaskTest {
         }
     }
 
+    static class PassThroughOperator<T> extends AbstractStreamOperator<T>
+            implements TwoInputStreamOperator<T, T, T> {
+
+        @Override
+        public void processElement1(StreamRecord<T> element) throws Exception {
+            output.collect(element);
+        }
+
+        @Override
+        public void processElement2(StreamRecord<T> element) throws Exception {
+            output.collect(element);
+        }
+    }
+
     static class OddEvenOperator extends AbstractStreamOperator<Integer>
             implements TwoInputStreamOperator<Integer, Integer, Integer> {
         private final OutputTag<Integer> oddOutputTag =

Reply via email to