This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 64c089441fe Subscription: deduplicate tsfile event before prefetching
(#12887)
64c089441fe is described below
commit 64c089441fe1549df341e9c86b9f7d1ca1421c5b
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Jul 10 18:09:48 2024 +0800
Subscription: deduplicate tsfile event before prefetching (#12887)
---
.../it/local/IoTDBSubscriptionBasicIT.java | 78 ++++++++++++++++++++++
.../consumer/SubscriptionConsumer.java | 9 ++-
.../broker/SubscriptionBlockingPendingQueue.java} | 25 ++++---
.../db/subscription/broker/SubscriptionBroker.java | 6 +-
.../broker/SubscriptionPrefetchingQueue.java | 5 +-
.../broker/SubscriptionPrefetchingTabletQueue.java | 6 +-
.../broker/SubscriptionPrefetchingTsFileQueue.java | 6 +-
.../TsFileDeduplicationBlockingPendingQueue.java | 70 +++++++++++++++++++
.../connection/UnboundedBlockingPendingQueue.java | 2 +-
9 files changed, 180 insertions(+), 27 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
index 83c5c9d1c85..b89bd776caf 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
@@ -34,6 +34,10 @@ import
org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
+import org.apache.tsfile.read.TsFileReader;
+import org.apache.tsfile.read.common.Path;
+import org.apache.tsfile.read.expression.QueryExpression;
+import org.apache.tsfile.read.query.dataset.QueryDataSet;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -41,7 +45,9 @@ import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.time.Duration;
+import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -398,4 +404,76 @@ public class IoTDBSubscriptionBasicIT extends
AbstractSubscriptionLocalIT {
thread.join();
}
}
+
+ @Test
+ public void testTsFileDeduplication() {
+ // Insert some historical data
+ try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
+ for (int i = 0; i < 100; ++i) {
+ session.executeNonQueryStatement(
+ String.format("insert into root.db.d1(time, s1) values (%s, 1)",
i));
+ }
+ // DO NOT FLUSH HERE
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Create topic
+ final String topicName = "topic5";
+ final String host = EnvFactory.getEnv().getIP();
+ final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+ try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
+ session.open();
+ final Properties config = new Properties();
+ config.put(TopicConstant.FORMAT_KEY,
TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+ session.createTopic(topicName, config);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Subscription
+ final AtomicInteger onReceiveCount = new AtomicInteger();
+ final AtomicInteger rowCount = new AtomicInteger();
+ try (final SubscriptionPushConsumer consumer =
+ new SubscriptionPushConsumer.Builder()
+ .host(host)
+ .port(port)
+ .consumerId("c1")
+ .consumerGroupId("cg1")
+ .ackStrategy(AckStrategy.AFTER_CONSUME)
+ .consumeListener(
+ message -> {
+ onReceiveCount.getAndIncrement();
+ try (final TsFileReader tsFileReader =
message.getTsFileHandler().openReader()) {
+ final QueryDataSet dataSet =
+ tsFileReader.query(
+ QueryExpression.create(
+ Collections.singletonList(new
Path("root.db.d1", "s1", true)),
+ null));
+ while (dataSet.hasNext()) {
+ dataSet.next();
+ rowCount.addAndGet(1);
+ }
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ return ConsumeResult.SUCCESS;
+ })
+ .buildPushConsumer()) {
+
+ consumer.open();
+ consumer.subscribe(topicName);
+
+ AWAIT.untilAsserted(
+ () -> {
+ Assert.assertEquals(100, rowCount.get());
+ Assert.assertEquals(1, onReceiveCount.get()); // exactly one tsfile
+ });
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index 1dad01aca00..633f63ae7a9 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -365,8 +365,13 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
return filePath;
} catch (final FileAlreadyExistsException fileAlreadyExistsException) {
if (allowFileAlreadyExistsException) {
- return getFilePath(
- topicName, fileName + "." + RandomStringGenerator.generate(16),
false, true);
+ final String suffix = RandomStringGenerator.generate(16);
+ LOGGER.warn(
+ "Detect already existed file {} when polling topic {}, add random
suffix {} to filename",
+ fileName,
+ topicName,
+ suffix);
+ return getFilePath(topicName, fileName + "." + suffix, false, true);
}
throw new SubscriptionRuntimeNonCriticalException(
fileAlreadyExistsException.getMessage(), fileAlreadyExistsException);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/UnboundedBlockingPendingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBlockingPendingQueue.java
similarity index 59%
copy from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/UnboundedBlockingPendingQueue.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBlockingPendingQueue.java
index 6cea142cb24..b95501e0742 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/UnboundedBlockingPendingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBlockingPendingQueue.java
@@ -17,24 +17,27 @@
* under the License.
*/
-package org.apache.iotdb.commons.pipe.task.connection;
+package org.apache.iotdb.db.subscription.broker;
-import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
+import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.pipe.api.event.Event;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.LinkedBlockingDeque;
+public abstract class SubscriptionBlockingPendingQueue {
-public class UnboundedBlockingPendingQueue<E extends Event> extends
BlockingPendingQueue<E> {
+ protected final UnboundedBlockingPendingQueue<Event> inputPendingQueue;
- private final BlockingDeque<E> pendingDeque;
+ public SubscriptionBlockingPendingQueue(
+ final UnboundedBlockingPendingQueue<Event> inputPendingQueue) {
+ this.inputPendingQueue = inputPendingQueue;
+ }
+
+ public abstract Event waitedPoll();
- public UnboundedBlockingPendingQueue(PipeEventCounter eventCounter) {
- super(new LinkedBlockingDeque<>(), eventCounter);
- pendingDeque = (BlockingDeque<E>) pendingQueue;
+ public int size() {
+ return inputPendingQueue.size();
}
- public E peekLast() {
- return pendingDeque.peekLast();
+ public boolean isEmpty() {
+ return inputPendingQueue.isEmpty();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index 45dbeda8d5e..57cfca226c8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -166,12 +166,14 @@ public class SubscriptionBroker {
final String topicFormat =
SubscriptionAgent.topic().getTopicFormat(topicName);
if (TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equals(topicFormat)) {
final SubscriptionPrefetchingQueue queue =
- new SubscriptionPrefetchingTsFileQueue(brokerId, topicName,
inputPendingQueue);
+ new SubscriptionPrefetchingTsFileQueue(
+ brokerId, topicName, new
TsFileDeduplicationBlockingPendingQueue(inputPendingQueue));
SubscriptionPrefetchingQueueMetrics.getInstance().register(queue);
topicNameToPrefetchingQueue.put(topicName, queue);
} else {
final SubscriptionPrefetchingQueue queue =
- new SubscriptionPrefetchingTabletQueue(brokerId, topicName,
inputPendingQueue);
+ new SubscriptionPrefetchingTabletQueue(
+ brokerId, topicName, new
TsFileDeduplicationBlockingPendingQueue(inputPendingQueue));
SubscriptionPrefetchingQueueMetrics.getInstance().register(queue);
topicNameToPrefetchingQueue.put(topicName, queue);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index e4b1c0d8711..2d23950fe75 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.subscription.broker;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
@@ -56,7 +55,7 @@ public abstract class SubscriptionPrefetchingQueue {
protected final String brokerId; // consumer group id
protected final String topicName;
- protected final UnboundedBlockingPendingQueue<Event> inputPendingQueue;
+ protected final SubscriptionBlockingPendingQueue inputPendingQueue;
protected final LinkedBlockingQueue<SubscriptionEvent> prefetchingQueue;
protected final Map<SubscriptionCommitContext, SubscriptionEvent>
uncommittedEvents;
@@ -68,7 +67,7 @@ public abstract class SubscriptionPrefetchingQueue {
public SubscriptionPrefetchingQueue(
final String brokerId,
final String topicName,
- final UnboundedBlockingPendingQueue<Event> inputPendingQueue) {
+ final SubscriptionBlockingPendingQueue inputPendingQueue) {
this.brokerId = brokerId;
this.topicName = topicName;
this.inputPendingQueue = inputPendingQueue;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
index 900bc4577fc..3aa36425b88 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
@@ -20,13 +20,11 @@
package org.apache.iotdb.db.subscription.broker;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTabletEventBatch;
import
org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTabletBatchEvents;
-import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
@@ -44,7 +42,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-public class SubscriptionPrefetchingTabletQueue extends
SubscriptionPrefetchingQueue {
+class SubscriptionPrefetchingTabletQueue extends SubscriptionPrefetchingQueue {
private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionPrefetchingTabletQueue.class);
@@ -60,7 +58,7 @@ public class SubscriptionPrefetchingTabletQueue extends
SubscriptionPrefetchingQ
public SubscriptionPrefetchingTabletQueue(
final String brokerId,
final String topicName,
- final UnboundedBlockingPendingQueue<Event> inputPendingQueue) {
+ final SubscriptionBlockingPendingQueue inputPendingQueue) {
super(brokerId, topicName, inputPendingQueue);
this.currentBatchRef.set(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index 40c8980d44f..b0aed50b8bf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -19,14 +19,12 @@
package org.apache.iotdb.db.subscription.broker;
-import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTsFileEventBatch;
import
org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTsFileBatchEvents;
import
org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTsFilePlainEvent;
-import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.rpc.subscription.payload.poll.FileInitPayload;
import org.apache.iotdb.rpc.subscription.payload.poll.FilePiecePayload;
@@ -49,7 +47,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-public class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQueue {
+class SubscriptionPrefetchingTsFileQueue extends SubscriptionPrefetchingQueue {
private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionPrefetchingTsFileQueue.class);
@@ -67,7 +65,7 @@ public class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQ
public SubscriptionPrefetchingTsFileQueue(
final String brokerId,
final String topicName,
- final UnboundedBlockingPendingQueue<Event> inputPendingQueue) {
+ final SubscriptionBlockingPendingQueue inputPendingQueue) {
super(brokerId, topicName, inputPendingQueue);
this.consumerIdToSubscriptionEventMap = new ConcurrentHashMap<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
new file mode 100644
index 00000000000..d5741889b2a
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.broker;
+
+import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+public class TsFileDeduplicationBlockingPendingQueue extends
SubscriptionBlockingPendingQueue {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(TsFileDeduplicationBlockingPendingQueue.class);
+
+ private final Cache<Integer, Integer> polledTsFiles;
+
+ public TsFileDeduplicationBlockingPendingQueue(
+ final UnboundedBlockingPendingQueue<Event> inputPendingQueue) {
+ super(inputPendingQueue);
+
+ this.polledTsFiles =
+ Caffeine.newBuilder()
+ .expireAfterWrite(10, TimeUnit.MINUTES) // TODO: config
+ .build();
+ }
+
+ @Override
+ public synchronized Event waitedPoll() { // make it synchronized
+ final Event event = inputPendingQueue.waitedPoll();
+ if (event instanceof PipeTsFileInsertionEvent) {
+ final PipeTsFileInsertionEvent pipeTsFileInsertionEvent =
(PipeTsFileInsertionEvent) event;
+ final int hashcode = pipeTsFileInsertionEvent.getTsFile().hashCode();
+ if (Objects.nonNull(polledTsFiles.getIfPresent(hashcode))) {
+ // commit directly
+ LOGGER.info(
+ "Subscription: Detect duplicated PipeTsFileInsertionEvent {},
commit it directly",
+ pipeTsFileInsertionEvent.coreReportMessage());
+ pipeTsFileInsertionEvent.decreaseReferenceCount(
+ TsFileDeduplicationBlockingPendingQueue.class.getName(), true);
+ return null;
+ }
+ polledTsFiles.put(hashcode, hashcode);
+ }
+ return event;
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/UnboundedBlockingPendingQueue.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/UnboundedBlockingPendingQueue.java
index 6cea142cb24..fe1542c75a9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/UnboundedBlockingPendingQueue.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/UnboundedBlockingPendingQueue.java
@@ -29,7 +29,7 @@ public class UnboundedBlockingPendingQueue<E extends Event>
extends BlockingPend
private final BlockingDeque<E> pendingDeque;
- public UnboundedBlockingPendingQueue(PipeEventCounter eventCounter) {
+ public UnboundedBlockingPendingQueue(final PipeEventCounter eventCounter) {
super(new LinkedBlockingDeque<>(), eventCounter);
pendingDeque = (BlockingDeque<E>) pendingQueue;
}