This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c893101de1b Pipe: cache leader support batch mode (#12486)
c893101de1b is described below
commit c893101de1be6bc61ab38e6570ad1dc75d23efb1
Author: Zikun Ma <[email protected]>
AuthorDate: Mon May 13 23:00:49 2024 +0800
Pipe: cache leader support batch mode (#12486)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../client/IoTDBDataNodeAsyncClientManager.java | 10 +-
.../IoTDBDataNodeCacheLeaderClientManager.java | 2 +-
.../client/IoTDBDataNodeSyncClientManager.java | 9 +
...oTDBThriftAsyncPipeTransferBatchReqBuilder.java | 36 ----
...IoTDBThriftSyncPipeTransferBatchReqBuilder.java | 29 ---
...ferBatchReqBuilder.java => PipeEventBatch.java} | 61 +++---
.../builder/PipeTransferBatchReqBuilder.java | 219 +++++++--------------
.../async/IoTDBDataRegionAsyncConnector.java | 49 +++--
.../PipeTransferTabletBatchEventHandler.java | 19 +-
.../thrift/sync/IoTDBDataRegionSyncConnector.java | 40 ++--
.../db/pipe/connector/util/LeaderCacheUtils.java | 69 +++++++
.../protocol/thrift/IoTDBDataNodeReceiver.java | 51 ++++-
.../crud/InsertMultiTabletsStatement.java | 8 +
13 files changed, 313 insertions(+), 289 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 729c3dbc8bc..62eeab93caa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -107,12 +108,15 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
}
public AsyncPipeDataTransferServiceClient borrowClient(String deviceId)
throws Exception {
- if (!useLeaderCache) {
+ if (!useLeaderCache || Objects.isNull(deviceId)) {
return borrowClient();
}
- final TEndPoint endPoint =
LEADER_CACHE_MANAGER.getLeaderEndPoint(deviceId);
- if (endPoint == null) {
+ return borrowClient(LEADER_CACHE_MANAGER.getLeaderEndPoint(deviceId));
+ }
+
+ public AsyncPipeDataTransferServiceClient borrowClient(TEndPoint endPoint)
throws Exception {
+ if (!useLeaderCache || Objects.isNull(endPoint)) {
return borrowClient();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
index b6a6511efc7..29e46e60635 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
-interface IoTDBDataNodeCacheLeaderClientManager {
+public interface IoTDBDataNodeCacheLeaderClientManager {
LeaderCacheManager LEADER_CACHE_MANAGER = new LeaderCacheManager();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
index e281555e889..969d8d74d38 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
@@ -79,6 +79,15 @@ public class IoTDBDataNodeSyncClientManager extends
IoTDBSyncClientManager
: getClient();
}
+ public Pair<IoTDBSyncClient, Boolean> getClient(TEndPoint endPoint) {
+ return useLeaderCache
+ && endPoint != null
+ && endPoint2ClientAndStatus.containsKey(endPoint)
+ &&
Boolean.TRUE.equals(endPoint2ClientAndStatus.get(endPoint).getRight())
+ ? endPoint2ClientAndStatus.get(endPoint)
+ : getClient();
+ }
+
public void updateLeaderCache(String deviceId, TEndPoint endPoint) {
if (!useLeaderCache) {
return;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
deleted file mode 100644
index 394bc0f5b0c..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
-
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class IoTDBThriftAsyncPipeTransferBatchReqBuilder extends
PipeTransferBatchReqBuilder {
-
- public IoTDBThriftAsyncPipeTransferBatchReqBuilder(PipeParameters
parameters) {
- super(parameters);
- }
-
- public List<Long> deepCopyRequestCommitIds() {
- return new ArrayList<>(requestCommitIds);
- }
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
deleted file mode 100644
index 95b03927b4d..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
-
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
-
-public class IoTDBThriftSyncPipeTransferBatchReqBuilder extends
PipeTransferBatchReqBuilder {
-
- public IoTDBThriftSyncPipeTransferBatchReqBuilder(final PipeParameters
parameters) {
- super(parameters);
- }
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeEventBatch.java
similarity index 76%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeEventBatch.java
index f7a17f3d7fc..74b49ce7272 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeEventBatch.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -40,49 +39,31 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Objects;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_SIZE_KEY;
+public class PipeEventBatch implements AutoCloseable {
-public abstract class PipeTransferBatchReqBuilder implements AutoCloseable {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeEventBatch.class);
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTransferBatchReqBuilder.class);
+ private final List<Event> events = new ArrayList<>();
+ private final List<Long> requestCommitIds = new ArrayList<>();
- protected final List<Event> events = new ArrayList<>();
- protected final List<Long> requestCommitIds = new ArrayList<>();
-
- protected final List<ByteBuffer> binaryBuffers = new ArrayList<>();
- protected final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
- protected final List<ByteBuffer> tabletBuffers = new ArrayList<>();
+ private final List<ByteBuffer> binaryBuffers = new ArrayList<>();
+ private final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
+ private final List<ByteBuffer> tabletBuffers = new ArrayList<>();
// limit in delayed time
- protected final int maxDelayInMs;
- protected long firstEventProcessingTime = Long.MIN_VALUE;
+ private final int maxDelayInMs;
+ private long firstEventProcessingTime = Long.MIN_VALUE;
// limit in buffer size
- protected final PipeMemoryBlock allocatedMemoryBlock;
- protected long totalBufferSize = 0;
-
- protected PipeTransferBatchReqBuilder(final PipeParameters parameters) {
- maxDelayInMs =
- parameters.getIntOrDefault(
- Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY,
SINK_IOTDB_BATCH_DELAY_KEY),
- CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE)
- * 1000;
-
- final long requestMaxBatchSizeInBytes =
- parameters.getLongOrDefault(
- Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY,
SINK_IOTDB_BATCH_SIZE_KEY),
- CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE);
-
- allocatedMemoryBlock =
+ private final PipeMemoryBlock allocatedMemoryBlock;
+ private long totalBufferSize = 0;
+
+ public PipeEventBatch(int maxDelayInMs, long requestMaxBatchSizeInBytes) {
+ this.maxDelayInMs = maxDelayInMs;
+ this.allocatedMemoryBlock =
PipeResourceManager.memory()
.tryAllocate(requestMaxBatchSizeInBytes)
.setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 0))
@@ -107,10 +88,10 @@ public abstract class PipeTransferBatchReqBuilder
implements AutoCloseable {
}
/**
- * Try offer {@link Event} into cache if the given {@link Event} is not
duplicated.
+ * Try offer {@link Event} into batch if the given {@link Event} is not
duplicated.
*
* @param event the given {@link Event}
- * @return {@link true} if the batch can be transferred
+ * @return {@code true} if the batch can be transferred
*/
public synchronized boolean onEvent(final TabletInsertionEvent event)
throws IOException, WALPipeException {
@@ -163,7 +144,7 @@ public abstract class PipeTransferBatchReqBuilder
implements AutoCloseable {
binaryBuffers, insertNodeBuffers, tabletBuffers);
}
- protected long getMaxBatchSizeInBytes() {
+ private long getMaxBatchSizeInBytes() {
return allocatedMemoryBlock.getMemoryUsageInBytes();
}
@@ -175,7 +156,11 @@ public abstract class PipeTransferBatchReqBuilder
implements AutoCloseable {
return new ArrayList<>(events);
}
- protected int buildTabletInsertionBuffer(final TabletInsertionEvent event)
+ public List<Long> deepCopyRequestCommitIds() {
+ return new ArrayList<>(requestCommitIds);
+ }
+
+ private int buildTabletInsertionBuffer(final TabletInsertionEvent event)
throws IOException, WALPipeException {
final ByteBuffer buffer;
if (event instanceof PipeInsertNodeTabletInsertionEvent) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
index f7a17f3d7fc..2b19bb1275d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
@@ -19,212 +19,145 @@
package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq;
+import
org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeCacheLeaderClientManager;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
-import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.tsfile.utils.PublicBAOS;
-import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.DataOutputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_SIZE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LEADER_CACHE_ENABLE_KEY;
-public abstract class PipeTransferBatchReqBuilder implements AutoCloseable {
+public class PipeTransferBatchReqBuilder implements AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTransferBatchReqBuilder.class);
- protected final List<Event> events = new ArrayList<>();
- protected final List<Long> requestCommitIds = new ArrayList<>();
+ private final boolean useLeaderCache;
- protected final List<ByteBuffer> binaryBuffers = new ArrayList<>();
- protected final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
- protected final List<ByteBuffer> tabletBuffers = new ArrayList<>();
+ private final int requestMaxDelayInMs;
+ private final long requestMaxBatchSizeInBytes;
- // limit in delayed time
- protected final int maxDelayInMs;
- protected long firstEventProcessingTime = Long.MIN_VALUE;
+ // If the leader cache is disabled (or unable to find the endpoint of event
in the leader cache),
+ // the event will be stored in the default batch.
+ private final PipeEventBatch defaultBatch;
+ // If the leader cache is enabled, the batch will be divided by the leader
endpoint,
+ // each endpoint has a batch.
+ private final Map<TEndPoint, PipeEventBatch> endPointToBatch = new
HashMap<>();
- // limit in buffer size
- protected final PipeMemoryBlock allocatedMemoryBlock;
- protected long totalBufferSize = 0;
+ public PipeTransferBatchReqBuilder(final PipeParameters parameters) {
+ useLeaderCache =
+ parameters.getBooleanOrDefault(
+ Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY,
CONNECTOR_LEADER_CACHE_ENABLE_KEY),
+ CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE);
- protected PipeTransferBatchReqBuilder(final PipeParameters parameters) {
- maxDelayInMs =
+ requestMaxDelayInMs =
parameters.getIntOrDefault(
Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY,
SINK_IOTDB_BATCH_DELAY_KEY),
CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE)
* 1000;
-
- final long requestMaxBatchSizeInBytes =
+ requestMaxBatchSizeInBytes =
parameters.getLongOrDefault(
Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY,
SINK_IOTDB_BATCH_SIZE_KEY),
CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE);
- allocatedMemoryBlock =
- PipeResourceManager.memory()
- .tryAllocate(requestMaxBatchSizeInBytes)
- .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 0))
- .setShrinkCallback(
- (oldMemory, newMemory) ->
- LOGGER.info(
- "The batch size limit has shrunk from {} to {}.",
oldMemory, newMemory))
- .setExpandMethod(
- oldMemory -> Math.min(Math.max(oldMemory, 1) * 2,
requestMaxBatchSizeInBytes))
- .setExpandCallback(
- (oldMemory, newMemory) ->
- LOGGER.info(
- "The batch size limit has expanded from {} to {}.",
oldMemory, newMemory));
-
- if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) {
- LOGGER.info(
- "PipeTransferBatchReqBuilder: the max batch size is adjusted from {}
to {} due to the "
- + "memory restriction",
- requestMaxBatchSizeInBytes,
- getMaxBatchSizeInBytes());
- }
+ this.defaultBatch = new PipeEventBatch(requestMaxDelayInMs,
requestMaxBatchSizeInBytes);
}
/**
- * Try offer {@link Event} into cache if the given {@link Event} is not
duplicated.
+ * Try offer {@link Event} into the corresponding batch if the given {@link
Event} is not
+ * duplicated.
*
* @param event the given {@link Event}
- * @return {@link true} if the batch can be transferred
+ * @return {@link Pair}<{@link TEndPoint}, {@link PipeEventBatch}> not null
means this {@link
+ * PipeEventBatch} can be transferred. the first element is the leader
endpoint to transfer to
+ * (might be null), the second element is the batch to be transferred.
*/
- public synchronized boolean onEvent(final TabletInsertionEvent event)
+ public synchronized Pair<TEndPoint, PipeEventBatch> onEvent(final
TabletInsertionEvent event)
throws IOException, WALPipeException {
if (!(event instanceof EnrichedEvent)) {
- return false;
+ LOGGER.warn(
+ "Unsupported event {} type {} when building transfer request",
event, event.getClass());
+ return null;
}
- final long requestCommitId = ((EnrichedEvent) event).getCommitId();
-
- // The deduplication logic here is to avoid the accumulation of the same
event in a batch when
- // retrying.
- if ((events.isEmpty() || !events.get(events.size() - 1).equals(event))) {
- // We increase the reference count for this event to determine if the
event may be released.
- if (((EnrichedEvent) event)
-
.increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName())) {
- events.add(event);
- requestCommitIds.add(requestCommitId);
-
- final int bufferSize = buildTabletInsertionBuffer(event);
- totalBufferSize += bufferSize;
-
- if (firstEventProcessingTime == Long.MIN_VALUE) {
- firstEventProcessingTime = System.currentTimeMillis();
- }
- } else {
- ((EnrichedEvent) event)
-
.decreaseReferenceCount(PipeTransferBatchReqBuilder.class.getName(), false);
- }
+ if (!useLeaderCache) {
+ return defaultBatch.onEvent(event) ? new Pair<>(null, defaultBatch) :
null;
}
- return totalBufferSize >= getMaxBatchSizeInBytes()
- || System.currentTimeMillis() - firstEventProcessingTime >=
maxDelayInMs;
- }
-
- public synchronized void onSuccess() {
- binaryBuffers.clear();
- insertNodeBuffers.clear();
- tabletBuffers.clear();
-
- events.clear();
- requestCommitIds.clear();
+ String deviceId = null;
+ if (event instanceof PipeRawTabletInsertionEvent) {
+ deviceId = ((PipeRawTabletInsertionEvent) event).getDeviceId();
+ } else if (event instanceof PipeInsertNodeTabletInsertionEvent) {
+ final InsertNode insertNode =
+ ((PipeInsertNodeTabletInsertionEvent)
event).getInsertNodeViaCacheIfPossible();
+ if (Objects.nonNull(insertNode)) {
+ deviceId = insertNode.getDevicePath().getFullPath();
+ }
+ }
- firstEventProcessingTime = Long.MIN_VALUE;
+ if (Objects.isNull(deviceId)) {
+ return defaultBatch.onEvent(event) ? new Pair<>(null, defaultBatch) :
null;
+ }
- totalBufferSize = 0;
- }
+ final TEndPoint endPoint =
+
IoTDBDataNodeCacheLeaderClientManager.LEADER_CACHE_MANAGER.getLeaderEndPoint(deviceId);
+ if (Objects.isNull(endPoint)) {
+ return defaultBatch.onEvent(event) ? new Pair<>(null, defaultBatch) :
null;
+ }
- public PipeTransferTabletBatchReq toTPipeTransferReq() throws IOException {
- return PipeTransferTabletBatchReq.toTPipeTransferReq(
- binaryBuffers, insertNodeBuffers, tabletBuffers);
+ final PipeEventBatch batch =
+ endPointToBatch.computeIfAbsent(
+ endPoint, k -> new PipeEventBatch(requestMaxDelayInMs,
requestMaxBatchSizeInBytes));
+ return batch.onEvent(event) ? new Pair<>(endPoint, batch) : null;
}
- protected long getMaxBatchSizeInBytes() {
- return allocatedMemoryBlock.getMemoryUsageInBytes();
+ /** Get all batches that have at least 1 event. */
+ public synchronized List<Pair<TEndPoint, PipeEventBatch>>
getAllNonEmptyBatches() {
+ final List<Pair<TEndPoint, PipeEventBatch>> nonEmptyBatches = new
ArrayList<>();
+ if (!defaultBatch.isEmpty()) {
+ nonEmptyBatches.add(new Pair<>(null, defaultBatch));
+ }
+ endPointToBatch.forEach(
+ (endPoint, batch) -> {
+ if (!batch.isEmpty()) {
+ nonEmptyBatches.add(new Pair<>(endPoint, batch));
+ }
+ });
+ return nonEmptyBatches;
}
public boolean isEmpty() {
- return binaryBuffers.isEmpty() && insertNodeBuffers.isEmpty() &&
tabletBuffers.isEmpty();
- }
-
- public List<Event> deepCopyEvents() {
- return new ArrayList<>(events);
- }
-
- protected int buildTabletInsertionBuffer(final TabletInsertionEvent event)
- throws IOException, WALPipeException {
- final ByteBuffer buffer;
- if (event instanceof PipeInsertNodeTabletInsertionEvent) {
- final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent =
- (PipeInsertNodeTabletInsertionEvent) event;
- // Read the bytebuffer from the wal file and transfer it directly
without serializing or
- // deserializing if possible
- final InsertNode insertNode =
- pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
- if (Objects.isNull(insertNode)) {
- buffer = pipeInsertNodeTabletInsertionEvent.getByteBuffer();
- binaryBuffers.add(buffer);
- } else {
- buffer = insertNode.serializeToByteBuffer();
- insertNodeBuffers.add(buffer);
- }
- } else {
- final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
- (PipeRawTabletInsertionEvent) event;
- try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
- final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
- pipeRawTabletInsertionEvent.convertToTablet().serialize(outputStream);
- ReadWriteIOUtils.write(pipeRawTabletInsertionEvent.isAligned(),
outputStream);
- buffer = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
- }
- tabletBuffers.add(buffer);
- }
- return buffer.limit();
+ return defaultBatch.isEmpty()
+ && endPointToBatch.values().stream().allMatch(PipeEventBatch::isEmpty);
}
@Override
public synchronized void close() {
- clearEventsReferenceCount(PipeTransferBatchReqBuilder.class.getName());
- allocatedMemoryBlock.close();
- }
-
- public void decreaseEventsReferenceCount(final String holderMessage, final
boolean shouldReport) {
- for (final Event event : events) {
- if (event instanceof EnrichedEvent) {
- ((EnrichedEvent) event).decreaseReferenceCount(holderMessage,
shouldReport);
- }
- }
- }
-
- public void clearEventsReferenceCount(final String holderMessage) {
- for (final Event event : events) {
- if (event instanceof EnrichedEvent) {
- ((EnrichedEvent) event).clearReferenceCount(holderMessage);
- }
- }
+ defaultBatch.close();
+ endPointToBatch.values().forEach(PipeEventBatch::close);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index aef8f2f340a..74cbfe2e39b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -24,7 +24,8 @@ import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeAsyncClientManager;
-import
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftAsyncPipeTransferBatchReqBuilder;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.PipeEventBatch;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.PipeTransferBatchReqBuilder;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
@@ -48,6 +49,7 @@ import
org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,7 +87,7 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
private final IoTDBDataRegionSyncConnector retryConnector = new
IoTDBDataRegionSyncConnector();
private final BlockingQueue<Event> retryEventQueue = new
LinkedBlockingQueue<>();
- private IoTDBThriftAsyncPipeTransferBatchReqBuilder tabletBatchBuilder;
+ private PipeTransferBatchReqBuilder tabletBatchBuilder;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
@@ -126,7 +128,7 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
loadBalanceStrategy);
if (isTabletBatchModeEnabled) {
- tabletBatchBuilder = new
IoTDBThriftAsyncPipeTransferBatchReqBuilder(parameters);
+ tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
}
}
@@ -160,13 +162,13 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
private void transferWithoutCheck(final TabletInsertionEvent
tabletInsertionEvent)
throws Exception {
if (isTabletBatchModeEnabled) {
- if (tabletBatchBuilder.onEvent(tabletInsertionEvent)) {
- final PipeTransferTabletBatchEventHandler
pipeTransferTabletBatchEventHandler =
- new PipeTransferTabletBatchEventHandler(tabletBatchBuilder, this);
-
- transfer(pipeTransferTabletBatchEventHandler);
-
- tabletBatchBuilder.onSuccess();
+ final Pair<TEndPoint, PipeEventBatch> endPointAndBatch =
+ tabletBatchBuilder.onEvent(tabletInsertionEvent);
+ if (Objects.nonNull(endPointAndBatch)) {
+ transfer(
+ endPointAndBatch.getLeft(),
+ new
PipeTransferTabletBatchEventHandler(endPointAndBatch.getRight(), this));
+ endPointAndBatch.getRight().onSuccess();
}
} else {
if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
@@ -191,7 +193,9 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
new PipeTransferTabletInsertNodeEventHandler(
pipeInsertNodeTabletInsertionEvent, pipeTransferReq, this);
- transfer(pipeTransferInsertNodeReqHandler);
+ transfer(
+ Objects.nonNull(insertNode) ?
insertNode.getDevicePath().getFullPath() : null,
+ pipeTransferInsertNodeReqHandler);
} else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
(PipeRawTabletInsertionEvent) tabletInsertionEvent;
@@ -211,16 +215,17 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
new PipeTransferTabletRawEventHandler(
pipeRawTabletInsertionEvent, pipeTransferTabletRawReq, this);
- transfer(pipeTransferTabletReqHandler);
+ transfer(pipeRawTabletInsertionEvent.getDeviceId(),
pipeTransferTabletReqHandler);
}
}
}
private void transfer(
+ final TEndPoint endPoint,
final PipeTransferTabletBatchEventHandler
pipeTransferTabletBatchEventHandler) {
AsyncPipeDataTransferServiceClient client = null;
try {
- client = clientManager.borrowClient();
+ client = clientManager.borrowClient(endPoint);
pipeTransferTabletBatchEventHandler.transfer(client);
} catch (final Exception ex) {
logOnClientException(client, ex);
@@ -229,10 +234,11 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
}
private void transfer(
+ final String deviceId,
final PipeTransferTabletInsertNodeEventHandler
pipeTransferInsertNodeReqHandler) {
AsyncPipeDataTransferServiceClient client = null;
try {
- client = clientManager.borrowClient();
+ client = clientManager.borrowClient(deviceId);
pipeTransferInsertNodeReqHandler.transfer(client);
} catch (final Exception ex) {
logOnClientException(client, ex);
@@ -240,10 +246,11 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
}
}
- private void transfer(final PipeTransferTabletRawEventHandler
pipeTransferTabletReqHandler) {
+ private void transfer(
+ final String deviceId, final PipeTransferTabletRawEventHandler
pipeTransferTabletReqHandler) {
AsyncPipeDataTransferServiceClient client = null;
try {
- client = clientManager.borrowClient();
+ client = clientManager.borrowClient(deviceId);
pipeTransferTabletReqHandler.transfer(client);
} catch (final Exception ex) {
logOnClientException(client, ex);
@@ -399,9 +406,13 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
return;
}
- transfer(new PipeTransferTabletBatchEventHandler(tabletBatchBuilder,
this));
-
- tabletBatchBuilder.onSuccess();
+ for (final Pair<TEndPoint, PipeEventBatch> endPointAndBatch :
+ tabletBatchBuilder.getAllNonEmptyBatches()) {
+ transfer(
+ endPointAndBatch.getLeft(),
+ new PipeTransferTabletBatchEventHandler(endPointAndBatch.getRight(),
this));
+ endPointAndBatch.getRight().onSuccess();
+ }
}
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index aa867ca05b1..500ed4cab98 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -19,11 +19,13 @@
package org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftAsyncPipeTransferBatchReqBuilder;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.PipeEventBatch;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
+import org.apache.iotdb.db.pipe.connector.util.LeaderCacheUtils;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -32,6 +34,7 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,13 +54,12 @@ public class PipeTransferTabletBatchEventHandler implements
AsyncMethodCallback<
private final IoTDBDataRegionAsyncConnector connector;
public PipeTransferTabletBatchEventHandler(
- final IoTDBThriftAsyncPipeTransferBatchReqBuilder batchBuilder,
- final IoTDBDataRegionAsyncConnector connector)
+ final PipeEventBatch batch, final IoTDBDataRegionAsyncConnector
connector)
throws IOException {
// Deep copy to keep Ids' and events' reference
- requestCommitIds = batchBuilder.deepCopyRequestCommitIds();
- events = batchBuilder.deepCopyEvents();
- req = batchBuilder.toTPipeTransferReq();
+ requestCommitIds = batch.deepCopyRequestCommitIds();
+ events = batch.deepCopyEvents();
+ req = batch.toTPipeTransferReq();
this.connector = connector;
}
@@ -83,6 +85,11 @@ public class PipeTransferTabletBatchEventHandler implements
AsyncMethodCallback<
.statusHandler()
.handle(status, response.getStatus().getMessage(),
events.toString());
}
+ for (final Pair<String, TEndPoint> redirectPair :
+ LeaderCacheUtils.parseRecommendedRedirections(status)) {
+ connector.updateLeaderCache(redirectPair.getLeft(),
redirectPair.getRight());
+ }
+
for (final Event event : events) {
if (event instanceof EnrichedEvent) {
((EnrichedEvent) event)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index f873cc27646..568baa13144 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -19,11 +19,13 @@
package org.apache.iotdb.db.pipe.connector.protocol.thrift.sync;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftSyncPipeTransferBatchReqBuilder;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.PipeEventBatch;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.PipeTransferBatchReqBuilder;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
@@ -31,6 +33,7 @@ import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
+import org.apache.iotdb.db.pipe.connector.util.LeaderCacheUtils;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
@@ -53,12 +56,13 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.util.Objects;
public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBDataRegionSyncConnector.class);
- private IoTDBThriftSyncPipeTransferBatchReqBuilder tabletBatchBuilder;
+ private PipeTransferBatchReqBuilder tabletBatchBuilder;
@Override
public void customize(
@@ -68,7 +72,7 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
// tablet batch mode configuration
if (isTabletBatchModeEnabled) {
- tabletBatchBuilder = new
IoTDBThriftSyncPipeTransferBatchReqBuilder(parameters);
+ tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
}
}
@@ -99,8 +103,10 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
try {
if (isTabletBatchModeEnabled) {
- if (tabletBatchBuilder.onEvent(tabletInsertionEvent)) {
- doTransfer();
+ final Pair<TEndPoint, PipeEventBatch> endPointAndBatch =
+ tabletBatchBuilder.onEvent(tabletInsertionEvent);
+ if (Objects.nonNull(endPointAndBatch)) {
+ doTransfer(endPointAndBatch);
}
} else {
if (tabletInsertionEvent instanceof
PipeInsertNodeTabletInsertionEvent) {
@@ -163,11 +169,14 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
}
}
- private void doTransfer() {
- final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
clientManager.getClient();
+ private void doTransfer(Pair<TEndPoint, PipeEventBatch> endPointAndBatch) {
+ final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
+ clientManager.getClient(endPointAndBatch.getLeft());
+ final PipeEventBatch batchToTransfer = endPointAndBatch.getRight();
+
final TPipeTransferResp resp;
try {
- resp =
clientAndStatus.getLeft().pipeTransfer(tabletBatchBuilder.toTPipeTransferReq());
+ resp =
clientAndStatus.getLeft().pipeTransfer(batchToTransfer.toTPipeTransferReq());
} catch (final Exception e) {
clientAndStatus.setRight(false);
throw new PipeConnectionException(
@@ -182,12 +191,21 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
receiverStatusHandler.handle(
resp.getStatus(),
String.format("Transfer PipeTransferTabletBatchReq error, result
status %s", resp.status),
- tabletBatchBuilder.deepCopyEvents().toString());
+ batchToTransfer.deepCopyEvents().toString());
}
- tabletBatchBuilder.decreaseEventsReferenceCount(
+ for (final Pair<String, TEndPoint> redirectPair :
+ LeaderCacheUtils.parseRecommendedRedirections(status)) {
+ clientManager.updateLeaderCache(redirectPair.getLeft(),
redirectPair.getRight());
+ }
+
+ batchToTransfer.decreaseEventsReferenceCount(
IoTDBDataRegionSyncConnector.class.getName(), true);
- tabletBatchBuilder.onSuccess();
+ batchToTransfer.onSuccess();
+ }
+
+ private void doTransfer() {
+ tabletBatchBuilder.getAllNonEmptyBatches().forEach(this::doTransfer);
}
private void doTransferWrapper(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/LeaderCacheUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/LeaderCacheUtils.java
new file mode 100644
index 00000000000..e31fec9b57b
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/LeaderCacheUtils.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.util;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.tsfile.utils.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class LeaderCacheUtils {
+
+ private LeaderCacheUtils() {
+ // Do nothing
+ }
+
+ /**
+ * Get all redirection recommends after transferring a batch event to update
leader cache.
+ *
+ * @param status is the returned status after transferring a batch event.
+ * @return a list of pairs, each pair contains a device path and its
redirect endpoint.
+ */
+ public static List<Pair<String, TEndPoint>>
parseRecommendedRedirections(TSStatus status) {
+ // If there is no exception, there should be 2 sub-statuses, one for
InsertRowsStatement and one
+ // for InsertMultiTabletsStatement (see
IoTDBDataNodeReceiver#handleTransferTabletBatch).
+ final List<Pair<String, TEndPoint>> redirectList = new ArrayList<>();
+
+ if (status.getSubStatusSize() != 2) {
+ return redirectList;
+ }
+
+ for (final TSStatus subStatus : status.getSubStatus()) {
+ if (subStatus.getCode() !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ continue;
+ }
+
+ for (final TSStatus innerSubStatus : subStatus.getSubStatus()) {
+ if (innerSubStatus.isSetRedirectNode()) {
+ // We assume that innerSubStatus.getMessage() is a device path.
+ // The message field should be a device path.
+ redirectList.add(
+ new Pair<>(innerSubStatus.getMessage(),
innerSubStatus.getRedirectNode()));
+ }
+ }
+ }
+
+ return redirectList;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index c9626c3ba81..c5f1d5fa99d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -245,10 +245,10 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
Stream.of(
statementPair.getLeft().isEmpty()
? RpcUtils.SUCCESS_STATUS
- :
executeStatementAndClassifyExceptions(statementPair.getLeft()),
+ :
executeStatementAndAddRedirectInfo(statementPair.getLeft()),
statementPair.getRight().isEmpty()
? RpcUtils.SUCCESS_STATUS
- :
executeStatementAndClassifyExceptions(statementPair.getRight()))
+ :
executeStatementAndAddRedirectInfo(statementPair.getRight()))
.collect(Collectors.toList())));
}
@@ -353,10 +353,55 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
return configReceiverId.get();
}
+ /**
+ * For {@link InsertRowsStatement} and {@link InsertMultiTabletsStatement},
the returned {@link
+ * TSStatus} will use sub-status to record the endpoint for redirection.
Each sub-status records
+ * the redirection endpoint for one device path, and the order is the same
as the order of the
+ * device paths in the statement. However, this order is not guaranteed to
be the same as in the
+ * request. So for each sub-status which needs to redirect, we record the
device path using the
+ * message field.
+ */
+ private TSStatus executeStatementAndAddRedirectInfo(final
InsertBaseStatement statement) {
+ final TSStatus result = executeStatementAndClassifyExceptions(statement);
+
+ if (result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
+ && result.getSubStatusSize() > 0) {
+ final List<PartialPath> devicePaths;
+ if (statement instanceof InsertRowsStatement) {
+ devicePaths = ((InsertRowsStatement) statement).getDevicePaths();
+ } else if (statement instanceof InsertMultiTabletsStatement) {
+ devicePaths = ((InsertMultiTabletsStatement)
statement).getDevicePaths();
+ } else {
+ LOGGER.warn(
+ "Receiver id = {}: Unsupported statement type {} for redirection.",
+ receiverId.get(),
+ statement);
+ return result;
+ }
+
+ if (devicePaths.size() == result.getSubStatusSize()) {
+ for (int i = 0; i < devicePaths.size(); ++i) {
+ if (result.getSubStatus().get(i).isSetRedirectNode()) {
+
result.getSubStatus().get(i).setMessage(devicePaths.get(i).getFullPath());
+ }
+ }
+ } else {
+ LOGGER.warn(
+ "Receiver id = {}: The number of device paths is not equal to
sub-status in statement {}: {}.",
+ receiverId.get(),
+ statement,
+ result);
+ }
+ }
+
+ return result;
+ }
+
private TSStatus executeStatementAndClassifyExceptions(final Statement
statement) {
try {
final TSStatus result = executeStatement(statement);
- if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ || result.getCode() ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
return result;
} else {
LOGGER.warn(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
index eb4d2f56311..bbc00e1975b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
@@ -49,6 +49,14 @@ public class InsertMultiTabletsStatement extends
InsertBaseStatement {
this.insertTabletStatementList = insertTabletStatementList;
}
+ public List<PartialPath> getDevicePaths() {
+ List<PartialPath> partialPaths = new ArrayList<>();
+ for (InsertTabletStatement insertTabletStatement :
insertTabletStatementList) {
+ partialPaths.add(insertTabletStatement.devicePath);
+ }
+ return partialPaths;
+ }
+
public List<String[]> getMeasurementsList() {
List<String[]> measurementsList = new ArrayList<>();
for (InsertTabletStatement insertTabletStatement :
insertTabletStatementList) {