This is an automated email from the ASF dual-hosted git repository.

ajothomas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new e1816f3e7 SAMZA-2797: Call flush during stop from 
CoordinatorStreamWriter (#1692)
e1816f3e7 is described below

commit e1816f3e7f09c27b3642a3e62a356198feb020f7
Author: ajo thomas <[email protected]>
AuthorDate: Tue Nov 21 17:21:29 2023 -0800

    SAMZA-2797: Call flush during stop from CoordinatorStreamWriter (#1692)
---
 .../samza/coordinator/stream/CoordinatorStreamSystemProducer.java | 8 ++++++++
 .../apache/samza/coordinator/stream/CoordinatorStreamWriter.java  | 1 +
 2 files changed, 9 insertions(+)

diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
index 61c0ed93e..43f9dffcf 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
@@ -110,6 +110,14 @@ public class CoordinatorStreamSystemProducer {
     isStarted = false;
   }
 
+  /**
+   * Flushes underlying system producer.
+   * */
+  public void flush(String source) {
+    log.info("Flushing coordinator stream producer.");
+    systemProducer.flush(source);
+  }
+
   /**
    * Serialize and send a coordinator stream message.
    *
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
index da659818a..5bc10f1e9 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
@@ -65,6 +65,7 @@ public class CoordinatorStreamWriter {
    */
   public void stop() {
     log.info("Stopping the coordinator stream producer.");
+    coordinatorStreamSystemProducer.flush(SOURCE);
     coordinatorStreamSystemProducer.stop();
   }
 

Reply via email to