This is an automated email from the ASF dual-hosted git repository.
ajothomas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new e1816f3e7 SAMZA-2797: Call flush during stop from
CoordinatorStreamWriter (#1692)
e1816f3e7 is described below
commit e1816f3e7f09c27b3642a3e62a356198feb020f7
Author: ajo thomas <[email protected]>
AuthorDate: Tue Nov 21 17:21:29 2023 -0800
SAMZA-2797: Call flush during stop from CoordinatorStreamWriter (#1692)
---
.../samza/coordinator/stream/CoordinatorStreamSystemProducer.java | 8 ++++++++
.../apache/samza/coordinator/stream/CoordinatorStreamWriter.java | 1 +
2 files changed, 9 insertions(+)
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
index 61c0ed93e..43f9dffcf 100644
---
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
@@ -110,6 +110,14 @@ public class CoordinatorStreamSystemProducer {
isStarted = false;
}
+ /**
+ * Flushes underlying system producer.
+ * */
+ public void flush(String source) {
+ log.info("Flushing coordinator stream producer.");
+ systemProducer.flush(source);
+ }
+
/**
* Serialize and send a coordinator stream message.
*
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
index da659818a..5bc10f1e9 100644
---
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
@@ -65,6 +65,7 @@ public class CoordinatorStreamWriter {
*/
public void stop() {
log.info("Stopping the coordinator stream producer.");
+ coordinatorStreamSystemProducer.flush(SOURCE);
coordinatorStreamSystemProducer.stop();
}