This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch FLINK-18808-record-out in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3e6578edc4aeeefbd675b4735863e5f80f3144d0 Author: Weijie Guo <[email protected]> AuthorDate: Wed May 10 19:17:16 2023 +0800 fixup! [FLINK-18808][streaming] Include side outputs in numRecordsOut metric --- .../runtime/tasks/MultipleInputStreamTaskTest.java | 51 +++++++++++++++++++--- .../runtime/tasks/OneInputStreamTaskTest.java | 19 +++++--- .../runtime/tasks/TwoInputStreamTaskTest.java | 24 +++++++--- 3 files changed, 79 insertions(+), 15 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java index 59c79fa1ff4..799040b3648 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java @@ -1419,7 +1419,7 @@ public class MultipleInputStreamTaskTest { .addInput(BasicTypeInfo.INT_TYPE_INFO) .addInput(BasicTypeInfo.INT_TYPE_INFO) .addAdditionalOutput(partitionWriters) - .setupOperatorChain(new OddEvenOperatorFactory()) + .setupOperatorChain(new PassThroughOperatorFactory<>()) .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig())) .setOperatorFactory( SimpleOperatorFactory.of( @@ -1458,13 +1458,13 @@ public class MultipleInputStreamTaskTest { int totalOddRecords = numOddRecords + numNaturalRecords / 2; int totalEvenRecords = numEvenRecords + (int) Math.ceil(numNaturalRecords / 2.0); - final int secondOddEvenOperatorOutputsWithOddTag = totalOddRecords; - final int secondOddEvenOperatorOutputsWithoutTag = totalOddRecords + totalEvenRecords; + final int oddEvenOperatorOutputsWithOddTag = totalOddRecords; + final int oddEvenOperatorOutputsWithoutTag = totalOddRecords + totalEvenRecords; final int duplicatingOperatorOutput = (totalOddRecords + totalEvenRecords) * 2; assertEquals(totalOddRecords + totalEvenRecords, numRecordsInCounter.getCount()); assertEquals( - secondOddEvenOperatorOutputsWithOddTag - + secondOddEvenOperatorOutputsWithoutTag + oddEvenOperatorOutputsWithOddTag + + oddEvenOperatorOutputsWithoutTag + duplicatingOperatorOutput, numRecordsOutCounter.getCount()); testHarness.waitForTaskCompletion(); @@ -1475,6 +1475,47 @@ public class MultipleInputStreamTaskTest { } } + static class PassThroughOperator<T> extends AbstractStreamOperatorV2<T> + implements MultipleInputStreamOperator<T> { + + public PassThroughOperator(StreamOperatorParameters<T> parameters) { + super(parameters, 3); + } + + @Override + public List<Input> getInputs() { + return Arrays.asList( + new PassThroughInput<>(this, 1), + new PassThroughInput<>(this, 2), + new PassThroughInput<>(this, 3)); + } + + static class PassThroughInput<I> extends AbstractInput<I, I> { + + public PassThroughInput(AbstractStreamOperatorV2<I> owner, int inputId) { + super(owner, inputId); + } + + @Override + public void processElement(StreamRecord<I> element) throws Exception { + output.collect(element); + } + } + } + + private static class PassThroughOperatorFactory<T> extends AbstractStreamOperatorFactory<T> { + @Override + public <O extends StreamOperator<T>> O createStreamOperator( + StreamOperatorParameters<T> parameters) { + return (O) new PassThroughOperator<>(parameters); + } + + @Override + public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) { + return PassThroughOperator.class; + } + } + static class OddEvenOperator extends AbstractStreamOperatorV2<Integer> implements MultipleInputStreamOperator<Integer> { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index cb959899f0f..936fa21ca9d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -1023,7 +1023,7 @@ public class OneInputStreamTaskTest extends TestLogger { OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO) .addInput(BasicTypeInfo.INT_TYPE_INFO) .addAdditionalOutput(partitionWriters) - .setupOperatorChain(new OperatorID(), new OddEvenOperator()) + .setupOperatorChain(new OperatorID(), new PassThroughOperator<>()) .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig())) .setOperatorFactory(SimpleOperatorFactory.of(new OddEvenOperator())) .addNonChainedOutputsCount( @@ -1053,13 +1053,13 @@ public class OneInputStreamTaskTest extends TestLogger { testHarness.processElement(new StreamRecord<>(2 * x + 1)); } - final int secondOddEvenOperatorOutputsWithOddTag = numOddRecords; - final int secondOddEvenOperatorOutputsWithoutTag = numOddRecords + numEvenRecords; + final int oddEvenOperatorOutputsWithOddTag = numOddRecords; + final int oddEvenOperatorOutputsWithoutTag = numOddRecords + numEvenRecords; final int duplicatingOperatorOutput = (numOddRecords + numEvenRecords) * 2; assertEquals(numOddRecords + numEvenRecords, numRecordsInCounter.getCount()); assertEquals( - secondOddEvenOperatorOutputsWithOddTag - + secondOddEvenOperatorOutputsWithoutTag + oddEvenOperatorOutputsWithOddTag + + oddEvenOperatorOutputsWithoutTag + duplicatingOperatorOutput, numRecordsOutCounter.getCount()); testHarness.waitForTaskCompletion(); @@ -1070,6 +1070,15 @@ public class OneInputStreamTaskTest extends TestLogger { } } + static class PassThroughOperator<T> extends AbstractStreamOperator<T> + implements OneInputStreamOperator<T, T> { + + @Override + public void processElement(StreamRecord<T> element) throws Exception { + output.collect(element); + } + } + static class OddEvenOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> { private final OutputTag<Integer> oddOutputTag = diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java index 906076c678c..5272abc7f6c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java @@ -854,7 +854,7 @@ public class TwoInputStreamTaskTest { .addInput(BasicTypeInfo.INT_TYPE_INFO) .addInput(BasicTypeInfo.INT_TYPE_INFO) .addAdditionalOutput(partitionWriters) - .setupOperatorChain(new OperatorID(), new OddEvenOperator()) + .setupOperatorChain(new OperatorID(), new PassThroughOperator<>()) .chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig())) .setOperatorFactory( SimpleOperatorFactory.of( @@ -888,13 +888,13 @@ public class TwoInputStreamTaskTest { testHarness.processElement(new StreamRecord<>(2 * x + 1)); } - final int secondOddEvenOperatorOutputsWithOddTag = numOddRecords; - final int secondOddEvenOperatorOutputsWithoutTag = numOddRecords + numEvenRecords; + final int oddEvenOperatorOutputsWithOddTag = numOddRecords; + final int oddEvenOperatorOutputsWithoutTag = numOddRecords + numEvenRecords; final int duplicatingOperatorOutput = (numOddRecords + numEvenRecords) * 2; assertEquals(numOddRecords + numEvenRecords, numRecordsInCounter.getCount()); assertEquals( - secondOddEvenOperatorOutputsWithOddTag - + secondOddEvenOperatorOutputsWithoutTag + oddEvenOperatorOutputsWithOddTag + + oddEvenOperatorOutputsWithoutTag + duplicatingOperatorOutput, numRecordsOutCounter.getCount()); testHarness.waitForTaskCompletion(); @@ -905,6 +905,20 @@ public class TwoInputStreamTaskTest { } } + static class PassThroughOperator<T> extends AbstractStreamOperator<T> + implements TwoInputStreamOperator<T, T, T> { + + @Override + public void processElement1(StreamRecord<T> element) throws Exception { + output.collect(element); + } + + @Override + public void processElement2(StreamRecord<T> element) throws Exception { + output.collect(element); + } + } + static class OddEvenOperator extends AbstractStreamOperator<Integer> implements TwoInputStreamOperator<Integer, Integer, Integer> { private final OutputTag<Integer> oddOutputTag =
