This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new d0dbd738cbc Subscription: allow generate subsequent events with the
same tablet batch to avoid large message & improve poll logic to avoid
unnecessary nack (#14452) (#14476)
d0dbd738cbc is described below
commit d0dbd738cbcf9695f04c7f6e627ef949c8279500
Author: VGalaxies <[email protected]>
AuthorDate: Wed Dec 18 10:21:37 2024 +0800
Subscription: allow generate subsequent events with the same tablet batch
to avoid large message & improve poll logic to avoid unnecessary nack (#14452)
(#14476)
---
.../consumer/SubscriptionConsumer.java | 3 +-
.../consumer/SubscriptionPullConsumer.java | 3 +-
.../consumer/SubscriptionPushConsumer.java | 3 +-
.../subtask/connector/PipeConnectorSubtask.java | 2 +-
.../db/subscription/broker/SubscriptionBroker.java | 104 +++++++++++++++++----
.../broker/SubscriptionPrefetchingQueue.java | 7 +-
.../db/subscription/broker/SubscriptionStates.java | 30 +++---
...java => SubscriptionCommitContextSupplier.java} | 34 +------
.../db/subscription/event/SubscriptionEvent.java | 14 ++-
.../batch/SubscriptionPipeTabletEventBatch.java | 57 ++++++++---
.../SubscriptionEventExtendableResponse.java | 6 --
.../event/response/SubscriptionEventResponse.java | 7 ++
.../response/SubscriptionEventTabletResponse.java | 60 ++++++++++--
.../receiver/SubscriptionReceiverV1.java | 2 +-
14 files changed, 234 insertions(+), 98 deletions(-)
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index f02ae3b2448..c85bdc4b9a6 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -610,7 +610,8 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
LOGGER.warn("unexpected response type: {}",
responseType);
return Optional.empty();
})
- .apply(response, timer)
+ // TODO: reuse previous timer?
+ .apply(response, new PollTimer(System.currentTimeMillis(),
timeoutMs))
.ifPresent(currentMessages::add);
} catch (final SubscriptionRuntimeNonCriticalException e) {
LOGGER.warn(
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
index ae9f99b969d..874659c4339 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.session.subscription.consumer;
import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+import org.apache.iotdb.session.subscription.util.CollectionUtils;
import org.apache.iotdb.session.subscription.util.IdentifierUtils;
import org.slf4j.Logger;
@@ -180,7 +181,7 @@ public class SubscriptionPullConsumer extends
SubscriptionConsumer {
LOGGER.info(
"SubscriptionPullConsumer {} poll empty message from topics {} after
{} millisecond(s)",
this,
- parsedTopicNames,
+ CollectionUtils.getLimitedString(parsedTopicNames, 32),
timeoutMs);
return messages;
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
index 953894eece7..327596d474f 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.session.subscription.consumer;
import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+import org.apache.iotdb.session.subscription.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -178,7 +179,7 @@ public class SubscriptionPushConsumer extends
SubscriptionConsumer {
LOGGER.info(
"SubscriptionPushConsumer {} poll empty message from topics {}
after {} millisecond(s)",
this,
- subscribedTopics.keySet(),
+ CollectionUtils.getLimitedString(subscribedTopics.keySet(), 32),
autoPollTimeoutMs);
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
index c0470becb3d..09782813fff 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
@@ -202,7 +202,7 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
final long startTime = System.currentTimeMillis();
outputPipeConnector.close();
LOGGER.info(
- "Pipe: connector subtask {} was closed {} within {} ms",
+ "Pipe: connector subtask {} ({}) was closed within {} ms",
taskID,
outputPipeConnector,
System.currentTimeMillis() - startTime);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index 74db75b277c..df888ec1b03 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -34,12 +34,15 @@ import
org.apache.iotdb.rpc.subscription.payload.poll.TerminationPayload;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
-import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -88,18 +91,92 @@ public class SubscriptionBroker {
public List<SubscriptionEvent> poll(
final String consumerId, final Set<String> topicNames, final long
maxBytes) {
- final List<SubscriptionEvent> events = new ArrayList<>();
+ final List<SubscriptionEvent> eventsToPoll = new ArrayList<>();
+ final Set<String> candidateTopicNames =
prepareCandidateTopicNames(topicNames, eventsToPoll);
+
+ // Sort topic names based on the current subscription states (number of
events received per
+ // topic)
+ final List<String> sortedTopicNames = new ArrayList<>(candidateTopicNames);
+ sortedTopicNames.sort(
+ Comparator.comparingLong(
+ topicName ->
+
Objects.requireNonNull(consumerIdToSubscriptionStates.get(consumerId))
+ .getStates(topicName)));
+
+ final List<SubscriptionEvent> eventsToNack = new ArrayList<>();
+ long totalSize = 0;
+ final Map<String, Long> topicNameToIncrements = new HashMap<>();
+
+ // Iterate over each sorted topic name and poll the corresponding events
+ for (final String topicName : sortedTopicNames) {
+ final SubscriptionPrefetchingQueue prefetchingQueue =
+ topicNameToPrefetchingQueue.get(topicName);
+ // Recheck
+ if (Objects.isNull(prefetchingQueue) || prefetchingQueue.isClosed()) {
+ continue;
+ }
+
+ // Poll the event from the prefetching queue
+ final SubscriptionEvent event = prefetchingQueue.poll(consumerId);
+ if (Objects.isNull(event)) {
+ continue;
+ }
+
+ // Try to get the current size of the event
+ final long currentSize;
+ try {
+ currentSize = event.getCurrentResponseSize();
+ } catch (final IOException e) {
+ // If there is an error getting the event's size, nack the event
+ eventsToNack.add(event);
+ continue;
+ }
+
+ // Add the event to the poll list
+ eventsToPoll.add(event);
+
+ // Increment the event count for the topic
+ topicNameToIncrements.merge(event.getCommitContext().getTopicName(), 1L,
Long::sum);
+
+ // Update the total size
+ totalSize += currentSize;
+
+ // If adding this event exceeds the maxBytes (pessimistic estimation),
break the loop
+ if (totalSize + currentSize > maxBytes) {
+ break;
+ }
+ }
+
+ // Update the subscription states with the increments for the topics
processed
+ Objects.requireNonNull(consumerIdToSubscriptionStates.get(consumerId))
+ .updateStates(topicNameToIncrements);
+
+ // Commit the nack events for the consumer
+ commit(
+ consumerId,
+
eventsToNack.stream().map(SubscriptionEvent::getCommitContext).collect(Collectors.toList()),
+ true);
+
+ // Return the list of events that are to be polled
+ return eventsToPoll;
+ }
+
+ private Set<String> prepareCandidateTopicNames(
+ final Set<String> topicNames,
+ final List<SubscriptionEvent> eventsToPoll /* output parameter */) {
+ final Set<String> candidateTopicNames = new HashSet<>();
for (final String topicName : topicNames) {
final SubscriptionPrefetchingQueue prefetchingQueue =
topicNameToPrefetchingQueue.get(topicName);
+ // If there is no prefetching queue for the topic, check if it's
completed
if (Objects.isNull(prefetchingQueue)) {
- // check if completed
if (completedTopicNames.containsKey(topicName)) {
LOGGER.info(
"Subscription: prefetching queue bound to topic [{}] for
consumer group [{}] is completed, return termination response to client",
topicName,
brokerId);
- events.add(
+ // Add a termination event for the completed topic
+ eventsToPoll.add(
new SubscriptionEvent(
SubscriptionPollResponseType.TERMINATION.getType(),
new TerminationPayload(),
@@ -118,6 +195,8 @@ public class SubscriptionBroker {
// 2.2. potential disorder of unbind and close prefetching queue...
continue;
}
+
+ // Check if the prefetching queue is closed
if (prefetchingQueue.isClosed()) {
LOGGER.warn(
"Subscription: prefetching queue bound to topic [{}] for consumer
group [{}] is closed",
@@ -125,22 +204,11 @@ public class SubscriptionBroker {
brokerId);
continue;
}
- final SubscriptionEvent event = prefetchingQueue.poll(consumerId);
- if (Objects.nonNull(event)) {
- events.add(event);
- }
+
+ candidateTopicNames.add(topicName);
}
- final Pair<List<SubscriptionEvent>, List<SubscriptionEvent>>
eventsToPollWithEventsToNack =
- Objects.requireNonNull(consumerIdToSubscriptionStates.get(consumerId))
- .filter(events, maxBytes);
- commit(
- consumerId,
- eventsToPollWithEventsToNack.right.stream()
- .map(SubscriptionEvent::getCommitContext)
- .collect(Collectors.toList()),
- true);
- return eventsToPollWithEventsToNack.left;
+ return candidateTopicNames;
}
public List<SubscriptionEvent> pollTsFile(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index 433052409d9..20a0d0983f0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import
org.apache.iotdb.db.pipe.agent.task.execution.PipeSubtaskExecutorManager;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
-import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeEventBatches;
@@ -343,8 +342,8 @@ public abstract class SubscriptionPrefetchingQueue {
continue;
}
- if (event instanceof PipeTsFileInsertionEvent) {
- if (onEvent((PipeTsFileInsertionEvent) event)) {
+ if (event instanceof TsFileInsertionEvent) {
+ if (onEvent((TsFileInsertionEvent) event)) {
return;
}
continue;
@@ -448,7 +447,7 @@ public abstract class SubscriptionPrefetchingQueue {
this);
}
- ev.ack();
+ ev.ack(this::enqueueEventToPrefetchingQueue);
ev.recordCommittedTimestamp(); // now committed
acked.set(true);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionStates.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionStates.java
index caa1c0fd1e2..56d235adcc0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionStates.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionStates.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.subscription.broker;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import org.apache.tsfile.utils.Pair;
@@ -51,6 +52,7 @@ public class SubscriptionStates {
* @return a Pair containing two lists: the first list contains the events
to poll, and the second
* list contains the events to nack
*/
+ @TestOnly
public Pair<List<SubscriptionEvent>, List<SubscriptionEvent>> filter(
final List<SubscriptionEvent> events, final long maxBytes) {
final List<SubscriptionEvent> eventsToPoll = new ArrayList<>();
@@ -82,40 +84,40 @@ public class SubscriptionStates {
}
// Update the subscription state with the increments calculated during
filtering
- update(topicNameToIncrements);
+ updateStates(topicNameToIncrements);
return new Pair<>(eventsToPoll, eventsToNack);
}
+ /**
+ * Sorts a list of SubscriptionEvents according to the event count in the
subscription states.
+ * Events with fewer counts are prioritized.
+ *
+ * @param events the list of events to sort
+ */
+ private void sort(final List<SubscriptionEvent> events) {
+ events.sort(
+ Comparator.comparingLong(event ->
getStates(event.getCommitContext().getTopicName())));
+ }
+
/**
* Updates the subscription state by incrementing the event count for
multiple topics.
*
* @param topicNameToIncrements a map where the key is the topic name and
the value is the number
* of events to add to the count
*/
- private void update(final Map<String, Long> topicNameToIncrements) {
+ public void updateStates(final Map<String, Long> topicNameToIncrements) {
for (final Entry<String, Long> entry : topicNameToIncrements.entrySet()) {
topicNameToEventCount.merge(entry.getKey(), entry.getValue(), Long::sum);
}
}
- /**
- * Sorts a list of SubscriptionEvents according to the event count in the
subscription states.
- * Events with fewer counts are prioritized.
- *
- * @param events the list of events to sort
- */
- private void sort(final List<SubscriptionEvent> events) {
- events.sort(
- Comparator.comparingLong(event ->
getCount(event.getCommitContext().getTopicName())));
- }
-
/**
* Returns the number of events received for a specific topic.
*
* @param topicName the name of the topic
* @return the number of events received for the topic
*/
- private long getCount(final String topicName) {
+ public long getStates(final String topicName) {
return topicNameToEventCount.getOrDefault(topicName, 0L);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionCommitContextSupplier.java
similarity index 50%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionCommitContextSupplier.java
index 3ded870feed..cc73384d42c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionCommitContextSupplier.java
@@ -17,36 +17,12 @@
* under the License.
*/
-package org.apache.iotdb.db.subscription.event.response;
+package org.apache.iotdb.db.subscription.event;
-import java.io.IOException;
-import java.nio.ByteBuffer;
+import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
-public interface SubscriptionEventResponse<E> {
+@FunctionalInterface
+public interface SubscriptionCommitContextSupplier {
- /////////////////////////////// response ///////////////////////////////
-
- E getCurrentResponse();
-
- void prefetchRemainingResponses() throws Exception;
-
- void fetchNextResponse(final long offset) throws Exception;
-
- /////////////////////////////// byte buffer ///////////////////////////////
-
- void trySerializeCurrentResponse();
-
- void trySerializeRemainingResponses();
-
- ByteBuffer getCurrentResponseByteBuffer() throws IOException;
-
- void invalidateCurrentResponseByteBuffer();
-
- /////////////////////////////// lifecycle ///////////////////////////////
-
- void nack();
-
- void cleanUp();
-
- boolean isCommittable();
+ SubscriptionCommitContext get();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
index ec0ed1fc079..97fa621faef 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
@@ -43,6 +43,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
import static
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext.INVALID_COMMIT_ID;
@@ -87,9 +88,12 @@ public class SubscriptionEvent {
* SubscriptionEventTabletResponse}.
*/
public SubscriptionEvent(
- final SubscriptionPipeTabletEventBatch batch, final
SubscriptionCommitContext commitContext) {
+ final SubscriptionPipeTabletEventBatch batch,
+ final SubscriptionCommitContextSupplier commitContextSupplier) {
this.pipeEvents = new SubscriptionPipeTabletBatchEvents(batch);
- this.response = new SubscriptionEventTabletResponse(batch, commitContext);
+ final SubscriptionCommitContext commitContext =
commitContextSupplier.get();
+ this.response =
+ new SubscriptionEventTabletResponse(batch, commitContext,
commitContextSupplier);
this.commitContext = commitContext;
}
@@ -138,7 +142,11 @@ public class SubscriptionEvent {
return response.isCommittable();
}
- public void ack() {
+ public void ack(final Consumer<SubscriptionEvent> onCommittedHook) {
+ // ack response
+ response.ack(onCommittedHook);
+
+ // ack pipe events
pipeEvents.ack();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
index fe5ffee564f..59528125f49 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
@@ -37,6 +37,7 @@ import org.apache.tsfile.write.record.Tablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -52,12 +53,15 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
private long firstEventProcessingTime = Long.MIN_VALUE;
private long totalBufferSize = 0;
- private volatile Iterator<EnrichedEvent> enrichedEventsIterator;
+ private volatile Iterator<EnrichedEvent> currentEnrichedEventsIterator;
private volatile Iterator<TabletInsertionEvent>
currentTabletInsertionEventsIterator;
+ private volatile TsFileInsertionEvent currentTsFileInsertionEvent;
private final Meter insertNodeTabletInsertionEventSizeEstimator;
private final Meter rawTabletInsertionEventSizeEstimator;
+ private final List<EnrichedEvent> iteratedEnrichedEvents = new ArrayList<>();
+
public SubscriptionPipeTabletEventBatch(
final int regionId,
final SubscriptionPrefetchingTabletQueue prefetchingQueue,
@@ -75,21 +79,29 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
@Override
public synchronized void ack() {
- for (final EnrichedEvent enrichedEvent : enrichedEvents) {
+ // only decrease the reference count of iterated events
+ for (final EnrichedEvent enrichedEvent : iteratedEnrichedEvents) {
enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true);
}
+ iteratedEnrichedEvents.clear();
}
@Override
public synchronized void cleanUp() {
+ // do nothing if it has next
+ if (hasNext()) {
+ return;
+ }
+
// clear the reference count of events
for (final EnrichedEvent enrichedEvent : enrichedEvents) {
enrichedEvent.clearReferenceCount(this.getClass().getName());
}
enrichedEvents.clear();
- enrichedEventsIterator = null;
+ currentEnrichedEventsIterator = null;
currentTabletInsertionEventsIterator = null;
+ currentTsFileInsertionEvent = null;
}
/////////////////////////////// utility ///////////////////////////////
@@ -119,13 +131,17 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
// update buffer size
// TODO: more precise computation
+ // NOTE: Considering the possibility of large tsfile, the final generated
response size may be
+ // larger than totalBufferSize, therefore limit control is also required in
+ // SubscriptionEventTabletResponse.
totalBufferSize += ((PipeTsFileInsertionEvent) event).getTsFile().length();
}
@Override
protected List<SubscriptionEvent> generateSubscriptionEvents() {
+ resetIterator();
return Collections.singletonList(
- new SubscriptionEvent(this,
prefetchingQueue.generateSubscriptionCommitContext()));
+ new SubscriptionEvent(this,
prefetchingQueue::generateSubscriptionCommitContext));
}
@Override
@@ -179,7 +195,11 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
/////////////////////////////// iterator ///////////////////////////////
public void resetIterator() {
- enrichedEventsIterator = enrichedEvents.iterator();
+ currentEnrichedEventsIterator = enrichedEvents.iterator();
+ currentTabletInsertionEventsIterator = null;
+ currentTsFileInsertionEvent = null;
+
+ iteratedEnrichedEvents.clear();
}
@Override
@@ -190,19 +210,20 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
} else {
// reset
currentTabletInsertionEventsIterator = null;
+ currentTsFileInsertionEvent = null;
return false;
}
}
- if (Objects.isNull(enrichedEventsIterator)) {
+ if (Objects.isNull(currentEnrichedEventsIterator)) {
return false;
}
- if (enrichedEventsIterator.hasNext()) {
+ if (currentEnrichedEventsIterator.hasNext()) {
return true;
} else {
// reset
- enrichedEventsIterator = null;
+ currentEnrichedEventsIterator = null;
return false;
}
}
@@ -211,33 +232,43 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
public List<Tablet> next() {
if (Objects.nonNull(currentTabletInsertionEventsIterator)) {
if (currentTabletInsertionEventsIterator.hasNext()) {
- return convertToTablets(currentTabletInsertionEventsIterator.next());
+ final TabletInsertionEvent tabletInsertionEvent =
+ currentTabletInsertionEventsIterator.next();
+ if (!currentTabletInsertionEventsIterator.hasNext()) {
+ iteratedEnrichedEvents.add((EnrichedEvent)
currentTsFileInsertionEvent);
+ }
+ return convertToTablets(tabletInsertionEvent);
} else {
currentTabletInsertionEventsIterator = null;
+ currentTsFileInsertionEvent = null;
}
}
- if (Objects.isNull(enrichedEventsIterator)) {
+ if (Objects.isNull(currentEnrichedEventsIterator)) {
return null;
}
- if (!enrichedEventsIterator.hasNext()) {
+ if (!currentEnrichedEventsIterator.hasNext()) {
return null;
}
- final EnrichedEvent enrichedEvent = enrichedEventsIterator.next();
+ final EnrichedEvent enrichedEvent = currentEnrichedEventsIterator.next();
if (enrichedEvent instanceof TsFileInsertionEvent) {
if (Objects.nonNull(currentTabletInsertionEventsIterator)) {
LOGGER.warn(
"SubscriptionPipeTabletEventBatch {} override non-null
currentTabletInsertionEventsIterator when iterating (broken invariant).",
this);
}
+ final PipeTsFileInsertionEvent tsFileInsertionEvent =
+ (PipeTsFileInsertionEvent) enrichedEvent;
+ currentTsFileInsertionEvent = tsFileInsertionEvent;
currentTabletInsertionEventsIterator =
- ((PipeTsFileInsertionEvent) enrichedEvent)
+ tsFileInsertionEvent
.toTabletInsertionEvents(SubscriptionAgent.receiver().remainingMs())
.iterator();
return next();
} else if (enrichedEvent instanceof TabletInsertionEvent) {
+ iteratedEnrichedEvents.add(enrichedEvent);
return convertToTablets((TabletInsertionEvent) enrichedEvent);
} else {
LOGGER.warn(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventExtendableResponse.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventExtendableResponse.java
index dbbe9d898c8..dba2f48b466 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventExtendableResponse.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventExtendableResponse.java
@@ -22,9 +22,6 @@ package org.apache.iotdb.db.subscription.event.response;
import
org.apache.iotdb.db.subscription.event.cache.CachedSubscriptionPollResponse;
import
org.apache.iotdb.db.subscription.event.cache.SubscriptionPollResponseCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Deque;
@@ -42,9 +39,6 @@ import java.util.concurrent.ConcurrentLinkedDeque;
public abstract class SubscriptionEventExtendableResponse
implements SubscriptionEventResponse<CachedSubscriptionPollResponse> {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(SubscriptionEventTabletResponse.class);
-
private final Deque<CachedSubscriptionPollResponse> responses;
protected volatile boolean hasNoMore = false;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
index 3ded870feed..ed6ad138276 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventResponse.java
@@ -19,8 +19,11 @@
package org.apache.iotdb.db.subscription.event.response;
+import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
+
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.function.Consumer;
public interface SubscriptionEventResponse<E> {
@@ -44,6 +47,10 @@ public interface SubscriptionEventResponse<E> {
/////////////////////////////// lifecycle ///////////////////////////////
+ default void ack(final Consumer<SubscriptionEvent> onCommittedHook) {
+ // do nothing
+ }
+
void nack();
void cleanUp();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
index 61cce24294d..35591ed405e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTabletResponse.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.subscription.event.response;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
+import
org.apache.iotdb.db.subscription.event.SubscriptionCommitContextSupplier;
+import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTabletEventBatch;
import
org.apache.iotdb.db.subscription.event.cache.CachedSubscriptionPollResponse;
import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
@@ -36,6 +38,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
/**
* The {@code SubscriptionEventTabletResponse} class extends {@link
@@ -51,16 +54,27 @@ public class SubscriptionEventTabletResponse extends
SubscriptionEventExtendable
private static final long READ_TABLET_BUFFER_SIZE =
SubscriptionConfig.getInstance().getSubscriptionReadTabletBufferSize();
+ private static final long PREFETCH_TABLET_BUFFER_SIZE =
+
SubscriptionConfig.getInstance().getSubscriptionPrefetchTabletBatchMaxSizeInBytes();
+
private final SubscriptionPipeTabletEventBatch batch;
private final SubscriptionCommitContext commitContext;
+ private final SubscriptionCommitContextSupplier commitContextSupplier;
- private volatile int tabletsSize;
+ private volatile int totalTablets;
private final AtomicInteger nextOffset = new AtomicInteger(0);
+ // use these variables to limit control for large message
+ private volatile long totalBufferSize;
+ private volatile boolean availableForNext = false;
+
public SubscriptionEventTabletResponse(
- final SubscriptionPipeTabletEventBatch batch, final
SubscriptionCommitContext commitContext) {
+ final SubscriptionPipeTabletEventBatch batch,
+ final SubscriptionCommitContext commitContext,
+ final SubscriptionCommitContextSupplier commitContextSupplier) {
this.batch = batch;
this.commitContext = commitContext;
+ this.commitContextSupplier = commitContextSupplier;
init();
}
@@ -80,13 +94,26 @@ public class SubscriptionEventTabletResponse extends
SubscriptionEventExtendable
}
}
+ @Override
+ public synchronized void ack(final Consumer<SubscriptionEvent>
onCommittedHook) {
+ if (availableForNext) {
+ // generate next subscription event with the same batch
+ onCommittedHook.accept(new SubscriptionEvent(batch,
commitContextSupplier));
+ }
+ }
+
@Override
public synchronized void nack() {
if (nextOffset.get() == 1) {
// do nothing if with complete tablets
return;
}
+
cleanUp();
+
+ // should not reset the iterator of batch when init
+ // TODO: avoid completely rewinding the iterator
+ batch.resetIterator();
init();
}
@@ -94,8 +121,16 @@ public class SubscriptionEventTabletResponse extends
SubscriptionEventExtendable
public synchronized void cleanUp() {
super.cleanUp();
- tabletsSize = 0;
+ totalTablets = 0;
nextOffset.set(0);
+
+ totalBufferSize = 0;
+ availableForNext = false;
+ }
+
+ @Override
+ public boolean isCommittable() {
+ return (availableForNext || hasNoMore) && size() == 1;
}
/////////////////////////////// utility ///////////////////////////////
@@ -108,11 +143,17 @@ public class SubscriptionEventTabletResponse extends
SubscriptionEventExtendable
return;
}
- batch.resetIterator();
offer(generateNextTabletResponse());
}
private synchronized CachedSubscriptionPollResponse
generateNextTabletResponse() {
+ if (availableForNext) {
+ return new CachedSubscriptionPollResponse(
+ SubscriptionPollResponseType.TABLETS.getType(),
+ new TabletsPayload(Collections.emptyList(), -totalTablets),
+ commitContext);
+ }
+
final List<Tablet> currentTablets = new ArrayList<>();
long currentBufferSize = 0;
@@ -128,7 +169,8 @@ public class SubscriptionEventTabletResponse extends
SubscriptionEventExtendable
.map(PipeMemoryWeightUtil::calculateTabletSizeInBytes)
.reduce(Long::sum)
.orElse(0L);
- tabletsSize += tablets.size();
+ totalTablets += tablets.size();
+ totalBufferSize += bufferSize;
if (bufferSize > READ_TABLET_BUFFER_SIZE) {
// TODO: split tablets
@@ -148,6 +190,12 @@ public class SubscriptionEventTabletResponse extends
SubscriptionEventExtendable
}
currentBufferSize += bufferSize;
+
+ // limit control for large message
+ if (totalBufferSize > PREFETCH_TABLET_BUFFER_SIZE && batch.hasNext()) {
+ availableForNext = true;
+ break;
+ }
}
final CachedSubscriptionPollResponse response;
@@ -155,7 +203,7 @@ public class SubscriptionEventTabletResponse extends
SubscriptionEventExtendable
response =
new CachedSubscriptionPollResponse(
SubscriptionPollResponseType.TABLETS.getType(),
- new TabletsPayload(Collections.emptyList(), -tabletsSize),
+ new TabletsPayload(Collections.emptyList(), -totalTablets),
commitContext);
hasNoMore = true;
} else {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index 73ee10c2854..7a7459d0ab2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -89,7 +89,7 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionReceiverV1.class);
- private static final double POLL_PAYLOAD_SIZE_EXCEED_THRESHOLD = 0.95;
+ private static final double POLL_PAYLOAD_SIZE_EXCEED_THRESHOLD = 0.9;
private static final IClientManager<ConfigRegionId, ConfigNodeClient>
CONFIG_NODE_CLIENT_MANAGER =
ConfigNodeClientManager.getInstance();