This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch hybrid-connot-unpin
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6dd47bdd465144acdb4451ef3c7b4853f7ae49b9
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Oct 13 13:26:15 2023 +0800

    refactor
---
 ...oTDBThriftAsyncPipeTransferBatchReqBuilder.java |   7 +-
 .../thrift/async/IoTDBThriftAsyncConnector.java    |   5 +-
 .../PipeRealtimeDataRegionHybridExtractor.java     | 112 ++++++++++++++-------
 .../pipe/extractor/realtime/epoch/TsFileEpoch.java |   3 +-
 .../realtime/epoch/TsFileEpochManager.java         |   4 +-
 .../db/pipe/resource/wal/PipeWALResource.java      |   2 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |   4 +-
 7 files changed, 95 insertions(+), 42 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
index 2993b88da14..9de0dc84678 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
@@ -48,7 +48,8 @@ public class IoTDBThriftAsyncPipeTransferBatchReqBuilder 
extends PipeTransferBat
       throws IOException, WALPipeException {
     final TPipeTransferReq req = buildTabletInsertionReq(event);
 
-    if (events.isEmpty() || !events.get(events.size() - 1).equals(event)) {
+    if (requestCommitIds.isEmpty()
+        || !requestCommitIds.get(requestCommitIds.size() - 
1).equals(requestCommitId)) {
       reqs.add(req);
 
       if (event instanceof EnrichedEvent) {
@@ -86,4 +87,8 @@ public class IoTDBThriftAsyncPipeTransferBatchReqBuilder 
extends PipeTransferBat
   public List<Long> deepcopyRequestCommitIds() {
     return new ArrayList<>(requestCommitIds);
   }
+
+  public long getLastCommitId() {
+    return requestCommitIds.isEmpty() ? -1 : 
requestCommitIds.get(requestCommitIds.size() - 1);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index 42cdd8e7818..84b8865604c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -487,7 +487,10 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
       return;
     }
 
-    final long requestCommitId = commitIdGenerator.incrementAndGet();
+    // requestCommitId can not be generated by commitIdGenerator because the 
commit id must
+    // be bind to a specific InsertTabletEvent or TsFileInsertionEvent, 
otherwise the commit
+    // process will be stuck.
+    final long requestCommitId = tabletBatchBuilder.getLastCommitId();
     final PipeTransferTabletBatchEventHandler 
pipeTransferTabletBatchEventHandler =
         new PipeTransferTabletBatchEventHandler(tabletBatchBuilder, this);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index a452577858f..257479a1d62 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -86,7 +86,21 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
       //  size of wal buffer), the write operation will be throttled, so we 
should not extract any
       //  more tablet events.
       //  3. The number of tsfile events in the pending queue has exceeded the 
limit.
-      event.getTsFileEpoch().migrateState(this, state -> 
TsFileEpoch.State.USING_TSFILE);
+      event
+          .getTsFileEpoch()
+          .migrateState(
+              this,
+              state -> {
+                switch (state) {
+                  case EMPTY:
+                  case USING_TSFILE:
+                    return TsFileEpoch.State.USING_TSFILE;
+                  case USING_TABLET:
+                  case USING_BOTH:
+                  default:
+                    return TsFileEpoch.State.USING_BOTH;
+                }
+              });
     }
 
     final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
@@ -97,6 +111,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
         break;
       case EMPTY:
       case USING_TABLET:
+      case USING_BOTH:
         if (!pendingQueue.waitedOffer(event)) {
           // this would not happen, but just in case.
           // pendingQueue is unbounded, so it should never reach capacity.
@@ -127,13 +142,24 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
         .getTsFileEpoch()
         .migrateState(
             this,
-            state ->
-                state.equals(TsFileEpoch.State.EMPTY) ? 
TsFileEpoch.State.USING_TSFILE : state);
+            state -> {
+              switch (state) {
+                case EMPTY:
+                case USING_TSFILE:
+                  return TsFileEpoch.State.USING_TSFILE;
+                case USING_TABLET:
+                  return TsFileEpoch.State.USING_TABLET;
+                case USING_BOTH:
+                default:
+                  return TsFileEpoch.State.USING_BOTH;
+              }
+            });
 
     final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
     switch (state) {
       case EMPTY:
       case USING_TSFILE:
+      case USING_BOTH:
         if (!pendingQueue.waitedOffer(event)) {
           // this would not happen, but just in case.
           // pendingQueue is unbounded, so it should never reach capacity.
@@ -265,23 +291,29 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
             state ->
                 (state.equals(TsFileEpoch.State.EMPTY)) ? 
TsFileEpoch.State.USING_TABLET : state);
 
-    if 
(event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TABLET)) {
-      if 
(event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()))
 {
-        return event.getEvent();
-      } else {
-        // if the event's reference count can not be increased, it means the 
data represented by
-        // this event is not reliable anymore. but the data represented by 
this event
-        // has been carried by the following tsfile event, so we can just 
discard this event.
-        event.getTsFileEpoch().migrateState(this, state -> 
TsFileEpoch.State.USING_TSFILE);
-        LOGGER.warn(
-            "Discard tablet event {} because it is not reliable anymore. "
-                + "Change the state of TsFileEpoch to USING_TSFILE.",
-            event);
+    final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
+    switch (state) {
+      case USING_TSFILE:
+        // if the state is USING_TSFILE, discard the event and poll the next 
one.
         return null;
-      }
+      case EMPTY:
+      case USING_TABLET:
+      case USING_BOTH:
+      default:
+        if 
(event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()))
 {
+          return event.getEvent();
+        } else {
+          // if the event's reference count can not be increased, it means the 
data represented by
+          // this event is not reliable anymore. but the data represented by 
this event
+          // has been carried by the following tsfile event, so we can just 
discard this event.
+          event.getTsFileEpoch().migrateState(this, s -> 
TsFileEpoch.State.USING_BOTH);
+          LOGGER.warn(
+              "Discard tablet event {} because it is not reliable anymore. "
+                  + "Change the state of TsFileEpoch to USING_TSFILE.",
+              event);
+          return null;
+        }
     }
-    // if the state is USING_TSFILE, discard the event and poll the next one.
-    return null;
   }
 
   private Event supplyTsFileInsertion(PipeRealtimeEvent event) {
@@ -299,26 +331,34 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
               return state;
             });
 
-    if 
(event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)) {
-      if 
(event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()))
 {
-        return event.getEvent();
-      } else {
-        // if the event's reference count can not be increased, it means the 
data represented by
-        // this event is not reliable anymore. the data has been lost. we 
simply discard this event
-        // and report the exception to PipeRuntimeAgent.
-        final String errorMessage =
-            String.format(
-                "TsFile Event %s can not be supplied because "
-                    + "the reference count can not be increased, "
-                    + "the data represented by this event is lost",
-                event.getEvent());
-        LOGGER.error(errorMessage);
-        PipeAgent.runtime().report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
+    final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
+    switch (state) {
+      case USING_TABLET:
+        // if the state is USING_TABLET, discard the event and poll the next 
one.
         return null;
-      }
+      case EMPTY:
+      case USING_TSFILE:
+      case USING_BOTH:
+      default:
+        if 
(event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()))
 {
+          return event.getEvent();
+        } else {
+          // if the event's reference count can not be increased, it means the 
data represented by
+          // this event is not reliable anymore. the data has been lost. we 
simply discard this
+          // event
+          // and report the exception to PipeRuntimeAgent.
+          final String errorMessage =
+              String.format(
+                  "TsFile Event %s can not be supplied because "
+                      + "the reference count can not be increased, "
+                      + "the data represented by this event is lost",
+                  event.getEvent());
+          LOGGER.error(errorMessage);
+          PipeAgent.runtime()
+              .report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
+          return null;
+        }
     }
-    // if the state is USING_TABLET, discard the event and poll the next one.
-    return null;
   }
 
   private Event supplyHeartbeat(PipeRealtimeEvent event) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java
index 55a500bb1ed..f57408a9058 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java
@@ -63,6 +63,7 @@ public class TsFileEpoch {
   public enum State {
     EMPTY,
     USING_TABLET,
-    USING_TSFILE
+    USING_TSFILE,
+    USING_BOTH
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java
index cfa57760985..5b92828c00c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java
@@ -53,9 +53,11 @@ public class TsFileEpochManager {
           return new TsFileEpoch(path);
         });
 
+    final TsFileEpoch epoch = filePath2Epoch.remove(filePath);
+    LOGGER.info("All data in TsFileEpoch {} was extracted", epoch);
     return new PipeRealtimeEvent(
         event,
-        filePath2Epoch.remove(filePath),
+        epoch,
         resource.getDevices().stream()
             .collect(Collectors.toMap(device -> device, device -> 
EMPTY_MEASUREMENT_ARRAY)),
         event.getPattern());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
index 358e319dafa..951ad738460 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
@@ -40,7 +40,7 @@ public abstract class PipeWALResource implements Closeable {
 
   private final AtomicInteger referenceCount;
 
-  public static final long MIN_TIME_TO_LIVE_IN_MS = 1000L * 60;
+  public static final long MIN_TIME_TO_LIVE_IN_MS = 1000L * 20;
   private final AtomicLong lastLogicalPinTime;
   private final AtomicBoolean isPhysicallyPinned;
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index ccd3e8e8c90..e6bf01dff7f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -170,7 +170,9 @@ public class CommonConfig {
   private long pipeConnectorTimeoutMs = 15 * 60 * 1000L; // 15 minutes
   private int pipeConnectorReadFileBufferSize = 8388608;
   private long pipeConnectorRetryIntervalMs = 1000L;
-  private int pipeConnectorPendingQueueSize = 16;
+  // recommend to set this value to 3 * pipeSubtaskExecutorMaxThreadNum *
+  // pipeAsyncConnectorCoreClientNumber
+  private int pipeConnectorPendingQueueSize = 256;
   private boolean pipeConnectorRPCThriftCompressionEnabled = false;
 
   private int pipeAsyncConnectorSelectorNumber = 1;

Reply via email to