dawidwys commented on a change in pull request #16446:
URL: https://github.com/apache/flink/pull/16446#discussion_r666915966
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
##########
@@ -74,7 +74,96 @@
default:
throw new
IllegalArgumentException(String.valueOf(timeCharacteristic));
}
- return ctx;
+ return new SwitchingOnClose<>(ctx);
+ }
+
+ /**
+ * A thin wrapper that will substitute on close, a regular {@link
SourceFunction.SourceContext}
+ * with a one that throws an exception on any interaction. We do that
instead of adding a flag
+ * in {@link WatermarkContext} for performance reasons.
+ */
+ private static class SwitchingOnClose<T> implements
SourceFunction.SourceContext<T> {
+
+ private SourceFunction.SourceContext<T> nestedContext;
+
+ private SwitchingOnClose(SourceFunction.SourceContext<T>
nestedContext) {
+ this.nestedContext = nestedContext;
+ }
+
+ @Override
+ public void collect(T element) {
+ nestedContext.collect(element);
+ }
+
+ @Override
+ public void collectWithTimestamp(T element, long timestamp) {
+ nestedContext.collectWithTimestamp(element, timestamp);
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+ nestedContext.emitWatermark(mark);
+ }
+
+ @Override
+ public void markAsTemporarilyIdle() {
+ nestedContext.markAsTemporarilyIdle();
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return nestedContext.getCheckpointLock();
+ }
+
+ @Override
+ public void close() {
+ nestedContext.close();
+ this.nestedContext = new
ClosedContext<>(nestedContext.getCheckpointLock());
+ }
+ }
+
+ private static class ClosedContext<T> implements
SourceFunction.SourceContext<T> {
+
+ private final Object checkpointLock;
+
+ private ClosedContext(Object checkpointLock) {
+ this.checkpointLock = checkpointLock;
+ }
+
+ @Override
+ public void collect(T element) {
+ throwException();
+ throwException();
Review comment:
typo :facepalm:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]