This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 74eb6841b23 Subscription: avoid null pointer exception when get 
current response due to concurrent operations (#14926) (#14932)
74eb6841b23 is described below

commit 74eb6841b23c58c5f238cfdd3f5573c2fc9ad600
Author: VGalaxies <[email protected]>
AuthorDate: Mon Feb 24 18:55:28 2025 +0800

    Subscription: avoid null pointer exception when get current response due to 
concurrent operations (#14926) (#14932)
---
 .../broker/SubscriptionPrefetchingTabletQueue.java            | 10 ++++++++++
 .../broker/SubscriptionPrefetchingTsFileQueue.java            | 10 ++++++++++
 .../event/cache/SubscriptionPollResponseCache.java            | 11 +++++++++++
 3 files changed, 31 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
index 41e4b060ae5..79374755f97 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
@@ -110,6 +110,16 @@ public class SubscriptionPrefetchingTabletQueue extends 
SubscriptionPrefetchingQ
           }
 
           final SubscriptionPollResponse response = ev.getCurrentResponse();
+          if (Objects.isNull(response)) {
+            final String errorMessage =
+                String.format(
+                    "current response is null when fetching next response, 
consumer id: %s commit context: %s, offset: %s, prefetching queue: %s",
+                    consumerId, commitContext, offset, this);
+            LOGGER.warn(errorMessage);
+            eventRef.set(generateSubscriptionPollErrorResponse(errorMessage));
+            return ev;
+          }
+
           final SubscriptionPollPayload payload = response.getPayload();
 
           // 2. Check previous response type and offset
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index 978ed9fac46..f2961afd7f5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -116,6 +116,16 @@ public class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQ
           }
 
           final SubscriptionPollResponse response = ev.getCurrentResponse();
+          if (Objects.isNull(response)) {
+            final String errorMessage =
+                String.format(
+                    "current response is null when fetching next response, 
consumer id: %s commit context: %s, writing offset: %s, prefetching queue: %s",
+                    consumerId, commitContext, writingOffset, this);
+            LOGGER.warn(errorMessage);
+            eventRef.set(generateSubscriptionPollErrorResponse(errorMessage));
+            return ev;
+          }
+
           final SubscriptionPollPayload payload = response.getPayload();
 
           // 2. Check previous response type, file name and offset
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
index 8b003621687..4a49042b9ee 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Objects;
 import java.util.Optional;
 
 /**
@@ -49,6 +50,9 @@ public class SubscriptionPollResponseCache {
 
   public ByteBuffer serialize(final CachedSubscriptionPollResponse response) 
throws IOException {
     try {
+      if (Objects.isNull(response)) {
+        throw new IOException("null response when serializing");
+      }
       return this.cache.get(response);
     } catch (final Exception e) {
       LOGGER.warn(
@@ -61,6 +65,9 @@ public class SubscriptionPollResponseCache {
 
   public Optional<ByteBuffer> trySerialize(final 
CachedSubscriptionPollResponse response) {
     try {
+      if (Objects.isNull(response)) {
+        throw new IOException("null response when serializing");
+      }
       return Optional.of(serialize(response));
     } catch (final IOException e) {
       LOGGER.warn(
@@ -72,6 +79,10 @@ public class SubscriptionPollResponseCache {
   }
 
   public void invalidate(final CachedSubscriptionPollResponse response) {
+    if (Objects.isNull(response)) {
+      LOGGER.warn("null response when invalidating, skip it");
+      return;
+    }
     this.cache.invalidate(response);
     response.invalidateByteBuffer();
   }

Reply via email to