This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c260b5  [FLINK-22881] Revert toggling IDLE/ACTIVE on records in IDLE 
state
2c260b5 is described below

commit 2c260b53a37dfcc040c8d5cfbca36d010f9a3cef
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Mon Jun 7 12:40:45 2021 +0200

    [FLINK-22881] Revert toggling IDLE/ACTIVE on records in IDLE state
    
    This closes #16095
---
 .../streaming/runtime/io/RecordWriterOutput.java   |  44 ++---
 .../runtime/streamstatus/AnnouncedStatus.java      |  70 -------
 .../streaming/runtime/tasks/ChainingOutput.java    |  15 +-
 .../runtime/tasks/MultipleInputStreamTaskTest.java |   4 -
 .../runtime/tasks/OneInputStreamTaskTest.java      | 209 +++++++++++++++++++++
 5 files changed, 240 insertions(+), 102 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index 8d1d38c..0cefeff 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -31,7 +31,6 @@ import 
org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamstatus.AnnouncedStatus;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput;
 import org.apache.flink.util.OutputTag;
@@ -54,7 +53,7 @@ public class RecordWriterOutput<OUT> implements 
WatermarkGaugeExposingOutput<Str
 
     private final WatermarkGauge watermarkGauge = new WatermarkGauge();
 
-    private final AnnouncedStatus announcedStatus = new 
AnnouncedStatus(StreamStatus.ACTIVE);
+    private StreamStatus announcedStatus = StreamStatus.ACTIVE;
 
     @SuppressWarnings("unchecked")
     public RecordWriterOutput(
@@ -98,10 +97,9 @@ public class RecordWriterOutput<OUT> implements 
WatermarkGaugeExposingOutput<Str
     }
 
     private <X> void pushToRecordWriter(StreamRecord<X> record) {
-        // record could've been generated somewhere in the pipeline even 
though an IDLE status was
-        // emitted. It might've originated from a timer or just a wrong 
behaving operator
-        try (AutoCloseable ignored = 
announcedStatus.ensureActive(this::writeStreamStatus)) {
-            serializationDelegate.setInstance(record);
+        serializationDelegate.setInstance(record);
+
+        try {
             recordWriter.emit(serializationDelegate);
         } catch (Exception e) {
             throw new RuntimeException(e.getMessage(), e);
@@ -110,12 +108,14 @@ public class RecordWriterOutput<OUT> implements 
WatermarkGaugeExposingOutput<Str
 
     @Override
     public void emitWatermark(Watermark mark) {
-        // watermark could've been generated somewhere in the pipeline even 
though an IDLE status
-        // was emitted. It might've originated from a periodic watermark 
generator or just a wrong
-        // behaving operator
-        try (AutoCloseable ignored = 
announcedStatus.ensureActive(this::writeStreamStatus)) {
-            watermarkGauge.setCurrentWatermark(mark.getTimestamp());
-            serializationDelegate.setInstance(mark);
+        if (announcedStatus.isIdle()) {
+            return;
+        }
+
+        watermarkGauge.setCurrentWatermark(mark.getTimestamp());
+        serializationDelegate.setInstance(mark);
+
+        try {
             recordWriter.broadcastEmit(serializationDelegate);
         } catch (Exception e) {
             throw new RuntimeException(e.getMessage(), e);
@@ -124,18 +124,14 @@ public class RecordWriterOutput<OUT> implements 
WatermarkGaugeExposingOutput<Str
 
     @Override
     public void emitStreamStatus(StreamStatus streamStatus) {
-        if (!announcedStatus.getCurrentStatus().equals(streamStatus)) {
-            announcedStatus.setCurrentStatus(streamStatus);
-            writeStreamStatus(streamStatus);
-        }
-    }
-
-    private void writeStreamStatus(StreamStatus streamStatus) {
-        serializationDelegate.setInstance(streamStatus);
-        try {
-            recordWriter.broadcastEmit(serializationDelegate);
-        } catch (Exception e) {
-            throw new RuntimeException(e.getMessage(), e);
+        if (!announcedStatus.equals(streamStatus)) {
+            announcedStatus = streamStatus;
+            serializationDelegate.setInstance(streamStatus);
+            try {
+                recordWriter.broadcastEmit(serializationDelegate);
+            } catch (Exception e) {
+                throw new RuntimeException(e.getMessage(), e);
+            }
         }
     }
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/AnnouncedStatus.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/AnnouncedStatus.java
deleted file mode 100644
index 3cd055b..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/AnnouncedStatus.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.streamstatus;
-
-import org.apache.flink.annotation.Internal;
-
-import java.util.function.Consumer;
-
-/**
- * {@link StreamStatus#IDLE} requires that no records nor watermarks travel 
through the branch. In
- * order to keep the older behaviour that records could've been generated down 
the pipeline even
- * though the sources were idle we go through a short ACTIVE/IDLE loop. This 
is a helper class that
- * lets you easily flip the status around a code block.
- */
-@Internal
-public final class AnnouncedStatus {
-    private StreamStatus currentStatus;
-
-    public AnnouncedStatus(StreamStatus currentStatus) {
-        this.currentStatus = currentStatus;
-    }
-
-    public StreamStatus getCurrentStatus() {
-        return currentStatus;
-    }
-
-    public void setCurrentStatus(StreamStatus currentStatus) {
-        this.currentStatus = currentStatus;
-    }
-
-    /**
-     * Makes sure that the last emitted StreamStatus was ACTIVE.
-     *
-     * <p>Example usage:
-     *
-     * <pre>{@code
-     * try (AutoCloseable ignored = 
announcedStatus.ensureActive(this::writeStreamStatus)) {
-     *     serializationDelegate.setInstance(record);
-     *     recordWriter.emit(serializationDelegate);
-     * } catch (Exception e) {
-     *     throw new RuntimeException(e.getMessage(), e);
-     * }
-     * }</pre>
-     *
-     * @param statusConsumer a consumer which sends the status downstream
-     */
-    public AutoCloseable ensureActive(Consumer<StreamStatus> statusConsumer) {
-        if (currentStatus.isIdle()) {
-            statusConsumer.accept(StreamStatus.ACTIVE);
-            return () -> statusConsumer.accept(StreamStatus.IDLE);
-        }
-        return () -> {};
-    }
-}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
index 7455cf1..64f1cce 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
@@ -44,6 +44,7 @@ class ChainingOutput<T> implements 
WatermarkGaugeExposingOutput<StreamRecord<T>>
     protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
     @Nullable protected final OutputTag<T> outputTag;
     @Nullable protected final AutoCloseable closeable;
+    protected StreamStatus announcedStatus = StreamStatus.ACTIVE;
 
     public ChainingOutput(OneInputStreamOperator<T, ?> operator, @Nullable 
OutputTag<T> outputTag) {
         this(operator, (OperatorMetricGroup) operator.getMetricGroup(), 
outputTag, operator::close);
@@ -106,6 +107,9 @@ class ChainingOutput<T> implements 
WatermarkGaugeExposingOutput<StreamRecord<T>>
 
     @Override
     public void emitWatermark(Watermark mark) {
+        if (announcedStatus.isIdle()) {
+            return;
+        }
         try {
             watermarkGauge.setCurrentWatermark(mark.getTimestamp());
             input.processWatermark(mark);
@@ -141,10 +145,13 @@ class ChainingOutput<T> implements 
WatermarkGaugeExposingOutput<StreamRecord<T>>
 
     @Override
     public void emitStreamStatus(StreamStatus streamStatus) {
-        try {
-            input.processStreamStatus(streamStatus);
-        } catch (Exception e) {
-            throw new ExceptionInChainedOperatorException(e);
+        if (!announcedStatus.equals(streamStatus)) {
+            announcedStatus = streamStatus;
+            try {
+                input.processStreamStatus(streamStatus);
+            } catch (Exception e) {
+                throw new ExceptionInChainedOperatorException(e);
+            }
         }
     }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
index 4075ba6..6f5e8eb 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
@@ -626,12 +626,8 @@ public class MultipleInputStreamTaskTest {
             // make source active once again, emit a watermark and go idle 
again.
             addSourceRecords(testHarness, 1, initialTime + 10);
 
-            // FLIP-27 sources do not emit active status on new records, we 
wrap a record with
-            // ACTIVE/IDLE sequence
-            expectedOutput.add(StreamStatus.ACTIVE);
             expectedOutput.add(
                     new StreamRecord<>("" + (initialTime + 10), 
TimestampAssigner.NO_TIMESTAMP));
-            expectedOutput.add(StreamStatus.IDLE);
             expectedOutput.add(StreamStatus.ACTIVE); // activate source on new 
watermark
             expectedOutput.add(new Watermark(initialTime + 10)); // forward W 
from source
             expectedOutput.add(StreamStatus.IDLE); // go idle after reading 
all records
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 858f022..c845e1d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -264,6 +264,138 @@ public class OneInputStreamTaskTest extends TestLogger {
         assertEquals(2, resultElements.size());
     }
 
+    /**
+     * This test verifies that watermarks are not forwarded when the task is 
idle. It also verifies
+     * that when task is idle, watermarks generated in the middle of chains 
are also blocked and
+     * never forwarded.
+     *
+     * <p>The tested chain will be: (HEAD: normal operator) --> (watermark 
generating operator) -->
+     * (normal operator). The operators will throw an exception and fail the 
test if either of them
+     * were forwarded watermarks when the task is idle.
+     */
+    @Test
+    public void testWatermarksNotForwardedWithinChainWhenIdle() throws 
Exception {
+
+        final OneInputStreamTaskTestHarness<String, String> testHarness =
+                new OneInputStreamTaskTestHarness<>(
+                        OneInputStreamTask::new,
+                        1,
+                        1,
+                        BasicTypeInfo.STRING_TYPE_INFO,
+                        BasicTypeInfo.STRING_TYPE_INFO);
+
+        TriggerableFailOnWatermarkTestOperator headOperator =
+                new TriggerableFailOnWatermarkTestOperator();
+        WatermarkGeneratingTestOperator watermarkOperator = new 
WatermarkGeneratingTestOperator();
+        TriggerableFailOnWatermarkTestOperator tailOperator =
+                new TriggerableFailOnWatermarkTestOperator();
+
+        testHarness
+                .setupOperatorChain(new OperatorID(42L, 42L), headOperator)
+                .chain(new OperatorID(4711L, 42L), watermarkOperator, 
StringSerializer.INSTANCE)
+                .chain(new OperatorID(123L, 123L), tailOperator, 
StringSerializer.INSTANCE)
+                .finish();
+
+        // --------------------- begin test ---------------------
+
+        ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+        testHarness.invoke();
+        testHarness.waitForTaskRunning();
+
+        // the task starts as active, so all generated watermarks should be 
forwarded
+        testHarness.processElement(
+                new StreamRecord<>(
+                        
TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER));
+
+        testHarness.processElement(new StreamRecord<>("10"), 0, 0);
+
+        // this watermark will be forwarded since the task is currently active,
+        // but should not be in the final output because it should be blocked 
by the watermark
+        // generator in the chain
+        testHarness.processElement(new Watermark(15));
+
+        testHarness.processElement(new StreamRecord<>("20"), 0, 0);
+        testHarness.processElement(new StreamRecord<>("30"), 0, 0);
+
+        testHarness.waitForInputProcessing();
+
+        expectedOutput.add(
+                new StreamRecord<>(
+                        
TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER));
+        expectedOutput.add(new StreamRecord<>("10"));
+        expectedOutput.add(new Watermark(10));
+        expectedOutput.add(new StreamRecord<>("20"));
+        expectedOutput.add(new Watermark(20));
+        expectedOutput.add(new StreamRecord<>("30"));
+        expectedOutput.add(new Watermark(30));
+        TestHarnessUtil.assertOutputEquals(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        // now, toggle the task to be idle, and let the watermark generator 
produce some watermarks
+        testHarness.processElement(StreamStatus.IDLE);
+
+        // after this, the operators will throw an exception if they are 
forwarded watermarks
+        // anywhere in the chain
+        testHarness.processElement(
+                new StreamRecord<>(
+                        
TriggerableFailOnWatermarkTestOperator.NO_FORWARDED_WATERMARKS_MARKER));
+
+        // NOTE: normally, tasks will not have records to process while idle;
+        // we're doing this here only to mimic watermark generating in 
operators
+        testHarness.processElement(new StreamRecord<>("40"), 0, 0);
+        testHarness.processElement(new StreamRecord<>("50"), 0, 0);
+        testHarness.processElement(new StreamRecord<>("60"), 0, 0);
+        testHarness.processElement(
+                new Watermark(
+                        65)); // the test will fail if any of the operators 
were forwarded this
+        testHarness.waitForInputProcessing();
+
+        // the 40 - 60 watermarks should not be forwarded, only the stream 
status toggle element and
+        // records
+        expectedOutput.add(StreamStatus.IDLE);
+        expectedOutput.add(
+                new StreamRecord<>(
+                        
TriggerableFailOnWatermarkTestOperator.NO_FORWARDED_WATERMARKS_MARKER));
+        expectedOutput.add(new StreamRecord<>("40"));
+        expectedOutput.add(new StreamRecord<>("50"));
+        expectedOutput.add(new StreamRecord<>("60"));
+        TestHarnessUtil.assertOutputEquals(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        // re-toggle the task to be active and see if new watermarks are 
correctly forwarded again
+        testHarness.processElement(StreamStatus.ACTIVE);
+        testHarness.processElement(
+                new StreamRecord<>(
+                        
TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER));
+
+        testHarness.processElement(new StreamRecord<>("70"), 0, 0);
+        testHarness.processElement(new StreamRecord<>("80"), 0, 0);
+        testHarness.processElement(new StreamRecord<>("90"), 0, 0);
+        testHarness.waitForInputProcessing();
+
+        expectedOutput.add(StreamStatus.ACTIVE);
+        expectedOutput.add(
+                new StreamRecord<>(
+                        
TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER));
+        expectedOutput.add(new StreamRecord<>("70"));
+        expectedOutput.add(new Watermark(70));
+        expectedOutput.add(new StreamRecord<>("80"));
+        expectedOutput.add(new Watermark(80));
+        expectedOutput.add(new StreamRecord<>("90"));
+        expectedOutput.add(new Watermark(90));
+        TestHarnessUtil.assertOutputEquals(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.endInput();
+
+        testHarness.waitForTaskCompletion();
+
+        List<String> resultElements =
+                
TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
+        assertEquals(12, resultElements.size());
+    }
+
     /** This test verifies that checkpoint barriers are correctly forwarded. */
     @Test
     public void testCheckpointBarriers() throws Exception {
@@ -999,4 +1131,81 @@ public class OneInputStreamTaskTest extends TestLogger {
             return value;
         }
     }
+
+    /** A {@link TriggerableFailOnWatermarkTestOperator} that generates 
watermarks. */
+    private static class WatermarkGeneratingTestOperator
+            extends TriggerableFailOnWatermarkTestOperator {
+
+        private static final long serialVersionUID = -5064871833244157221L;
+
+        private long lastWatermark;
+
+        @Override
+        protected void handleElement(StreamRecord<String> element) {
+            long timestamp = Long.valueOf(element.getValue());
+            if (timestamp > lastWatermark) {
+                output.emitWatermark(new Watermark(timestamp));
+                lastWatermark = timestamp;
+            }
+        }
+
+        @Override
+        protected void handleWatermark(Watermark mark) {
+            if (mark.equals(Watermark.MAX_WATERMARK)) {
+                output.emitWatermark(mark);
+                lastWatermark = Long.MAX_VALUE;
+            }
+        }
+    }
+
+    /**
+     * An operator that can be triggered whether or not to expect watermarks 
forwarded to it,
+     * toggled by letting it process special trigger marker records.
+     *
+     * <p>If it receives a watermark when it's not expecting one, it'll throw 
an exception and fail.
+     */
+    private static class TriggerableFailOnWatermarkTestOperator
+            extends AbstractStreamOperator<String>
+            implements OneInputStreamOperator<String, String> {
+
+        private static final long serialVersionUID = 2048954179291813243L;
+
+        public static final String EXPECT_FORWARDED_WATERMARKS_MARKER = 
"EXPECT_WATERMARKS";
+        public static final String NO_FORWARDED_WATERMARKS_MARKER = 
"NO_WATERMARKS";
+
+        protected boolean expectForwardedWatermarks;
+
+        @Override
+        public void processElement(StreamRecord<String> element) throws 
Exception {
+            output.collect(element);
+
+            if (element.getValue().equals(EXPECT_FORWARDED_WATERMARKS_MARKER)) 
{
+                this.expectForwardedWatermarks = true;
+            } else if 
(element.getValue().equals(NO_FORWARDED_WATERMARKS_MARKER)) {
+                this.expectForwardedWatermarks = false;
+            } else {
+                handleElement(element);
+            }
+        }
+
+        @Override
+        public void processWatermark(Watermark mark) throws Exception {
+            if (!expectForwardedWatermarks) {
+                throw new Exception(
+                        "Received a "
+                                + mark
+                                + ", but this operator should not be forwarded 
watermarks.");
+            } else {
+                handleWatermark(mark);
+            }
+        }
+
+        protected void handleElement(StreamRecord<String> element) {
+            // do nothing
+        }
+
+        protected void handleWatermark(Watermark mark) {
+            output.emitWatermark(mark);
+        }
+    }
 }

Reply via email to