This is an automated email from the ASF dual-hosted git repository.
yichi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3c6fb0d [BEAM-13193] Allow BeamFnDataOutboundObserver to flush
elements. (#16778)
3c6fb0d is described below
commit 3c6fb0d93132c89c730e444349949a1bd1eb1fad
Author: Yichi Zhang <[email protected]>
AuthorDate: Tue Feb 8 19:58:29 2022 -0800
[BEAM-13193] Allow BeamFnDataOutboundObserver to flush elements. (#16778)
---
.../apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java | 11 +++++------
.../apache/beam/sdk/fn/data/BeamFnDataOutboundObserver.java | 4 +++-
2 files changed, 8 insertions(+), 7 deletions(-)
diff --git
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
index b1b88c9..45c9daa 100644
---
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
+++
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
@@ -107,8 +107,7 @@ public class BeamFnDataOutboundAggregator {
.setDaemon(true)
.setNameFormat("DataBufferOutboundFlusher-thread")
.build())
- .scheduleAtFixedRate(
- this::periodicFlush, timeLimit, timeLimit,
TimeUnit.MILLISECONDS);
+ .scheduleAtFixedRate(this::flush, timeLimit, timeLimit,
TimeUnit.MILLISECONDS);
}
}
@@ -160,7 +159,7 @@ public class BeamFnDataOutboundAggregator {
return receiver;
}
- private void flush() throws IOException {
+ private void flushInternal() {
if (bytesWrittenSinceFlush == 0) {
return;
}
@@ -265,10 +264,10 @@ public class BeamFnDataOutboundAggregator {
return bufferedElements;
}
- private void periodicFlush() {
+ void flush() {
try {
synchronized (flushLock) {
- flush();
+ flushInternal();
}
} catch (Throwable t) {
throw new RuntimeException(t);
@@ -348,7 +347,7 @@ public class BeamFnDataOutboundAggregator {
perBundleByteCount += delta;
perBundleElementCount += 1;
if (bytesWrittenSinceFlush > sizeLimit) {
- flush();
+ flushInternal();
}
}
diff --git
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundObserver.java
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundObserver.java
index 5baf88a..763f4bd 100644
---
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundObserver.java
+++
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundObserver.java
@@ -73,7 +73,9 @@ public class BeamFnDataOutboundObserver<T> implements
CloseableFnDataReceiver<T>
}
@Override
- public void flush() throws IOException {}
+ public void flush() throws IOException {
+ aggregator.flush();
+ }
@Override
public void accept(T t) throws Exception {