This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 23340404c99 [IOTDB-6080] Pipe: Introduce batch mode for tablet
transferring (#10912)
23340404c99 is described below
commit 23340404c99422382925c2094d4f1b120406d9cb
Author: Caideyipi <[email protected]>
AuthorDate: Tue Aug 22 16:28:05 2023 +0800
[IOTDB-6080] Pipe: Introduce batch mode for tablet transferring (#10912)
Co-authored-by: xuanronaldo <[email protected]>
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java | 1 +
.../config/constant/PipeConnectorConstant.java | 11 ++
.../payload/evolvable/PipeRequestType.java | 2 +
...oTDBThriftAsyncPipeTransferBatchReqBuilder.java | 98 ++++++++++++
...IoTDBThriftSyncPipeTransferBatchReqBuilder.java | 89 +++++++++++
.../builder/PipeTransferBatchReqBuilder.java | 64 ++++++++
.../evolvable/request/PipeTransferBatchReq.java | 172 +++++++++++++++++++++
.../request/PipeTransferFilePieceReq.java | 7 +-
.../evolvable/request/PipeTransferFileSealReq.java | 4 +-
.../request/PipeTransferHandshakeReq.java | 2 +-
.../request/PipeTransferInsertNodeReq.java | 2 +-
.../evolvable/request/PipeTransferTabletReq.java | 37 ++++-
.../db/pipe/connector/protocol/IoTDBConnector.java | 11 +-
.../protocol/airgap/IoTDBAirGapConnector.java | 8 +
.../thrift/async/IoTDBThriftAsyncConnector.java | 117 +++++++++++---
...eTransferTabletBatchInsertionEventHandler.java} | 63 ++++----
.../PipeTransferTabletInsertionEventHandler.java | 2 +-
.../thrift/sync/IoTDBThriftSyncConnector.java | 56 +++++--
.../receiver/thrift/IoTDBThriftReceiverV1.java | 93 +++++++----
.../planner/plan/node/write/InsertRowsNode.java | 9 +-
20 files changed, 737 insertions(+), 111 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
index 0e48b93e614..60bbe095b4f 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
@@ -90,6 +90,7 @@ public class IoTDBPipeDataSyncIT {
extractorAttributes.put("extractor.realtime.mode", "log");
connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enabled", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index 96477c3a3aa..2f65a39f86c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.pipe.config.constant;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
+
public class PipeConnectorConstant {
public static final String CONNECTOR_KEY = "connector";
@@ -27,6 +29,15 @@ public class PipeConnectorConstant {
public static final String CONNECTOR_IOTDB_PORT_KEY = "connector.port";
public static final String CONNECTOR_IOTDB_NODE_URLS_KEY =
"connector.node-urls";
+ public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLED_KEY =
"connector.batch.enabled";
+ public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLED_DEFAULT_VALUE
= true;
+
+ public static final String CONNECTOR_IOTDB_BATCH_DELAY_KEY =
"connector.batch.max-delay-seconds";
+ public static final int CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE = 10;
+
+ public static final String CONNECTOR_IOTDB_BATCH_SIZE_KEY =
"connector.batch.size-bytes";
+ public static final long CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE = 16 * MB;
+
public static final String CONNECTOR_IOTDB_USER_KEY = "connector.user";
public static final String CONNECTOR_IOTDB_USER_DEFAULT_VALUE = "root";
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
index 7d901518a7d..ed9c1384eac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
@@ -31,6 +31,8 @@ public enum PipeRequestType {
TRANSFER_FILE_PIECE((short) 4),
TRANSFER_FILE_SEAL((short) 5),
+
+ TRANSFER_BATCH((short) 6),
;
private final short type;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
new file mode 100644
index 00000000000..1d209adc887
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
+
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
+import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class IoTDBThriftAsyncPipeTransferBatchReqBuilder extends
PipeTransferBatchReqBuilder {
+
+ protected final List<Long> requestCommitIds = new ArrayList<>();
+
+ public IoTDBThriftAsyncPipeTransferBatchReqBuilder(PipeParameters
parameters) {
+ super(parameters);
+ }
+
+ /**
+ * Try offer event into cache if the given event is not duplicated.
+ *
+ * @param event the given event
+ * @return true if the batch can be transferred
+ */
+ public boolean onEvent(TabletInsertionEvent event, long requestCommitId)
+ throws IOException, WALPipeException {
+ final TPipeTransferReq req =
+ event instanceof PipeInsertNodeTabletInsertionEvent
+ ? PipeTransferInsertNodeReq.toTPipeTransferReq(
+ ((PipeInsertNodeTabletInsertionEvent) event).getInsertNode())
+ : PipeTransferTabletReq.toTPipeTransferReq(
+ ((PipeRawTabletInsertionEvent) event).convertToTablet(),
+ ((PipeRawTabletInsertionEvent) event).isAligned());
+ if (events.isEmpty() || !events.get(events.size() - 1).equals(event)) {
+ reqs.add(req);
+
+ if (event instanceof EnrichedEvent) {
+ ((EnrichedEvent)
event).increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName());
+ }
+ events.add(event);
+ requestCommitIds.add(requestCommitId);
+
+ if (firstEventProcessingTime == Long.MIN_VALUE) {
+ firstEventProcessingTime = System.currentTimeMillis();
+ }
+
+ bufferSize += req.getBody().length;
+ }
+
+ return bufferSize >= maxBatchSizeInBytes
+ || System.currentTimeMillis() - firstEventProcessingTime >=
maxDelayInMs;
+ }
+
+ public void onSuccess() {
+ reqs.clear();
+
+ events.clear();
+ requestCommitIds.clear();
+
+ firstEventProcessingTime = Long.MIN_VALUE;
+
+ bufferSize = 0;
+ }
+
+ public List<Event> deepcopyEvents() {
+ return new ArrayList<>(events);
+ }
+
+ public List<Long> deepcopyRequestCommitIds() {
+ return new ArrayList<>(requestCommitIds);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
new file mode 100644
index 00000000000..c60f8d241f3
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
+
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
+import
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector;
+import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import java.io.IOException;
+
+public class IoTDBThriftSyncPipeTransferBatchReqBuilder extends
PipeTransferBatchReqBuilder {
+
+ public IoTDBThriftSyncPipeTransferBatchReqBuilder(PipeParameters parameters)
{
+ super(parameters);
+ }
+
+ /**
+ * Try offer event into cache if the given event is not duplicated.
+ *
+ * @param event the given event
+ * @return true if the batch can be transferred
+ */
+ public boolean onEvent(TabletInsertionEvent event) throws IOException,
WALPipeException {
+ final TPipeTransferReq req =
+ event instanceof PipeInsertNodeTabletInsertionEvent
+ ? PipeTransferInsertNodeReq.toTPipeTransferReq(
+ ((PipeInsertNodeTabletInsertionEvent) event).getInsertNode())
+ : PipeTransferTabletReq.toTPipeTransferReq(
+ ((PipeRawTabletInsertionEvent) event).convertToTablet(),
+ ((PipeRawTabletInsertionEvent) event).isAligned());
+ if (events.isEmpty() || !events.get(events.size() - 1).equals(event)) {
+ reqs.add(req);
+
+ if (event instanceof EnrichedEvent) {
+ ((EnrichedEvent)
event).increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName());
+ }
+ events.add(event);
+
+ if (firstEventProcessingTime == Long.MIN_VALUE) {
+ firstEventProcessingTime = System.currentTimeMillis();
+ }
+
+ bufferSize += req.getBody().length;
+ }
+
+ return bufferSize >= maxBatchSizeInBytes
+ || System.currentTimeMillis() - firstEventProcessingTime >=
maxDelayInMs;
+ }
+
+ public void onSuccess() {
+ reqs.clear();
+
+ for (final Event event : events) {
+ if (event instanceof EnrichedEvent) {
+ ((EnrichedEvent)
event).decreaseReferenceCount(IoTDBThriftSyncConnector.class.getName());
+ }
+ }
+ events.clear();
+
+ firstEventProcessingTime = Long.MIN_VALUE;
+
+ bufferSize = 0;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
new file mode 100644
index 00000000000..a4bfec39bf9
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
+
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_KEY;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
+
+public abstract class PipeTransferBatchReqBuilder {
+
+ protected final List<TPipeTransferReq> reqs = new ArrayList<>();
+ protected final List<Event> events = new ArrayList<>();
+
+ // limit in delayed time
+ protected final int maxDelayInMs;
+ protected long firstEventProcessingTime = Long.MIN_VALUE;
+
+ // limit in buffer size
+ protected final long maxBatchSizeInBytes;
+ protected long bufferSize = 0;
+
+ protected PipeTransferBatchReqBuilder(PipeParameters parameters) {
+ maxDelayInMs =
+ parameters.getIntOrDefault(
+ CONNECTOR_IOTDB_BATCH_DELAY_KEY,
CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE)
+ * 1000;
+ maxBatchSizeInBytes =
+ parameters.getLongOrDefault(
+ CONNECTOR_IOTDB_BATCH_SIZE_KEY,
CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE);
+ }
+
+ public List<TPipeTransferReq> getTPipeTransferReqs() {
+ return reqs;
+ }
+
+ public boolean isEmpty() {
+ return reqs.isEmpty();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferBatchReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferBatchReq.java
new file mode 100644
index 00000000000..7574bc16895
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferBatchReq.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
+
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
+import
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class PipeTransferBatchReq extends TPipeTransferReq {
+
+ private final transient List<PipeTransferInsertNodeReq> insertNodeReqs = new
ArrayList<>();
+ private final transient List<PipeTransferTabletReq> tabletReqs = new
ArrayList<>();
+
+ private PipeTransferBatchReq() {
+ // Empty constructor
+ }
+
+ public Pair<InsertRowsStatement, InsertMultiTabletsStatement>
constructStatements() {
+ final InsertRowsStatement insertRowsStatement = new InsertRowsStatement();
+ final InsertMultiTabletsStatement insertMultiTabletsStatement =
+ new InsertMultiTabletsStatement();
+
+ final List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
+ final List<InsertTabletStatement> insertTabletStatementList = new
ArrayList<>();
+
+ for (final PipeTransferInsertNodeReq insertNodeReq : insertNodeReqs) {
+ final Statement insertStatement = insertNodeReq.constructStatement();
+ if (insertStatement instanceof InsertRowStatement) {
+ insertRowStatementList.add((InsertRowStatement) insertStatement);
+ } else if (insertStatement instanceof InsertTabletStatement) {
+ insertTabletStatementList.add((InsertTabletStatement) insertStatement);
+ } else {
+ throw new UnsupportedOperationException(
+ String.format(
+ "unknown InsertBaseStatement %s constructed from the insert
node request.",
+ insertNodeReq));
+ }
+ }
+
+ for (final PipeTransferTabletReq tabletReq : tabletReqs) {
+ insertTabletStatementList.add(tabletReq.constructStatement());
+ }
+
+ insertRowsStatement.setInsertRowStatementList(insertRowStatementList);
+
insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
+ return new Pair<>(insertRowsStatement, insertMultiTabletsStatement);
+ }
+
+ /////////////////////////////// Thrift ///////////////////////////////
+
+ public static PipeTransferBatchReq toTPipeTransferReq(List<TPipeTransferReq>
reqs)
+ throws IOException {
+ final PipeTransferBatchReq batchReq = new PipeTransferBatchReq();
+
+ for (final TPipeTransferReq req : reqs) {
+ if (req instanceof PipeTransferInsertNodeReq) {
+ batchReq.insertNodeReqs.add((PipeTransferInsertNodeReq) req);
+ } else if (req instanceof PipeTransferTabletReq) {
+ batchReq.tabletReqs.add((PipeTransferTabletReq) req);
+ } else {
+ throw new UnsupportedOperationException(
+ String.format(
+ "unknown TPipeTransferReq type %s when constructing
PipeTransferBatchReq",
+ req.getType()));
+ }
+ }
+
+ batchReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
+ batchReq.type = PipeRequestType.TRANSFER_BATCH.getType();
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(batchReq.insertNodeReqs.size(), outputStream);
+ for (final PipeTransferInsertNodeReq insertNodeReq :
batchReq.insertNodeReqs) {
+ insertNodeReq.getInsertNode().serialize(outputStream);
+ }
+
+ ReadWriteIOUtils.write(batchReq.tabletReqs.size(), outputStream);
+ for (final PipeTransferTabletReq tabletReq : batchReq.tabletReqs) {
+ tabletReq.getTablet().serialize(outputStream);
+ ReadWriteIOUtils.write(tabletReq.getIsAligned(), outputStream);
+ }
+
+ batchReq.body =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+
+ return batchReq;
+ }
+
+ public static PipeTransferBatchReq fromTPipeTransferReq(TPipeTransferReq
transferReq)
+ throws IOException {
+ final PipeTransferBatchReq batchReq = new PipeTransferBatchReq();
+
+ int size = ReadWriteIOUtils.readInt(transferReq.body);
+ for (int i = 0; i < size; ++i) {
+ batchReq.insertNodeReqs.add(
+ PipeTransferInsertNodeReq.toTPipeTransferReq(
+ (InsertNode) PlanFragment.deserializeHelper(transferReq.body)));
+ }
+
+ size = ReadWriteIOUtils.readInt(transferReq.body);
+ for (int i = 0; i < size; ++i) {
+ batchReq.tabletReqs.add(
+ PipeTransferTabletReq.toTPipeTransferReq(
+ Tablet.deserialize(transferReq.body),
ReadWriteIOUtils.readBool(transferReq.body)));
+ }
+
+ batchReq.version = transferReq.version;
+ batchReq.type = transferReq.type;
+ batchReq.body = transferReq.body;
+
+ return batchReq;
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ PipeTransferBatchReq that = (PipeTransferBatchReq) obj;
+ return insertNodeReqs.equals(that.insertNodeReqs)
+ && tabletReqs.equals(that.tabletReqs)
+ && version == that.version
+ && type == that.type
+ && body.equals(that.body);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(insertNodeReqs, tabletReqs, version, type, body);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
index 12f7843d906..1b9bf9bd1b2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
@@ -34,9 +34,9 @@ import java.util.Objects;
public class PipeTransferFilePieceReq extends TPipeTransferReq {
- private String fileName;
- private long startWritingOffset;
- private byte[] filePiece;
+ private transient String fileName;
+ private transient long startWritingOffset;
+ private transient byte[] filePiece;
private PipeTransferFilePieceReq() {
// Empty constructor
@@ -93,6 +93,7 @@ public class PipeTransferFilePieceReq extends
TPipeTransferReq {
}
/////////////////////////////// Air Gap ///////////////////////////////
+
public static byte[] toTPipeTransferBytes(
String fileName, long startWritingOffset, byte[] filePiece) throws
IOException {
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
index e9dc715bdd1..6314068fe50 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
@@ -32,8 +32,8 @@ import java.util.Objects;
public class PipeTransferFileSealReq extends TPipeTransferReq {
- private String fileName;
- private long fileLength;
+ private transient String fileName;
+ private transient long fileLength;
private PipeTransferFileSealReq() {
// Empty constructor
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
index 11d6ce7ef4b..435726bfcf1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
@@ -32,7 +32,7 @@ import java.util.Objects;
public class PipeTransferHandshakeReq extends TPipeTransferReq {
- private String timestampPrecision;
+ private transient String timestampPrecision;
private PipeTransferHandshakeReq() {
// Empty constructor
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
index c3da0985b45..f2bdd0db906 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
@@ -39,7 +39,7 @@ import java.util.Objects;
public class PipeTransferInsertNodeReq extends TPipeTransferReq {
- private InsertNode insertNode;
+ private transient InsertNode insertNode;
private PipeTransferInsertNodeReq() {
// Do nothing
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
index 58e64520133..8be16edda7b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
@@ -45,13 +45,22 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
+import java.util.Objects;
public class PipeTransferTabletReq extends TPipeTransferReq {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTransferTabletReq.class);
- private Tablet tablet;
- private boolean isAligned;
+ private transient Tablet tablet;
+ private transient boolean isAligned;
+
+ public Tablet getTablet() {
+ return tablet;
+ }
+
+ public boolean getIsAligned() {
+ return isAligned;
+ }
public InsertTabletStatement constructStatement() {
if (!checkSorted(tablet)) {
@@ -226,6 +235,7 @@ public class PipeTransferTabletReq extends TPipeTransferReq
{
}
/////////////////////////////// Air Gap ///////////////////////////////
+
public static byte[] toTPipeTransferTabletBytes(Tablet tablet, boolean
isAligned)
throws IOException {
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
@@ -237,4 +247,27 @@ public class PipeTransferTabletReq extends
TPipeTransferReq {
return byteArrayOutputStream.toByteArray();
}
}
+
+ /////////////////////////////// Object ///////////////////////////////
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ PipeTransferTabletReq that = (PipeTransferTabletReq) obj;
+ return tablet.equals(that.tablet)
+ && isAligned == that.isAligned
+ && version == that.version
+ && type == that.type
+ && body.equals(that.body);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tablet, isAligned, version, type, body);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
index 3b218c60e04..8de5706a958 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
@@ -35,6 +35,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLED_DEFAULT_VALUE;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLED_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
@@ -45,6 +47,8 @@ public abstract class IoTDBConnector implements PipeConnector
{
protected final List<TEndPoint> nodeUrls = new ArrayList<>();
+ protected boolean isTabletBatchModeEnabled = true;
+
@Override
public void validate(PipeParameterValidator validator) throws Exception {
final PipeParameters parameters = validator.getParameters();
@@ -79,7 +83,12 @@ public abstract class IoTDBConnector implements
PipeConnector {
nodeUrls.clear();
nodeUrls.addAll(givenNodeUrls);
+ LOGGER.info("IoTDBConnector nodeUrls: {}", nodeUrls);
- LOGGER.info("IoTDBThriftConnector nodeUrls: {}", nodeUrls);
+ isTabletBatchModeEnabled =
+ parameters.getBooleanOrDefault(
+ CONNECTOR_IOTDB_BATCH_MODE_ENABLED_KEY,
+ CONNECTOR_IOTDB_BATCH_MODE_ENABLED_DEFAULT_VALUE);
+ LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}",
isTabletBatchModeEnabled);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
index 8a0e11f5ebf..9830e44c9e7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
@@ -76,6 +76,14 @@ public class IoTDBAirGapConnector extends IoTDBConnector {
public void customize(PipeParameters parameters,
PipeConnectorRuntimeConfiguration configuration)
throws Exception {
super.customize(parameters, configuration);
+
+ if (isTabletBatchModeEnabled) {
+ LOGGER.warn(
+ "Batch mode is enabled by the given parameters. "
+ + "IoTDBAirGapConnector does not support batch mode. "
+ + "Disable batch mode.");
+ }
+
for (int i = 0; i < nodeUrls.size(); i++) {
isSocketAlive.add(false);
sockets.add(null);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index a7ee768226b..b588a0b168d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -27,12 +27,14 @@ import
org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftAsyncPipeTransferBatchReqBuilder;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
import org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnector;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferInsertNodeTabletInsertionEventHandler;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferRawTabletInsertionEventHandler;
+import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletBatchInsertionEventHandler;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTsFileInsertionEventHandler;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
@@ -59,6 +61,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.Comparator;
import java.util.Optional;
import java.util.PriorityQueue;
@@ -96,6 +99,8 @@ public class IoTDBThriftAsyncConnector extends IoTDBConnector
{
private final PriorityQueue<Pair<Long, Runnable>> commitQueue =
new PriorityQueue<>(Comparator.comparing(o -> o.left));
+ private IoTDBThriftAsyncPipeTransferBatchReqBuilder tabletBatchBuilder;
+
public IoTDBThriftAsyncConnector() {
if (ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get() == null) {
synchronized (IoTDBThriftAsyncConnector.class) {
@@ -120,11 +125,16 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
public void customize(PipeParameters parameters,
PipeConnectorRuntimeConfiguration configuration)
throws Exception {
super.customize(parameters, configuration);
+
retryConnector.customize(parameters, configuration);
+
+ if (isTabletBatchModeEnabled) {
+ tabletBatchBuilder = new
IoTDBThriftAsyncPipeTransferBatchReqBuilder(parameters);
+ }
}
@Override
- // synchronized to avoid close connector when transfer event
+ // Synchronized to avoid close connector when transfer event
public synchronized void handshake() throws Exception {
retryConnector.handshake();
}
@@ -149,29 +159,70 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
final long requestCommitId = commitIdGenerator.incrementAndGet();
- if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
- final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent =
- (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
- final PipeTransferInsertNodeReq pipeTransferInsertNodeReq =
- PipeTransferInsertNodeReq.toTPipeTransferReq(
- pipeInsertNodeTabletInsertionEvent.getInsertNode());
- final PipeTransferInsertNodeTabletInsertionEventHandler
pipeTransferInsertNodeReqHandler =
- new PipeTransferInsertNodeTabletInsertionEventHandler(
- requestCommitId, pipeInsertNodeTabletInsertionEvent,
pipeTransferInsertNodeReq, this);
-
- transfer(requestCommitId, pipeTransferInsertNodeReqHandler);
- } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
- final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
- (PipeRawTabletInsertionEvent) tabletInsertionEvent;
- final PipeTransferTabletReq pipeTransferTabletReq =
- PipeTransferTabletReq.toTPipeTransferReq(
- pipeRawTabletInsertionEvent.convertToTablet(),
- pipeRawTabletInsertionEvent.isAligned());
- final PipeTransferRawTabletInsertionEventHandler
pipeTransferTabletReqHandler =
- new PipeTransferRawTabletInsertionEventHandler(
- requestCommitId, pipeRawTabletInsertionEvent,
pipeTransferTabletReq, this);
-
- transfer(requestCommitId, pipeTransferTabletReqHandler);
+ if (isTabletBatchModeEnabled) {
+ if (tabletBatchBuilder.onEvent(tabletInsertionEvent, requestCommitId)) {
+ final PipeTransferTabletBatchInsertionEventHandler
+ pipeTransferTabletBatchInsertionEventHandler =
+ new
PipeTransferTabletBatchInsertionEventHandler(tabletBatchBuilder, this);
+
+ transfer(requestCommitId,
pipeTransferTabletBatchInsertionEventHandler);
+
+ tabletBatchBuilder.onSuccess();
+ }
+ } else {
+ if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+ final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent =
+ (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
+ final PipeTransferInsertNodeReq pipeTransferInsertNodeReq =
+ PipeTransferInsertNodeReq.toTPipeTransferReq(
+ pipeInsertNodeTabletInsertionEvent.getInsertNode());
+ final PipeTransferInsertNodeTabletInsertionEventHandler
pipeTransferInsertNodeReqHandler =
+ new PipeTransferInsertNodeTabletInsertionEventHandler(
+ requestCommitId,
+ pipeInsertNodeTabletInsertionEvent,
+ pipeTransferInsertNodeReq,
+ this);
+
+ transfer(requestCommitId, pipeTransferInsertNodeReqHandler);
+ } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
+ final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
+ (PipeRawTabletInsertionEvent) tabletInsertionEvent;
+ final PipeTransferTabletReq pipeTransferTabletReq =
+ PipeTransferTabletReq.toTPipeTransferReq(
+ pipeRawTabletInsertionEvent.convertToTablet(),
+ pipeRawTabletInsertionEvent.isAligned());
+ final PipeTransferRawTabletInsertionEventHandler
pipeTransferTabletReqHandler =
+ new PipeTransferRawTabletInsertionEventHandler(
+ requestCommitId, pipeRawTabletInsertionEvent,
pipeTransferTabletReq, this);
+
+ transfer(requestCommitId, pipeTransferTabletReqHandler);
+ }
+ }
+ }
+
+ private void transfer(
+ long requestCommitId,
+ PipeTransferTabletBatchInsertionEventHandler
pipeTransferTabletBatchInsertionEventHandler) {
+ final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId %
nodeUrls.size()));
+
+ try {
+ final AsyncPipeDataTransferServiceClient client =
borrowClient(targetNodeUrl);
+
+ try {
+ pipeTransferTabletBatchInsertionEventHandler.transfer(client);
+ } catch (TException e) {
+ LOGGER.warn(
+ String.format(
+ "Transfer batched insertion requests to receiver %s:%s error,
retrying...",
+ targetNodeUrl.getIp(), targetNodeUrl.getPort()),
+ e);
+ }
+ } catch (Exception ex) {
+ pipeTransferTabletBatchInsertionEventHandler.onError(ex);
+ LOGGER.warn(
+ String.format(
+ FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(),
targetNodeUrl.getPort()),
+ ex);
}
}
@@ -230,6 +281,7 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
@Override
public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws
Exception {
transferQueuedEventsIfNecessary();
+ transferBatchedEventsIfNecessary();
if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
LOGGER.warn(
@@ -279,6 +331,7 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
@Override
public void transfer(Event event) throws Exception {
transferQueuedEventsIfNecessary();
+ transferBatchedEventsIfNecessary();
LOGGER.warn("IoTDBThriftAsyncConnector does not support transfer generic
event: {}.", event);
}
@@ -414,6 +467,22 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
}
}
+ /** Try its best to commit data in order. Flush can also be a trigger to
transfer batched data. */
+ private void transferBatchedEventsIfNecessary() throws IOException {
+ if (!isTabletBatchModeEnabled || tabletBatchBuilder.isEmpty()) {
+ return;
+ }
+
+ final long requestCommitId = commitIdGenerator.incrementAndGet();
+ final PipeTransferTabletBatchInsertionEventHandler
+ pipeTransferTabletBatchInsertionEventHandler =
+ new
PipeTransferTabletBatchInsertionEventHandler(tabletBatchBuilder, this);
+
+ transfer(requestCommitId, pipeTransferTabletBatchInsertionEventHandler);
+
+ tabletBatchBuilder.onSuccess();
+ }
+
/**
* Commit the event. Decrease the reference count of the event. If the
reference count is 0, the
* progress index of the event will be recalculated and the resources of the
event will be
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchInsertionEventHandler.java
similarity index 62%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchInsertionEventHandler.java
index 85c88957d3d..dc183d762b3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchInsertionEventHandler.java
@@ -20,6 +20,8 @@
package org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftAsyncPipeTransferBatchReqBuilder;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferBatchReq;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.pipe.api.event.Event;
@@ -33,59 +35,50 @@ import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Optional;
+import java.io.IOException;
+import java.util.List;
-public abstract class PipeTransferTabletInsertionEventHandler<E extends
TPipeTransferResp>
- implements AsyncMethodCallback<E> {
+public class PipeTransferTabletBatchInsertionEventHandler
+ implements AsyncMethodCallback<TPipeTransferResp> {
private static final Logger LOGGER =
- LoggerFactory.getLogger(PipeTransferTabletInsertionEventHandler.class);
+
LoggerFactory.getLogger(PipeTransferTabletBatchInsertionEventHandler.class);
- private final long requestCommitId;
- private final Event event;
+ private final List<Long> requestCommitIds;
+ private final List<Event> events;
private final TPipeTransferReq req;
private final IoTDBThriftAsyncConnector connector;
- protected PipeTransferTabletInsertionEventHandler(
- long requestCommitId,
- Event event,
- TPipeTransferReq req,
- IoTDBThriftAsyncConnector connector) {
- this.requestCommitId = requestCommitId;
- this.event = event;
- this.req = req;
- this.connector = connector;
+ public PipeTransferTabletBatchInsertionEventHandler(
+ IoTDBThriftAsyncPipeTransferBatchReqBuilder batchBuilder,
IoTDBThriftAsyncConnector connector)
+ throws IOException {
+ // Deep copy to keep Ids' and events' reference
+ requestCommitIds = batchBuilder.deepcopyRequestCommitIds();
+ events = batchBuilder.deepcopyEvents();
+ req =
PipeTransferBatchReq.toTPipeTransferReq(batchBuilder.getTPipeTransferReqs());
- Optional.ofNullable(event)
- .ifPresent(
- e -> {
- if (e instanceof EnrichedEvent) {
- ((EnrichedEvent) e)
- .increaseReferenceCount(
-
PipeTransferTabletInsertionEventHandler.class.getName());
- }
- });
+ this.connector = connector;
}
public void transfer(AsyncPipeDataTransferServiceClient client) throws
TException {
- doTransfer(client, req);
+ client.pipeTransfer(req, this);
}
- protected abstract void doTransfer(
- AsyncPipeDataTransferServiceClient client, TPipeTransferReq req) throws
TException;
-
@Override
public void onComplete(TPipeTransferResp response) {
- // just in case
+ // Just in case
if (response == null) {
onError(new PipeException("TPipeTransferResp is null"));
return;
}
if (response.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- connector.commit(
- requestCommitId, event instanceof EnrichedEvent ? (EnrichedEvent)
event : null);
+ for (int i = 0; i < events.size(); ++i) {
+ connector.commit(
+ requestCommitIds.get(i),
+ events.get(i) instanceof EnrichedEvent ? (EnrichedEvent)
events.get(i) : null);
+ }
} else {
onError(new PipeException(response.getStatus().getMessage()));
}
@@ -95,10 +88,12 @@ public abstract class
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
public void onError(Exception exception) {
LOGGER.warn(
"Failed to transfer TabletInsertionEvent {} (requestCommitId={}).",
- event,
- requestCommitId,
+ events,
+ requestCommitIds,
exception);
- connector.addFailureEventToRetryQueue(requestCommitId, event);
+ for (int i = 0; i < events.size(); ++i) {
+ connector.addFailureEventToRetryQueue(requestCommitIds.get(i),
events.get(i));
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index 85c88957d3d..c76b81011e3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -77,7 +77,7 @@ public abstract class
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
@Override
public void onComplete(TPipeTransferResp response) {
- // just in case
+ // Just in case
if (response == null) {
onError(new PipeException("TPipeTransferResp is null"));
return;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
index e682247cc66..c4764219859 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
@@ -22,7 +22,9 @@ package
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftSyncPipeTransferBatchReqBuilder;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferBatchReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
@@ -65,6 +67,8 @@ public class IoTDBThriftSyncConnector extends IoTDBConnector {
private long currentClientIndex = 0;
+ private IoTDBThriftSyncPipeTransferBatchReqBuilder tabletBatchBuilder;
+
public IoTDBThriftSyncConnector() {
// Do nothing
}
@@ -73,10 +77,15 @@ public class IoTDBThriftSyncConnector extends
IoTDBConnector {
public void customize(PipeParameters parameters,
PipeConnectorRuntimeConfiguration configuration)
throws Exception {
super.customize(parameters, configuration);
+
for (int i = 0; i < nodeUrls.size(); i++) {
isClientAlive.add(false);
clients.add(null);
}
+
+ if (isTabletBatchModeEnabled) {
+ tabletBatchBuilder = new
IoTDBThriftSyncPipeTransferBatchReqBuilder(parameters);
+ }
}
@Override
@@ -89,7 +98,7 @@ public class IoTDBThriftSyncConnector extends IoTDBConnector {
final String ip = nodeUrls.get(i).getIp();
final int port = nodeUrls.get(i).getPort();
- // close the client if necessary
+ // Close the client if necessary
if (clients.get(i) != null) {
try {
clients.set(i, null).close();
@@ -163,21 +172,30 @@ public class IoTDBThriftSyncConnector extends
IoTDBConnector {
@Override
public void transfer(TabletInsertionEvent tabletInsertionEvent) throws
Exception {
// PipeProcessor can change the type of TabletInsertionEvent
+ if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
+ && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
+ LOGGER.warn(
+ "IoTDBThriftSyncConnector only support "
+ + "PipeInsertNodeTabletInsertionEvent and
PipeRawTabletInsertionEvent. "
+ + "Ignore {}.",
+ tabletInsertionEvent);
+ return;
+ }
final int clientIndex = nextClientIndex();
final IoTDBThriftSyncConnectorClient client = clients.get(clientIndex);
try {
- if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
- doTransfer(client, (PipeInsertNodeTabletInsertionEvent)
tabletInsertionEvent);
- } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
- doTransfer(client, (PipeRawTabletInsertionEvent) tabletInsertionEvent);
+ if (isTabletBatchModeEnabled) {
+ if (tabletBatchBuilder.onEvent(tabletInsertionEvent)) {
+ doTransfer(client);
+ }
} else {
- LOGGER.warn(
- "IoTDBThriftSyncConnector only support "
- + "PipeInsertNodeTabletInsertionEvent and
PipeRawTabletInsertionEvent. "
- + "Ignore {}.",
- tabletInsertionEvent);
+ if (tabletInsertionEvent instanceof
PipeInsertNodeTabletInsertionEvent) {
+ doTransfer(client, (PipeInsertNodeTabletInsertionEvent)
tabletInsertionEvent);
+ } else {
+ doTransfer(client, (PipeRawTabletInsertionEvent)
tabletInsertionEvent);
+ }
}
} catch (TException e) {
isClientAlive.set(clientIndex, false);
@@ -204,6 +222,11 @@ public class IoTDBThriftSyncConnector extends
IoTDBConnector {
final IoTDBThriftSyncConnectorClient client = clients.get(clientIndex);
try {
+ // in order to commit in order
+ if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) {
+ doTransfer(client);
+ }
+
doTransfer(client, (PipeTsFileInsertionEvent) tsFileInsertionEvent);
} catch (TException e) {
isClientAlive.set(clientIndex, false);
@@ -221,6 +244,19 @@ public class IoTDBThriftSyncConnector extends
IoTDBConnector {
LOGGER.warn("IoTDBThriftSyncConnector does not support transfer generic
event: {}.", event);
}
+ private void doTransfer(IoTDBThriftSyncConnectorClient client) throws
IOException, TException {
+ final TPipeTransferResp resp =
+ client.pipeTransfer(
+
PipeTransferBatchReq.toTPipeTransferReq(tabletBatchBuilder.getTPipeTransferReqs()));
+
+ if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(
+ String.format("Transfer PipeTransferBatchReq error, result status
%s", resp.status));
+ }
+
+ tabletBatchBuilder.onSuccess();
+ }
+
private void doTransfer(
IoTDBThriftSyncConnectorClient client,
PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
index 09d6870f489..454a5dc8faa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferBatchReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
@@ -39,6 +40,8 @@ import
org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
@@ -47,6 +50,7 @@ import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +61,8 @@ import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class IoTDBThriftReceiverV1 implements IoTDBThriftReceiver {
@@ -76,37 +82,48 @@ public class IoTDBThriftReceiverV1 implements
IoTDBThriftReceiver {
@Override
public synchronized TPipeTransferResp receive(
TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher
schemaFetcher) {
- final short rawRequestType = req.getType();
- if (PipeRequestType.isValidatedRequestType(rawRequestType)) {
- switch (PipeRequestType.valueOf(rawRequestType)) {
- case HANDSHAKE:
- return
handleTransferHandshake(PipeTransferHandshakeReq.fromTPipeTransferReq(req));
- case TRANSFER_INSERT_NODE:
- return handleTransferInsertNode(
- PipeTransferInsertNodeReq.fromTPipeTransferReq(req),
partitionFetcher, schemaFetcher);
- case TRANSFER_TABLET:
- return handleTransferTablet(
- PipeTransferTabletReq.fromTPipeTransferReq(req),
partitionFetcher, schemaFetcher);
- case TRANSFER_FILE_PIECE:
- return handleTransferFilePiece(
- PipeTransferFilePieceReq.fromTPipeTransferReq(req),
- req instanceof AirGapPseudoTPipeTransferRequest);
- case TRANSFER_FILE_SEAL:
- return handleTransferFileSeal(
- PipeTransferFileSealReq.fromTPipeTransferReq(req),
partitionFetcher, schemaFetcher);
- default:
- break;
+ try {
+ final short rawRequestType = req.getType();
+ if (PipeRequestType.isValidatedRequestType(rawRequestType)) {
+ switch (PipeRequestType.valueOf(rawRequestType)) {
+ case HANDSHAKE:
+ return
handleTransferHandshake(PipeTransferHandshakeReq.fromTPipeTransferReq(req));
+ case TRANSFER_INSERT_NODE:
+ return handleTransferInsertNode(
+ PipeTransferInsertNodeReq.fromTPipeTransferReq(req),
+ partitionFetcher,
+ schemaFetcher);
+ case TRANSFER_TABLET:
+ return handleTransferTablet(
+ PipeTransferTabletReq.fromTPipeTransferReq(req),
partitionFetcher, schemaFetcher);
+ case TRANSFER_BATCH:
+ return handleTransferBatch(
+ PipeTransferBatchReq.fromTPipeTransferReq(req),
partitionFetcher, schemaFetcher);
+ case TRANSFER_FILE_PIECE:
+ return handleTransferFilePiece(
+ PipeTransferFilePieceReq.fromTPipeTransferReq(req),
+ req instanceof AirGapPseudoTPipeTransferRequest);
+ case TRANSFER_FILE_SEAL:
+ return handleTransferFileSeal(
+ PipeTransferFileSealReq.fromTPipeTransferReq(req),
partitionFetcher, schemaFetcher);
+ default:
+ break;
+ }
}
- }
- // unknown request type, which means the request can not be handled by
this receiver,
- // maybe the version of the receiver is not compatible with the sender
- final TSStatus status =
- RpcUtils.getStatus(
- TSStatusCode.PIPE_TYPE_ERROR,
- String.format("Unknown PipeRequestType %s.", rawRequestType));
- LOGGER.warn("Unknown PipeRequestType, response status = {}.", status);
- return new TPipeTransferResp(status);
+ // Unknown request type, which means the request can not be handled by
this receiver,
+ // maybe the version of the receiver is not compatible with the sender
+ final TSStatus status =
+ RpcUtils.getStatus(
+ TSStatusCode.PIPE_TYPE_ERROR,
+ String.format("Unknown PipeRequestType %s.", rawRequestType));
+ LOGGER.warn("Unknown PipeRequestType, response status = {}.", status);
+ return new TPipeTransferResp(status);
+ } catch (IOException e) {
+ String error = String.format("Serialization error during pipe receiving,
%s", e);
+ LOGGER.warn(error);
+ return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_ERROR,
error));
+ }
}
private TPipeTransferResp handleTransferHandshake(PipeTransferHandshakeReq
req) {
@@ -187,6 +204,24 @@ public class IoTDBThriftReceiverV1 implements
IoTDBThriftReceiver {
: executeStatement(statement, partitionFetcher, schemaFetcher));
}
+ private TPipeTransferResp handleTransferBatch(
+ PipeTransferBatchReq req, IPartitionFetcher partitionFetcher,
ISchemaFetcher schemaFetcher) {
+ final Pair<InsertRowsStatement, InsertMultiTabletsStatement> statementPair
=
+ req.constructStatements();
+ return new TPipeTransferResp(
+ RpcUtils.squashResponseStatusList(
+ Stream.of(
+ statementPair.getLeft().isEmpty()
+ ? RpcUtils.SUCCESS_STATUS
+ : executeStatement(
+ statementPair.getLeft(), partitionFetcher,
schemaFetcher),
+ statementPair.getRight().isEmpty()
+ ? RpcUtils.SUCCESS_STATUS
+ : executeStatement(
+ statementPair.getRight(), partitionFetcher,
schemaFetcher))
+ .collect(Collectors.toList())));
+ }
+
private TPipeTransferResp handleTransferFilePiece(
PipeTransferFilePieceReq req, boolean isRequestThroughAirGap) {
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 20394f14ae0..23428d75bec 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -37,6 +37,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -112,11 +113,13 @@ public class InsertRowsNode extends InsertNode {
@Override
public List<PlanNode> getChildren() {
- return null;
+ return Collections.emptyList();
}
@Override
- public void addChild(PlanNode child) {}
+ public void addChild(PlanNode child) {
+ // Do nothing
+ }
@Override
public boolean equals(Object o) {
@@ -145,7 +148,7 @@ public class InsertRowsNode extends InsertNode {
@Override
public List<String> getOutputColumnNames() {
- return null;
+ return Collections.emptyList();
}
public static InsertRowsNode deserialize(ByteBuffer byteBuffer) {