This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e7bc4926df0 Support legacy pipe receiver requests (#17901)
e7bc4926df0 is described below
commit e7bc4926df0878337d61b7c19a00b72d48e55fe0
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 11 17:25:31 2026 +0800
Support legacy pipe receiver requests (#17901)
* Support legacy pipe snapshot seal parameters
* Support legacy pipe tablet requests
* Resolve legacy pipe tablet request conflicts
* Add pipe receiver 1.3 compatibility tests
* Try current raw tablet format first
* Fix legacy raw tablet fallback
---
.../receiver/protocol/IoTDBConfigNodeReceiver.java | 4 +-
.../pipe/sink/PipeConfigNodeThriftRequestTest.java | 212 +++++++++
.../protocol/thrift/IoTDBDataNodeReceiver.java | 4 +-
.../request/PipeTransferTabletRawReq.java | 89 +++-
.../pipe/sink/util/TabletStatementConverter.java | 29 +-
.../pipe/sink/PipeDataNodeThriftRequestTest.java | 505 +++++++++++++++++++++
.../thrift/request/PipeTransferFileSealReqV2.java | 8 +
.../common/PipeTransferSliceReqBuilderTest.java | 32 ++
.../thrift/request/PipeRequestTypeTest.java | 61 +++
.../request/PipeTransferCompressedReqTest.java | 112 +++++
.../request/PipeTransferFileSealReqV2Test.java | 65 +++
11 files changed, 1101 insertions(+), 20 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index 7de8e8bf78d..5f414f2cd69 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -1277,7 +1277,7 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
PipeConfigRegionSnapshotEvent.getConfigPhysicalPlanTypeSet(
parameters.get(ColumnHeaderConstant.TYPE));
final boolean isTreeModelDataAllowedToBeCaptured =
- parameters.containsKey(PipeTransferFileSealReqV2.TREE);
+
PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters);
final TreePattern treePattern =
TreePattern.parsePatternFromString(
parameters.get(ColumnHeaderConstant.PATH_PATTERN),
@@ -1285,7 +1285,7 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
p -> new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, p));
final TablePattern tablePattern =
new TablePattern(
- parameters.containsKey(PipeTransferFileSealReqV2.TABLE),
+
PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters),
parameters.get(PipeTransferFileSealReqV2.DATABASE_PATTERN),
parameters.get(ColumnHeaderConstant.TABLE_NAME));
final List<TSStatus> results = new ArrayList<>();
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/PipeConfigNodeThriftRequestTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/PipeConfigNodeThriftRequestTest.java
index 5c578ebead8..dd948a451cf 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/PipeConfigNodeThriftRequestTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/PipeConfigNodeThriftRequestTest.java
@@ -19,17 +19,33 @@
package org.apache.iotdb.confignode.manager.pipe.sink;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferHandshakeConstant;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV2;
+import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan;
import
org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigNodeHandshakeV1Req;
+import
org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigNodeHandshakeV2Req;
import
org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigPlanReq;
import
org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigSnapshotPieceReq;
import
org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigSnapshotSealReq;
import org.apache.iotdb.confignode.persistence.schema.CNSnapshotFileType;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.junit.Assert;
import org.junit.Test;
+import java.io.DataOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
public class PipeConfigNodeThriftRequestTest {
@@ -48,6 +64,59 @@ public class PipeConfigNodeThriftRequestTest {
Assert.assertEquals(req.getTimestampPrecision(),
deserializeReq.getTimestampPrecision());
}
+ @Test
+ public void testPipeTransferConfigHandshakeReqFromLegacyV13Body() throws
IOException {
+ final TPipeTransferReq req =
+ legacyTransferReq(
+ PipeRequestType.HANDSHAKE_CONFIGNODE_V1,
+ serializeLegacyHandshakeV1Body(TIME_PRECISION));
+
+ final PipeTransferConfigNodeHandshakeV1Req deserializeReq =
+ PipeTransferConfigNodeHandshakeV1Req.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertEquals(TIME_PRECISION,
deserializeReq.getTimestampPrecision());
+ }
+
+ @Test
+ public void testPipeTransferConfigHandshakeV2Req() throws IOException {
+ final Map<String, String> params = new HashMap<>();
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
TIME_PRECISION);
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID,
"cluster");
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, "root");
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, "root");
+
+ final PipeTransferConfigNodeHandshakeV2Req req =
+ PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferReq(params);
+ final PipeTransferConfigNodeHandshakeV2Req deserializeReq =
+ PipeTransferConfigNodeHandshakeV2Req.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertEquals(params, deserializeReq.getParams());
+ }
+
+ @Test
+ public void testPipeTransferConfigHandshakeV2ReqFromLegacyV13Body() throws
IOException {
+ final Map<String, String> params = new HashMap<>();
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
TIME_PRECISION);
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID,
"cluster");
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, "root");
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, "root");
+
+ final TPipeTransferReq req =
+ legacyTransferReq(
+ PipeRequestType.HANDSHAKE_CONFIGNODE_V2,
serializeLegacyHandshakeV2Body(params));
+
+ final PipeTransferConfigNodeHandshakeV2Req deserializeReq =
+ PipeTransferConfigNodeHandshakeV2Req.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertEquals(params, deserializeReq.getParams());
+ }
+
@Test
public void testPipeTransferConfigPlanReq() {
PipeTransferConfigPlanReq req =
@@ -56,6 +125,22 @@ public class PipeConfigNodeThriftRequestTest {
Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertArrayEquals(req.getBody(), deserializeReq.getBody());
+ }
+
+ @Test
+ public void testPipeTransferConfigPlanReqFromLegacyV13Body() {
+ final ActiveCQPlan plan = new ActiveCQPlan("cqId", "md5");
+ final ByteBuffer legacyBody = plan.serializeToByteBuffer();
+ final TPipeTransferReq req =
+ legacyTransferReq(PipeRequestType.TRANSFER_CONFIG_PLAN, legacyBody);
+
+ final PipeTransferConfigPlanReq deserializeReq =
+ PipeTransferConfigPlanReq.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertArrayEquals(byteBufferToByteArray(legacyBody),
deserializeReq.getBody());
}
@Test
@@ -76,6 +161,24 @@ public class PipeConfigNodeThriftRequestTest {
Assert.assertArrayEquals(req.getFilePiece(),
deserializeReq.getFilePiece());
}
+ @Test
+ public void testPipeTransferConfigSnapshotPieceReqFromLegacyV13Body() throws
IOException {
+ final byte[] body = "legacyConfigSnapshotPiece".getBytes();
+ final TPipeTransferReq req =
+ legacyTransferReq(
+ PipeRequestType.TRANSFER_CONFIG_SNAPSHOT_PIECE,
+ serializeLegacyFilePieceBody("cluster_schema.bin", 10L, body));
+
+ final PipeTransferConfigSnapshotPieceReq deserializeReq =
+ PipeTransferConfigSnapshotPieceReq.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertEquals("cluster_schema.bin", deserializeReq.getFileName());
+ Assert.assertEquals(10L, deserializeReq.getStartWritingOffset());
+ Assert.assertArrayEquals(body, deserializeReq.getFilePiece());
+ }
+
@Test
public void testPipeTransferConfigSnapshotSealReq() throws IOException {
String snapshotName = "cluster_schema.bin";
@@ -108,4 +211,113 @@ public class PipeConfigNodeThriftRequestTest {
Assert.assertEquals(req.getFileLengths(), deserializeReq.getFileLengths());
Assert.assertEquals(req.getParameters(), deserializeReq.getParameters());
}
+
+ @Test
+ public void testPipeTransferConfigSnapshotSealReqFromLegacyV13Body() throws
IOException {
+ final String snapshotName = "cluster_schema.bin";
+ final String templateInfoName = "template_info.bin";
+ final Map<String, String> parameters = new HashMap<>();
+ parameters.put(ColumnHeaderConstant.PATH_PATTERN, "root.**");
+ parameters.put(
+ PipeTransferConfigSnapshotSealReq.FILE_TYPE,
+ Byte.toString(CNSnapshotFileType.SCHEMA.getType()));
+ parameters.put(ColumnHeaderConstant.TYPE, "200");
+
+ final TPipeTransferReq req =
+ legacyTransferReq(
+ PipeRequestType.TRANSFER_CONFIG_SNAPSHOT_SEAL,
+ serializeLegacyFileSealV2Body(
+ Arrays.asList(snapshotName, templateInfoName),
+ Arrays.asList(100L, 10L),
+ parameters));
+ final PipeTransferConfigSnapshotSealReq deserializeReq =
+ PipeTransferConfigSnapshotSealReq.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertEquals(
+ Arrays.asList(snapshotName, templateInfoName),
deserializeReq.getFileNames());
+ Assert.assertEquals(Arrays.asList(100L, 10L),
deserializeReq.getFileLengths());
+ Assert.assertEquals(parameters, deserializeReq.getParameters());
+ Assert.assertTrue(
+ PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(
+ deserializeReq.getParameters()));
+ Assert.assertFalse(
+ PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(
+ deserializeReq.getParameters()));
+ }
+
+ private static TPipeTransferReq legacyTransferReq(
+ final PipeRequestType requestType, final ByteBuffer body) {
+ final TPipeTransferReq req = new TPipeTransferReq();
+ req.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
+ req.type = requestType.getType();
+ req.body = body;
+ return req;
+ }
+
+ private static byte[] byteBufferToByteArray(final ByteBuffer byteBuffer) {
+ final ByteBuffer duplicatedBuffer = byteBuffer.duplicate();
+ final byte[] bytes = new byte[duplicatedBuffer.remaining()];
+ duplicatedBuffer.get(bytes);
+ return bytes;
+ }
+
+ private static ByteBuffer serializeLegacyFileSealV2Body(
+ final List<String> fileNames,
+ final List<Long> fileLengths,
+ final Map<String, String> parameters)
+ throws IOException {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(fileNames.size(), outputStream);
+ for (final String fileName : fileNames) {
+ ReadWriteIOUtils.write(fileName, outputStream);
+ }
+ ReadWriteIOUtils.write(fileLengths.size(), outputStream);
+ for (final Long fileLength : fileLengths) {
+ ReadWriteIOUtils.write(fileLength, outputStream);
+ }
+ ReadWriteIOUtils.write(parameters.size(), outputStream);
+ for (final Map.Entry<String, String> entry : parameters.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue(), outputStream);
+ }
+ return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+ }
+
+ private static ByteBuffer serializeLegacyHandshakeV1Body(final String
timestampPrecision)
+ throws IOException {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(timestampPrecision, outputStream);
+ return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+ }
+
+ private static ByteBuffer serializeLegacyHandshakeV2Body(final Map<String,
String> params)
+ throws IOException {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(params.size(), outputStream);
+ for (final Map.Entry<String, String> entry : params.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue(), outputStream);
+ }
+ return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+ }
+
+ private static ByteBuffer serializeLegacyFilePieceBody(
+ final String fileName, final long startWritingOffset, final byte[]
filePiece)
+ throws IOException {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(fileName, outputStream);
+ ReadWriteIOUtils.write(startWritingOffset, outputStream);
+ ReadWriteIOUtils.write(new Binary(filePiece), outputStream);
+ return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index f4cb3b281e9..ba7f5f7e03e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -641,7 +641,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
PipeSchemaRegionSnapshotEvent.getStatementTypeSet(
parameters.get(ColumnHeaderConstant.TYPE));
final boolean isTreeModelDataAllowedToBeCaptured =
- parameters.containsKey(PipeTransferFileSealReqV2.TREE);
+
PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters);
final TreePattern treePattern =
TreePattern.parsePatternFromString(
parameters.get(ColumnHeaderConstant.PATH_PATTERN),
@@ -649,7 +649,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
p -> new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, p));
final TablePattern tablePattern =
new TablePattern(
- parameters.containsKey(PipeTransferFileSealReqV2.TABLE),
+
PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters),
parameters.get(PipeTransferFileSealReqV2.DATABASE_PATTERN),
parameters.get(ColumnHeaderConstant.TABLE_NAME));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
index 98ea83b6d76..63db24a37ba 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.record.Tablet;
@@ -118,19 +119,7 @@ public class PipeTransferTabletRawReq extends
TPipeTransferReq {
final ByteBuffer buffer, final TabletStringInternPool
tabletStringInternPool) {
final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq();
- final int startPosition = buffer.position();
- try {
- final InsertTabletStatement insertTabletStatement =
- TabletStatementConverter.deserializeStatementFromTabletFormat(
- buffer, false, tabletStringInternPool);
- tabletReq.isAligned = insertTabletStatement.isAligned();
- tabletReq.statement = insertTabletStatement;
- } catch (final Exception e) {
- buffer.position(startPosition);
- tabletReq.tablet =
- PipeTabletUtils.internTablet(Tablet.deserialize(buffer),
tabletStringInternPool);
- tabletReq.isAligned = ReadWriteIOUtils.readBool(buffer);
- }
+ tabletReq.deserializeTPipeTransferRawReq(buffer, tabletStringInternPool);
return tabletReq;
}
@@ -167,6 +156,80 @@ public class PipeTransferTabletRawReq extends
TPipeTransferReq {
return tabletReq;
}
+ private void deserializeTPipeTransferRawReq(
+ final ByteBuffer buffer, final TabletStringInternPool
tabletStringInternPool) {
+ final int startPosition = buffer.position();
+ try {
+ // Current V1 raw tablet requests can be converted to
InsertTabletStatement directly. Keep
+ // this as the first attempt to avoid the overhead of constructing an
intermediate Tablet.
+ final InsertTabletStatement insertTabletStatement =
+ TabletStatementConverter.deserializeStatementFromTabletFormat(
+ buffer, false, tabletStringInternPool);
+ // Legacy tablets do not serialize column categories. Since hasSchema=1
can be
+ // misread as FIELD, the current reader may return a corrupt statement
instead of failing.
+
ensureStatementDeserializedFromCurrentTabletFormat(insertTabletStatement);
+ isAligned = insertTabletStatement.isAligned();
+ statement = insertTabletStatement;
+ return;
+ } catch (final Exception e) {
+ buffer.position(startPosition);
+ }
+
+ try {
+ // Some old senders serialize Tablet without column categories. Retry
with the legacy reader
+ // before falling back to the full Tablet deserialization path.
+ final InsertTabletStatement insertTabletStatement =
+ TabletStatementConverter.deserializeLegacyStatementFromTabletFormat(
+ buffer, tabletStringInternPool);
+ isAligned = insertTabletStatement.isAligned();
+ statement = insertTabletStatement;
+ return;
+ } catch (final Exception e) {
+ buffer.position(startPosition);
+ }
+
+ tablet = PipeTabletUtils.internTablet(Tablet.deserialize(buffer),
tabletStringInternPool);
+ isAligned = ReadWriteIOUtils.readBool(buffer);
+ }
+
+ private static void ensureStatementDeserializedFromCurrentTabletFormat(
+ final InsertTabletStatement statement) {
+ final String[] measurements = statement.getMeasurements();
+ final TSDataType[] dataTypes = statement.getDataTypes();
+
+ if (Objects.isNull(measurements)
+ || Objects.isNull(dataTypes)
+ || measurements.length != dataTypes.length) {
+ throw new IllegalArgumentException(
+ "Incomplete schema in current tablet format deserialization.");
+ }
+
+ final Object[] columns = statement.getColumns();
+ if (Objects.nonNull(columns) && columns.length != measurements.length) {
+ throw new IllegalArgumentException(
+ "Column count is inconsistent with schema count in current tablet
format deserialization.");
+ }
+
+ for (int i = 0; i < measurements.length; ++i) {
+ if (Objects.isNull(measurements[i]) || Objects.isNull(dataTypes[i])) {
+ throw new IllegalArgumentException(
+ "Incomplete measurement schema in current tablet format
deserialization.");
+ }
+ if (statement.getRowCount() > 0 && (Objects.isNull(columns) ||
Objects.isNull(columns[i]))) {
+ throw new IllegalArgumentException(
+ "Incomplete column values in current tablet format
deserialization.");
+ }
+ }
+
+ final long[] times = statement.getTimes();
+ if (statement.getRowCount() > 0
+ && measurements.length > 0
+ && (Objects.isNull(times) || times.length < statement.getRowCount())) {
+ throw new IllegalArgumentException(
+ "Incomplete timestamps in current tablet format deserialization.");
+ }
+ }
+
/////////////////////////////// Air Gap ///////////////////////////////
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
index d7be9f548f5..405859dd982 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
@@ -86,6 +86,27 @@ public class TabletStatementConverter {
final boolean readDatabaseName,
final TabletStringInternPool tabletStringInternPool)
throws IllegalPathException {
+ return deserializeStatementFromTabletFormat(
+ byteBuffer, readDatabaseName, tabletStringInternPool, true);
+ }
+
+ public static InsertTabletStatement
deserializeLegacyStatementFromTabletFormat(
+ final ByteBuffer byteBuffer) throws IllegalPathException {
+ return deserializeLegacyStatementFromTabletFormat(byteBuffer, null);
+ }
+
+ public static InsertTabletStatement
deserializeLegacyStatementFromTabletFormat(
+ final ByteBuffer byteBuffer, final TabletStringInternPool
tabletStringInternPool)
+ throws IllegalPathException {
+ return deserializeStatementFromTabletFormat(byteBuffer, false,
tabletStringInternPool, false);
+ }
+
+ private static InsertTabletStatement deserializeStatementFromTabletFormat(
+ final ByteBuffer byteBuffer,
+ final boolean readDatabaseName,
+ final TabletStringInternPool tabletStringInternPool,
+ final boolean readColumnCategory)
+ throws IllegalPathException {
final InsertTabletStatement statement = new InsertTabletStatement();
// Calculate memory size during deserialization, use INSTANCE_SIZE constant
@@ -132,9 +153,11 @@ public class TabletStatementConverter {
final Pair<String, TSDataType> pair = readMeasurement(byteBuffer,
tabletStringInternPool);
measurement[i] = pair.getLeft();
dataTypes[i] = pair.getRight();
- columnCategories[i] =
- TsTableColumnCategory.fromTsFileColumnCategory(
- ColumnCategory.values()[byteBuffer.get()]);
+ if (readColumnCategory) {
+ columnCategories[i] =
+ TsTableColumnCategory.fromTsFileColumnCategory(
+ ColumnCategory.values()[byteBuffer.get()]);
+ }
// Calculate memory for each measurement string
if (measurement[i] != null) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
index 10573e5609d..88f7353f519 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
@@ -20,12 +20,18 @@
package org.apache.iotdb.db.pipe.sink;
import org.apache.iotdb.commons.path.PartialPath;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferHandshakeConstant;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV2;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.response.PipeTransferFilePieceResp;
import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
import
org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.CombineRequest;
import org.apache.iotdb.db.pipe.processor.twostage.state.CountState;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
+import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferSchemaSnapshotPieceReq;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferSchemaSnapshotSealReq;
@@ -40,6 +46,7 @@ import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTable
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFilePieceReq;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFileSealReq;
+import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateAlignedTimeSeriesNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
@@ -69,9 +76,12 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
public class PipeDataNodeThriftRequestTest {
@@ -146,6 +156,61 @@ public class PipeDataNodeThriftRequestTest {
Assert.assertEquals(req.getTimestampPrecision(),
deserializeReq.getTimestampPrecision());
}
+ @Test
+ public void testPipeTransferDataNodeHandshakeReqFromLegacyV13Body() throws
IOException {
+ final TPipeTransferReq req =
+ legacyTransferReq(
+ PipeRequestType.HANDSHAKE_DATANODE_V1,
serializeLegacyHandshakeV1Body(TIME_PRECISION));
+
+ final PipeTransferDataNodeHandshakeV1Req deserializeReq =
+ PipeTransferDataNodeHandshakeV1Req.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertEquals(TIME_PRECISION,
deserializeReq.getTimestampPrecision());
+ }
+
+ @Test
+ public void testPipeTransferDataNodeHandshakeV2Req() throws IOException {
+ final Map<String, String> params = new HashMap<>();
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
TIME_PRECISION);
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID,
"cluster");
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, "root");
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, "root");
+
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY,
"async");
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE,
Boolean.TRUE.toString());
+
+ final PipeTransferDataNodeHandshakeV2Req req =
+ PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params);
+ final PipeTransferDataNodeHandshakeV2Req deserializeReq =
+ PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertEquals(params, deserializeReq.getParams());
+ }
+
+ @Test
+ public void testPipeTransferDataNodeHandshakeV2ReqFromLegacyV13Body() throws
IOException {
+ final Map<String, String> params = new HashMap<>();
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
TIME_PRECISION);
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID,
"cluster");
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, "root");
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, "root");
+
+ final TPipeTransferReq req =
+ legacyTransferReq(
+ PipeRequestType.HANDSHAKE_DATANODE_V2,
serializeLegacyHandshakeV2Body(params));
+
+ final PipeTransferDataNodeHandshakeV2Req deserializeReq =
+ PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertEquals(params, deserializeReq.getParams());
+ }
+
@Test
public void testPipeTransferInsertNodeReq() {
final PipeTransferTabletInsertNodeReq req =
@@ -173,6 +238,34 @@ public class PipeDataNodeThriftRequestTest {
Assert.assertEquals(statement.getPaths(), paths);
}
+ @Test
+ public void testPipeTransferInsertNodeReqFromLegacyV13Body() {
+ final InsertRowNode node =
+ new InsertRowNode(
+ new PlanNodeId(""),
+ new PartialPath(new String[] {"root", "sg", "d"}),
+ false,
+ new String[] {"s"},
+ new TSDataType[] {TSDataType.INT32},
+ 1,
+ new Object[] {1},
+ false);
+ final TPipeTransferReq req =
+ legacyTransferReq(
+ PipeRequestType.TRANSFER_TABLET_INSERT_NODE,
node.serializeToByteBuffer());
+
+ final PipeTransferTabletInsertNodeReq deserializeReq =
+ PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertEquals(node, deserializeReq.getInsertNode());
+
+ final List<PartialPath> paths = new ArrayList<>();
+ paths.add(new PartialPath(new String[] {"root", "sg", "d", "s"}));
+ Assert.assertEquals(paths, deserializeReq.constructStatement().getPaths());
+ }
+
@Test
public void testPipeTransferInsertNodeReqV2() {
final PipeTransferTabletInsertNodeReqV2 req =
@@ -244,6 +337,23 @@ public class PipeDataNodeThriftRequestTest {
Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertArrayEquals(
+ new byte[] {'a', 'b'},
byteBufferToByteArray(deserializeReq.getByteBuffer()));
+ }
+
+ @Test
+ public void testPipeTransferTabletBinaryReqFromLegacyV13Body() {
+ // Not do real test here since "serializeToWal" needs private inner class
of walBuffer
+ final TPipeTransferReq req =
+ legacyTransferReq(
+ PipeRequestType.TRANSFER_TABLET_BINARY, ByteBuffer.wrap(new byte[]
{'a', 'b'}));
+ final PipeTransferTabletBinaryReq deserializeReq =
+ PipeTransferTabletBinaryReq.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertArrayEquals(
+ new byte[] {'a', 'b'},
byteBufferToByteArray(deserializeReq.getByteBuffer()));
}
@Test
@@ -285,6 +395,30 @@ public class PipeDataNodeThriftRequestTest {
Assert.assertEquals(req.getPlanNode(), deserializeReq.getPlanNode());
}
+ @Test
+ public void testPipeTransferPlanNodeReqFromLegacyV13SchemaPlanBody() {
+ final CreateAlignedTimeSeriesNode node =
+ new CreateAlignedTimeSeriesNode(
+ new PlanNodeId(""),
+ new PartialPath(new String[] {"root", "sg", "d"}),
+ Collections.singletonList("s"),
+ Collections.singletonList(TSDataType.INT32),
+ Collections.singletonList(TSEncoding.PLAIN),
+ Collections.singletonList(CompressionType.UNCOMPRESSED),
+ null,
+ null,
+ null);
+ final TPipeTransferReq req =
+ legacyTransferReq(PipeRequestType.TRANSFER_PLAN_NODE,
node.serializeToByteBuffer());
+
+ final PipeTransferPlanNodeReq deserializeReq =
+ PipeTransferPlanNodeReq.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertEquals(node, deserializeReq.getPlanNode());
+ }
+
@Test
public void testPipeTransferTabletReq() {
try {
@@ -518,6 +652,72 @@ public class PipeDataNodeThriftRequestTest {
insertTabletStatements.get(1).getMeasurements()[0]);
}
+ @Test
+ public void testPipeTransferTabletBatchReqWithLegacyTabletFormat() throws
IOException {
+ final List<ByteBuffer> tabletBuffers = new ArrayList<>();
+ tabletBuffers.add(serializeLegacyTabletRawBuffer(false));
+ tabletBuffers.add(serializeLegacyTabletRawBuffer(true));
+
+ final PipeTransferTabletBatchReq req =
+ PipeTransferTabletBatchReq.toTPipeTransferReq(Collections.emptyList(),
tabletBuffers);
+
+ final PipeTransferTabletBatchReq deserializedReq =
+ PipeTransferTabletBatchReq.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(2, deserializedReq.getTabletReqs().size());
+ Assert.assertFalse(deserializedReq.getTabletReqs().get(0).getIsAligned());
+ Assert.assertTrue(deserializedReq.getTabletReqs().get(1).getIsAligned());
+
+
assertLegacyTabletStatement(deserializedReq.getTabletReqs().get(0).constructStatement());
+
assertLegacyTabletStatement(deserializedReq.getTabletReqs().get(1).constructStatement());
+ }
+
+ @Test
+ public void testPipeTransferTabletBatchReqFromLegacyV13Body() throws
IOException {
+ final InsertRowNode node =
+ new InsertRowNode(
+ new PlanNodeId(""),
+ new PartialPath(new String[] {"root", "sg", "d"}),
+ false,
+ new String[] {"s"},
+ new TSDataType[] {TSDataType.INT32},
+ 1,
+ new Object[] {1},
+ false);
+ final TPipeTransferReq req =
+ legacyTransferReq(
+ PipeRequestType.TRANSFER_TABLET_BATCH,
+ serializeLegacyTabletBatchBody(
+ Collections.singletonList(node.serializeToByteBuffer()),
+
Collections.singletonList(serializeLegacyTabletRawBuffer(false))));
+
+ final PipeTransferTabletBatchReq deserializedReq =
+ PipeTransferTabletBatchReq.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializedReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializedReq.getType());
+ Assert.assertEquals(1, deserializedReq.getInsertNodeReqs().size());
+ Assert.assertEquals(1, deserializedReq.getTabletReqs().size());
+ Assert.assertEquals(node,
deserializedReq.getInsertNodeReqs().get(0).getInsertNode());
+
assertLegacyTabletStatement(deserializedReq.getTabletReqs().get(0).constructStatement());
+ }
+
+ @Test
+ public void testPipeTransferTabletRawReqWithLegacyTabletFormat() throws
IOException {
+ final TPipeTransferReq req = new TPipeTransferReq();
+ req.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
+ req.type = PipeRequestType.TRANSFER_TABLET_RAW.getType();
+ req.body = serializeLegacyTabletRawBuffer(true);
+
+ final PipeTransferTabletRawReq deserializedReq =
+ PipeTransferTabletRawReq.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializedReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializedReq.getType());
+ Assert.assertTrue(deserializedReq.getIsAligned());
+ assertLegacyTabletStatement(deserializedReq.constructStatement());
+ }
+
@Test
public void testPipeTransferTabletBatchReqV2() throws IOException {
final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
@@ -733,6 +933,38 @@ public class PipeDataNodeThriftRequestTest {
Assert.assertArrayEquals(req.getFilePiece(),
deserializeReq.getFilePiece());
}
+ @Test
+ public void testPipeTransferFilePieceReqsFromLegacyV13Bodies() throws
IOException {
+ final byte[] body = "legacyPiece".getBytes();
+
+ final PipeTransferTsFilePieceReq tsFilePieceReq =
+ PipeTransferTsFilePieceReq.fromTPipeTransferReq(
+ legacyTransferReq(
+ PipeRequestType.TRANSFER_TS_FILE_PIECE,
+ serializeLegacyFilePieceBody("1.tsfile", 1L, body)));
+ Assert.assertEquals("1.tsfile", tsFilePieceReq.getFileName());
+ Assert.assertEquals(1L, tsFilePieceReq.getStartWritingOffset());
+ Assert.assertArrayEquals(body, tsFilePieceReq.getFilePiece());
+
+ final PipeTransferTsFilePieceWithModReq tsFilePieceWithModReq =
+ PipeTransferTsFilePieceWithModReq.fromTPipeTransferReq(
+ legacyTransferReq(
+ PipeRequestType.TRANSFER_TS_FILE_PIECE_WITH_MOD,
+ serializeLegacyFilePieceBody("1.tsfile.mod", 2L, body)));
+ Assert.assertEquals("1.tsfile.mod", tsFilePieceWithModReq.getFileName());
+ Assert.assertEquals(2L, tsFilePieceWithModReq.getStartWritingOffset());
+ Assert.assertArrayEquals(body, tsFilePieceWithModReq.getFilePiece());
+
+ final PipeTransferSchemaSnapshotPieceReq schemaSnapshotPieceReq =
+ PipeTransferSchemaSnapshotPieceReq.fromTPipeTransferReq(
+ legacyTransferReq(
+ PipeRequestType.TRANSFER_SCHEMA_SNAPSHOT_PIECE,
+ serializeLegacyFilePieceBody("schema.snapshot", 3L, body)));
+ Assert.assertEquals("schema.snapshot",
schemaSnapshotPieceReq.getFileName());
+ Assert.assertEquals(3L, schemaSnapshotPieceReq.getStartWritingOffset());
+ Assert.assertArrayEquals(body, schemaSnapshotPieceReq.getFilePiece());
+ }
+
@Test
public void testPipeTransferTsFileSealReq() throws IOException {
final String fileName = "1.tsfile";
@@ -749,6 +981,87 @@ public class PipeDataNodeThriftRequestTest {
Assert.assertEquals(req.getFileLength(), deserializeReq.getFileLength());
}
+ @Test
+ public void testPipeTransferTsFileSealReqFromLegacyV13Body() throws
IOException {
+ final TPipeTransferReq req =
+ legacyTransferReq(
+ PipeRequestType.TRANSFER_TS_FILE_SEAL,
serializeLegacyFileSealV1Body("1.tsfile", 100L));
+
+ final PipeTransferTsFileSealReq deserializeReq =
+ PipeTransferTsFileSealReq.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertEquals("1.tsfile", deserializeReq.getFileName());
+ Assert.assertEquals(100L, deserializeReq.getFileLength());
+ }
+
+ @Test
+ public void testPipeTransferTsFileSealWithModReq() throws IOException {
+ final String modFileName = "1.tsfile.mod";
+ final String tsFileName = "1.tsfile";
+
+ final PipeTransferTsFileSealWithModReq req =
+ PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
+ modFileName, 10, tsFileName, 100, "root.db");
+ final PipeTransferTsFileSealWithModReq deserializeReq =
+ PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertEquals(Arrays.asList(modFileName, tsFileName),
deserializeReq.getFileNames());
+ Assert.assertEquals(Arrays.asList(10L, 100L),
deserializeReq.getFileLengths());
+ Assert.assertEquals("root.db",
deserializeReq.getDatabaseNameByTsFileName());
+ }
+
+ @Test
+ public void
testPipeTransferTsFileSealWithModReqFromLegacyV13BodyWithoutDatabaseName()
+ throws IOException {
+ final String modFileName = "1.tsfile.mod";
+ final String tsFileName = "1.tsfile";
+ final TPipeTransferReq req =
+ legacyTransferReq(
+ PipeRequestType.TRANSFER_TS_FILE_SEAL_WITH_MOD,
+ serializeLegacyFileSealV2Body(
+ Arrays.asList(modFileName, tsFileName),
+ Arrays.asList(10L, 100L),
+ Collections.emptyMap()));
+
+ final PipeTransferTsFileSealWithModReq deserializeReq =
+ PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertEquals(Arrays.asList(modFileName, tsFileName),
deserializeReq.getFileNames());
+ Assert.assertEquals(Arrays.asList(10L, 100L),
deserializeReq.getFileLengths());
+ Assert.assertTrue(deserializeReq.getParameters().isEmpty());
+ Assert.assertNull(deserializeReq.getDatabaseNameByTsFileName());
+ }
+
+ @Test
+ public void
testPipeTransferTsFileSealWithModReqFromLegacyV13BodyWithNullDatabaseName()
+ throws IOException {
+ final String modFileName = "1.tsfile.mod";
+ final String tsFileName = "1.tsfile";
+ final Map<String, String> parameters = new HashMap<>();
+ parameters.put("DATABASE_NAME_" + tsFileName, null);
+ final TPipeTransferReq req =
+ legacyTransferReq(
+ PipeRequestType.TRANSFER_TS_FILE_SEAL_WITH_MOD,
+ serializeLegacyFileSealV2Body(
+ Arrays.asList(modFileName, tsFileName), Arrays.asList(10L,
100L), parameters));
+
+ final PipeTransferTsFileSealWithModReq deserializeReq =
+ PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertEquals(Arrays.asList(modFileName, tsFileName),
deserializeReq.getFileNames());
+ Assert.assertEquals(Arrays.asList(10L, 100L),
deserializeReq.getFileLengths());
+ Assert.assertEquals(parameters, deserializeReq.getParameters());
+ Assert.assertNull(deserializeReq.getDatabaseNameByTsFileName());
+ }
+
@Test
public void testPipeTransferSchemaSnapshotSealReq() throws IOException {
final String mTreeSnapshotName = SchemaConstant.MTREE_SNAPSHOT;
@@ -784,6 +1097,36 @@ public class PipeDataNodeThriftRequestTest {
Assert.assertEquals(req.getParameters(), deserializeReq.getParameters());
}
+ @Test
+ public void testPipeTransferSchemaSnapshotSealReqFromLegacyV13Body() throws
IOException {
+ final String mTreeSnapshotName = SchemaConstant.MTREE_SNAPSHOT;
+ final String tLogName = SchemaConstant.TAG_LOG;
+ final Map<String, String> parameters = new HashMap<>();
+ parameters.put(ColumnHeaderConstant.PATH_PATTERN, "root.**");
+ parameters.put(ColumnHeaderConstant.DATABASE, "root.db");
+ parameters.put(ColumnHeaderConstant.TYPE, "19");
+
+ final TPipeTransferReq req =
+ legacyTransferReq(
+ PipeRequestType.TRANSFER_SCHEMA_SNAPSHOT_SEAL,
+ serializeLegacyFileSealV2Body(
+ Arrays.asList(mTreeSnapshotName, tLogName),
Arrays.asList(100L, 10L), parameters));
+ final PipeTransferSchemaSnapshotSealReq deserializeReq =
+ PipeTransferSchemaSnapshotSealReq.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertEquals(Arrays.asList(mTreeSnapshotName, tLogName),
deserializeReq.getFileNames());
+ Assert.assertEquals(Arrays.asList(100L, 10L),
deserializeReq.getFileLengths());
+ Assert.assertEquals(parameters, deserializeReq.getParameters());
+ Assert.assertTrue(
+ PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(
+ deserializeReq.getParameters()));
+ Assert.assertFalse(
+ PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(
+ deserializeReq.getParameters()));
+ }
+
@Test
public void testPipeTransferFilePieceResp() throws IOException {
final PipeTransferFilePieceResp resp =
@@ -795,6 +1138,83 @@ public class PipeDataNodeThriftRequestTest {
Assert.assertEquals(resp.getEndWritingOffset(),
deserializeResp.getEndWritingOffset());
}
+ private static TPipeTransferReq legacyTransferReq(
+ final PipeRequestType requestType, final ByteBuffer body) {
+ final TPipeTransferReq req = new TPipeTransferReq();
+ req.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
+ req.type = requestType.getType();
+ req.body = body;
+ return req;
+ }
+
+ private static ByteBuffer serializeLegacyFileSealV2Body(
+ final List<String> fileNames,
+ final List<Long> fileLengths,
+ final Map<String, String> parameters)
+ throws IOException {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(fileNames.size(), outputStream);
+ for (final String fileName : fileNames) {
+ ReadWriteIOUtils.write(fileName, outputStream);
+ }
+ ReadWriteIOUtils.write(fileLengths.size(), outputStream);
+ for (final Long fileLength : fileLengths) {
+ ReadWriteIOUtils.write(fileLength, outputStream);
+ }
+ ReadWriteIOUtils.write(parameters.size(), outputStream);
+ for (final Map.Entry<String, String> entry : parameters.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue(), outputStream);
+ }
+ return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+ }
+
+ private static ByteBuffer serializeLegacyHandshakeV1Body(final String
timestampPrecision)
+ throws IOException {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(timestampPrecision, outputStream);
+ return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+ }
+
+ private static ByteBuffer serializeLegacyHandshakeV2Body(final Map<String,
String> params)
+ throws IOException {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(params.size(), outputStream);
+ for (final Map.Entry<String, String> entry : params.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue(), outputStream);
+ }
+ return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+ }
+
+ private static ByteBuffer serializeLegacyFilePieceBody(
+ final String fileName, final long startWritingOffset, final byte[]
filePiece)
+ throws IOException {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(fileName, outputStream);
+ ReadWriteIOUtils.write(startWritingOffset, outputStream);
+ ReadWriteIOUtils.write(new Binary(filePiece), outputStream);
+ return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+ }
+
+ private static ByteBuffer serializeLegacyFileSealV1Body(
+ final String fileName, final long fileLength) throws IOException {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(fileName, outputStream);
+ ReadWriteIOUtils.write(fileLength, outputStream);
+ return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+ }
+
private static Tablet createSingleValueTablet(final String deviceId, final
String measurement) {
final List<IMeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema(measurement, TSDataType.INT32));
@@ -814,4 +1234,89 @@ public class PipeDataNodeThriftRequestTest {
return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
}
}
+
+ private static ByteBuffer serializeLegacyTabletRawBuffer(final boolean
isAligned)
+ throws IOException {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write("root.sg.d", outputStream);
+ ReadWriteIOUtils.write(2, outputStream);
+
+ ReadWriteIOUtils.write((byte) 1, outputStream);
+ ReadWriteIOUtils.write(2, outputStream);
+ writeLegacyMeasurementSchema(outputStream, "s1", TSDataType.INT32);
+ writeLegacyMeasurementSchema(outputStream, "s2", TSDataType.TEXT);
+
+ ReadWriteIOUtils.write((byte) 1, outputStream);
+ ReadWriteIOUtils.write(2L, outputStream);
+ ReadWriteIOUtils.write(1L, outputStream);
+
+ ReadWriteIOUtils.write((byte) 0, outputStream);
+
+ ReadWriteIOUtils.write((byte) 1, outputStream);
+ ReadWriteIOUtils.write((byte) 1, outputStream);
+ ReadWriteIOUtils.write(2, outputStream);
+ ReadWriteIOUtils.write(1, outputStream);
+ ReadWriteIOUtils.write((byte) 1, outputStream);
+ ReadWriteIOUtils.write((byte) 1, outputStream);
+ ReadWriteIOUtils.write(new Binary("2", TSFileConfig.STRING_CHARSET),
outputStream);
+ ReadWriteIOUtils.write((byte) 1, outputStream);
+ ReadWriteIOUtils.write(new Binary("1", TSFileConfig.STRING_CHARSET),
outputStream);
+
+ ReadWriteIOUtils.write(isAligned, outputStream);
+ return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+ }
+
+ private static ByteBuffer serializeLegacyTabletBatchBody(
+ final List<ByteBuffer> insertNodeBuffers, final List<ByteBuffer>
tabletBuffers)
+ throws IOException {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(0, outputStream);
+
+ ReadWriteIOUtils.write(insertNodeBuffers.size(), outputStream);
+ for (final ByteBuffer insertNodeBuffer : insertNodeBuffers) {
+ writeByteBuffer(outputStream, insertNodeBuffer);
+ }
+
+ ReadWriteIOUtils.write(tabletBuffers.size(), outputStream);
+ for (final ByteBuffer tabletBuffer : tabletBuffers) {
+ writeByteBuffer(outputStream, tabletBuffer);
+ }
+
+ return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+ }
+
+ private static void writeByteBuffer(
+ final DataOutputStream outputStream, final ByteBuffer byteBuffer) throws
IOException {
+ outputStream.write(byteBufferToByteArray(byteBuffer));
+ }
+
+ private static byte[] byteBufferToByteArray(final ByteBuffer byteBuffer) {
+ final ByteBuffer duplicatedBuffer = byteBuffer.duplicate();
+ final byte[] bytes = new byte[duplicatedBuffer.remaining()];
+ duplicatedBuffer.get(bytes);
+ return bytes;
+ }
+
+ private static void writeLegacyMeasurementSchema(
+ final DataOutputStream outputStream, final String measurement, final
TSDataType dataType)
+ throws IOException {
+ ReadWriteIOUtils.write((byte) 1, outputStream);
+ ReadWriteIOUtils.write(measurement, outputStream);
+ ReadWriteIOUtils.write(dataType.serialize(), outputStream);
+ ReadWriteIOUtils.write(TSEncoding.PLAIN.serialize(), outputStream);
+ ReadWriteIOUtils.write(CompressionType.UNCOMPRESSED.serialize(),
outputStream);
+ ReadWriteIOUtils.write(0, outputStream);
+ }
+
+ private static void assertLegacyTabletStatement(final InsertTabletStatement
statement) {
+ Assert.assertEquals("root.sg.d", statement.getDevicePath().getFullPath());
+ Assert.assertArrayEquals(new String[] {"s1", "s2"},
statement.getMeasurements());
+ Assert.assertArrayEquals(
+ new TSDataType[] {TSDataType.INT32, TSDataType.TEXT},
statement.getDataTypes());
+ Assert.assertEquals(2, statement.getRowCount());
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2.java
index cf73948c3f1..227d5871d8b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2.java
@@ -54,6 +54,14 @@ public abstract class PipeTransferFileSealReqV2 extends
TPipeTransferReq {
return parameters;
}
+ public static boolean isTreeModelDataAllowedToBeCaptured(final Map<String,
String> parameters) {
+ return parameters.containsKey(TREE) || !parameters.containsKey(TABLE);
+ }
+
+ public static boolean isTableModelDataAllowedToBeCaptured(final Map<String,
String> parameters) {
+ return parameters.containsKey(TABLE);
+ }
+
protected abstract PipeRequestType getPlanType();
/////////////////////////////// Thrift ///////////////////////////////
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java
index 290ce397980..cc836cae945 100644
---
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java
@@ -22,14 +22,20 @@ package
org.apache.iotdb.commons.pipe.sink.payload.thrift.common;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
public class PipeTransferSliceReqBuilderTest {
@@ -91,6 +97,32 @@ public class PipeTransferSliceReqBuilderTest {
createReq(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 4),
bodySizeLimit));
}
+ @Test
+ public void testPipeTransferSliceReqFromLegacyV13Body() throws IOException {
+ final TPipeTransferReq req = new TPipeTransferReq();
+ req.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
+ req.type = PipeRequestType.TRANSFER_SLICE.getType();
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(7, outputStream);
+ ReadWriteIOUtils.write(PipeRequestType.TRANSFER_TABLET_RAW.getType(),
outputStream);
+ ReadWriteIOUtils.write(6, outputStream);
+ ReadWriteIOUtils.write(new Binary(new byte[] {2, 3, 4}), outputStream);
+ ReadWriteIOUtils.write(1, outputStream);
+ ReadWriteIOUtils.write(2, outputStream);
+ req.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+
+ final PipeTransferSliceReq sliceReq =
PipeTransferSliceReq.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(7, sliceReq.getOrderId());
+ Assert.assertEquals(PipeRequestType.TRANSFER_TABLET_RAW.getType(),
sliceReq.getOriginReqType());
+ Assert.assertEquals(6, sliceReq.getOriginBodySize());
+ Assert.assertArrayEquals(new byte[] {2, 3, 4}, sliceReq.getSliceBody());
+ Assert.assertEquals(1, sliceReq.getSliceIndex());
+ Assert.assertEquals(2, sliceReq.getSliceCount());
+ }
+
private static TPipeTransferReq createReq(final byte version, final int
bodySize) {
final byte[] body = new byte[bodySize];
for (int i = 0; i < body.length; ++i) {
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeRequestTypeTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeRequestTypeTest.java
new file mode 100644
index 00000000000..c2e0ba949f9
--- /dev/null
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeRequestTypeTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.sink.payload.thrift.request;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PipeRequestTypeTest {
+
+ @Test
+ public void testAllV13RequestTypesAreRecognized() {
+ assertV13RequestType((short) 0, PipeRequestType.HANDSHAKE_CONFIGNODE_V1);
+ assertV13RequestType((short) 1, PipeRequestType.HANDSHAKE_DATANODE_V1);
+ assertV13RequestType((short) 50, PipeRequestType.HANDSHAKE_CONFIGNODE_V2);
+ assertV13RequestType((short) 51, PipeRequestType.HANDSHAKE_DATANODE_V2);
+
+ assertV13RequestType((short) 2,
PipeRequestType.TRANSFER_TABLET_INSERT_NODE);
+ assertV13RequestType((short) 3, PipeRequestType.TRANSFER_TABLET_RAW);
+ assertV13RequestType((short) 4, PipeRequestType.TRANSFER_TS_FILE_PIECE);
+ assertV13RequestType((short) 5, PipeRequestType.TRANSFER_TS_FILE_SEAL);
+ assertV13RequestType((short) 6, PipeRequestType.TRANSFER_TABLET_BATCH);
+ assertV13RequestType((short) 7, PipeRequestType.TRANSFER_TABLET_BINARY);
+ assertV13RequestType((short) 8,
PipeRequestType.TRANSFER_TS_FILE_PIECE_WITH_MOD);
+ assertV13RequestType((short) 9,
PipeRequestType.TRANSFER_TS_FILE_SEAL_WITH_MOD);
+
+ // 1.3 named this request TRANSFER_SCHEMA_PLAN. 2.0 keeps the same wire
type.
+ assertV13RequestType((short) 100, PipeRequestType.TRANSFER_PLAN_NODE);
+ assertV13RequestType((short) 101,
PipeRequestType.TRANSFER_SCHEMA_SNAPSHOT_PIECE);
+ assertV13RequestType((short) 102,
PipeRequestType.TRANSFER_SCHEMA_SNAPSHOT_SEAL);
+
+ assertV13RequestType((short) 200, PipeRequestType.TRANSFER_CONFIG_PLAN);
+ assertV13RequestType((short) 201,
PipeRequestType.TRANSFER_CONFIG_SNAPSHOT_PIECE);
+ assertV13RequestType((short) 202,
PipeRequestType.TRANSFER_CONFIG_SNAPSHOT_SEAL);
+
+ assertV13RequestType((short) 300, PipeRequestType.TRANSFER_COMPRESSED);
+ assertV13RequestType((short) 400, PipeRequestType.TRANSFER_SLICE);
+ }
+
+ private static void assertV13RequestType(
+ final short type, final PipeRequestType expectedRequestType) {
+ Assert.assertTrue(PipeRequestType.isValidatedRequestType(type));
+ Assert.assertEquals(expectedRequestType, PipeRequestType.valueOf(type));
+ }
+}
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferCompressedReqTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferCompressedReqTest.java
new file mode 100644
index 00000000000..ccdbc5086a5
--- /dev/null
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferCompressedReqTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.sink.payload.thrift.request;
+
+import org.apache.iotdb.commons.pipe.sink.compressor.PipeCompressor;
+import org.apache.iotdb.commons.pipe.sink.compressor.PipeCompressorFactory;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class PipeTransferCompressedReqTest {
+
+ @Test
+ public void testPipeTransferCompressedReq() throws IOException {
+ final TPipeTransferReq originalReq = new TPipeTransferReq();
+ originalReq.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
+ originalReq.type = PipeRequestType.TRANSFER_TABLET_BINARY.getType();
+ originalReq.body = ByteBuffer.wrap(new byte[] {1, 2, 3, 4});
+
+ final TPipeTransferReq compressedReq =
+ PipeTransferCompressedReq.toTPipeTransferReq(
+ originalReq,
+ Collections.singletonList(
+ PipeCompressorFactory.getCompressor(
+ PipeCompressor.PipeCompressionType.GZIP.getIndex())));
+ final TPipeTransferReq decompressedReq =
+ PipeTransferCompressedReq.fromTPipeTransferReq(compressedReq);
+
+ Assert.assertEquals(IoTDBSinkRequestVersion.VERSION_1.getVersion(),
compressedReq.version);
+ Assert.assertEquals(PipeRequestType.TRANSFER_COMPRESSED.getType(),
compressedReq.type);
+ Assert.assertEquals(originalReq.version, decompressedReq.version);
+ Assert.assertEquals(originalReq.type, decompressedReq.type);
+ Assert.assertArrayEquals(originalReq.getBody(), decompressedReq.getBody());
+ }
+
+ @Test
+ public void testPipeTransferCompressedReqFromLegacyV13Body() throws
IOException {
+ final TPipeTransferReq originalReq = new TPipeTransferReq();
+ originalReq.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
+ originalReq.type = PipeRequestType.TRANSFER_TABLET_BINARY.getType();
+ originalReq.body = ByteBuffer.wrap(new byte[] {1, 2, 3, 4});
+
+ final TPipeTransferReq compressedReq = new TPipeTransferReq();
+ compressedReq.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
+ compressedReq.type = PipeRequestType.TRANSFER_COMPRESSED.getType();
+ compressedReq.body =
+ serializeLegacyCompressedBody(
+ originalReq,
+ Collections.singletonList(
+ PipeCompressorFactory.getCompressor(
+ PipeCompressor.PipeCompressionType.GZIP.getIndex())));
+
+ final TPipeTransferReq decompressedReq =
+ PipeTransferCompressedReq.fromTPipeTransferReq(compressedReq);
+
+ Assert.assertEquals(originalReq.version, decompressedReq.version);
+ Assert.assertEquals(originalReq.type, decompressedReq.type);
+ Assert.assertArrayEquals(originalReq.getBody(), decompressedReq.getBody());
+ }
+
+ private static ByteBuffer serializeLegacyCompressedBody(
+ final TPipeTransferReq originalReq, final List<PipeCompressor>
compressors)
+ throws IOException {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ byte[] body =
+ BytesUtils.concatByteArrayList(
+ Arrays.asList(
+ new byte[] {originalReq.version},
+ BytesUtils.shortToBytes(originalReq.type),
+ originalReq.getBody()));
+
+ ReadWriteIOUtils.write((byte) compressors.size(), outputStream);
+ for (final PipeCompressor compressor : compressors) {
+ ReadWriteIOUtils.write(compressor.serialize(), outputStream);
+ ReadWriteIOUtils.write(body.length, outputStream);
+ body = compressor.compress(body);
+ }
+ outputStream.write(body);
+
+ return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+ }
+}
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2Test.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2Test.java
new file mode 100644
index 00000000000..4ac0e5cb7c3
--- /dev/null
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2Test.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.sink.payload.thrift.request;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class PipeTransferFileSealReqV2Test {
+
+ @Test
+ public void testLegacyV13SnapshotSealCapturesTreeOnly() {
+ final Map<String, String> parameters = new HashMap<>();
+
+
Assert.assertTrue(PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters));
+
Assert.assertFalse(PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters));
+ }
+
+ @Test
+ public void testExplicitTreeOnlySnapshotSealCapturesTreeOnly() {
+ final Map<String, String> parameters = new HashMap<>();
+ parameters.put(PipeTransferFileSealReqV2.TREE, "");
+
+
Assert.assertTrue(PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters));
+
Assert.assertFalse(PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters));
+ }
+
+ @Test
+ public void testExplicitTableOnlySnapshotSealCapturesTableOnly() {
+ final Map<String, String> parameters = new HashMap<>();
+ parameters.put(PipeTransferFileSealReqV2.TABLE, "");
+
+
Assert.assertFalse(PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters));
+
Assert.assertTrue(PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters));
+ }
+
+ @Test
+ public void testExplicitTreeAndTableSnapshotSealCapturesBoth() {
+ final Map<String, String> parameters = new HashMap<>();
+ parameters.put(PipeTransferFileSealReqV2.TREE, "");
+ parameters.put(PipeTransferFileSealReqV2.TABLE, "");
+
+
Assert.assertTrue(PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters));
+
Assert.assertTrue(PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters));
+ }
+}