This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new baa77dd [FLINK-23308] Optimize checks if SourceContext is closed
baa77dd is described below
commit baa77dd639b2404745e95a029ec0c4bd461bdb4c
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Thu Jul 8 11:21:52 2021 +0200
[FLINK-23308] Optimize checks if SourceContext is closed
This commit tries to optimize checks on the hot path for closed
SourceContext. It removes unnecessary duplicated calls of
checkNotClosed.
There is no certainty it fixes the problem, however we want to run
benchmarks for a couple of days and see how it affects the performance.
---
.../streaming/api/operators/StreamSourceContexts.java | 18 +++++-------------
1 file changed, 5 insertions(+), 13 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
index 5dcf566..ac2d1b0 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
@@ -173,7 +173,6 @@ public class StreamSourceContexts {
@Override
protected void processAndCollect(T element) {
- checkNotClosed();
lastRecordTime = this.timeService.getCurrentProcessingTime();
output.collect(reuse.replace(element, lastRecordTime));
@@ -193,7 +192,6 @@ public class StreamSourceContexts {
@Override
protected void processAndCollectWithTimestamp(T element, long
timestamp) {
- checkNotClosed();
processAndCollect(element);
}
@@ -207,7 +205,6 @@ public class StreamSourceContexts {
/** This will only be called if allowWatermark returned {@code true}.
*/
@Override
protected void processAndEmitWatermark(Watermark mark) {
- checkNotClosed();
nextWatermarkTime = Long.MAX_VALUE;
output.emitWatermark(mark);
@@ -222,7 +219,6 @@ public class StreamSourceContexts {
@Override
protected void processAndEmitStreamStatus(StreamStatus streamStatus) {
- checkNotClosed();
if (idle != streamStatus.isIdle()) {
output.emitStreamStatus(streamStatus);
}
@@ -320,25 +316,21 @@ public class StreamSourceContexts {
@Override
protected void processAndCollect(T element) {
- checkNotClosed();
output.collect(reuse.replace(element));
}
@Override
protected void processAndCollectWithTimestamp(T element, long
timestamp) {
- checkNotClosed();
output.collect(reuse.replace(element, timestamp));
}
@Override
protected void processAndEmitWatermark(Watermark mark) {
- checkNotClosed();
output.emitWatermark(mark);
}
@Override
protected void processAndEmitStreamStatus(StreamStatus streamStatus) {
- checkNotClosed();
if (idle != streamStatus.isIdle()) {
output.emitStreamStatus(streamStatus);
}
@@ -410,7 +402,7 @@ public class StreamSourceContexts {
}
@Override
- public void collect(T element) {
+ public final void collect(T element) {
checkNotClosed();
synchronized (checkpointLock) {
@@ -426,14 +418,14 @@ public class StreamSourceContexts {
}
}
- protected void checkNotClosed() {
+ private void checkNotClosed() {
if (closed) {
throw new FlinkRuntimeException("The Source Context has been
closed already.");
}
}
@Override
- public void collectWithTimestamp(T element, long timestamp) {
+ public final void collectWithTimestamp(T element, long timestamp) {
checkNotClosed();
synchronized (checkpointLock) {
@@ -450,7 +442,7 @@ public class StreamSourceContexts {
}
@Override
- public void emitWatermark(Watermark mark) {
+ public final void emitWatermark(Watermark mark) {
checkNotClosed();
if (allowWatermark(mark)) {
@@ -469,7 +461,7 @@ public class StreamSourceContexts {
}
@Override
- public void markAsTemporarilyIdle() {
+ public final void markAsTemporarilyIdle() {
checkNotClosed();
synchronized (checkpointLock) {