This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new bcc3b3dd4a1 Pipe schema: RPC request & response datastructures (#11680)
bcc3b3dd4a1 is described below
commit bcc3b3dd4a10cab1db9e5b6fa187f547800104b5
Author: Zikun Ma <[email protected]>
AuthorDate: Wed Dec 13 13:24:49 2023 +0800
Pipe schema: RPC request & response datastructures (#11680)
---
.../payload/request/PipeTransferConfigPlanReq.java | 78 +++++++++++++++
.../request/PipeTransferFilePieceReq.java | 4 +-
.../evolvable/request/PipeTransferFileSealReq.java | 4 +-
.../request/PipeTransferHandshakeReq.java | 4 +-
.../request/PipeTransferSchemaPlanReq.java | 105 +++++++++++++++++++++
.../request/PipeTransferTabletBatchReq.java | 4 +-
.../request/PipeTransferTabletBinaryReq.java | 4 +-
.../request/PipeTransferTabletInsertNodeReq.java | 4 +-
.../request/PipeTransferTabletRawReq.java | 4 +-
.../pipe/receiver/thrift/IoTDBThriftReceiver.java | 2 +-
.../receiver/thrift/IoTDBThriftReceiverAgent.java | 2 +-
.../receiver/thrift/IoTDBThriftReceiverV1.java | 36 ++++++-
...est.java => PipeDataNodeThriftRequestTest.java} | 80 +++++++++++++++-
.../request}/IoTDBConnectorRequestVersion.java | 2 +-
.../payload/request}/PipeRequestType.java | 8 +-
.../request/PipeTransferSnapshotPieceReq.java} | 78 ++++++++-------
.../request/PipeTransferSnapshotSealReq.java} | 74 +++++++--------
.../payload/request/TransferConfigPlanReq.java} | 17 +---
.../response/PipeTransferSnapshotPieceResp.java | 81 ++++++++++++++++
19 files changed, 478 insertions(+), 113 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/payload/request/PipeTransferConfigPlanReq.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/payload/request/PipeTransferConfigPlanReq.java
new file mode 100644
index 00000000000..3de3f585307
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/payload/request/PipeTransferConfigPlanReq.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.connector.payload.request;
+
+import
org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
+import
org.apache.iotdb.commons.pipe.connector.payload.request.TransferConfigPlanReq;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class PipeTransferConfigPlanReq extends TransferConfigPlanReq {
+
+ private PipeTransferConfigPlanReq() {
+ // Empty constructor
+ }
+
+ /////////////////////////////// Thrift ///////////////////////////////
+
+ public static PipeTransferConfigPlanReq toTPipeTransferReq(IConsensusRequest
consensusRequest) {
+ final PipeTransferConfigPlanReq req = new PipeTransferConfigPlanReq();
+
+ req.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
+ req.type = PipeRequestType.TRANSFER_CONFIG_PLAN.getType();
+ req.body = consensusRequest.serializeToByteBuffer();
+
+ return req;
+ }
+
+ public static PipeTransferConfigPlanReq
fromTPipeTransferReq(TPipeTransferReq transferReq) {
+ final PipeTransferConfigPlanReq configPlanReq = new
PipeTransferConfigPlanReq();
+
+ configPlanReq.version = transferReq.version;
+ configPlanReq.type = transferReq.type;
+ configPlanReq.body = transferReq.body;
+
+ return configPlanReq;
+ }
+
+ /////////////////////////////// Air Gap ///////////////////////////////
+
+ public static byte[] toTransferInsertNodeBytes(IConsensusRequest
consensusRequest)
+ throws IOException {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(),
outputStream);
+ ReadWriteIOUtils.write(PipeRequestType.TRANSFER_CONFIG_PLAN.getType(),
outputStream);
+ return BytesUtils.concatByteArray(
+ byteArrayOutputStream.toByteArray(),
consensusRequest.serializeToByteBuffer().array());
+ }
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
+ // do not overwrite equals() and hashCode() methods for there is no extra
member variable
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
index 1b9bf9bd1b2..255f84a468b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
@@ -19,8 +19,8 @@
package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
+import
org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
index 6314068fe50..3b8603f6bd8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
@@ -19,8 +19,8 @@
package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
+import
org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
index 435726bfcf1..80f9d1217eb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
@@ -19,8 +19,8 @@
package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
+import
org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferSchemaPlanReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferSchemaPlanReq.java
new file mode 100644
index 00000000000..17d6503a05f
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferSchemaPlanReq.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
+
+import
org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Objects;
+
+public class PipeTransferSchemaPlanReq extends TPipeTransferReq {
+
+ private transient PlanNode planNode;
+
+ private PipeTransferSchemaPlanReq() {
+ // Do nothing
+ }
+
+ public PlanNode getPlanNode() {
+ return planNode;
+ }
+
+ /////////////////////////////// Thrift ///////////////////////////////
+
+ public static PipeTransferSchemaPlanReq toTPipeTransferReq(PlanNode
planNode) {
+ final PipeTransferSchemaPlanReq req = new PipeTransferSchemaPlanReq();
+
+ req.planNode = planNode;
+
+ req.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
+ req.type = PipeRequestType.TRANSFER_SCHEMA_PLAN.getType();
+ req.body = planNode.serializeToByteBuffer();
+
+ return req;
+ }
+
+ public static PipeTransferSchemaPlanReq
fromTPipeTransferReq(TPipeTransferReq transferReq) {
+ final PipeTransferSchemaPlanReq planNodeReq = new
PipeTransferSchemaPlanReq();
+
+ planNodeReq.planNode = PlanNodeType.deserialize(transferReq.body);
+
+ planNodeReq.version = transferReq.version;
+ planNodeReq.type = transferReq.type;
+ planNodeReq.body = transferReq.body;
+
+ return planNodeReq;
+ }
+
+ /////////////////////////////// Air Gap ///////////////////////////////
+ public static byte[] toTransferInsertNodeBytes(PlanNode planNode) throws
IOException {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(),
outputStream);
+ ReadWriteIOUtils.write(PipeRequestType.TRANSFER_SCHEMA_PLAN.getType(),
outputStream);
+ return BytesUtils.concatByteArray(
+ byteArrayOutputStream.toByteArray(),
planNode.serializeToByteBuffer().array());
+ }
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ PipeTransferSchemaPlanReq that = (PipeTransferSchemaPlanReq) obj;
+ return planNode.equals(that.planNode)
+ && version == that.version
+ && type == that.type
+ && body.equals(that.body);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(planNode, version, type, body);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBatchReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBatchReq.java
index 710e3cb437e..bacd7a4dda6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBatchReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBatchReq.java
@@ -19,8 +19,8 @@
package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
+import
org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBinaryReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBinaryReq.java
index 3ab9c948081..28dbf072f5c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBinaryReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBinaryReq.java
@@ -19,8 +19,8 @@
package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
+import
org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
index 80b9f92042b..e112d29e3bf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
@@ -19,8 +19,8 @@
package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
+import
org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
index 71babe20982..71875f7f87e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
@@ -20,9 +20,9 @@
package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
import org.apache.iotdb.commons.exception.MetadataException;
+import
org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
import org.apache.iotdb.commons.utils.PathUtils;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiver.java
index b34b5c123da..409faa69176 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiver.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.pipe.receiver.thrift;
-import
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
+import
org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverAgent.java
index 848e31d3400..a112b27b817 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverAgent.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.pipe.receiver.thrift;
-import
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
+import
org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.rpc.RpcUtils;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
index e0cbae58464..03c58874292 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
@@ -21,22 +21,26 @@ package org.apache.iotdb.db.pipe.receiver.thrift;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import
org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
+import
org.apache.iotdb.commons.pipe.connector.payload.request.PipeTransferSnapshotPieceReq;
+import
org.apache.iotdb.commons.pipe.connector.payload.request.PipeTransferSnapshotSealReq;
+import
org.apache.iotdb.commons.pipe.connector.payload.request.TransferConfigPlanReq;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import
org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferSchemaPlanReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
-import
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
@@ -139,6 +143,14 @@ public class IoTDBThriftReceiverV1 implements
IoTDBThriftReceiver {
case TRANSFER_FILE_SEAL:
return handleTransferFileSeal(
PipeTransferFileSealReq.fromTPipeTransferReq(req),
partitionFetcher, schemaFetcher);
+ case TRANSFER_CONFIG_PLAN:
+ return handleTransferConfigPlan((TransferConfigPlanReq) req);
+ case TRANSFER_SCHEMA_PLAN:
+ return handleTransferSchemaPlan((PipeTransferSchemaPlanReq) req);
+ case TRANSFER_SNAPSHOT_PIECE:
+ return handleTransferSnapshotPiece((PipeTransferSnapshotPieceReq)
req);
+ case TRANSFER_SNAPSHOT_SEAL:
+ return handleTransferSnapshotSeal((PipeTransferSnapshotSealReq)
req);
default:
break;
}
@@ -503,6 +515,26 @@ public class IoTDBThriftReceiverV1 implements
IoTDBThriftReceiver {
}
}
+ private TPipeTransferResp handleTransferConfigPlan(TransferConfigPlanReq
req) {
+ // TODO
+ return new TPipeTransferResp();
+ }
+
+ private TPipeTransferResp handleTransferSchemaPlan(PipeTransferSchemaPlanReq
req) {
+ // TODO
+ return new TPipeTransferResp();
+ }
+
+ private TPipeTransferResp
handleTransferSnapshotPiece(PipeTransferSnapshotPieceReq req) {
+ // TODO
+ return new TPipeTransferResp();
+ }
+
+ private TPipeTransferResp
handleTransferSnapshotSeal(PipeTransferSnapshotSealReq req) {
+ // TODO
+ return new TPipeTransferResp();
+ }
+
private boolean isWritingFileAvailable() {
final boolean isWritingFileAvailable =
writingFile != null && writingFile.exists() && writingFileWriter !=
null;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeThriftRequestTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeDataNodeThriftRequestTest.java
similarity index 67%
rename from
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeThriftRequestTest.java
rename to
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeDataNodeThriftRequestTest.java
index 15d3ee4cd80..03972592efd 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeThriftRequestTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeDataNodeThriftRequestTest.java
@@ -20,17 +20,24 @@
package org.apache.iotdb.db.pipe.connector;
import org.apache.iotdb.commons.path.PartialPath;
+import
org.apache.iotdb.commons.pipe.connector.payload.request.PipeTransferSnapshotPieceReq;
+import
org.apache.iotdb.commons.pipe.connector.payload.request.PipeTransferSnapshotSealReq;
+import
org.apache.iotdb.commons.pipe.connector.payload.response.PipeTransferSnapshotPieceResp;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferSchemaPlanReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -39,9 +46,10 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
-public class PipeThriftRequestTest {
+public class PipeDataNodeThriftRequestTest {
private static final String TIME_PRECISION = "ms";
@@ -85,6 +93,30 @@ public class PipeThriftRequestTest {
Assert.assertEquals(statement.getPaths(), paths);
}
+ @Test
+ public void testPipeTransferSchemaPlanReq() {
+ PipeTransferSchemaPlanReq req =
+ PipeTransferSchemaPlanReq.toTPipeTransferReq(
+ new CreateAlignedTimeSeriesNode(
+ new PlanNodeId(""),
+ new PartialPath(new String[] {"root", "sg", "d"}),
+ Collections.singletonList("s"),
+ Collections.singletonList(TSDataType.INT32),
+ Collections.singletonList(TSEncoding.PLAIN),
+ Collections.singletonList(CompressionType.UNCOMPRESSED),
+ null,
+ null,
+ null));
+
+ PipeTransferSchemaPlanReq deserializeReq =
PipeTransferSchemaPlanReq.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertArrayEquals(req.getBody(), deserializeReq.getBody());
+
+ Assert.assertEquals(req.getPlanNode(), deserializeReq.getPlanNode());
+ }
+
@Test
public void testPipeTransferTabletReq() {
try {
@@ -142,6 +174,25 @@ public class PipeThriftRequestTest {
Assert.assertArrayEquals(req.getFilePiece(),
deserializeReq.getFilePiece());
}
+ @Test
+ public void testPipeTransferSnapshotPieceReq() throws IOException {
+ byte[] body = "testPipeTransferSnapshotPieceReq".getBytes();
+ String fileName = "1.temp";
+
+ PipeTransferSnapshotPieceReq req =
+ PipeTransferSnapshotPieceReq.toTPipeTransferReq(fileName, 0, body);
+ PipeTransferSnapshotPieceReq deserializeReq =
+ PipeTransferSnapshotPieceReq.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertArrayEquals(req.getBody(), deserializeReq.getBody());
+
+ Assert.assertEquals(req.getSnapshotName(),
deserializeReq.getSnapshotName());
+ Assert.assertEquals(req.getStartWritingOffset(),
deserializeReq.getStartWritingOffset());
+ Assert.assertArrayEquals(req.getSnapshotPiece(),
deserializeReq.getSnapshotPiece());
+ }
+
@Test
public void testPipeTransferFileSealReq() throws IOException {
String fileName = "1.tsfile";
@@ -157,6 +208,22 @@ public class PipeThriftRequestTest {
Assert.assertEquals(req.getFileLength(), deserializeReq.getFileLength());
}
+ @Test
+ public void testPipeTransferSnapshotSealReq() throws IOException {
+ String fileName = "1.temp";
+
+ PipeTransferSnapshotSealReq req =
PipeTransferSnapshotSealReq.toTPipeTransferReq(fileName, 100);
+ PipeTransferSnapshotSealReq deserializeReq =
+ PipeTransferSnapshotSealReq.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertArrayEquals(req.getBody(), deserializeReq.getBody());
+
+ Assert.assertEquals(req.getSnapshotName(),
deserializeReq.getSnapshotName());
+ Assert.assertEquals(req.getSnapshotLength(),
deserializeReq.getSnapshotLength());
+ }
+
@Test
public void testPIpeTransferFilePieceResp() throws IOException {
PipeTransferFilePieceResp resp =
@@ -167,4 +234,15 @@ public class PipeThriftRequestTest {
Assert.assertEquals(resp.getStatus(), deserializeResp.getStatus());
Assert.assertEquals(resp.getEndWritingOffset(),
deserializeResp.getEndWritingOffset());
}
+
+ @Test
+ public void testPipeTransferSnapshotPieceResp() throws IOException {
+ PipeTransferSnapshotPieceResp resp =
+
PipeTransferSnapshotPieceResp.toTPipeTransferResp(RpcUtils.SUCCESS_STATUS, 100);
+ PipeTransferSnapshotPieceResp deserializeResp =
+ PipeTransferSnapshotPieceResp.fromTPipeTransferResp(resp);
+
+ Assert.assertEquals(resp.getStatus(), deserializeResp.getStatus());
+ Assert.assertEquals(resp.getEndWritingOffset(),
deserializeResp.getEndWritingOffset());
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnectorRequestVersion.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/IoTDBConnectorRequestVersion.java
similarity index 94%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnectorRequestVersion.java
copy to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/IoTDBConnectorRequestVersion.java
index 941a372a498..a4eaabee95b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnectorRequestVersion.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/IoTDBConnectorRequestVersion.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.connector.protocol;
+package org.apache.iotdb.commons.pipe.connector.payload.request;
public enum IoTDBConnectorRequestVersion {
VERSION_1((byte) 1),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeRequestType.java
similarity index 89%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
rename to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeRequestType.java
index 029ee7fdb0d..1d9528a4318 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeRequestType.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.connector.payload.evolvable;
+package org.apache.iotdb.commons.pipe.connector.payload.request;
import java.util.Arrays;
import java.util.HashMap;
@@ -34,6 +34,12 @@ public enum PipeRequestType {
TRANSFER_TABLET_BATCH((short) 6),
TRANSFER_TABLET_BINARY((short) 7),
+
+ TRANSFER_SCHEMA_PLAN((short) 8),
+ TRANSFER_CONFIG_PLAN((short) 9),
+
+ TRANSFER_SNAPSHOT_PIECE((short) 10),
+ TRANSFER_SNAPSHOT_SEAL((short) 11),
;
private final short type;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeTransferSnapshotPieceReq.java
similarity index 53%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
copy to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeTransferSnapshotPieceReq.java
index 1b9bf9bd1b2..b397d2f7e8c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeTransferSnapshotPieceReq.java
@@ -17,10 +17,8 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
+package org.apache.iotdb.commons.pipe.connector.payload.request;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -32,77 +30,77 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
-public class PipeTransferFilePieceReq extends TPipeTransferReq {
+public class PipeTransferSnapshotPieceReq extends TPipeTransferReq {
- private transient String fileName;
+ private transient String snapshotName;
private transient long startWritingOffset;
- private transient byte[] filePiece;
+ private transient byte[] snapshotPiece;
- private PipeTransferFilePieceReq() {
+ private PipeTransferSnapshotPieceReq() {
// Empty constructor
}
- public String getFileName() {
- return fileName;
+ public String getSnapshotName() {
+ return snapshotName;
}
public long getStartWritingOffset() {
return startWritingOffset;
}
- public byte[] getFilePiece() {
- return filePiece;
+ public byte[] getSnapshotPiece() {
+ return snapshotPiece;
}
/////////////////////////////// Thrift ///////////////////////////////
- public static PipeTransferFilePieceReq toTPipeTransferReq(
- String fileName, long startWritingOffset, byte[] filePiece) throws
IOException {
- final PipeTransferFilePieceReq filePieceReq = new
PipeTransferFilePieceReq();
+ public static PipeTransferSnapshotPieceReq toTPipeTransferReq(
+ String snapshotName, long startWritingOffset, byte[] snapshotPiece)
throws IOException {
+ final PipeTransferSnapshotPieceReq snapshotPieceReq = new
PipeTransferSnapshotPieceReq();
- filePieceReq.fileName = fileName;
- filePieceReq.startWritingOffset = startWritingOffset;
- filePieceReq.filePiece = filePiece;
+ snapshotPieceReq.snapshotName = snapshotName;
+ snapshotPieceReq.startWritingOffset = startWritingOffset;
+ snapshotPieceReq.snapshotPiece = snapshotPiece;
- filePieceReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
- filePieceReq.type = PipeRequestType.TRANSFER_FILE_PIECE.getType();
+ snapshotPieceReq.version =
IoTDBConnectorRequestVersion.VERSION_1.getVersion();
+ snapshotPieceReq.type = PipeRequestType.TRANSFER_SNAPSHOT_PIECE.getType();
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
- ReadWriteIOUtils.write(fileName, outputStream);
+ ReadWriteIOUtils.write(snapshotName, outputStream);
ReadWriteIOUtils.write(startWritingOffset, outputStream);
- ReadWriteIOUtils.write(new Binary(filePiece), outputStream);
- filePieceReq.body =
+ ReadWriteIOUtils.write(new Binary(snapshotPiece), outputStream);
+ snapshotPieceReq.body =
ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
}
- return filePieceReq;
+ return snapshotPieceReq;
}
- public static PipeTransferFilePieceReq fromTPipeTransferReq(TPipeTransferReq
transferReq) {
- final PipeTransferFilePieceReq filePieceReq = new
PipeTransferFilePieceReq();
+ public static PipeTransferSnapshotPieceReq
fromTPipeTransferReq(TPipeTransferReq transferReq) {
+ final PipeTransferSnapshotPieceReq snapshotPieceReq = new
PipeTransferSnapshotPieceReq();
- filePieceReq.fileName = ReadWriteIOUtils.readString(transferReq.body);
- filePieceReq.startWritingOffset =
ReadWriteIOUtils.readLong(transferReq.body);
- filePieceReq.filePiece =
ReadWriteIOUtils.readBinary(transferReq.body).getValues();
+ snapshotPieceReq.snapshotName =
ReadWriteIOUtils.readString(transferReq.body);
+ snapshotPieceReq.startWritingOffset =
ReadWriteIOUtils.readLong(transferReq.body);
+ snapshotPieceReq.snapshotPiece =
ReadWriteIOUtils.readBinary(transferReq.body).getValues();
- filePieceReq.version = transferReq.version;
- filePieceReq.type = transferReq.type;
- filePieceReq.body = transferReq.body;
+ snapshotPieceReq.version = transferReq.version;
+ snapshotPieceReq.type = transferReq.type;
+ snapshotPieceReq.body = transferReq.body;
- return filePieceReq;
+ return snapshotPieceReq;
}
/////////////////////////////// Air Gap ///////////////////////////////
public static byte[] toTPipeTransferBytes(
- String fileName, long startWritingOffset, byte[] filePiece) throws
IOException {
+ String snapshotName, long startWritingOffset, byte[] snapshotPiece)
throws IOException {
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(),
outputStream);
- ReadWriteIOUtils.write(PipeRequestType.TRANSFER_FILE_PIECE.getType(),
outputStream);
- ReadWriteIOUtils.write(fileName, outputStream);
+
ReadWriteIOUtils.write(PipeRequestType.TRANSFER_SNAPSHOT_PIECE.getType(),
outputStream);
+ ReadWriteIOUtils.write(snapshotName, outputStream);
ReadWriteIOUtils.write(startWritingOffset, outputStream);
- ReadWriteIOUtils.write(new Binary(filePiece), outputStream);
+ ReadWriteIOUtils.write(new Binary(snapshotPiece), outputStream);
return byteArrayOutputStream.toByteArray();
}
}
@@ -117,10 +115,10 @@ public class PipeTransferFilePieceReq extends
TPipeTransferReq {
if (obj == null || getClass() != obj.getClass()) {
return false;
}
- PipeTransferFilePieceReq that = (PipeTransferFilePieceReq) obj;
- return fileName.equals(that.fileName)
+ PipeTransferSnapshotPieceReq that = (PipeTransferSnapshotPieceReq) obj;
+ return snapshotName.equals(that.snapshotName)
&& startWritingOffset == that.startWritingOffset
- && Arrays.equals(filePiece, that.filePiece)
+ && Arrays.equals(snapshotPiece, that.snapshotPiece)
&& version == that.version
&& type == that.type
&& body.equals(that.body);
@@ -129,6 +127,6 @@ public class PipeTransferFilePieceReq extends
TPipeTransferReq {
@Override
public int hashCode() {
return Objects.hash(
- fileName, startWritingOffset, Arrays.hashCode(filePiece), version,
type, body);
+ snapshotName, startWritingOffset, Arrays.hashCode(snapshotPiece),
version, type, body);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeTransferSnapshotSealReq.java
similarity index 52%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
copy to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeTransferSnapshotSealReq.java
index 6314068fe50..b4ffbc2cd03 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeTransferSnapshotSealReq.java
@@ -17,10 +17,8 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
+package org.apache.iotdb.commons.pipe.connector.payload.request;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -30,68 +28,68 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
-public class PipeTransferFileSealReq extends TPipeTransferReq {
+public class PipeTransferSnapshotSealReq extends TPipeTransferReq {
- private transient String fileName;
- private transient long fileLength;
+ private transient String snapshotName;
+ private transient long snapshotLength;
- private PipeTransferFileSealReq() {
+ private PipeTransferSnapshotSealReq() {
// Empty constructor
}
- public String getFileName() {
- return fileName;
+ public String getSnapshotName() {
+ return snapshotName;
}
- public long getFileLength() {
- return fileLength;
+ public long getSnapshotLength() {
+ return snapshotLength;
}
/////////////////////////////// Thrift ///////////////////////////////
- public static PipeTransferFileSealReq toTPipeTransferReq(String fileName,
long fileLength)
- throws IOException {
- final PipeTransferFileSealReq fileSealReq = new PipeTransferFileSealReq();
+ public static PipeTransferSnapshotSealReq toTPipeTransferReq(
+ String snapshotName, long snapshotLength) throws IOException {
+ final PipeTransferSnapshotSealReq snapshotSealReq = new
PipeTransferSnapshotSealReq();
- fileSealReq.fileName = fileName;
- fileSealReq.fileLength = fileLength;
+ snapshotSealReq.snapshotName = snapshotName;
+ snapshotSealReq.snapshotLength = snapshotLength;
- fileSealReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
- fileSealReq.type = PipeRequestType.TRANSFER_FILE_SEAL.getType();
+ snapshotSealReq.version =
IoTDBConnectorRequestVersion.VERSION_1.getVersion();
+ snapshotSealReq.type = PipeRequestType.TRANSFER_SNAPSHOT_SEAL.getType();
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
- ReadWriteIOUtils.write(fileName, outputStream);
- ReadWriteIOUtils.write(fileLength, outputStream);
- fileSealReq.body =
+ ReadWriteIOUtils.write(snapshotName, outputStream);
+ ReadWriteIOUtils.write(snapshotLength, outputStream);
+ snapshotSealReq.body =
ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
}
- return fileSealReq;
+ return snapshotSealReq;
}
- public static PipeTransferFileSealReq fromTPipeTransferReq(TPipeTransferReq
req) {
- final PipeTransferFileSealReq fileSealReq = new PipeTransferFileSealReq();
+ public static PipeTransferSnapshotSealReq
fromTPipeTransferReq(TPipeTransferReq req) {
+ final PipeTransferSnapshotSealReq snapshotSealReq = new
PipeTransferSnapshotSealReq();
- fileSealReq.fileName = ReadWriteIOUtils.readString(req.body);
- fileSealReq.fileLength = ReadWriteIOUtils.readLong(req.body);
+ snapshotSealReq.snapshotName = ReadWriteIOUtils.readString(req.body);
+ snapshotSealReq.snapshotLength = ReadWriteIOUtils.readLong(req.body);
- fileSealReq.version = req.version;
- fileSealReq.type = req.type;
- fileSealReq.body = req.body;
+ snapshotSealReq.version = req.version;
+ snapshotSealReq.type = req.type;
+ snapshotSealReq.body = req.body;
- return fileSealReq;
+ return snapshotSealReq;
}
/////////////////////////////// Air Gap ///////////////////////////////
- public static byte[] toTPipeTransferFileSealBytes(String fileName, long
fileLength)
+ public static byte[] toTPipeTransferSnapshotSealBytes(String snapshotName,
long snapshotLength)
throws IOException {
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(),
outputStream);
- ReadWriteIOUtils.write(PipeRequestType.TRANSFER_FILE_SEAL.getType(),
outputStream);
- ReadWriteIOUtils.write(fileName, outputStream);
- ReadWriteIOUtils.write(fileLength, outputStream);
+ ReadWriteIOUtils.write(PipeRequestType.TRANSFER_SNAPSHOT_SEAL.getType(),
outputStream);
+ ReadWriteIOUtils.write(snapshotName, outputStream);
+ ReadWriteIOUtils.write(snapshotLength, outputStream);
return byteArrayOutputStream.toByteArray();
}
}
@@ -106,9 +104,9 @@ public class PipeTransferFileSealReq extends
TPipeTransferReq {
if (obj == null || getClass() != obj.getClass()) {
return false;
}
- PipeTransferFileSealReq that = (PipeTransferFileSealReq) obj;
- return fileName.equals(that.fileName)
- && fileLength == that.fileLength
+ PipeTransferSnapshotSealReq that = (PipeTransferSnapshotSealReq) obj;
+ return snapshotName.equals(that.snapshotName)
+ && snapshotLength == that.snapshotLength
&& version == that.version
&& type == that.type
&& body.equals(that.body);
@@ -116,6 +114,6 @@ public class PipeTransferFileSealReq extends
TPipeTransferReq {
@Override
public int hashCode() {
- return Objects.hash(fileName, fileLength, version, type, body);
+ return Objects.hash(snapshotName, snapshotLength, version, type, body);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnectorRequestVersion.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/TransferConfigPlanReq.java
similarity index 74%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnectorRequestVersion.java
rename to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/TransferConfigPlanReq.java
index 941a372a498..3520ddfa835 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnectorRequestVersion.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/TransferConfigPlanReq.java
@@ -17,19 +17,8 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.connector.protocol;
+package org.apache.iotdb.commons.pipe.connector.payload.request;
-public enum IoTDBConnectorRequestVersion {
- VERSION_1((byte) 1),
- ;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
- private final byte version;
-
- IoTDBConnectorRequestVersion(byte type) {
- this.version = type;
- }
-
- public byte getVersion() {
- return version;
- }
-}
+public abstract class TransferConfigPlanReq extends TPipeTransferReq {}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/response/PipeTransferSnapshotPieceResp.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/response/PipeTransferSnapshotPieceResp.java
new file mode 100644
index 00000000000..5df196a7b5c
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/response/PipeTransferSnapshotPieceResp.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.payload.response;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class PipeTransferSnapshotPieceResp extends TPipeTransferResp {
+
+ public static final long ERROR_END_OFFSET = -1;
+
+ private long endWritingOffset;
+
+ private PipeTransferSnapshotPieceResp() {}
+
+ public long getEndWritingOffset() {
+ return endWritingOffset;
+ }
+
+ public static PipeTransferSnapshotPieceResp toTPipeTransferResp(
+ TSStatus status, long endWritingOffset) throws IOException {
+ final PipeTransferSnapshotPieceResp snapshotPieceResp = new
PipeTransferSnapshotPieceResp();
+
+ snapshotPieceResp.status = status;
+
+ snapshotPieceResp.endWritingOffset = endWritingOffset;
+ try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(endWritingOffset, outputStream);
+ snapshotPieceResp.body =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+
+ return snapshotPieceResp;
+ }
+
+ public static PipeTransferSnapshotPieceResp toTPipeTransferResp(TSStatus
status) {
+ final PipeTransferSnapshotPieceResp snapshotPieceResp = new
PipeTransferSnapshotPieceResp();
+
+ snapshotPieceResp.status = status;
+
+ return snapshotPieceResp;
+ }
+
+ public static PipeTransferSnapshotPieceResp fromTPipeTransferResp(
+ TPipeTransferResp transferResp) {
+ final PipeTransferSnapshotPieceResp snapshotPieceResp = new
PipeTransferSnapshotPieceResp();
+
+ snapshotPieceResp.status = transferResp.status;
+
+ if (transferResp.isSetBody()) {
+ snapshotPieceResp.endWritingOffset =
ReadWriteIOUtils.readLong(transferResp.body);
+ snapshotPieceResp.body = transferResp.body;
+ }
+
+ return snapshotPieceResp;
+ }
+}