This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 74eb6841b23 Subscription: avoid null pointer exception when get
current response due to concurrent operations (#14926) (#14932)
74eb6841b23 is described below
commit 74eb6841b23c58c5f238cfdd3f5573c2fc9ad600
Author: VGalaxies <[email protected]>
AuthorDate: Mon Feb 24 18:55:28 2025 +0800
Subscription: avoid null pointer exception when get current response due to
concurrent operations (#14926) (#14932)
---
.../broker/SubscriptionPrefetchingTabletQueue.java | 10 ++++++++++
.../broker/SubscriptionPrefetchingTsFileQueue.java | 10 ++++++++++
.../event/cache/SubscriptionPollResponseCache.java | 11 +++++++++++
3 files changed, 31 insertions(+)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
index 41e4b060ae5..79374755f97 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
@@ -110,6 +110,16 @@ public class SubscriptionPrefetchingTabletQueue extends
SubscriptionPrefetchingQ
}
final SubscriptionPollResponse response = ev.getCurrentResponse();
+ if (Objects.isNull(response)) {
+ final String errorMessage =
+ String.format(
+ "current response is null when fetching next response,
consumer id: %s commit context: %s, offset: %s, prefetching queue: %s",
+ consumerId, commitContext, offset, this);
+ LOGGER.warn(errorMessage);
+ eventRef.set(generateSubscriptionPollErrorResponse(errorMessage));
+ return ev;
+ }
+
final SubscriptionPollPayload payload = response.getPayload();
// 2. Check previous response type and offset
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index 978ed9fac46..f2961afd7f5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -116,6 +116,16 @@ public class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQ
}
final SubscriptionPollResponse response = ev.getCurrentResponse();
+ if (Objects.isNull(response)) {
+ final String errorMessage =
+ String.format(
+ "current response is null when fetching next response,
consumer id: %s commit context: %s, writing offset: %s, prefetching queue: %s",
+ consumerId, commitContext, writingOffset, this);
+ LOGGER.warn(errorMessage);
+ eventRef.set(generateSubscriptionPollErrorResponse(errorMessage));
+ return ev;
+ }
+
final SubscriptionPollPayload payload = response.getPayload();
// 2. Check previous response type, file name and offset
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
index 8b003621687..4a49042b9ee 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Objects;
import java.util.Optional;
/**
@@ -49,6 +50,9 @@ public class SubscriptionPollResponseCache {
public ByteBuffer serialize(final CachedSubscriptionPollResponse response)
throws IOException {
try {
+ if (Objects.isNull(response)) {
+ throw new IOException("null response when serializing");
+ }
return this.cache.get(response);
} catch (final Exception e) {
LOGGER.warn(
@@ -61,6 +65,9 @@ public class SubscriptionPollResponseCache {
public Optional<ByteBuffer> trySerialize(final
CachedSubscriptionPollResponse response) {
try {
+ if (Objects.isNull(response)) {
+ throw new IOException("null response when serializing");
+ }
return Optional.of(serialize(response));
} catch (final IOException e) {
LOGGER.warn(
@@ -72,6 +79,10 @@ public class SubscriptionPollResponseCache {
}
public void invalidate(final CachedSubscriptionPollResponse response) {
+ if (Objects.isNull(response)) {
+ LOGGER.warn("null response when invalidating, skip it");
+ return;
+ }
this.cache.invalidate(response);
response.invalidateByteBuffer();
}