This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cb3b2f765c Subscription: improve tsfile event deduplication & fix 
some minor concurrency-related issues (#12915)
6cb3b2f765c is described below

commit 6cb3b2f765ccd2e656cea9e0d16fb010163fb10d
Author: V_Galaxy <[email protected]>
AuthorDate: Mon Jul 15 10:10:46 2024 +0800

    Subscription: improve tsfile event deduplication & fix some minor 
concurrency-related issues (#12915)
---
 .../rpc/subscription/payload/poll/TabletsPayload.java |  5 +++--
 .../TsFileDeduplicationBlockingPendingQueue.java      | 19 +++++++++++++++----
 .../org/apache/iotdb/commons/conf/CommonConfig.java   | 11 +++++++++++
 .../apache/iotdb/commons/conf/CommonDescriptor.java   |  5 +++++
 .../subscription/config/SubscriptionConfig.java       |  7 +++++++
 5 files changed, 41 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TabletsPayload.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TabletsPayload.java
index 5b5bb8ade91..6ece7cc691b 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TabletsPayload.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TabletsPayload.java
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 public class TabletsPayload implements SubscriptionPollPayload {
 
@@ -36,7 +37,7 @@ public class TabletsPayload implements 
SubscriptionPollPayload {
   public TabletsPayload() {}
 
   public TabletsPayload(final List<Tablet> tablets) {
-    this.tablets = tablets;
+    this.tablets = new CopyOnWriteArrayList<>(tablets);
   }
 
   public List<Tablet> getTablets() {
@@ -58,7 +59,7 @@ public class TabletsPayload implements 
SubscriptionPollPayload {
     for (int i = 0; i < size; ++i) {
       tablets.add(Tablet.deserialize(buffer));
     }
-    this.tablets = tablets;
+    this.tablets = new CopyOnWriteArrayList<>(tablets);
     return this;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
index d5741889b2a..7929d2a0ee1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.subscription.broker;
 
 import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.event.Event;
 
@@ -44,27 +45,37 @@ public class TsFileDeduplicationBlockingPendingQueue 
extends SubscriptionBlockin
 
     this.polledTsFiles =
         Caffeine.newBuilder()
-            .expireAfterWrite(10, TimeUnit.MINUTES) // TODO: config
+            .expireAfterWrite(
+                
SubscriptionConfig.getInstance().getSubscriptionTsFileDeduplicationWindowSeconds(),
+                TimeUnit.SECONDS)
             .build();
   }
 
   @Override
-  public synchronized Event waitedPoll() { // make it synchronized
-    final Event event = inputPendingQueue.waitedPoll();
+  public Event waitedPoll() {
+    return filter(inputPendingQueue.waitedPoll());
+  }
+
+  private synchronized Event filter(final Event event) { // make it 
synchronized
+    if (Objects.isNull(event)) {
+      return null;
+    }
+
     if (event instanceof PipeTsFileInsertionEvent) {
       final PipeTsFileInsertionEvent pipeTsFileInsertionEvent = 
(PipeTsFileInsertionEvent) event;
       final int hashcode = pipeTsFileInsertionEvent.getTsFile().hashCode();
       if (Objects.nonNull(polledTsFiles.getIfPresent(hashcode))) {
-        // commit directly
         LOGGER.info(
             "Subscription: Detect duplicated PipeTsFileInsertionEvent {}, 
commit it directly",
             pipeTsFileInsertionEvent.coreReportMessage());
+        // commit directly
         pipeTsFileInsertionEvent.decreaseReferenceCount(
             TsFileDeduplicationBlockingPendingQueue.class.getName(), true);
         return null;
       }
       polledTsFiles.put(hashcode, hashcode);
     }
+
     return event;
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 4b426e842b0..751d7f54a2d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -267,6 +267,7 @@ public class CommonConfig {
   private long subscriptionLaunchRetryIntervalMs = 1000;
   private int subscriptionRecycleUncommittedEventIntervalMs = 60000; // 60s
   private long subscriptionReadFileBufferSize = 8 * MB;
+  private long subscriptionTsFileDeduplicationWindowSeconds = 120; // 120s
 
   /** Whether to use persistent schema mode. */
   private String schemaEngineMode = "Memory";
@@ -1221,6 +1222,16 @@ public class CommonConfig {
     this.subscriptionReadFileBufferSize = subscriptionReadFileBufferSize;
   }
 
+  public long getSubscriptionTsFileDeduplicationWindowSeconds() {
+    return subscriptionTsFileDeduplicationWindowSeconds;
+  }
+
+  public void setSubscriptionTsFileDeduplicationWindowSeconds(
+      long subscriptionTsFileDeduplicationWindowSeconds) {
+    this.subscriptionTsFileDeduplicationWindowSeconds =
+        subscriptionTsFileDeduplicationWindowSeconds;
+  }
+
   public String getSchemaEngineMode() {
     return schemaEngineMode;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 6c264c24e2c..9ba601800d6 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -646,6 +646,11 @@ public class CommonDescriptor {
             properties.getProperty(
                 "subscription_read_file_buffer_size",
                 String.valueOf(config.getSubscriptionReadFileBufferSize()))));
+    config.setSubscriptionTsFileDeduplicationWindowSeconds(
+        Long.parseLong(
+            properties.getProperty(
+                "subscription_ts_file_deduplication_window_seconds",
+                
String.valueOf(config.getSubscriptionTsFileDeduplicationWindowSeconds()))));
   }
 
   public void loadRetryProperties(Properties properties) throws IOException {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
index bf40650d604..0c73c6a23d7 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
@@ -71,6 +71,10 @@ public class SubscriptionConfig {
     return COMMON_CONFIG.getSubscriptionReadFileBufferSize();
   }
 
+  public long getSubscriptionTsFileDeduplicationWindowSeconds() {
+    return COMMON_CONFIG.getSubscriptionTsFileDeduplicationWindowSeconds();
+  }
+
   /////////////////////////////// Utils ///////////////////////////////
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SubscriptionConfig.class);
@@ -99,6 +103,9 @@ public class SubscriptionConfig {
         "SubscriptionRecycleUncommittedEventIntervalMs: {}",
         getSubscriptionRecycleUncommittedEventIntervalMs());
     LOGGER.info("SubscriptionReadFileBufferSize: {}", 
getSubscriptionReadFileBufferSize());
+    LOGGER.info(
+        "SubscriptionTsFileDeduplicationWindowSeconds: {}",
+        getSubscriptionTsFileDeduplicationWindowSeconds());
   }
 
   /////////////////////////////// Singleton ///////////////////////////////

Reply via email to