This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7bf6eea139c Subscription: improve deduplication logic for
PipeRawTabletInsertionEvent (#13061)
7bf6eea139c is described below
commit 7bf6eea139c5a6ad3c0f8e56b830045a208ed125
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Jul 31 11:51:35 2024 +0800
Subscription: improve deduplication logic for PipeRawTabletInsertionEvent
(#13061)
---
.../it/local/IoTDBSubscriptionBasicIT.java | 63 ++++++++++++++++++++++
.../common/tablet/PipeRawTabletInsertionEvent.java | 4 ++
.../common/tsfile/PipeTsFileInsertionEvent.java | 14 ++++-
.../event/realtime/PipeRealtimeEventFactory.java | 2 +-
.../PipeHistoricalDataRegionTsFileExtractor.java | 1 +
.../TsFileDeduplicationBlockingPendingQueue.java | 48 +++++++++++++----
6 files changed, 121 insertions(+), 11 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
index 8f5e8683746..68287ab6619 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
@@ -482,4 +482,67 @@ public class IoTDBSubscriptionBasicIT extends
AbstractSubscriptionLocalIT {
fail(e.getMessage());
}
}
+
+ @Test
+ public void testDataSetDeduplication() {
+ // Insert some historical data
+ try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
+ session.createDatabase("root.db");
+ for (int i = 0; i < 100; ++i) {
+ session.executeNonQueryStatement(
+ String.format("insert into root.db.d1(time, s1, s2) values (%s, 1,
2)", i));
+ session.executeNonQueryStatement(
+ String.format("insert into root.db.d2(time, s1, s2) values (%s, 3,
4)", i));
+ }
+ // DO NOT FLUSH HERE
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Create topic
+ final String topicName = "topic6";
+ final String host = EnvFactory.getEnv().getIP();
+ final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+ try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
+ session.open();
+ final Properties config = new Properties();
+ config.put(TopicConstant.PATTERN_KEY, "root.db.d1.s1");
+ session.createTopic(topicName, config);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Subscription
+ final AtomicInteger rowCount = new AtomicInteger();
+ try (final SubscriptionPushConsumer consumer =
+ new SubscriptionPushConsumer.Builder()
+ .host(host)
+ .port(port)
+ .consumerId("c1")
+ .consumerGroupId("cg1")
+ .ackStrategy(AckStrategy.AFTER_CONSUME)
+ .consumeListener(
+ message -> {
+ for (final SubscriptionSessionDataSet dataSet :
+ message.getSessionDataSetsHandler()) {
+ while (dataSet.hasNext()) {
+ dataSet.next();
+ rowCount.addAndGet(1);
+ }
+ }
+ return ConsumeResult.SUCCESS;
+ })
+ .buildPushConsumer()) {
+
+ consumer.open();
+ consumer.subscribe(topicName);
+
+ AWAIT.untilAsserted(() -> Assert.assertEquals(100, rowCount.get()));
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 48acc332ddf..65da3828931 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -206,6 +206,10 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
return Objects.nonNull(tablet) ? tablet.deviceId : deviceId;
}
+ public EnrichedEvent getSourceEvent() {
+ return sourceEvent;
+ }
+
/////////////////////////// TabletInsertionEvent ///////////////////////////
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 4194670493e..9fab25ee38b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -63,6 +63,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
private final boolean isLoaded;
private final boolean isGeneratedByPipe;
private final boolean isGeneratedByPipeConsensus;
+ private final boolean isGeneratedByHistoricalExtractor;
private final AtomicBoolean isClosed;
private TsFileInsertionDataContainer dataContainer;
@@ -72,13 +73,17 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
private long flushPointCount = TsFileProcessor.FLUSH_POINT_COUNT_NOT_SET;
public PipeTsFileInsertionEvent(
- final TsFileResource resource, final boolean isLoaded, final boolean
isGeneratedByPipe) {
+ final TsFileResource resource,
+ final boolean isLoaded,
+ final boolean isGeneratedByPipe,
+ final boolean isGeneratedByHistoricalExtractor) {
// The modFile must be copied before the event is assigned to the
listening pipes
this(
resource,
true,
isLoaded,
isGeneratedByPipe,
+ isGeneratedByHistoricalExtractor,
null,
0,
null,
@@ -92,6 +97,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
final boolean isWithMod,
final boolean isLoaded,
final boolean isGeneratedByPipe,
+ final boolean isGeneratedByHistoricalExtractor,
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
@@ -110,6 +116,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
this.isLoaded = isLoaded;
this.isGeneratedByPipe = isGeneratedByPipe;
this.isGeneratedByPipeConsensus = resource.isGeneratedByPipeConsensus();
+ this.isGeneratedByHistoricalExtractor = isGeneratedByHistoricalExtractor;
isClosed = new AtomicBoolean(resource.isClosed());
// Register close listener if TsFile is not closed
@@ -291,6 +298,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
isWithMod,
isLoaded,
isGeneratedByPipe,
+ isGeneratedByHistoricalExtractor,
pipeName,
creationTime,
pipeTaskMeta,
@@ -370,6 +378,10 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
return isGeneratedByPipeConsensus;
}
+ public boolean isGeneratedByHistoricalExtractor() {
+ return isGeneratedByHistoricalExtractor;
+ }
+
private TsFileInsertionDataContainer initDataContainer() {
try {
if (dataContainer == null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
index 60786cfed25..1dd86e1e5d8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
@@ -37,7 +37,7 @@ public class PipeRealtimeEventFactory {
public static PipeRealtimeEvent createRealtimeEvent(
final TsFileResource resource, final boolean isLoaded, final boolean
isGeneratedByPipe) {
return TS_FILE_EPOCH_MANAGER.bindPipeTsFileInsertionEvent(
- new PipeTsFileInsertionEvent(resource, isLoaded, isGeneratedByPipe),
resource);
+ new PipeTsFileInsertionEvent(resource, isLoaded, isGeneratedByPipe,
false), resource);
}
public static PipeRealtimeEvent createRealtimeEvent(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index bbf34892e42..ca1888b0200 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -588,6 +588,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
shouldTransferModFile,
false,
false,
+ true,
pipeName,
creationTime,
pipeTaskMeta,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
index 7929d2a0ee1..5b4001890e5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
@@ -19,8 +19,10 @@
package org.apache.iotdb.db.subscription.broker;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.pipe.api.event.Event;
@@ -37,15 +39,15 @@ public class TsFileDeduplicationBlockingPendingQueue
extends SubscriptionBlockin
private static final Logger LOGGER =
LoggerFactory.getLogger(TsFileDeduplicationBlockingPendingQueue.class);
- private final Cache<Integer, Integer> polledTsFiles;
+ private final Cache<Integer, Boolean>
hashCodeToIsGeneratedByHistoricalExtractor;
public TsFileDeduplicationBlockingPendingQueue(
final UnboundedBlockingPendingQueue<Event> inputPendingQueue) {
super(inputPendingQueue);
- this.polledTsFiles =
+ this.hashCodeToIsGeneratedByHistoricalExtractor =
Caffeine.newBuilder()
- .expireAfterWrite(
+ .expireAfterAccess(
SubscriptionConfig.getInstance().getSubscriptionTsFileDeduplicationWindowSeconds(),
TimeUnit.SECONDS)
.build();
@@ -61,21 +63,49 @@ public class TsFileDeduplicationBlockingPendingQueue
extends SubscriptionBlockin
return null;
}
+ if (event instanceof PipeRawTabletInsertionEvent) {
+ final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
+ (PipeRawTabletInsertionEvent) event;
+ final EnrichedEvent sourceEvent =
pipeRawTabletInsertionEvent.getSourceEvent();
+ if (sourceEvent instanceof PipeTsFileInsertionEvent
+ && isDuplicated((PipeTsFileInsertionEvent) sourceEvent)) {
+ // commit directly
+ pipeRawTabletInsertionEvent.decreaseReferenceCount(
+ TsFileDeduplicationBlockingPendingQueue.class.getName(), true);
+ return null;
+ }
+ }
+
if (event instanceof PipeTsFileInsertionEvent) {
final PipeTsFileInsertionEvent pipeTsFileInsertionEvent =
(PipeTsFileInsertionEvent) event;
- final int hashcode = pipeTsFileInsertionEvent.getTsFile().hashCode();
- if (Objects.nonNull(polledTsFiles.getIfPresent(hashcode))) {
- LOGGER.info(
- "Subscription: Detect duplicated PipeTsFileInsertionEvent {},
commit it directly",
- pipeTsFileInsertionEvent.coreReportMessage());
+ if (isDuplicated(pipeTsFileInsertionEvent)) {
// commit directly
pipeTsFileInsertionEvent.decreaseReferenceCount(
TsFileDeduplicationBlockingPendingQueue.class.getName(), true);
return null;
}
- polledTsFiles.put(hashcode, hashcode);
}
return event;
}
+
+ private boolean isDuplicated(final PipeTsFileInsertionEvent event) {
+ final int hashCode = event.getTsFile().hashCode();
+ final boolean isGeneratedByHistoricalExtractor =
event.isGeneratedByHistoricalExtractor();
+ final Boolean existedIsGeneratedByHistoricalExtractor =
+ hashCodeToIsGeneratedByHistoricalExtractor.getIfPresent(hashCode);
+ if (Objects.isNull(existedIsGeneratedByHistoricalExtractor)) {
+ hashCodeToIsGeneratedByHistoricalExtractor.put(hashCode,
isGeneratedByHistoricalExtractor);
+ return false;
+ }
+ // Multiple PipeRawTabletInsertionEvents parsed from the same
PipeTsFileInsertionEvent (i.e.,
+ // with the same isGeneratedByHistoricalExtractor field) are not
considered duplicates.
+ if (Objects.equals(existedIsGeneratedByHistoricalExtractor,
isGeneratedByHistoricalExtractor)) {
+ return false;
+ }
+ LOGGER.info(
+ "Subscription: Detect duplicated PipeTsFileInsertionEvent {}, commit
it directly",
+ event.coreReportMessage());
+ return true;
+ }
}