This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new cb03ad80653 Subscription: fix consumer infinite pulling event & fully
managed tsfile parsing process & increase the reference count for subscribed
parsed raw tablet event & disrupt parsing requests through the introduction of
randomness & disable prefetch by default (#14856)
cb03ad80653 is described below
commit cb03ad80653a38d0c5e6c3bc667fdbe1080db7da
Author: VGalaxies <[email protected]>
AuthorDate: Tue Feb 18 10:39:30 2025 +0800
Subscription: fix consumer infinite pulling event & fully managed tsfile
parsing process & increase the reference count for subscribed parsed raw tablet
event & disrupt parsing requests through the introduction of randomness &
disable prefetch by default (#14856)
---
.../iotdb/rpc/subscription/config/TopicConfig.java | 7 ++-
.../subscription/payload/poll/TabletsPayload.java | 4 +-
.../base/AbstractSubscriptionConsumer.java | 2 +-
.../task/builder/PipeDataNodeTaskBuilder.java | 12 ++++-
.../agent/task/connection/PipeEventCollector.java | 11 +++-
.../agent/task/stage/PipeTaskProcessorStage.java | 9 +++-
.../broker/SubscriptionPrefetchingQueue.java | 3 +-
.../broker/SubscriptionPrefetchingTabletQueue.java | 2 +-
.../broker/SubscriptionPrefetchingTsFileQueue.java | 2 +-
.../batch/SubscriptionPipeTabletEventBatch.java | 48 ++++++++++++-----
.../SubscriptionPipeTabletIterationSnapshot.java | 62 ++++++++++++++++++++++
.../pipe/SubscriptionPipeTabletBatchEvents.java | 29 +++++-----
.../apache/iotdb/commons/conf/CommonConfig.java | 2 +-
13 files changed, 153 insertions(+), 40 deletions(-)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
index c03fbf3147c..f71980d629f 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
@@ -194,7 +194,12 @@ public class TopicConfig extends PipeParameters {
/////////////////////////////// connector attributes mapping
///////////////////////////////
public Map<String, String> getAttributesWithSinkFormat() {
- return SINK_HYBRID_FORMAT_CONFIG; // default to hybrid
+ // refer to
+ //
org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector.parseAndCollectEvent(org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent)
+ return TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equalsIgnoreCase(
+ attributes.getOrDefault(TopicConstant.FORMAT_KEY,
TopicConstant.FORMAT_DEFAULT_VALUE))
+ ? SINK_TS_FILE_FORMAT_CONFIG
+ : SINK_TABLET_FORMAT_CONFIG;
}
public Map<String, String> getAttributesWithSinkPrefix() {
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TabletsPayload.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TabletsPayload.java
index 23b9e066329..5654572f458 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TabletsPayload.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TabletsPayload.java
@@ -40,8 +40,8 @@ public class TabletsPayload implements
SubscriptionPollPayload {
* <ul>
* <li>If nextOffset is 1, it indicates that the current payload is the
first payload (its
* tablets are empty) and the fetching should continue.
- * <li>If nextOffset is negative, it indicates all tablets have been
fetched, and -nextOffset
- * represents the total number of tablets.
+ * <li>If nextOffset is negative (or zero), it indicates all tablets have
been fetched, and
+ * -nextOffset represents the total number of tablets.
* </ul>
*/
private transient int nextOffset;
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
index 7dfae59a935..1755c0cfd48 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
@@ -935,7 +935,7 @@ abstract class AbstractSubscriptionConsumer implements
AutoCloseable {
int nextOffset = ((TabletsPayload)
initialResponse.getPayload()).getNextOffset();
while (true) {
- if (nextOffset < 0) {
+ if (nextOffset <= 0) {
if (!Objects.equals(tablets.size(), -nextOffset)) {
final String errorMessage =
String.format(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
index 2bfc343fda7..1b517419c98 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
@@ -49,6 +49,7 @@ import java.util.Map;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TABLET_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TS_FILE_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_FORMAT_KEY;
public class PipeDataNodeTaskBuilder {
@@ -139,7 +140,16 @@ public class PipeDataNodeTaskBuilder {
.getStringOrDefault(
Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
CONNECTOR_FORMAT_HYBRID_VALUE)
- .equals(CONNECTOR_FORMAT_TABLET_VALUE));
+ .equals(CONNECTOR_FORMAT_TABLET_VALUE),
+ PipeType.SUBSCRIPTION.equals(pipeType)
+ &&
+ // should not skip parsing when the format is tsfile
+ !pipeStaticMeta
+ .getConnectorParameters()
+ .getStringOrDefault(
+ Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
+ CONNECTOR_FORMAT_HYBRID_VALUE)
+ .equals(CONNECTOR_FORMAT_TS_FILE_VALUE));
return new PipeDataNodeTask(
pipeStaticMeta.getPipeName(), regionId, extractorStage,
processorStage, connectorStage);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index d63c1e39325..d8bd7fdce2b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -55,6 +55,8 @@ public class PipeEventCollector implements EventCollector {
private final boolean forceTabletFormat;
+ private final boolean skipParseTsFile;
+
private final AtomicInteger collectInvocationCount = new AtomicInteger(0);
private boolean hasNoGeneratedEvent = true;
private boolean isFailedToIncreaseReferenceCount = false;
@@ -63,11 +65,13 @@ public class PipeEventCollector implements EventCollector {
final UnboundedBlockingPendingQueue<Event> pendingQueue,
final long creationTime,
final int regionId,
- final boolean forceTabletFormat) {
+ final boolean forceTabletFormat,
+ final boolean skipParseTsFile) {
this.pendingQueue = pendingQueue;
this.creationTime = creationTime;
this.regionId = regionId;
this.forceTabletFormat = forceTabletFormat;
+ this.skipParseTsFile = skipParseTsFile;
}
@Override
@@ -117,6 +121,11 @@ public class PipeEventCollector implements EventCollector {
return;
}
+ if (skipParseTsFile) {
+ collectEvent(sourceEvent);
+ return;
+ }
+
if (!forceTabletFormat
&& (!sourceEvent.shouldParseTimeOrPattern()
|| (sourceEvent.isTableModelEvent()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
index 22cd5ce3094..5201a19d8bc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java
@@ -68,7 +68,8 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
final UnboundedBlockingPendingQueue<Event>
pipeConnectorOutputPendingQueue,
final PipeProcessorSubtaskExecutor executor,
final PipeTaskMeta pipeTaskMeta,
- final boolean forceTabletFormat) {
+ final boolean forceTabletFormat,
+ final boolean skipParseTsFile) {
final PipeProcessorRuntimeConfiguration runtimeConfiguration =
new PipeTaskRuntimeConfiguration(
new PipeTaskProcessorRuntimeEnvironment(
@@ -100,7 +101,11 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
final String taskId = pipeName + "_" + regionId + "_" + creationTime;
final PipeEventCollector pipeConnectorOutputEventCollector =
new PipeEventCollector(
- pipeConnectorOutputPendingQueue, creationTime, regionId,
forceTabletFormat);
+ pipeConnectorOutputPendingQueue,
+ creationTime,
+ regionId,
+ forceTabletFormat,
+ skipParseTsFile);
this.pipeProcessorSubtask =
new PipeProcessorSubtask(
taskId,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index ca9bf096efa..f0d6d752398 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -197,7 +197,8 @@ public abstract class SubscriptionPrefetchingQueue {
return null;
},
SubscriptionAgent.receiver().remainingMs());
- } catch (final Exception ignored) {
+ } catch (final Exception e) {
+ LOGGER.warn("Exception {} occurred when {} execute receiver subtask",
this, e, e);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
index d590df17c2b..41e4b060ae5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
@@ -159,7 +159,7 @@ public class SubscriptionPrefetchingTabletQueue extends
SubscriptionPrefetchingQ
String.format(
"exception occurred when fetching next response: %s,
consumer id: %s, commit context: %s, offset: %s, prefetching queue: %s",
e, consumerId, commitContext, offset, this);
- LOGGER.warn(errorMessage);
+ LOGGER.warn(errorMessage, e);
eventRef.set(generateSubscriptionPollErrorResponse(errorMessage));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index adef86518b8..978ed9fac46 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -211,7 +211,7 @@ public class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQ
String.format(
"exception occurred when fetching next response: %s,
consumer id: %s, commit context: %s, writing offset: %s, prefetching queue: %s",
e, consumerId, commitContext, writingOffset, this);
- LOGGER.warn(errorMessage);
+ LOGGER.warn(errorMessage, e);
eventRef.set(generateSubscriptionPollErrorResponse(errorMessage));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
index 00114d85df4..40fb4fe3751 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
@@ -37,7 +37,6 @@ import org.apache.tsfile.write.record.Tablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -61,7 +60,7 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
private final Meter insertNodeTabletInsertionEventSizeEstimator;
private final Meter rawTabletInsertionEventSizeEstimator;
- private volatile List<EnrichedEvent> iteratedEnrichedEvents;
+ private volatile SubscriptionPipeTabletIterationSnapshot iterationSnapshot;
private final AtomicInteger referenceCount = new AtomicInteger();
private static final long ITERATED_COUNT_REPORT_FREQ =
@@ -88,7 +87,17 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
@Override
public synchronized void ack() {
referenceCount.decrementAndGet();
- // do nothing for iterated enriched events, see
SubscriptionPipeTabletBatchEvents
+
+ // we decrease the reference count of events if and only if when the whole
batch is consumed
+ if (!hasNext() && referenceCount.get() == 0) {
+ for (final EnrichedEvent enrichedEvent : enrichedEvents) {
+ if (enrichedEvent instanceof PipeTsFileInsertionEvent) {
+ // close data container in tsfile event
+ ((PipeTsFileInsertionEvent) enrichedEvent).close();
+ }
+ enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true);
+ }
+ }
}
@Override
@@ -200,26 +209,29 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
/////////////////////////////// iterator ///////////////////////////////
- public List<EnrichedEvent> sendIterationSnapshot() {
- final List<EnrichedEvent> result =
Collections.unmodifiableList(iteratedEnrichedEvents);
- iteratedEnrichedEvents = new ArrayList<>();
+ public synchronized SubscriptionPipeTabletIterationSnapshot
sendIterationSnapshot() {
+ final SubscriptionPipeTabletIterationSnapshot result = iterationSnapshot;
+ iterationSnapshot = new SubscriptionPipeTabletIterationSnapshot();
referenceCount.incrementAndGet();
return result;
}
- public void resetForIteration() {
+ public synchronized void resetForIteration() {
currentEnrichedEventsIterator = enrichedEvents.iterator();
currentTabletInsertionEventsIterator = null;
currentTsFileInsertionEvent = null;
- iteratedEnrichedEvents = new ArrayList<>();
+ if (Objects.nonNull(iterationSnapshot)) {
+ iterationSnapshot.clear();
+ }
+ iterationSnapshot = new SubscriptionPipeTabletIterationSnapshot();
referenceCount.set(0);
iteratedCount.set(0);
}
@Override
- public boolean hasNext() {
+ public synchronized boolean hasNext() {
if (Objects.nonNull(currentTabletInsertionEventsIterator)) {
if (currentTabletInsertionEventsIterator.hasNext()) {
return true;
@@ -245,7 +257,7 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
}
@Override
- public List<Tablet> next() {
+ public synchronized List<Tablet> next() {
final List<Tablet> tablets = nextInternal();
if (Objects.isNull(tablets)) {
return null;
@@ -267,8 +279,16 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
if (currentTabletInsertionEventsIterator.hasNext()) {
final TabletInsertionEvent tabletInsertionEvent =
currentTabletInsertionEventsIterator.next();
+ if (!((EnrichedEvent) tabletInsertionEvent)
+ .increaseReferenceCount(this.getClass().getName())) {
+ LOGGER.warn(
+ "SubscriptionPipeTabletEventBatch: Failed to increase the
reference count of event {}, skipping it.",
+ ((EnrichedEvent) tabletInsertionEvent).coreReportMessage());
+ } else {
+ iterationSnapshot.addParsedEnrichedEvent((EnrichedEvent)
tabletInsertionEvent);
+ }
if (!currentTabletInsertionEventsIterator.hasNext()) {
- iteratedEnrichedEvents.add((EnrichedEvent)
currentTsFileInsertionEvent);
+ iterationSnapshot.addIteratedEnrichedEvent((EnrichedEvent)
currentTsFileInsertionEvent);
}
return convertToTablets(tabletInsertionEvent);
} else {
@@ -297,11 +317,13 @@ public class SubscriptionPipeTabletEventBatch extends
SubscriptionPipeEventBatch
currentTsFileInsertionEvent = tsFileInsertionEvent;
currentTabletInsertionEventsIterator =
tsFileInsertionEvent
-
.toTabletInsertionEvents(SubscriptionAgent.receiver().remainingMs())
+ .toTabletInsertionEvents(
+ // disrupt parsing requests through the introduction of
randomness
+ (long) ((1 + Math.random()) *
SubscriptionAgent.receiver().remainingMs()))
.iterator();
return next();
} else if (enrichedEvent instanceof TabletInsertionEvent) {
- iteratedEnrichedEvents.add(enrichedEvent);
+ iterationSnapshot.addIteratedEnrichedEvent(enrichedEvent);
return convertToTablets((TabletInsertionEvent) enrichedEvent);
} else {
LOGGER.warn(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletIterationSnapshot.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletIterationSnapshot.java
new file mode 100644
index 00000000000..137354ae36b
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletIterationSnapshot.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.event.batch;
+
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class SubscriptionPipeTabletIterationSnapshot {
+
+ private final List<EnrichedEvent> iteratedEnrichedEvents = new ArrayList<>();
+ private final List<EnrichedEvent> parsedEnrichedEvents = new ArrayList<>();
+
+ public List<EnrichedEvent> getIteratedEnrichedEvents() {
+ return Collections.unmodifiableList(iteratedEnrichedEvents);
+ }
+
+ public void addIteratedEnrichedEvent(final EnrichedEvent enrichedEvent) {
+ iteratedEnrichedEvents.add(enrichedEvent);
+ }
+
+ public void addParsedEnrichedEvent(final EnrichedEvent enrichedEvent) {
+ parsedEnrichedEvents.add(enrichedEvent);
+ }
+
+ public void clear() {
+ for (final EnrichedEvent enrichedEvent : iteratedEnrichedEvents) {
+ if (enrichedEvent instanceof PipeTsFileInsertionEvent) {
+ // close data container in tsfile event
+ ((PipeTsFileInsertionEvent) enrichedEvent).close();
+ }
+ }
+
+ for (final EnrichedEvent enrichedEvent : parsedEnrichedEvents) {
+ if (enrichedEvent instanceof PipeRawTabletInsertionEvent) {
+ // decrease reference count in raw tablet event
+ enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true);
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
index 87a69fb2be2..9bf87c6b1be 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/pipe/SubscriptionPipeTabletBatchEvents.java
@@ -20,8 +20,8 @@
package org.apache.iotdb.db.subscription.event.pipe;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTabletEventBatch;
+import
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTabletIterationSnapshot;
import java.util.List;
import java.util.Objects;
@@ -32,28 +32,21 @@ import static
com.google.common.base.MoreObjects.toStringHelper;
public class SubscriptionPipeTabletBatchEvents implements
SubscriptionPipeEvents {
private final SubscriptionPipeTabletEventBatch batch;
- private volatile List<EnrichedEvent> iteratedEnrichedEvents;
+ private volatile SubscriptionPipeTabletIterationSnapshot iterationSnapshot;
public SubscriptionPipeTabletBatchEvents(final
SubscriptionPipeTabletEventBatch batch) {
this.batch = batch;
}
- public void receiveIterationSnapshot(final List<EnrichedEvent>
iteratedEnrichedEvents) {
- this.iteratedEnrichedEvents = iteratedEnrichedEvents;
+ public void receiveIterationSnapshot(
+ final SubscriptionPipeTabletIterationSnapshot iterationSnapshot) {
+ this.iterationSnapshot = iterationSnapshot;
}
@Override
public void ack() {
batch.ack();
-
- // only decrease the reference count of iterated events
- for (final EnrichedEvent enrichedEvent : iteratedEnrichedEvents) {
- if (enrichedEvent instanceof PipeTsFileInsertionEvent) {
- // close data container in tsfile event
- ((PipeTsFileInsertionEvent) enrichedEvent).close();
- }
- enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true);
- }
+ iterationSnapshot.clear();
}
@Override
@@ -67,7 +60,11 @@ public class SubscriptionPipeTabletBatchEvents implements
SubscriptionPipeEvents
public String toString() {
return toStringHelper(this)
.add("batch", batch)
- .add("events", formatEnrichedEvents(iteratedEnrichedEvents, 4))
+ .add(
+ "events",
+ Objects.nonNull(iterationSnapshot)
+ ?
formatEnrichedEvents(iterationSnapshot.getIteratedEnrichedEvents(), 4)
+ : "<unknown>")
.toString();
}
@@ -92,6 +89,8 @@ public class SubscriptionPipeTabletBatchEvents implements
SubscriptionPipeEvents
@Override
public int getPipeEventCount() {
- return iteratedEnrichedEvents.size();
+ return Objects.nonNull(iterationSnapshot)
+ ? iterationSnapshot.getIteratedEnrichedEvents().size()
+ : 0;
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index f0212486f52..f18ef082e00 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -309,7 +309,7 @@ public class CommonConfig {
private long subscriptionTsFileDeduplicationWindowSeconds = 120; // 120s
private volatile long subscriptionCheckMemoryEnoughIntervalMs = 10L;
- private boolean subscriptionPrefetchEnabled = true;
+ private boolean subscriptionPrefetchEnabled = false;
private float subscriptionPrefetchMemoryThreshold = 0.5F;
private float subscriptionPrefetchMissingRateThreshold = 0.9F;
private int subscriptionPrefetchEventLocalCountThreshold = 10;