This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new baa77dd  [FLINK-23308] Optimize checks if SourceContext is closed
baa77dd is described below

commit baa77dd639b2404745e95a029ec0c4bd461bdb4c
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Thu Jul 8 11:21:52 2021 +0200

    [FLINK-23308] Optimize checks if SourceContext is closed
    
    This commit tries to optimize checks on the hot path for closed
    SourceContext. It removes unnecessary duplicated calls of
    checkNotClosed.
    
    There is no certainty it fixes the problem, however we want to run
    benchmarks for a couple of days and see how it affects the performance.
---
 .../streaming/api/operators/StreamSourceContexts.java  | 18 +++++-------------
 1 file changed, 5 insertions(+), 13 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
index 5dcf566..ac2d1b0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
@@ -173,7 +173,6 @@ public class StreamSourceContexts {
 
         @Override
         protected void processAndCollect(T element) {
-            checkNotClosed();
             lastRecordTime = this.timeService.getCurrentProcessingTime();
             output.collect(reuse.replace(element, lastRecordTime));
 
@@ -193,7 +192,6 @@ public class StreamSourceContexts {
 
         @Override
         protected void processAndCollectWithTimestamp(T element, long 
timestamp) {
-            checkNotClosed();
             processAndCollect(element);
         }
 
@@ -207,7 +205,6 @@ public class StreamSourceContexts {
         /** This will only be called if allowWatermark returned {@code true}. 
*/
         @Override
         protected void processAndEmitWatermark(Watermark mark) {
-            checkNotClosed();
             nextWatermarkTime = Long.MAX_VALUE;
             output.emitWatermark(mark);
 
@@ -222,7 +219,6 @@ public class StreamSourceContexts {
 
         @Override
         protected void processAndEmitStreamStatus(StreamStatus streamStatus) {
-            checkNotClosed();
             if (idle != streamStatus.isIdle()) {
                 output.emitStreamStatus(streamStatus);
             }
@@ -320,25 +316,21 @@ public class StreamSourceContexts {
 
         @Override
         protected void processAndCollect(T element) {
-            checkNotClosed();
             output.collect(reuse.replace(element));
         }
 
         @Override
         protected void processAndCollectWithTimestamp(T element, long 
timestamp) {
-            checkNotClosed();
             output.collect(reuse.replace(element, timestamp));
         }
 
         @Override
         protected void processAndEmitWatermark(Watermark mark) {
-            checkNotClosed();
             output.emitWatermark(mark);
         }
 
         @Override
         protected void processAndEmitStreamStatus(StreamStatus streamStatus) {
-            checkNotClosed();
             if (idle != streamStatus.isIdle()) {
                 output.emitStreamStatus(streamStatus);
             }
@@ -410,7 +402,7 @@ public class StreamSourceContexts {
         }
 
         @Override
-        public void collect(T element) {
+        public final void collect(T element) {
             checkNotClosed();
 
             synchronized (checkpointLock) {
@@ -426,14 +418,14 @@ public class StreamSourceContexts {
             }
         }
 
-        protected void checkNotClosed() {
+        private void checkNotClosed() {
             if (closed) {
                 throw new FlinkRuntimeException("The Source Context has been 
closed already.");
             }
         }
 
         @Override
-        public void collectWithTimestamp(T element, long timestamp) {
+        public final void collectWithTimestamp(T element, long timestamp) {
             checkNotClosed();
 
             synchronized (checkpointLock) {
@@ -450,7 +442,7 @@ public class StreamSourceContexts {
         }
 
         @Override
-        public void emitWatermark(Watermark mark) {
+        public final void emitWatermark(Watermark mark) {
             checkNotClosed();
 
             if (allowWatermark(mark)) {
@@ -469,7 +461,7 @@ public class StreamSourceContexts {
         }
 
         @Override
-        public void markAsTemporarilyIdle() {
+        public final void markAsTemporarilyIdle() {
             checkNotClosed();
 
             synchronized (checkpointLock) {

Reply via email to