This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6cb3b2f765c Subscription: improve tsfile event deduplication & fix
some minor concurrency-related issues (#12915)
6cb3b2f765c is described below
commit 6cb3b2f765ccd2e656cea9e0d16fb010163fb10d
Author: V_Galaxy <[email protected]>
AuthorDate: Mon Jul 15 10:10:46 2024 +0800
Subscription: improve tsfile event deduplication & fix some minor
concurrency-related issues (#12915)
---
.../rpc/subscription/payload/poll/TabletsPayload.java | 5 +++--
.../TsFileDeduplicationBlockingPendingQueue.java | 19 +++++++++++++++----
.../org/apache/iotdb/commons/conf/CommonConfig.java | 11 +++++++++++
.../apache/iotdb/commons/conf/CommonDescriptor.java | 5 +++++
.../subscription/config/SubscriptionConfig.java | 7 +++++++
5 files changed, 41 insertions(+), 6 deletions(-)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TabletsPayload.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TabletsPayload.java
index 5b5bb8ade91..6ece7cc691b 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TabletsPayload.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TabletsPayload.java
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
public class TabletsPayload implements SubscriptionPollPayload {
@@ -36,7 +37,7 @@ public class TabletsPayload implements
SubscriptionPollPayload {
public TabletsPayload() {}
public TabletsPayload(final List<Tablet> tablets) {
- this.tablets = tablets;
+ this.tablets = new CopyOnWriteArrayList<>(tablets);
}
public List<Tablet> getTablets() {
@@ -58,7 +59,7 @@ public class TabletsPayload implements
SubscriptionPollPayload {
for (int i = 0; i < size; ++i) {
tablets.add(Tablet.deserialize(buffer));
}
- this.tablets = tablets;
+ this.tablets = new CopyOnWriteArrayList<>(tablets);
return this;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
index d5741889b2a..7929d2a0ee1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.subscription.broker;
import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.pipe.api.event.Event;
@@ -44,27 +45,37 @@ public class TsFileDeduplicationBlockingPendingQueue
extends SubscriptionBlockin
this.polledTsFiles =
Caffeine.newBuilder()
- .expireAfterWrite(10, TimeUnit.MINUTES) // TODO: config
+ .expireAfterWrite(
+
SubscriptionConfig.getInstance().getSubscriptionTsFileDeduplicationWindowSeconds(),
+ TimeUnit.SECONDS)
.build();
}
@Override
- public synchronized Event waitedPoll() { // make it synchronized
- final Event event = inputPendingQueue.waitedPoll();
+ public Event waitedPoll() {
+ return filter(inputPendingQueue.waitedPoll());
+ }
+
+ private synchronized Event filter(final Event event) { // make it
synchronized
+ if (Objects.isNull(event)) {
+ return null;
+ }
+
if (event instanceof PipeTsFileInsertionEvent) {
final PipeTsFileInsertionEvent pipeTsFileInsertionEvent =
(PipeTsFileInsertionEvent) event;
final int hashcode = pipeTsFileInsertionEvent.getTsFile().hashCode();
if (Objects.nonNull(polledTsFiles.getIfPresent(hashcode))) {
- // commit directly
LOGGER.info(
"Subscription: Detect duplicated PipeTsFileInsertionEvent {},
commit it directly",
pipeTsFileInsertionEvent.coreReportMessage());
+ // commit directly
pipeTsFileInsertionEvent.decreaseReferenceCount(
TsFileDeduplicationBlockingPendingQueue.class.getName(), true);
return null;
}
polledTsFiles.put(hashcode, hashcode);
}
+
return event;
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 4b426e842b0..751d7f54a2d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -267,6 +267,7 @@ public class CommonConfig {
private long subscriptionLaunchRetryIntervalMs = 1000;
private int subscriptionRecycleUncommittedEventIntervalMs = 60000; // 60s
private long subscriptionReadFileBufferSize = 8 * MB;
+ private long subscriptionTsFileDeduplicationWindowSeconds = 120; // 120s
/** Whether to use persistent schema mode. */
private String schemaEngineMode = "Memory";
@@ -1221,6 +1222,16 @@ public class CommonConfig {
this.subscriptionReadFileBufferSize = subscriptionReadFileBufferSize;
}
+ public long getSubscriptionTsFileDeduplicationWindowSeconds() {
+ return subscriptionTsFileDeduplicationWindowSeconds;
+ }
+
+ public void setSubscriptionTsFileDeduplicationWindowSeconds(
+ long subscriptionTsFileDeduplicationWindowSeconds) {
+ this.subscriptionTsFileDeduplicationWindowSeconds =
+ subscriptionTsFileDeduplicationWindowSeconds;
+ }
+
public String getSchemaEngineMode() {
return schemaEngineMode;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 6c264c24e2c..9ba601800d6 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -646,6 +646,11 @@ public class CommonDescriptor {
properties.getProperty(
"subscription_read_file_buffer_size",
String.valueOf(config.getSubscriptionReadFileBufferSize()))));
+ config.setSubscriptionTsFileDeduplicationWindowSeconds(
+ Long.parseLong(
+ properties.getProperty(
+ "subscription_ts_file_deduplication_window_seconds",
+
String.valueOf(config.getSubscriptionTsFileDeduplicationWindowSeconds()))));
}
public void loadRetryProperties(Properties properties) throws IOException {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
index bf40650d604..0c73c6a23d7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
@@ -71,6 +71,10 @@ public class SubscriptionConfig {
return COMMON_CONFIG.getSubscriptionReadFileBufferSize();
}
+ public long getSubscriptionTsFileDeduplicationWindowSeconds() {
+ return COMMON_CONFIG.getSubscriptionTsFileDeduplicationWindowSeconds();
+ }
+
/////////////////////////////// Utils ///////////////////////////////
private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionConfig.class);
@@ -99,6 +103,9 @@ public class SubscriptionConfig {
"SubscriptionRecycleUncommittedEventIntervalMs: {}",
getSubscriptionRecycleUncommittedEventIntervalMs());
LOGGER.info("SubscriptionReadFileBufferSize: {}",
getSubscriptionReadFileBufferSize());
+ LOGGER.info(
+ "SubscriptionTsFileDeduplicationWindowSeconds: {}",
+ getSubscriptionTsFileDeduplicationWindowSeconds());
}
/////////////////////////////// Singleton ///////////////////////////////