This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2c260b5 [FLINK-22881] Revert toggling IDLE/ACTIVE on records in IDLE
state
2c260b5 is described below
commit 2c260b53a37dfcc040c8d5cfbca36d010f9a3cef
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Mon Jun 7 12:40:45 2021 +0200
[FLINK-22881] Revert toggling IDLE/ACTIVE on records in IDLE state
This closes #16095
---
.../streaming/runtime/io/RecordWriterOutput.java | 44 ++---
.../runtime/streamstatus/AnnouncedStatus.java | 70 -------
.../streaming/runtime/tasks/ChainingOutput.java | 15 +-
.../runtime/tasks/MultipleInputStreamTaskTest.java | 4 -
.../runtime/tasks/OneInputStreamTaskTest.java | 209 +++++++++++++++++++++
5 files changed, 240 insertions(+), 102 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index 8d1d38c..0cefeff 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -31,7 +31,6 @@ import
org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamstatus.AnnouncedStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput;
import org.apache.flink.util.OutputTag;
@@ -54,7 +53,7 @@ public class RecordWriterOutput<OUT> implements
WatermarkGaugeExposingOutput<Str
private final WatermarkGauge watermarkGauge = new WatermarkGauge();
- private final AnnouncedStatus announcedStatus = new
AnnouncedStatus(StreamStatus.ACTIVE);
+ private StreamStatus announcedStatus = StreamStatus.ACTIVE;
@SuppressWarnings("unchecked")
public RecordWriterOutput(
@@ -98,10 +97,9 @@ public class RecordWriterOutput<OUT> implements
WatermarkGaugeExposingOutput<Str
}
private <X> void pushToRecordWriter(StreamRecord<X> record) {
- // record could've been generated somewhere in the pipeline even
though an IDLE status was
- // emitted. It might've originated from a timer or just a wrong
behaving operator
- try (AutoCloseable ignored =
announcedStatus.ensureActive(this::writeStreamStatus)) {
- serializationDelegate.setInstance(record);
+ serializationDelegate.setInstance(record);
+
+ try {
recordWriter.emit(serializationDelegate);
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
@@ -110,12 +108,14 @@ public class RecordWriterOutput<OUT> implements
WatermarkGaugeExposingOutput<Str
@Override
public void emitWatermark(Watermark mark) {
- // watermark could've been generated somewhere in the pipeline even
though an IDLE status
- // was emitted. It might've originated from a periodic watermark
generator or just a wrong
- // behaving operator
- try (AutoCloseable ignored =
announcedStatus.ensureActive(this::writeStreamStatus)) {
- watermarkGauge.setCurrentWatermark(mark.getTimestamp());
- serializationDelegate.setInstance(mark);
+ if (announcedStatus.isIdle()) {
+ return;
+ }
+
+ watermarkGauge.setCurrentWatermark(mark.getTimestamp());
+ serializationDelegate.setInstance(mark);
+
+ try {
recordWriter.broadcastEmit(serializationDelegate);
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
@@ -124,18 +124,14 @@ public class RecordWriterOutput<OUT> implements
WatermarkGaugeExposingOutput<Str
@Override
public void emitStreamStatus(StreamStatus streamStatus) {
- if (!announcedStatus.getCurrentStatus().equals(streamStatus)) {
- announcedStatus.setCurrentStatus(streamStatus);
- writeStreamStatus(streamStatus);
- }
- }
-
- private void writeStreamStatus(StreamStatus streamStatus) {
- serializationDelegate.setInstance(streamStatus);
- try {
- recordWriter.broadcastEmit(serializationDelegate);
- } catch (Exception e) {
- throw new RuntimeException(e.getMessage(), e);
+ if (!announcedStatus.equals(streamStatus)) {
+ announcedStatus = streamStatus;
+ serializationDelegate.setInstance(streamStatus);
+ try {
+ recordWriter.broadcastEmit(serializationDelegate);
+ } catch (Exception e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
}
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/AnnouncedStatus.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/AnnouncedStatus.java
deleted file mode 100644
index 3cd055b..0000000
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/AnnouncedStatus.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.streamstatus;
-
-import org.apache.flink.annotation.Internal;
-
-import java.util.function.Consumer;
-
-/**
- * {@link StreamStatus#IDLE} requires that no records nor watermarks travel
through the branch. In
- * order to keep the older behaviour that records could've been generated down
the pipeline even
- * though the sources were idle we go through a short ACTIVE/IDLE loop. This
is a helper class that
- * lets you easily flip the status around a code block.
- */
-@Internal
-public final class AnnouncedStatus {
- private StreamStatus currentStatus;
-
- public AnnouncedStatus(StreamStatus currentStatus) {
- this.currentStatus = currentStatus;
- }
-
- public StreamStatus getCurrentStatus() {
- return currentStatus;
- }
-
- public void setCurrentStatus(StreamStatus currentStatus) {
- this.currentStatus = currentStatus;
- }
-
- /**
- * Makes sure that the last emitted StreamStatus was ACTIVE.
- *
- * <p>Example usage:
- *
- * <pre>{@code
- * try (AutoCloseable ignored =
announcedStatus.ensureActive(this::writeStreamStatus)) {
- * serializationDelegate.setInstance(record);
- * recordWriter.emit(serializationDelegate);
- * } catch (Exception e) {
- * throw new RuntimeException(e.getMessage(), e);
- * }
- * }</pre>
- *
- * @param statusConsumer a consumer which sends the status downstream
- */
- public AutoCloseable ensureActive(Consumer<StreamStatus> statusConsumer) {
- if (currentStatus.isIdle()) {
- statusConsumer.accept(StreamStatus.ACTIVE);
- return () -> statusConsumer.accept(StreamStatus.IDLE);
- }
- return () -> {};
- }
-}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
index 7455cf1..64f1cce 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
@@ -44,6 +44,7 @@ class ChainingOutput<T> implements
WatermarkGaugeExposingOutput<StreamRecord<T>>
protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
@Nullable protected final OutputTag<T> outputTag;
@Nullable protected final AutoCloseable closeable;
+ protected StreamStatus announcedStatus = StreamStatus.ACTIVE;
public ChainingOutput(OneInputStreamOperator<T, ?> operator, @Nullable
OutputTag<T> outputTag) {
this(operator, (OperatorMetricGroup) operator.getMetricGroup(),
outputTag, operator::close);
@@ -106,6 +107,9 @@ class ChainingOutput<T> implements
WatermarkGaugeExposingOutput<StreamRecord<T>>
@Override
public void emitWatermark(Watermark mark) {
+ if (announcedStatus.isIdle()) {
+ return;
+ }
try {
watermarkGauge.setCurrentWatermark(mark.getTimestamp());
input.processWatermark(mark);
@@ -141,10 +145,13 @@ class ChainingOutput<T> implements
WatermarkGaugeExposingOutput<StreamRecord<T>>
@Override
public void emitStreamStatus(StreamStatus streamStatus) {
- try {
- input.processStreamStatus(streamStatus);
- } catch (Exception e) {
- throw new ExceptionInChainedOperatorException(e);
+ if (!announcedStatus.equals(streamStatus)) {
+ announcedStatus = streamStatus;
+ try {
+ input.processStreamStatus(streamStatus);
+ } catch (Exception e) {
+ throw new ExceptionInChainedOperatorException(e);
+ }
}
}
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
index 4075ba6..6f5e8eb 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
@@ -626,12 +626,8 @@ public class MultipleInputStreamTaskTest {
// make source active once again, emit a watermark and go idle
again.
addSourceRecords(testHarness, 1, initialTime + 10);
- // FLIP-27 sources do not emit active status on new records, we
wrap a record with
- // ACTIVE/IDLE sequence
- expectedOutput.add(StreamStatus.ACTIVE);
expectedOutput.add(
new StreamRecord<>("" + (initialTime + 10),
TimestampAssigner.NO_TIMESTAMP));
- expectedOutput.add(StreamStatus.IDLE);
expectedOutput.add(StreamStatus.ACTIVE); // activate source on new
watermark
expectedOutput.add(new Watermark(initialTime + 10)); // forward W
from source
expectedOutput.add(StreamStatus.IDLE); // go idle after reading
all records
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 858f022..c845e1d 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -264,6 +264,138 @@ public class OneInputStreamTaskTest extends TestLogger {
assertEquals(2, resultElements.size());
}
+ /**
+ * This test verifies that watermarks are not forwarded when the task is
idle. It also verifies
+ * that when task is idle, watermarks generated in the middle of chains
are also blocked and
+ * never forwarded.
+ *
+ * <p>The tested chain will be: (HEAD: normal operator) --> (watermark
generating operator) -->
+ * (normal operator). The operators will throw an exception and fail the
test if either of them
+ * were forwarded watermarks when the task is idle.
+ */
+ @Test
+ public void testWatermarksNotForwardedWithinChainWhenIdle() throws
Exception {
+
+ final OneInputStreamTaskTestHarness<String, String> testHarness =
+ new OneInputStreamTaskTestHarness<>(
+ OneInputStreamTask::new,
+ 1,
+ 1,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ TriggerableFailOnWatermarkTestOperator headOperator =
+ new TriggerableFailOnWatermarkTestOperator();
+ WatermarkGeneratingTestOperator watermarkOperator = new
WatermarkGeneratingTestOperator();
+ TriggerableFailOnWatermarkTestOperator tailOperator =
+ new TriggerableFailOnWatermarkTestOperator();
+
+ testHarness
+ .setupOperatorChain(new OperatorID(42L, 42L), headOperator)
+ .chain(new OperatorID(4711L, 42L), watermarkOperator,
StringSerializer.INSTANCE)
+ .chain(new OperatorID(123L, 123L), tailOperator,
StringSerializer.INSTANCE)
+ .finish();
+
+ // --------------------- begin test ---------------------
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
+
+ testHarness.invoke();
+ testHarness.waitForTaskRunning();
+
+ // the task starts as active, so all generated watermarks should be
forwarded
+ testHarness.processElement(
+ new StreamRecord<>(
+
TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER));
+
+ testHarness.processElement(new StreamRecord<>("10"), 0, 0);
+
+ // this watermark will be forwarded since the task is currently active,
+ // but should not be in the final output because it should be blocked
by the watermark
+ // generator in the chain
+ testHarness.processElement(new Watermark(15));
+
+ testHarness.processElement(new StreamRecord<>("20"), 0, 0);
+ testHarness.processElement(new StreamRecord<>("30"), 0, 0);
+
+ testHarness.waitForInputProcessing();
+
+ expectedOutput.add(
+ new StreamRecord<>(
+
TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER));
+ expectedOutput.add(new StreamRecord<>("10"));
+ expectedOutput.add(new Watermark(10));
+ expectedOutput.add(new StreamRecord<>("20"));
+ expectedOutput.add(new Watermark(20));
+ expectedOutput.add(new StreamRecord<>("30"));
+ expectedOutput.add(new Watermark(30));
+ TestHarnessUtil.assertOutputEquals(
+ "Output was not correct.", expectedOutput,
testHarness.getOutput());
+
+ // now, toggle the task to be idle, and let the watermark generator
produce some watermarks
+ testHarness.processElement(StreamStatus.IDLE);
+
+ // after this, the operators will throw an exception if they are
forwarded watermarks
+ // anywhere in the chain
+ testHarness.processElement(
+ new StreamRecord<>(
+
TriggerableFailOnWatermarkTestOperator.NO_FORWARDED_WATERMARKS_MARKER));
+
+ // NOTE: normally, tasks will not have records to process while idle;
+ // we're doing this here only to mimic watermark generating in
operators
+ testHarness.processElement(new StreamRecord<>("40"), 0, 0);
+ testHarness.processElement(new StreamRecord<>("50"), 0, 0);
+ testHarness.processElement(new StreamRecord<>("60"), 0, 0);
+ testHarness.processElement(
+ new Watermark(
+ 65)); // the test will fail if any of the operators
were forwarded this
+ testHarness.waitForInputProcessing();
+
+ // the 40 - 60 watermarks should not be forwarded, only the stream
status toggle element and
+ // records
+ expectedOutput.add(StreamStatus.IDLE);
+ expectedOutput.add(
+ new StreamRecord<>(
+
TriggerableFailOnWatermarkTestOperator.NO_FORWARDED_WATERMARKS_MARKER));
+ expectedOutput.add(new StreamRecord<>("40"));
+ expectedOutput.add(new StreamRecord<>("50"));
+ expectedOutput.add(new StreamRecord<>("60"));
+ TestHarnessUtil.assertOutputEquals(
+ "Output was not correct.", expectedOutput,
testHarness.getOutput());
+
+ // re-toggle the task to be active and see if new watermarks are
correctly forwarded again
+ testHarness.processElement(StreamStatus.ACTIVE);
+ testHarness.processElement(
+ new StreamRecord<>(
+
TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER));
+
+ testHarness.processElement(new StreamRecord<>("70"), 0, 0);
+ testHarness.processElement(new StreamRecord<>("80"), 0, 0);
+ testHarness.processElement(new StreamRecord<>("90"), 0, 0);
+ testHarness.waitForInputProcessing();
+
+ expectedOutput.add(StreamStatus.ACTIVE);
+ expectedOutput.add(
+ new StreamRecord<>(
+
TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER));
+ expectedOutput.add(new StreamRecord<>("70"));
+ expectedOutput.add(new Watermark(70));
+ expectedOutput.add(new StreamRecord<>("80"));
+ expectedOutput.add(new Watermark(80));
+ expectedOutput.add(new StreamRecord<>("90"));
+ expectedOutput.add(new Watermark(90));
+ TestHarnessUtil.assertOutputEquals(
+ "Output was not correct.", expectedOutput,
testHarness.getOutput());
+
+ testHarness.endInput();
+
+ testHarness.waitForTaskCompletion();
+
+ List<String> resultElements =
+
TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
+ assertEquals(12, resultElements.size());
+ }
+
/** This test verifies that checkpoint barriers are correctly forwarded. */
@Test
public void testCheckpointBarriers() throws Exception {
@@ -999,4 +1131,81 @@ public class OneInputStreamTaskTest extends TestLogger {
return value;
}
}
+
+ /** A {@link TriggerableFailOnWatermarkTestOperator} that generates
watermarks. */
+ private static class WatermarkGeneratingTestOperator
+ extends TriggerableFailOnWatermarkTestOperator {
+
+ private static final long serialVersionUID = -5064871833244157221L;
+
+ private long lastWatermark;
+
+ @Override
+ protected void handleElement(StreamRecord<String> element) {
+ long timestamp = Long.valueOf(element.getValue());
+ if (timestamp > lastWatermark) {
+ output.emitWatermark(new Watermark(timestamp));
+ lastWatermark = timestamp;
+ }
+ }
+
+ @Override
+ protected void handleWatermark(Watermark mark) {
+ if (mark.equals(Watermark.MAX_WATERMARK)) {
+ output.emitWatermark(mark);
+ lastWatermark = Long.MAX_VALUE;
+ }
+ }
+ }
+
+ /**
+ * An operator that can be triggered whether or not to expect watermarks
forwarded to it,
+ * toggled by letting it process special trigger marker records.
+ *
+ * <p>If it receives a watermark when it's not expecting one, it'll throw
an exception and fail.
+ */
+ private static class TriggerableFailOnWatermarkTestOperator
+ extends AbstractStreamOperator<String>
+ implements OneInputStreamOperator<String, String> {
+
+ private static final long serialVersionUID = 2048954179291813243L;
+
+ public static final String EXPECT_FORWARDED_WATERMARKS_MARKER =
"EXPECT_WATERMARKS";
+ public static final String NO_FORWARDED_WATERMARKS_MARKER =
"NO_WATERMARKS";
+
+ protected boolean expectForwardedWatermarks;
+
+ @Override
+ public void processElement(StreamRecord<String> element) throws
Exception {
+ output.collect(element);
+
+ if (element.getValue().equals(EXPECT_FORWARDED_WATERMARKS_MARKER))
{
+ this.expectForwardedWatermarks = true;
+ } else if
(element.getValue().equals(NO_FORWARDED_WATERMARKS_MARKER)) {
+ this.expectForwardedWatermarks = false;
+ } else {
+ handleElement(element);
+ }
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ if (!expectForwardedWatermarks) {
+ throw new Exception(
+ "Received a "
+ + mark
+ + ", but this operator should not be forwarded
watermarks.");
+ } else {
+ handleWatermark(mark);
+ }
+ }
+
+ protected void handleElement(StreamRecord<String> element) {
+ // do nothing
+ }
+
+ protected void handleWatermark(Watermark mark) {
+ output.emitWatermark(mark);
+ }
+ }
}