This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 72becd8 [FLINK-23308] Replace regular SourceContext with a throwing
one on close
72becd8 is described below
commit 72becd835603288cee160bd7cc7a31313cb17775
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Jul 9 14:07:15 2021 +0200
[FLINK-23308] Replace regular SourceContext with a throwing one on close
This commit adds a thin wrapper that will substitute on close, a regular
SourceFunction.SourceContext
with a one that throws an exception on any interaction. We do that instead
of adding a flag in WatermarkContext
for performance reasons.
This closes #16446
---
.../api/operators/StreamSourceContexts.java | 106 +++++++++++++++++----
1 file changed, 89 insertions(+), 17 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
index ac2d1b0..d72e9d5 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
@@ -74,7 +74,95 @@ public class StreamSourceContexts {
default:
throw new
IllegalArgumentException(String.valueOf(timeCharacteristic));
}
- return ctx;
+ return new SwitchingOnClose<>(ctx);
+ }
+
+ /**
+ * A thin wrapper that will substitute on close, a regular {@link
SourceFunction.SourceContext}
+ * with a one that throws an exception on any interaction. We do that
instead of adding a flag
+ * in {@link WatermarkContext} for performance reasons.
+ */
+ private static class SwitchingOnClose<T> implements
SourceFunction.SourceContext<T> {
+
+ private SourceFunction.SourceContext<T> nestedContext;
+
+ private SwitchingOnClose(SourceFunction.SourceContext<T>
nestedContext) {
+ this.nestedContext = nestedContext;
+ }
+
+ @Override
+ public void collect(T element) {
+ nestedContext.collect(element);
+ }
+
+ @Override
+ public void collectWithTimestamp(T element, long timestamp) {
+ nestedContext.collectWithTimestamp(element, timestamp);
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+ nestedContext.emitWatermark(mark);
+ }
+
+ @Override
+ public void markAsTemporarilyIdle() {
+ nestedContext.markAsTemporarilyIdle();
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return nestedContext.getCheckpointLock();
+ }
+
+ @Override
+ public void close() {
+ nestedContext.close();
+ this.nestedContext = new
ClosedContext<>(nestedContext.getCheckpointLock());
+ }
+ }
+
+ private static class ClosedContext<T> implements
SourceFunction.SourceContext<T> {
+
+ private final Object checkpointLock;
+
+ private ClosedContext(Object checkpointLock) {
+ this.checkpointLock = checkpointLock;
+ }
+
+ @Override
+ public void collect(T element) {
+ throwException();
+ }
+
+ @Override
+ public void collectWithTimestamp(T element, long timestamp) {
+ throwException();
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+ throwException();
+ }
+
+ @Override
+ public void markAsTemporarilyIdle() {
+ throwException();
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return checkpointLock;
+ }
+
+ @Override
+ public void close() {
+ // nothing to be done
+ }
+
+ private void throwException() {
+ throw new FlinkRuntimeException("The Source Context has been
closed already.");
+ }
}
/**
@@ -364,7 +452,6 @@ public class StreamSourceContexts {
protected final long idleTimeout;
private ScheduledFuture<?> nextCheck;
- private boolean closed = false;
/**
* This flag will be reset to {@code true} every time the next check
is scheduled. Whenever
@@ -403,8 +490,6 @@ public class StreamSourceContexts {
@Override
public final void collect(T element) {
- checkNotClosed();
-
synchronized (checkpointLock) {
processAndEmitStreamStatus(StreamStatus.ACTIVE);
@@ -418,16 +503,8 @@ public class StreamSourceContexts {
}
}
- private void checkNotClosed() {
- if (closed) {
- throw new FlinkRuntimeException("The Source Context has been
closed already.");
- }
- }
-
@Override
public final void collectWithTimestamp(T element, long timestamp) {
- checkNotClosed();
-
synchronized (checkpointLock) {
processAndEmitStreamStatus(StreamStatus.ACTIVE);
@@ -443,8 +520,6 @@ public class StreamSourceContexts {
@Override
public final void emitWatermark(Watermark mark) {
- checkNotClosed();
-
if (allowWatermark(mark)) {
synchronized (checkpointLock) {
processAndEmitStreamStatus(StreamStatus.ACTIVE);
@@ -462,8 +537,6 @@ public class StreamSourceContexts {
@Override
public final void markAsTemporarilyIdle() {
- checkNotClosed();
-
synchronized (checkpointLock) {
processAndEmitStreamStatus(StreamStatus.IDLE);
}
@@ -476,7 +549,6 @@ public class StreamSourceContexts {
@Override
public void close() {
- this.closed = true;
cancelNextIdleDetectionTask();
}