This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 72becd8  [FLINK-23308] Replace regular SourceContext with a throwing 
one on close
72becd8 is described below

commit 72becd835603288cee160bd7cc7a31313cb17775
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Jul 9 14:07:15 2021 +0200

    [FLINK-23308] Replace regular SourceContext with a throwing one on close
    
    This commit adds a thin wrapper that will substitute on close, a regular 
SourceFunction.SourceContext
    with a one that throws an exception on any interaction. We do that instead 
of adding a flag in WatermarkContext
    for performance reasons.
    
    This closes #16446
---
 .../api/operators/StreamSourceContexts.java        | 106 +++++++++++++++++----
 1 file changed, 89 insertions(+), 17 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
index ac2d1b0..d72e9d5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
@@ -74,7 +74,95 @@ public class StreamSourceContexts {
             default:
                 throw new 
IllegalArgumentException(String.valueOf(timeCharacteristic));
         }
-        return ctx;
+        return new SwitchingOnClose<>(ctx);
+    }
+
+    /**
+     * A thin wrapper that will substitute on close, a regular {@link 
SourceFunction.SourceContext}
+     * with a one that throws an exception on any interaction. We do that 
instead of adding a flag
+     * in {@link WatermarkContext} for performance reasons.
+     */
+    private static class SwitchingOnClose<T> implements 
SourceFunction.SourceContext<T> {
+
+        private SourceFunction.SourceContext<T> nestedContext;
+
+        private SwitchingOnClose(SourceFunction.SourceContext<T> 
nestedContext) {
+            this.nestedContext = nestedContext;
+        }
+
+        @Override
+        public void collect(T element) {
+            nestedContext.collect(element);
+        }
+
+        @Override
+        public void collectWithTimestamp(T element, long timestamp) {
+            nestedContext.collectWithTimestamp(element, timestamp);
+        }
+
+        @Override
+        public void emitWatermark(Watermark mark) {
+            nestedContext.emitWatermark(mark);
+        }
+
+        @Override
+        public void markAsTemporarilyIdle() {
+            nestedContext.markAsTemporarilyIdle();
+        }
+
+        @Override
+        public Object getCheckpointLock() {
+            return nestedContext.getCheckpointLock();
+        }
+
+        @Override
+        public void close() {
+            nestedContext.close();
+            this.nestedContext = new 
ClosedContext<>(nestedContext.getCheckpointLock());
+        }
+    }
+
+    private static class ClosedContext<T> implements 
SourceFunction.SourceContext<T> {
+
+        private final Object checkpointLock;
+
+        private ClosedContext(Object checkpointLock) {
+            this.checkpointLock = checkpointLock;
+        }
+
+        @Override
+        public void collect(T element) {
+            throwException();
+        }
+
+        @Override
+        public void collectWithTimestamp(T element, long timestamp) {
+            throwException();
+        }
+
+        @Override
+        public void emitWatermark(Watermark mark) {
+            throwException();
+        }
+
+        @Override
+        public void markAsTemporarilyIdle() {
+            throwException();
+        }
+
+        @Override
+        public Object getCheckpointLock() {
+            return checkpointLock;
+        }
+
+        @Override
+        public void close() {
+            // nothing to be done
+        }
+
+        private void throwException() {
+            throw new FlinkRuntimeException("The Source Context has been 
closed already.");
+        }
     }
 
     /**
@@ -364,7 +452,6 @@ public class StreamSourceContexts {
         protected final long idleTimeout;
 
         private ScheduledFuture<?> nextCheck;
-        private boolean closed = false;
 
         /**
          * This flag will be reset to {@code true} every time the next check 
is scheduled. Whenever
@@ -403,8 +490,6 @@ public class StreamSourceContexts {
 
         @Override
         public final void collect(T element) {
-            checkNotClosed();
-
             synchronized (checkpointLock) {
                 processAndEmitStreamStatus(StreamStatus.ACTIVE);
 
@@ -418,16 +503,8 @@ public class StreamSourceContexts {
             }
         }
 
-        private void checkNotClosed() {
-            if (closed) {
-                throw new FlinkRuntimeException("The Source Context has been 
closed already.");
-            }
-        }
-
         @Override
         public final void collectWithTimestamp(T element, long timestamp) {
-            checkNotClosed();
-
             synchronized (checkpointLock) {
                 processAndEmitStreamStatus(StreamStatus.ACTIVE);
 
@@ -443,8 +520,6 @@ public class StreamSourceContexts {
 
         @Override
         public final void emitWatermark(Watermark mark) {
-            checkNotClosed();
-
             if (allowWatermark(mark)) {
                 synchronized (checkpointLock) {
                     processAndEmitStreamStatus(StreamStatus.ACTIVE);
@@ -462,8 +537,6 @@ public class StreamSourceContexts {
 
         @Override
         public final void markAsTemporarilyIdle() {
-            checkNotClosed();
-
             synchronized (checkpointLock) {
                 processAndEmitStreamStatus(StreamStatus.IDLE);
             }
@@ -476,7 +549,6 @@ public class StreamSourceContexts {
 
         @Override
         public void close() {
-            this.closed = true;
             cancelNextIdleDetectionTask();
         }
 

Reply via email to