This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/develop by this push:
     new 036f92ec fix(state) fix bug: npl when rebalance
     new 0a735fb8 Merge pull request #288 from ni-ze/develop
036f92ec is described below

commit 036f92ec909632d566215eb2918e07a5e60e5fdb
Author: 维章 <[email protected]>
AuthorDate: Thu May 4 10:15:58 2023 +0800

    fix(state) fix bug: npl when rebalance
---
 .../org/apache/rocketmq/streams/core/state/AbstractStore.java  | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/state/AbstractStore.java 
b/core/src/main/java/org/apache/rocketmq/streams/core/state/AbstractStore.java
index e7aab918..7b593348 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/state/AbstractStore.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/state/AbstractStore.java
@@ -145,12 +145,12 @@ public abstract class AbstractStore {
         }
 
         public Set<byte[]> getInCalculating(String stateTopicQueue) {
-            return calculating.get(stateTopicQueue);
+            return calculating.getOrDefault(stateTopicQueue, new HashSet<>());
         }
 
         public Set<byte[]> getAll(String stateTopicQueue) {
-            Set<byte[]> calculating = this.calculating.get(stateTopicQueue);
-            Set<byte[]> recover = this.recover.get(stateTopicQueue);
+            Set<byte[]> calculating = 
this.calculating.getOrDefault(stateTopicQueue, new HashSet<>());
+            Set<byte[]> recover = this.recover.getOrDefault(stateTopicQueue, 
new HashSet<>());
 
             Set<byte[]> result = new HashSet<>();
             result.addAll(calculating);
@@ -162,7 +162,7 @@ public abstract class AbstractStore {
 
         public String whichStateTopicQueueBelongTo(byte[] key) {
             for (String uniqueQueue : recover.keySet()) {
-                for (byte[] tempKeyByte : recover.get(uniqueQueue)) {
+                for (byte[] tempKeyByte : recover.getOrDefault(uniqueQueue, 
new HashSet<>())) {
                     if (Arrays.equals(tempKeyByte, key)) {
                         return uniqueQueue;
                     }
@@ -170,7 +170,7 @@ public abstract class AbstractStore {
             }
 
             for (String uniqueQueue : calculating.keySet()) {
-                for (byte[] tempKeyByte : calculating.get(uniqueQueue)) {
+                for (byte[] tempKeyByte : 
calculating.getOrDefault(uniqueQueue, new HashSet<>())) {
                     if (Arrays.equals(tempKeyByte, key)) {
                         return uniqueQueue;
                     }

Reply via email to