This is an automated email from the ASF dual-hosted git repository.
cbqiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geaflow.git
The following commit(s) were added to refs/heads/master by this push:
new 851b39ee fix data loss after failover (#633)
851b39ee is described below
commit 851b39ee28f8dafc26427108421dc5b4d4c269f5
Author: chzhoo <[email protected]>
AuthorDate: Wed Nov 12 19:48:59 2025 +0800
fix data loss after failover (#633)
---
.../apache/geaflow/dsl/connector/api/function/OffsetStore.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/org/apache/geaflow/dsl/connector/api/function/OffsetStore.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/org/apache/geaflow/dsl/connector/api/function/OffsetStore.java
index 413d2b11..31c20c14 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/org/apache/geaflow/dsl/connector/api/function/OffsetStore.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/org/apache/geaflow/dsl/connector/api/function/OffsetStore.java
@@ -85,9 +85,14 @@ public class OffsetStore {
storeContext.withKeySerializer(new DefaultKVSerializer(String.class,
String.class));
jsonOffsetStore.init(storeContext);
- this.bucketNum = 2 *
runtimeContext.getConfiguration().getLong(FrameworkConfigKeys.BATCH_NUMBER_PER_CHECKPOINT);
+ long bucketNum = 2 *
runtimeContext.getConfiguration().getLong(FrameworkConfigKeys.BATCH_NUMBER_PER_CHECKPOINT);
+ long streamFlyingNum =
runtimeContext.getConfiguration().getInteger(FrameworkConfigKeys.STREAMING_FLYING_BATCH_NUM)
+ 1;
+ if (bucketNum < streamFlyingNum) {
+ bucketNum = streamFlyingNum;
+ }
+ this.bucketNum = bucketNum;
this.kvStoreCache = new HashMap<>();
- LOGGER.info("init offset store, store type is: {}", backendType);
+ LOGGER.info("init offset store, store type is: {}, bucket num is: {}",
backendType, this.bucketNum);
}
public Offset readOffset(String partitionName, long batchId) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]