This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 64c089441fe Subscription: deduplicate tsfile event before prefetching 
(#12887)
64c089441fe is described below

commit 64c089441fe1549df341e9c86b9f7d1ca1421c5b
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Jul 10 18:09:48 2024 +0800

    Subscription: deduplicate tsfile event before prefetching (#12887)
---
 .../it/local/IoTDBSubscriptionBasicIT.java         | 78 ++++++++++++++++++++++
 .../consumer/SubscriptionConsumer.java             |  9 ++-
 .../broker/SubscriptionBlockingPendingQueue.java}  | 25 ++++---
 .../db/subscription/broker/SubscriptionBroker.java |  6 +-
 .../broker/SubscriptionPrefetchingQueue.java       |  5 +-
 .../broker/SubscriptionPrefetchingTabletQueue.java |  6 +-
 .../broker/SubscriptionPrefetchingTsFileQueue.java |  6 +-
 .../TsFileDeduplicationBlockingPendingQueue.java   | 70 +++++++++++++++++++
 .../connection/UnboundedBlockingPendingQueue.java  |  2 +-
 9 files changed, 180 insertions(+), 27 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
index 83c5c9d1c85..b89bd776caf 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
@@ -34,6 +34,10 @@ import 
org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
 import 
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
 import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
 
+import org.apache.tsfile.read.TsFileReader;
+import org.apache.tsfile.read.common.Path;
+import org.apache.tsfile.read.expression.QueryExpression;
+import org.apache.tsfile.read.query.dataset.QueryDataSet;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -41,7 +45,9 @@ import org.junit.runner.RunWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.time.Duration;
+import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -398,4 +404,76 @@ public class IoTDBSubscriptionBasicIT extends 
AbstractSubscriptionLocalIT {
       thread.join();
     }
   }
+
+  @Test
+  public void testTsFileDeduplication() {
+    // Insert some historical data
+    try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      for (int i = 0; i < 100; ++i) {
+        session.executeNonQueryStatement(
+            String.format("insert into root.db.d1(time, s1) values (%s, 1)", 
i));
+      }
+      // DO NOT FLUSH HERE
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Create topic
+    final String topicName = "topic5";
+    final String host = EnvFactory.getEnv().getIP();
+    final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+    try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
+      session.open();
+      final Properties config = new Properties();
+      config.put(TopicConstant.FORMAT_KEY, 
TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+      session.createTopic(topicName, config);
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Subscription
+    final AtomicInteger onReceiveCount = new AtomicInteger();
+    final AtomicInteger rowCount = new AtomicInteger();
+    try (final SubscriptionPushConsumer consumer =
+        new SubscriptionPushConsumer.Builder()
+            .host(host)
+            .port(port)
+            .consumerId("c1")
+            .consumerGroupId("cg1")
+            .ackStrategy(AckStrategy.AFTER_CONSUME)
+            .consumeListener(
+                message -> {
+                  onReceiveCount.getAndIncrement();
+                  try (final TsFileReader tsFileReader = 
message.getTsFileHandler().openReader()) {
+                    final QueryDataSet dataSet =
+                        tsFileReader.query(
+                            QueryExpression.create(
+                                Collections.singletonList(new 
Path("root.db.d1", "s1", true)),
+                                null));
+                    while (dataSet.hasNext()) {
+                      dataSet.next();
+                      rowCount.addAndGet(1);
+                    }
+                  } catch (final IOException e) {
+                    throw new RuntimeException(e);
+                  }
+                  return ConsumeResult.SUCCESS;
+                })
+            .buildPushConsumer()) {
+
+      consumer.open();
+      consumer.subscribe(topicName);
+
+      AWAIT.untilAsserted(
+          () -> {
+            Assert.assertEquals(100, rowCount.get());
+            Assert.assertEquals(1, onReceiveCount.get()); // exactly one tsfile
+          });
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
 }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index 1dad01aca00..633f63ae7a9 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -365,8 +365,13 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
       return filePath;
     } catch (final FileAlreadyExistsException fileAlreadyExistsException) {
       if (allowFileAlreadyExistsException) {
-        return getFilePath(
-            topicName, fileName + "." + RandomStringGenerator.generate(16), 
false, true);
+        final String suffix = RandomStringGenerator.generate(16);
+        LOGGER.warn(
+            "Detect already existed file {} when polling topic {}, add random 
suffix {} to filename",
+            fileName,
+            topicName,
+            suffix);
+        return getFilePath(topicName, fileName + "." + suffix, false, true);
       }
       throw new SubscriptionRuntimeNonCriticalException(
           fileAlreadyExistsException.getMessage(), fileAlreadyExistsException);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/UnboundedBlockingPendingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBlockingPendingQueue.java
similarity index 59%
copy from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/UnboundedBlockingPendingQueue.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBlockingPendingQueue.java
index 6cea142cb24..b95501e0742 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/UnboundedBlockingPendingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBlockingPendingQueue.java
@@ -17,24 +17,27 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.pipe.task.connection;
+package org.apache.iotdb.db.subscription.broker;
 
-import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
+import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.event.Event;
 
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.LinkedBlockingDeque;
+public abstract class SubscriptionBlockingPendingQueue {
 
-public class UnboundedBlockingPendingQueue<E extends Event> extends 
BlockingPendingQueue<E> {
+  protected final UnboundedBlockingPendingQueue<Event> inputPendingQueue;
 
-  private final BlockingDeque<E> pendingDeque;
+  public SubscriptionBlockingPendingQueue(
+      final UnboundedBlockingPendingQueue<Event> inputPendingQueue) {
+    this.inputPendingQueue = inputPendingQueue;
+  }
+
+  public abstract Event waitedPoll();
 
-  public UnboundedBlockingPendingQueue(PipeEventCounter eventCounter) {
-    super(new LinkedBlockingDeque<>(), eventCounter);
-    pendingDeque = (BlockingDeque<E>) pendingQueue;
+  public int size() {
+    return inputPendingQueue.size();
   }
 
-  public E peekLast() {
-    return pendingDeque.peekLast();
+  public boolean isEmpty() {
+    return inputPendingQueue.isEmpty();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index 45dbeda8d5e..57cfca226c8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -166,12 +166,14 @@ public class SubscriptionBroker {
     final String topicFormat = 
SubscriptionAgent.topic().getTopicFormat(topicName);
     if (TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equals(topicFormat)) {
       final SubscriptionPrefetchingQueue queue =
-          new SubscriptionPrefetchingTsFileQueue(brokerId, topicName, 
inputPendingQueue);
+          new SubscriptionPrefetchingTsFileQueue(
+              brokerId, topicName, new 
TsFileDeduplicationBlockingPendingQueue(inputPendingQueue));
       SubscriptionPrefetchingQueueMetrics.getInstance().register(queue);
       topicNameToPrefetchingQueue.put(topicName, queue);
     } else {
       final SubscriptionPrefetchingQueue queue =
-          new SubscriptionPrefetchingTabletQueue(brokerId, topicName, 
inputPendingQueue);
+          new SubscriptionPrefetchingTabletQueue(
+              brokerId, topicName, new 
TsFileDeduplicationBlockingPendingQueue(inputPendingQueue));
       SubscriptionPrefetchingQueueMetrics.getInstance().register(queue);
       topicNameToPrefetchingQueue.put(topicName, queue);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index e4b1c0d8711..2d23950fe75 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.subscription.broker;
 
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
@@ -56,7 +55,7 @@ public abstract class SubscriptionPrefetchingQueue {
 
   protected final String brokerId; // consumer group id
   protected final String topicName;
-  protected final UnboundedBlockingPendingQueue<Event> inputPendingQueue;
+  protected final SubscriptionBlockingPendingQueue inputPendingQueue;
   protected final LinkedBlockingQueue<SubscriptionEvent> prefetchingQueue;
 
   protected final Map<SubscriptionCommitContext, SubscriptionEvent> 
uncommittedEvents;
@@ -68,7 +67,7 @@ public abstract class SubscriptionPrefetchingQueue {
   public SubscriptionPrefetchingQueue(
       final String brokerId,
       final String topicName,
-      final UnboundedBlockingPendingQueue<Event> inputPendingQueue) {
+      final SubscriptionBlockingPendingQueue inputPendingQueue) {
     this.brokerId = brokerId;
     this.topicName = topicName;
     this.inputPendingQueue = inputPendingQueue;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
index 900bc4577fc..3aa36425b88 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
@@ -20,13 +20,11 @@
 package org.apache.iotdb.db.subscription.broker;
 
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
 import 
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTabletEventBatch;
 import 
org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTabletBatchEvents;
-import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
 import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
@@ -44,7 +42,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-public class SubscriptionPrefetchingTabletQueue extends 
SubscriptionPrefetchingQueue {
+class SubscriptionPrefetchingTabletQueue extends SubscriptionPrefetchingQueue {
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(SubscriptionPrefetchingTabletQueue.class);
@@ -60,7 +58,7 @@ public class SubscriptionPrefetchingTabletQueue extends 
SubscriptionPrefetchingQ
   public SubscriptionPrefetchingTabletQueue(
       final String brokerId,
       final String topicName,
-      final UnboundedBlockingPendingQueue<Event> inputPendingQueue) {
+      final SubscriptionBlockingPendingQueue inputPendingQueue) {
     super(brokerId, topicName, inputPendingQueue);
 
     this.currentBatchRef.set(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index 40c8980d44f..b0aed50b8bf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -19,14 +19,12 @@
 
 package org.apache.iotdb.db.subscription.broker;
 
-import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
 import 
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTsFileEventBatch;
 import 
org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTsFileBatchEvents;
 import 
org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTsFilePlainEvent;
-import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.rpc.subscription.payload.poll.FileInitPayload;
 import org.apache.iotdb.rpc.subscription.payload.poll.FilePiecePayload;
@@ -49,7 +47,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
-public class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQueue {
+class SubscriptionPrefetchingTsFileQueue extends SubscriptionPrefetchingQueue {
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(SubscriptionPrefetchingTsFileQueue.class);
@@ -67,7 +65,7 @@ public class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQ
   public SubscriptionPrefetchingTsFileQueue(
       final String brokerId,
       final String topicName,
-      final UnboundedBlockingPendingQueue<Event> inputPendingQueue) {
+      final SubscriptionBlockingPendingQueue inputPendingQueue) {
     super(brokerId, topicName, inputPendingQueue);
 
     this.consumerIdToSubscriptionEventMap = new ConcurrentHashMap<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
new file mode 100644
index 00000000000..d5741889b2a
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.broker;
+
+import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+public class TsFileDeduplicationBlockingPendingQueue extends 
SubscriptionBlockingPendingQueue {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(TsFileDeduplicationBlockingPendingQueue.class);
+
+  private final Cache<Integer, Integer> polledTsFiles;
+
+  public TsFileDeduplicationBlockingPendingQueue(
+      final UnboundedBlockingPendingQueue<Event> inputPendingQueue) {
+    super(inputPendingQueue);
+
+    this.polledTsFiles =
+        Caffeine.newBuilder()
+            .expireAfterWrite(10, TimeUnit.MINUTES) // TODO: config
+            .build();
+  }
+
+  @Override
+  public synchronized Event waitedPoll() { // make it synchronized
+    final Event event = inputPendingQueue.waitedPoll();
+    if (event instanceof PipeTsFileInsertionEvent) {
+      final PipeTsFileInsertionEvent pipeTsFileInsertionEvent = 
(PipeTsFileInsertionEvent) event;
+      final int hashcode = pipeTsFileInsertionEvent.getTsFile().hashCode();
+      if (Objects.nonNull(polledTsFiles.getIfPresent(hashcode))) {
+        // commit directly
+        LOGGER.info(
+            "Subscription: Detect duplicated PipeTsFileInsertionEvent {}, 
commit it directly",
+            pipeTsFileInsertionEvent.coreReportMessage());
+        pipeTsFileInsertionEvent.decreaseReferenceCount(
+            TsFileDeduplicationBlockingPendingQueue.class.getName(), true);
+        return null;
+      }
+      polledTsFiles.put(hashcode, hashcode);
+    }
+    return event;
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/UnboundedBlockingPendingQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/UnboundedBlockingPendingQueue.java
index 6cea142cb24..fe1542c75a9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/UnboundedBlockingPendingQueue.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/UnboundedBlockingPendingQueue.java
@@ -29,7 +29,7 @@ public class UnboundedBlockingPendingQueue<E extends Event> 
extends BlockingPend
 
   private final BlockingDeque<E> pendingDeque;
 
-  public UnboundedBlockingPendingQueue(PipeEventCounter eventCounter) {
+  public UnboundedBlockingPendingQueue(final PipeEventCounter eventCounter) {
     super(new LinkedBlockingDeque<>(), eventCounter);
     pendingDeque = (BlockingDeque<E>) pendingQueue;
   }

Reply via email to