This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3b9f59fc9e8 [Pipe] Fix omitted legacy tablet compatibility follow-ups
(#17929)
3b9f59fc9e8 is described below
commit 3b9f59fc9e8c7f95af18d639569b32e73fdfec03
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 16 09:37:27 2026 +0800
[Pipe] Fix omitted legacy tablet compatibility follow-ups (#17929)
* Fix 1.3.7 binaryBuffers
* Update PipeDataNodeThriftRequestTest.java
* Hotfix
* spotless
---
.../request/PipeTransferTabletBatchReq.java | 98 ++++++++++++++--
.../request/PipeTransferTabletRawReq.java | 13 ++-
.../pipe/sink/util/TabletStatementConverter.java | 87 +++++++++++---
.../pipe/sink/PipeDataNodeThriftRequestTest.java | 128 ++++++++++++++++++++-
4 files changed, 294 insertions(+), 32 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
index ede3370f5b0..340232c8b62 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
@@ -45,6 +45,7 @@ import java.util.Objects;
public class PipeTransferTabletBatchReq extends TPipeTransferReq {
+ private final transient List<PipeTransferTabletBinaryReq> binaryReqs = new
ArrayList<>();
private final transient List<PipeTransferTabletInsertNodeReq> insertNodeReqs
= new ArrayList<>();
private final transient List<PipeTransferTabletRawReq> tabletReqs = new
ArrayList<>();
@@ -60,6 +61,26 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
final List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
final List<InsertTabletStatement> insertTabletStatementList = new
ArrayList<>();
+ for (final PipeTransferTabletBinaryReq binaryReq : binaryReqs) {
+ final InsertBaseStatement statement = binaryReq.constructStatement();
+ if (statement.isEmpty()) {
+ continue;
+ }
+ if (statement instanceof InsertRowStatement) {
+ insertRowStatementList.add((InsertRowStatement) statement);
+ } else if (statement instanceof InsertTabletStatement) {
+ insertTabletStatementList.add((InsertTabletStatement) statement);
+ } else if (statement instanceof InsertRowsStatement) {
+ insertRowStatementList.addAll(
+ ((InsertRowsStatement) statement).getInsertRowStatementList());
+ } else {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unknown InsertBaseStatement %s constructed from
PipeTransferTabletBinaryReq.",
+ statement));
+ }
+ }
+
for (final PipeTransferTabletInsertNodeReq insertNodeReq : insertNodeReqs)
{
final InsertBaseStatement statement = insertNodeReq.constructStatement();
if (statement.isEmpty()) {
@@ -132,19 +153,52 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
final PipeTransferTabletBatchReq batchReq = new
PipeTransferTabletBatchReq();
final TabletStringInternPool tabletStringInternPool = new
TabletStringInternPool();
- // Binary size, for rolling upgrade
- ReadWriteIOUtils.readInt(transferReq.body);
- int size = ReadWriteIOUtils.readInt(transferReq.body);
+ // Legacy 1.3.x batch bodies may carry WAL binary requests before insert
nodes and tablets.
+ int size = readNonNegativeSize(transferReq.body, "binary request count");
for (int i = 0; i < size; ++i) {
- batchReq.insertNodeReqs.add(
- PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(
- (InsertNode) PlanFragment.deserializeHelper(transferReq.body,
null)));
+ final int length = readNonNegativeSize(transferReq.body, "binary request
body length");
+ if (length > transferReq.body.remaining()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Invalid binary request body length %s, remaining body length
%s.",
+ length, transferReq.body.remaining()));
+ }
+ final byte[] body = new byte[length];
+ transferReq.body.get(body);
+ batchReq.binaryReqs.add(
+
PipeTransferTabletBinaryReq.toTPipeTransferReq(ByteBuffer.wrap(body)));
}
- size = ReadWriteIOUtils.readInt(transferReq.body);
+ size = readNonNegativeSize(transferReq.body, "insert node count");
for (int i = 0; i < size; ++i) {
- batchReq.tabletReqs.add(
- PipeTransferTabletRawReq.toTPipeTransferRawReq(transferReq.body,
tabletStringInternPool));
+ final int startPosition = transferReq.body.position();
+ try {
+ batchReq.insertNodeReqs.add(
+ PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(
+ (InsertNode) PlanFragment.deserializeHelper(transferReq.body,
null)));
+ } catch (final RuntimeException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Failed to deserialize insert node %s/%s in tablet batch at
body position %s with remaining body length %s.",
+ i + 1, size, startPosition, transferReq.body.remaining()),
+ e);
+ }
+ }
+
+ size = readNonNegativeSize(transferReq.body, "raw tablet count");
+ for (int i = 0; i < size; ++i) {
+ final int startPosition = transferReq.body.position();
+ try {
+ batchReq.tabletReqs.add(
+ PipeTransferTabletRawReq.toTPipeTransferRawReq(
+ transferReq.body, tabletStringInternPool));
+ } catch (final RuntimeException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Failed to deserialize raw tablet %s/%s in tablet batch at
body position %s with remaining body length %s.",
+ i + 1, size, startPosition, transferReq.body.remaining()),
+ e);
+ }
}
batchReq.version = transferReq.version;
@@ -153,8 +207,29 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
return batchReq;
}
+ private static int readNonNegativeSize(final ByteBuffer buffer, final String
fieldName) {
+ if (buffer.remaining() < Integer.BYTES) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Insufficient bytes to read %s in tablet batch, remaining body
length %s.",
+ fieldName, buffer.remaining()));
+ }
+
+ final int size = ReadWriteIOUtils.readInt(buffer);
+ if (size < 0) {
+ throw new IllegalArgumentException(
+ String.format("Invalid negative %s %s in tablet batch.", fieldName,
size));
+ }
+ return size;
+ }
+
/////////////////////////////// TestOnly ///////////////////////////////
+ @TestOnly
+ public List<PipeTransferTabletBinaryReq> getBinaryReqs() {
+ return binaryReqs;
+ }
+
@TestOnly
public List<PipeTransferTabletInsertNodeReq> getInsertNodeReqs() {
return insertNodeReqs;
@@ -176,7 +251,8 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
return false;
}
final PipeTransferTabletBatchReq that = (PipeTransferTabletBatchReq) obj;
- return insertNodeReqs.equals(that.insertNodeReqs)
+ return binaryReqs.equals(that.binaryReqs)
+ && insertNodeReqs.equals(that.insertNodeReqs)
&& tabletReqs.equals(that.tabletReqs)
&& version == that.version
&& type == that.type
@@ -185,6 +261,6 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
@Override
public int hashCode() {
- return Objects.hash(insertNodeReqs, tabletReqs, version, type, body);
+ return Objects.hash(binaryReqs, insertNodeReqs, tabletReqs, version, type,
body);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
index 63db24a37ba..e80b10c95e2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
@@ -188,8 +188,17 @@ public class PipeTransferTabletRawReq extends
TPipeTransferReq {
buffer.position(startPosition);
}
- tablet = PipeTabletUtils.internTablet(Tablet.deserialize(buffer),
tabletStringInternPool);
- isAligned = ReadWriteIOUtils.readBool(buffer);
+ try {
+ tablet = PipeTabletUtils.internTablet(Tablet.deserialize(buffer),
tabletStringInternPool);
+ isAligned = ReadWriteIOUtils.readBool(buffer);
+ } catch (final RuntimeException e) {
+ buffer.position(startPosition);
+ throw new IllegalArgumentException(
+ String.format(
+ "Failed to deserialize raw tablet request at body position %s
with remaining body length %s.",
+ startPosition, buffer.remaining()),
+ e);
+ }
}
private static void ensureStatementDeserializedFromCurrentTabletFormat(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
index 405859dd982..5afe02658d9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
@@ -32,7 +32,6 @@ import org.apache.tsfile.enums.ColumnCategory;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
-import org.apache.tsfile.utils.BytesUtils;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.UnSupportedDataTypeException;
@@ -116,12 +115,19 @@ public class TabletStatementConverter {
intern(ReadWriteIOUtils.readString(byteBuffer),
tabletStringInternPool);
final int rowSize = ReadWriteIOUtils.readInt(byteBuffer);
+ if (rowSize < 0) {
+ throw new IllegalArgumentException(
+ String.format("Invalid row size %s in tablet format
deserialization.", rowSize));
+ }
// deserialize schemas
final int schemaSize =
- BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer))
- ? ReadWriteIOUtils.readInt(byteBuffer)
- : 0;
+ readBooleanByte(byteBuffer, "schema existence") ?
ReadWriteIOUtils.readInt(byteBuffer) : 0;
+ if (schemaSize < 0) {
+ throw new IllegalArgumentException(
+ String.format("Invalid schema size %s in tablet format
deserialization.", schemaSize));
+ }
+ ensureRemaining(byteBuffer, schemaSize, "measurement schema existence
flags");
final String[] measurement = new String[schemaSize];
final TsTableColumnCategory[] columnCategories = new
TsTableColumnCategory[schemaSize];
final TSDataType[] dataTypes = new TSDataType[schemaSize];
@@ -148,15 +154,26 @@ public class TabletStatementConverter {
// Deserialize and calculate memory in the same loop
for (int i = 0; i < schemaSize; i++) {
- final boolean hasSchema =
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+ final boolean hasSchema = readBooleanByte(byteBuffer, "measurement
schema existence");
if (hasSchema) {
final Pair<String, TSDataType> pair = readMeasurement(byteBuffer,
tabletStringInternPool);
measurement[i] = pair.getLeft();
dataTypes[i] = pair.getRight();
if (readColumnCategory) {
+ if (!byteBuffer.hasRemaining()) {
+ throw new IllegalArgumentException(
+ "Missing column category in current tablet format
deserialization.");
+ }
+ final byte columnCategory = byteBuffer.get();
+ if (columnCategory < 0 || columnCategory >=
ColumnCategory.values().length) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Invalid column category %s in current tablet format
deserialization.",
+ columnCategory));
+ }
columnCategories[i] =
TsTableColumnCategory.fromTsFileColumnCategory(
- ColumnCategory.values()[byteBuffer.get()]);
+ ColumnCategory.values()[columnCategory]);
}
// Calculate memory for each measurement string
@@ -178,6 +195,15 @@ public class TabletStatementConverter {
memorySize += measurementMemorySize;
memorySize += dataTypesMemorySize;
+ final boolean isTimesNotNull = readBooleanByte(byteBuffer, "timestamp
column existence");
+ if (rowSize > 0 && !isTimesNotNull) {
+ throw new IllegalArgumentException(
+ "Missing timestamps in tablet format deserialization with non-empty
rows.");
+ }
+ if (isTimesNotNull) {
+ ensureRemaining(byteBuffer, (long) Long.BYTES * rowSize, "timestamps");
+ }
+
// deserialize times and calculate memory during deserialization
final long[] times = new long[rowSize];
// Calculate memory: array header + long size * rowSize
@@ -185,7 +211,6 @@ public class TabletStatementConverter {
org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
NUM_BYTES_ARRAY_HEADER + (long) Long.BYTES * rowSize);
- final boolean isTimesNotNull =
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
if (isTimesNotNull) {
for (int i = 0; i < rowSize; i++) {
times[i] = ReadWriteIOUtils.readLong(byteBuffer);
@@ -199,7 +224,7 @@ public class TabletStatementConverter {
final BitMap[] bitMaps;
final long bitMapsMemorySize;
- final boolean isBitMapsNotNull =
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+ final boolean isBitMapsNotNull = readBooleanByte(byteBuffer, "bitmap
column existence");
if (isBitMapsNotNull) {
// Use the method that returns both BitMap array and memory size
final Pair<BitMap[], Long> bitMapsAndMemory =
@@ -218,7 +243,11 @@ public class TabletStatementConverter {
final Object[] values;
final long valuesMemorySize;
- final boolean isValuesNotNull =
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+ final boolean isValuesNotNull = readBooleanByte(byteBuffer, "value column
existence");
+ if (rowSize > 0 && schemaSize > 0 && !isValuesNotNull) {
+ throw new IllegalArgumentException(
+ "Missing values in tablet format deserialization with non-empty
rows.");
+ }
if (isValuesNotNull) {
// Use the method that returns both values array and memory size
final Pair<Object[], Long> valuesAndMemory =
@@ -236,7 +265,7 @@ public class TabletStatementConverter {
// Add values memory to total
memorySize += valuesMemorySize;
- final boolean isAligned = ReadWriteIOUtils.readBoolean(byteBuffer);
+ final boolean isAligned = readBooleanByte(byteBuffer, "alignment");
statement.setMeasurements(measurement);
statement.setTimes(times);
@@ -321,6 +350,30 @@ public class TabletStatementConverter {
}
}
+ private static boolean readBooleanByte(final ByteBuffer buffer, final String
fieldName) {
+ if (!buffer.hasRemaining()) {
+ throw new IllegalArgumentException(
+ String.format("Missing %s flag in tablet format deserialization.",
fieldName));
+ }
+
+ final byte value = ReadWriteIOUtils.readByte(buffer);
+ if (value != 0 && value != 1) {
+ throw new IllegalArgumentException(
+ String.format("Invalid %s flag %s in tablet format
deserialization.", fieldName, value));
+ }
+ return value == 1;
+ }
+
+ private static void ensureRemaining(
+ final ByteBuffer buffer, final long expectedSize, final String
fieldName) {
+ if (expectedSize > buffer.remaining()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Insufficient bytes for %s in tablet format deserialization,
expected %s, remaining %s.",
+ fieldName, expectedSize, buffer.remaining()));
+ }
+ }
+
/**
* Read measurement name and data type from buffer, skipping other
measurement schema fields
* (encoding, compression, and tags/attributes) that are not needed for
InsertTabletStatement.
@@ -364,9 +417,13 @@ public class TabletStatementConverter {
boolean hasMarkedBitMap = false;
for (int i = 0; i < columns; i++) {
- final boolean hasBitMap =
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+ final boolean hasBitMap = readBooleanByte(byteBuffer, "bitmap
existence");
if (hasBitMap) {
final int size = ReadWriteIOUtils.readInt(byteBuffer);
+ if (size < 0) {
+ throw new IllegalArgumentException(
+ String.format("Invalid bitmap size %s in tablet format
deserialization.", size));
+ }
final Binary valueBinary = ReadWriteIOUtils.readBinary(byteBuffer);
final byte[] byteArray = valueBinary.getValues();
final BitMap bitMap = new BitMap(size, byteArray);
@@ -416,8 +473,7 @@ public class TabletStatementConverter {
NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * columns);
for (int i = 0; i < columns; i++) {
- final boolean isValueColumnsNotNull =
- BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+ final boolean isValueColumnsNotNull = readBooleanByte(byteBuffer, "value
column existence");
if (types[i] == null) {
continue;
}
@@ -427,7 +483,7 @@ public class TabletStatementConverter {
final boolean[] boolValues = new boolean[rowSize];
if (isValueColumnsNotNull) {
for (int index = 0; index < rowSize; index++) {
- boolValues[index] =
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+ boolValues[index] = readBooleanByte(byteBuffer, "boolean value");
}
}
values[i] = boolValues;
@@ -503,8 +559,7 @@ public class TabletStatementConverter {
if (isValueColumnsNotNull) {
for (int index = 0; index < rowSize; index++) {
- final boolean isNotNull =
- BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+ final boolean isNotNull = readBooleanByte(byteBuffer, "binary
value existence");
if (isNotNull) {
binaryValues[index] = ReadWriteIOUtils.readBinary(byteBuffer);
// Calculate memory for each Binary object during
deserialization
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
index 88f7353f519..7641070800f 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
@@ -702,6 +702,43 @@ public class PipeDataNodeThriftRequestTest {
assertLegacyTabletStatement(deserializedReq.getTabletReqs().get(0).constructStatement());
}
+ @Test
+ public void testPipeTransferTabletBatchReqFromLegacyV13BodyWithBinaryReqs()
throws IOException {
+ final InsertRowNode node =
+ new InsertRowNode(
+ new PlanNodeId(""),
+ new PartialPath(new String[] {"root", "sg", "d"}),
+ false,
+ new String[] {"s"},
+ new TSDataType[] {TSDataType.INT32},
+ 1,
+ new Object[] {1},
+ false);
+ final ByteBuffer binaryBuffer = ByteBuffer.wrap(new byte[] {'a', 'b'});
+
+ final TPipeTransferReq req =
+ legacyTransferReq(
+ PipeRequestType.TRANSFER_TABLET_BATCH,
+ serializeLegacyTabletBatchBody(
+ Collections.singletonList(binaryBuffer),
+ Collections.singletonList(node.serializeToByteBuffer()),
+
Collections.singletonList(serializeLegacyTabletRawBuffer(false))));
+
+ final PipeTransferTabletBatchReq deserializedReq =
+ PipeTransferTabletBatchReq.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializedReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializedReq.getType());
+ Assert.assertEquals(1, deserializedReq.getBinaryReqs().size());
+ Assert.assertArrayEquals(
+ new byte[] {'a', 'b'},
+
byteBufferToByteArray(deserializedReq.getBinaryReqs().get(0).getByteBuffer()));
+ Assert.assertEquals(1, deserializedReq.getInsertNodeReqs().size());
+ Assert.assertEquals(1, deserializedReq.getTabletReqs().size());
+ Assert.assertEquals(node,
deserializedReq.getInsertNodeReqs().get(0).getInsertNode());
+
assertLegacyTabletStatement(deserializedReq.getTabletReqs().get(0).constructStatement());
+ }
+
@Test
public void testPipeTransferTabletRawReqWithLegacyTabletFormat() throws
IOException {
final TPipeTransferReq req = new TPipeTransferReq();
@@ -718,6 +755,51 @@ public class PipeDataNodeThriftRequestTest {
assertLegacyTabletStatement(deserializedReq.constructStatement());
}
+ @Test
+ public void testPipeTransferTabletRawReqWithSingleColumnLegacyTabletFormat()
throws IOException {
+ final TPipeTransferReq req = new TPipeTransferReq();
+ req.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
+ req.type = PipeRequestType.TRANSFER_TABLET_RAW.getType();
+ req.body = serializeSingleColumnLegacyTabletRawBuffer(false);
+
+ final PipeTransferTabletRawReq deserializedReq =
+ PipeTransferTabletRawReq.fromTPipeTransferReq(req);
+
+ Assert.assertFalse(deserializedReq.getIsAligned());
+ final InsertTabletStatement statement =
deserializedReq.constructStatement();
+ Assert.assertEquals("root.sg.d", statement.getDevicePath().getFullPath());
+ Assert.assertArrayEquals(new String[] {"s1"}, statement.getMeasurements());
+ Assert.assertArrayEquals(new TSDataType[] {TSDataType.INT32},
statement.getDataTypes());
+ Assert.assertEquals(2, statement.getRowCount());
+ Assert.assertArrayEquals(new long[] {1700000000000L, 1700000000001L},
statement.getTimes());
+ Assert.assertArrayEquals(new int[] {2, 1}, (int[])
statement.getColumns()[0]);
+ }
+
+ @Test
+ public void testPipeTransferTabletBatchReqRejectsTruncatedRawTablet() throws
IOException {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(0, outputStream);
+ ReadWriteIOUtils.write(0, outputStream);
+ ReadWriteIOUtils.write(1, outputStream);
+ outputStream.write(new byte[] {1, 0, 0, 0, 0, 0});
+
+ final TPipeTransferReq req =
+ legacyTransferReq(
+ PipeRequestType.TRANSFER_TABLET_BATCH,
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size()));
+
+ try {
+ PipeTransferTabletBatchReq.fromTPipeTransferReq(req);
+ Assert.fail("Expected IllegalArgumentException");
+ } catch (final IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("Failed to deserialize raw
tablet"));
+ Assert.assertTrue(
+ e.getCause().getMessage().contains("Failed to deserialize raw
tablet request"));
+ }
+ }
+ }
+
@Test
public void testPipeTransferTabletBatchReqV2() throws IOException {
final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
@@ -1248,8 +1330,8 @@ public class PipeDataNodeThriftRequestTest {
writeLegacyMeasurementSchema(outputStream, "s2", TSDataType.TEXT);
ReadWriteIOUtils.write((byte) 1, outputStream);
- ReadWriteIOUtils.write(2L, outputStream);
- ReadWriteIOUtils.write(1L, outputStream);
+ ReadWriteIOUtils.write(1700000000000L, outputStream);
+ ReadWriteIOUtils.write(1700000000001L, outputStream);
ReadWriteIOUtils.write((byte) 0, outputStream);
@@ -1268,12 +1350,52 @@ public class PipeDataNodeThriftRequestTest {
}
}
+ private static ByteBuffer serializeSingleColumnLegacyTabletRawBuffer(final
boolean isAligned)
+ throws IOException {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write("root.sg.d", outputStream);
+ ReadWriteIOUtils.write(2, outputStream);
+
+ ReadWriteIOUtils.write((byte) 1, outputStream);
+ ReadWriteIOUtils.write(1, outputStream);
+ writeLegacyMeasurementSchema(outputStream, "s1", TSDataType.INT32);
+
+ ReadWriteIOUtils.write((byte) 1, outputStream);
+ ReadWriteIOUtils.write(1700000000000L, outputStream);
+ ReadWriteIOUtils.write(1700000000001L, outputStream);
+
+ ReadWriteIOUtils.write((byte) 0, outputStream);
+
+ ReadWriteIOUtils.write((byte) 1, outputStream);
+ ReadWriteIOUtils.write((byte) 1, outputStream);
+ ReadWriteIOUtils.write(2, outputStream);
+ ReadWriteIOUtils.write(1, outputStream);
+
+ ReadWriteIOUtils.write(isAligned, outputStream);
+ return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+ }
+
private static ByteBuffer serializeLegacyTabletBatchBody(
final List<ByteBuffer> insertNodeBuffers, final List<ByteBuffer>
tabletBuffers)
throws IOException {
+ return serializeLegacyTabletBatchBody(
+ Collections.emptyList(), insertNodeBuffers, tabletBuffers);
+ }
+
+ private static ByteBuffer serializeLegacyTabletBatchBody(
+ final List<ByteBuffer> binaryBuffers,
+ final List<ByteBuffer> insertNodeBuffers,
+ final List<ByteBuffer> tabletBuffers)
+ throws IOException {
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
- ReadWriteIOUtils.write(0, outputStream);
+ ReadWriteIOUtils.write(binaryBuffers.size(), outputStream);
+ for (final ByteBuffer binaryBuffer : binaryBuffers) {
+ ReadWriteIOUtils.write(binaryBuffer.limit(), outputStream);
+ writeByteBuffer(outputStream, binaryBuffer);
+ }
ReadWriteIOUtils.write(insertNodeBuffers.size(), outputStream);
for (final ByteBuffer insertNodeBuffer : insertNodeBuffers) {