This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 376ed3c4957 Subscription: support payload size control fallback
strategy & fix issue where subscription events cannot be auto recycled & fix
issue where the reference count of tablet events for tsfile topic cannot
decrease to zero (#13053)
376ed3c4957 is described below
commit 376ed3c495797e118c3ebfac9b6abef14c65941d
Author: V_Galaxy <[email protected]>
AuthorDate: Mon Jul 29 15:19:05 2024 +0800
Subscription: support payload size control fallback strategy & fix issue
where subscription events cannot be auto recycled & fix issue where the
reference count of tablet events for tsfile topic cannot decrease to zero
(#13053)
---
.../consumer/SubscriptionPushConsumer.java | 2 +-
.../db/subscription/event/SubscriptionEvent.java | 6 ++--
.../batch/SubscriptionPipeTsFileEventBatch.java | 7 ++++
.../receiver/SubscriptionReceiverV1.java | 41 +++++++++++++++++-----
.../apache/iotdb/commons/conf/CommonConfig.java | 11 ++++++
.../iotdb/commons/conf/CommonDescriptor.java | 5 +++
.../subscription/config/SubscriptionConfig.java | 5 +++
7 files changed, 64 insertions(+), 13 deletions(-)
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
index 69cf13f9bef..e2dfe797457 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
@@ -184,7 +184,7 @@ public class SubscriptionPushConsumer extends
SubscriptionConsumer {
final ConsumeResult consumeResult;
try {
consumeResult = consumeListener.onReceive(message);
- if (consumeResult.equals(ConsumeResult.SUCCESS)) {
+ if (Objects.equals(consumeResult, ConsumeResult.SUCCESS)) {
messagesToAck.add(message);
} else {
LOGGER.warn("Consumer listener result failure when consuming
message: {}", message);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
index aa68a522b22..bf0e409fdd4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
@@ -58,6 +58,7 @@ public class SubscriptionEvent {
private final SubscriptionCommitContext
commitContext; // all responses have the same commit context
+ // lastPolledConsumerId is not used as a criterion for determining
pollability
private String lastPolledConsumerId;
private long lastPolledTimestamp;
private long committedTimestamp;
@@ -163,9 +164,6 @@ public class SubscriptionEvent {
if (lastPolledTimestamp == INVALID_TIMESTAMP) {
return true;
}
- if (Objects.nonNull(lastPolledConsumerId)) {
- return false;
- }
// Recycle events that may not be able to be committed, i.e., those that
have been polled but
// not committed within a certain period of time.
return System.currentTimeMillis() - lastPolledTimestamp
@@ -176,7 +174,7 @@ public class SubscriptionEvent {
// reset current response index
currentResponseIndex = 0;
- lastPolledConsumerId = null;
+ // reset lastPolledTimestamp makes this event pollable
lastPolledTimestamp = INVALID_TIMESTAMP;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index c36f9ca3964..a307cfc9568 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.subscription.event.batch;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch;
import
org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTsFileQueue;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
@@ -61,6 +62,12 @@ public class SubscriptionPipeTsFileEventBatch {
}
if (Objects.nonNull(event)) {
batch.onEvent(event);
+ if (event instanceof EnrichedEvent) {
+ ((EnrichedEvent) event)
+ .decreaseReferenceCount(
+ SubscriptionPipeTsFileEventBatch.class.getName(),
+ false); // missing releaseLastEvent decreases reference count
+ }
}
if (batch.shouldEmit()) {
final List<SubscriptionEvent> events = generateSubscriptionEvents();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index 69f60f593bd..77cdba455d3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.consensus.ConfigRegionId;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.confignode.rpc.thrift.TCloseConsumerReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateConsumerReq;
import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
@@ -77,12 +78,18 @@ import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
public class SubscriptionReceiverV1 implements SubscriptionReceiver {
private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionReceiverV1.class);
+ private static final long POLL_PAYLOAD_MAX_SIZE =
+ SubscriptionConfig.getInstance().getSubscriptionPollPayloadMaxSize();
+
+ private static final double POLL_PAYLOAD_SIZE_THRESHOLD =
POLL_PAYLOAD_MAX_SIZE * 0.75;
+
private static final IClientManager<ConfigRegionId, ConfigNodeClient>
CONFIG_NODE_CLIENT_MANAGER =
ConfigNodeClientManager.getInstance();
@@ -358,23 +365,44 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
}
// generate response
+ final AtomicLong totalSize = new AtomicLong();
return PipeSubscribePollResp.toTPipeSubscribeResp(
RpcUtils.SUCCESS_STATUS,
- events.parallelStream()
+ events.stream()
.map(
(event) -> {
+ final SubscriptionCommitContext commitContext =
event.getCommitContext();
final SubscriptionPollResponse response =
event.getCurrentResponse();
if (Objects.isNull(response)) {
- throw new SubscriptionException("null response");
+ LOGGER.warn(
+ "Subscription: consumer {} poll null response for
event {} with request: {}",
+ consumerConfig,
+ event,
+ req.getRequest());
+ // nack
+ SubscriptionAgent.broker()
+ .commit(consumerConfig,
Collections.singletonList(commitContext), true);
+ return null;
}
- final SubscriptionCommitContext commitContext =
response.getCommitContext();
+
try {
final ByteBuffer byteBuffer =
event.getCurrentResponseByteBuffer();
+
+ // payload size control
+ final long size = byteBuffer.limit();
+ if (totalSize.get() + size >
POLL_PAYLOAD_SIZE_THRESHOLD) {
+ throw new SubscriptionException(
+ String.format(
+ "payload size will exceed the threshold %s",
+ POLL_PAYLOAD_SIZE_THRESHOLD));
+ }
+ totalSize.getAndAdd(size);
+
SubscriptionPrefetchingQueueMetrics.getInstance()
.mark(
SubscriptionPrefetchingQueue.generatePrefetchingQueueId(
commitContext.getConsumerGroupId(),
commitContext.getTopicName()),
- byteBuffer.limit());
+ size);
event.resetResponseByteBuffer(false);
LOGGER.info(
"Subscription: consumer {} poll {} successfully with
request: {}",
@@ -391,10 +419,7 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
e);
// nack
SubscriptionAgent.broker()
- .commit(
- consumerConfig,
-
Collections.singletonList(response.getCommitContext()),
- true);
+ .commit(consumerConfig,
Collections.singletonList(commitContext), true);
return null;
}
})
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 45785f045b9..abcf0679dca 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -269,6 +269,9 @@ public class CommonConfig {
private long subscriptionReadFileBufferSize = 8 * MB;
private long subscriptionTsFileDeduplicationWindowSeconds = 120; // 120s
+ // default to SessionConfig.DEFAULT_MAX_FRAME_SIZE
+ private long subscriptionPollPayloadMaxSize = 64 * MB;
+
/** Whether to use persistent schema mode. */
private String schemaEngineMode = "Memory";
@@ -1231,6 +1234,14 @@ public class CommonConfig {
subscriptionTsFileDeduplicationWindowSeconds;
}
+ public long getSubscriptionPollPayloadMaxSize() {
+ return subscriptionPollPayloadMaxSize;
+ }
+
+ public void setSubscriptionPollPayloadMaxSize(long
subscriptionPollPayloadMaxSize) {
+ this.subscriptionPollPayloadMaxSize = subscriptionPollPayloadMaxSize;
+ }
+
public String getSchemaEngineMode() {
return schemaEngineMode;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 31275dcaee9..864df3d75d7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -651,6 +651,11 @@ public class CommonDescriptor {
properties.getProperty(
"subscription_ts_file_deduplication_window_seconds",
String.valueOf(config.getSubscriptionTsFileDeduplicationWindowSeconds()))));
+ config.setSubscriptionPollPayloadMaxSize(
+ Long.parseLong(
+ properties.getProperty(
+ "subscription_poll_payload_max_size",
+ String.valueOf(config.getSubscriptionPollPayloadMaxSize()))));
}
public void loadRetryProperties(Properties properties) throws IOException {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
index 0c73c6a23d7..7bac5e181c8 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
@@ -75,6 +75,10 @@ public class SubscriptionConfig {
return COMMON_CONFIG.getSubscriptionTsFileDeduplicationWindowSeconds();
}
+ public long getSubscriptionPollPayloadMaxSize() {
+ return COMMON_CONFIG.getSubscriptionPollPayloadMaxSize();
+ }
+
/////////////////////////////// Utils ///////////////////////////////
private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionConfig.class);
@@ -106,6 +110,7 @@ public class SubscriptionConfig {
LOGGER.info(
"SubscriptionTsFileDeduplicationWindowSeconds: {}",
getSubscriptionTsFileDeduplicationWindowSeconds());
+ LOGGER.info("SubscriptionPollPayloadMaxSize: {}",
getSubscriptionPollPayloadMaxSize());
}
/////////////////////////////// Singleton ///////////////////////////////