This is an automated email from the ASF dual-hosted git repository.

yichi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c6fb0d  [BEAM-13193] Allow BeamFnDataOutboundObserver to flush 
elements. (#16778)
3c6fb0d is described below

commit 3c6fb0d93132c89c730e444349949a1bd1eb1fad
Author: Yichi Zhang <[email protected]>
AuthorDate: Tue Feb 8 19:58:29 2022 -0800

    [BEAM-13193] Allow BeamFnDataOutboundObserver to flush elements. (#16778)
---
 .../apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java | 11 +++++------
 .../apache/beam/sdk/fn/data/BeamFnDataOutboundObserver.java   |  4 +++-
 2 files changed, 8 insertions(+), 7 deletions(-)

diff --git 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
index b1b88c9..45c9daa 100644
--- 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
+++ 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
@@ -107,8 +107,7 @@ public class BeamFnDataOutboundAggregator {
                       .setDaemon(true)
                       .setNameFormat("DataBufferOutboundFlusher-thread")
                       .build())
-              .scheduleAtFixedRate(
-                  this::periodicFlush, timeLimit, timeLimit, 
TimeUnit.MILLISECONDS);
+              .scheduleAtFixedRate(this::flush, timeLimit, timeLimit, 
TimeUnit.MILLISECONDS);
     }
   }
 
@@ -160,7 +159,7 @@ public class BeamFnDataOutboundAggregator {
     return receiver;
   }
 
-  private void flush() throws IOException {
+  private void flushInternal() {
     if (bytesWrittenSinceFlush == 0) {
       return;
     }
@@ -265,10 +264,10 @@ public class BeamFnDataOutboundAggregator {
     return bufferedElements;
   }
 
-  private void periodicFlush() {
+  void flush() {
     try {
       synchronized (flushLock) {
-        flush();
+        flushInternal();
       }
     } catch (Throwable t) {
       throw new RuntimeException(t);
@@ -348,7 +347,7 @@ public class BeamFnDataOutboundAggregator {
       perBundleByteCount += delta;
       perBundleElementCount += 1;
       if (bytesWrittenSinceFlush > sizeLimit) {
-        flush();
+        flushInternal();
       }
     }
 
diff --git 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundObserver.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundObserver.java
index 5baf88a..763f4bd 100644
--- 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundObserver.java
+++ 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundObserver.java
@@ -73,7 +73,9 @@ public class BeamFnDataOutboundObserver<T> implements 
CloseableFnDataReceiver<T>
   }
 
   @Override
-  public void flush() throws IOException {}
+  public void flush() throws IOException {
+    aggregator.flush();
+  }
 
   @Override
   public void accept(T t) throws Exception {

Reply via email to