This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new feb61e8d83a Pipe: Fix InsertRowsNode is not supported in batch mode
(#12517)
feb61e8d83a is described below
commit feb61e8d83a8e4ef3e995690de5b6b29185067e2
Author: Caideyipi <[email protected]>
AuthorDate: Mon May 13 18:39:58 2024 +0800
Pipe: Fix InsertRowsNode is not supported in batch mode (#12517)
---
.../request/PipeTransferTabletBatchReq.java | 23 ++++++---
.../request/PipeTransferTabletBinaryReq.java | 57 +++++++---------------
.../request/PipeTransferTabletInsertNodeReq.java | 13 ++---
3 files changed, 39 insertions(+), 54 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBatchReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBatchReq.java
index 73e1d2b82c5..8090f650489 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBatchReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBatchReq.java
@@ -70,6 +70,9 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
insertRowStatementList.add((InsertRowStatement) statement);
} else if (statement instanceof InsertTabletStatement) {
insertTabletStatementList.add((InsertTabletStatement) statement);
+ } else if (statement instanceof InsertRowsStatement) {
+ insertRowStatementList.addAll(
+ ((InsertRowsStatement) statement).getInsertRowStatementList());
} else {
throw new UnsupportedOperationException(
String.format(
@@ -87,11 +90,14 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
insertRowStatementList.add((InsertRowStatement) statement);
} else if (statement instanceof InsertTabletStatement) {
insertTabletStatementList.add((InsertTabletStatement) statement);
+ } else if (statement instanceof InsertRowsStatement) {
+ insertRowStatementList.addAll(
+ ((InsertRowsStatement) statement).getInsertRowStatementList());
} else {
throw new UnsupportedOperationException(
String.format(
- "unknown InsertBaseStatement %s constructed from
PipeTransferTabletInsertNodeReq.",
- insertNodeReq));
+ "Unknown InsertBaseStatement %s constructed from
PipeTransferTabletInsertNodeReq.",
+ statement));
}
}
@@ -111,9 +117,9 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
/////////////////////////////// Thrift ///////////////////////////////
public static PipeTransferTabletBatchReq toTPipeTransferReq(
- List<ByteBuffer> binaryBuffers,
- List<ByteBuffer> insertNodeBuffers,
- List<ByteBuffer> tabletBuffers)
+ final List<ByteBuffer> binaryBuffers,
+ final List<ByteBuffer> insertNodeBuffers,
+ final List<ByteBuffer> tabletBuffers)
throws IOException {
final PipeTransferTabletBatchReq batchReq = new
PipeTransferTabletBatchReq();
@@ -147,7 +153,8 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
return batchReq;
}
- public static PipeTransferTabletBatchReq
fromTPipeTransferReq(TPipeTransferReq transferReq) {
+ public static PipeTransferTabletBatchReq fromTPipeTransferReq(
+ final TPipeTransferReq transferReq) {
final PipeTransferTabletBatchReq batchReq = new
PipeTransferTabletBatchReq();
int size = ReadWriteIOUtils.readInt(transferReq.body);
@@ -200,14 +207,14 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
/////////////////////////////// Object ///////////////////////////////
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
- PipeTransferTabletBatchReq that = (PipeTransferTabletBatchReq) obj;
+ final PipeTransferTabletBatchReq that = (PipeTransferTabletBatchReq) obj;
return binaryReqs.equals(that.binaryReqs)
&& insertNodeReqs.equals(that.insertNodeReqs)
&& tabletReqs.equals(that.tabletReqs)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBinaryReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBinaryReq.java
index ae8da894347..5e9e0a39103 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBinaryReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBinaryReq.java
@@ -21,13 +21,13 @@ package
org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
+import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -51,41 +51,17 @@ public class PipeTransferTabletBinaryReq extends
TPipeTransferReq {
public InsertBaseStatement constructStatement() {
final InsertNode insertNode = parseByteBuffer();
- if (insertNode instanceof InsertRowNode) {
- final InsertRowNode node = (InsertRowNode) insertNode;
-
- final InsertRowStatement statement = new InsertRowStatement();
- statement.setDevicePath(node.getDevicePath());
- statement.setTime(node.getTime());
- statement.setMeasurements(node.getMeasurements());
- statement.setDataTypes(node.getDataTypes());
- statement.setValues(node.getValues());
- statement.setNeedInferType(node.isNeedInferType());
- statement.setAligned(node.isAligned());
- statement.setMeasurementSchemas(node.getMeasurementSchemas());
- return statement;
+ if (!(insertNode instanceof InsertRowNode
+ || insertNode instanceof InsertTabletNode
+ || insertNode instanceof InsertRowsNode)) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unknown InsertNode type %s when constructing statement from
insert node.",
+ insertNode));
}
- if (insertNode instanceof InsertTabletNode) {
- final InsertTabletNode node = (InsertTabletNode) insertNode;
-
- final InsertTabletStatement statement = new InsertTabletStatement();
- statement.setDevicePath(node.getDevicePath());
- statement.setMeasurements(node.getMeasurements());
- statement.setTimes(node.getTimes());
- statement.setColumns(node.getColumns());
- statement.setBitMaps(node.getBitMaps());
- statement.setRowCount(node.getRowCount());
- statement.setDataTypes(node.getDataTypes());
- statement.setAligned(node.isAligned());
- statement.setMeasurementSchemas(node.getMeasurementSchemas());
- return statement;
- }
-
- throw new UnsupportedOperationException(
- String.format(
- "unknown InsertNode type %s when constructing statement from
insert node.",
- insertNode));
+ return (InsertBaseStatement)
+ IoTDBDataNodeReceiver.PLAN_TO_STATEMENT_VISITOR.process(insertNode,
null);
}
private InsertNode parseByteBuffer() {
@@ -95,7 +71,7 @@ public class PipeTransferTabletBinaryReq extends
TPipeTransferReq {
/////////////////////////////// Thrift ///////////////////////////////
- public static PipeTransferTabletBinaryReq toTPipeTransferReq(ByteBuffer
byteBuffer) {
+ public static PipeTransferTabletBinaryReq toTPipeTransferReq(final
ByteBuffer byteBuffer) {
final PipeTransferTabletBinaryReq req = new PipeTransferTabletBinaryReq();
req.byteBuffer = byteBuffer;
@@ -106,7 +82,8 @@ public class PipeTransferTabletBinaryReq extends
TPipeTransferReq {
return req;
}
- public static PipeTransferTabletBinaryReq
fromTPipeTransferReq(TPipeTransferReq transferReq) {
+ public static PipeTransferTabletBinaryReq fromTPipeTransferReq(
+ final TPipeTransferReq transferReq) {
final PipeTransferTabletBinaryReq binaryReq = new
PipeTransferTabletBinaryReq();
binaryReq.byteBuffer = transferReq.body;
@@ -119,7 +96,7 @@ public class PipeTransferTabletBinaryReq extends
TPipeTransferReq {
/////////////////////////////// Air Gap ///////////////////////////////
- public static byte[] toTPipeTransferBytes(ByteBuffer byteBuffer) throws
IOException {
+ public static byte[] toTPipeTransferBytes(final ByteBuffer byteBuffer)
throws IOException {
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(),
outputStream);
@@ -131,14 +108,14 @@ public class PipeTransferTabletBinaryReq extends
TPipeTransferReq {
/////////////////////////////// Object ///////////////////////////////
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
- PipeTransferTabletBinaryReq that = (PipeTransferTabletBinaryReq) obj;
+ final PipeTransferTabletBinaryReq that = (PipeTransferTabletBinaryReq) obj;
return byteBuffer.equals(that.byteBuffer)
&& version == that.version
&& type == that.type
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
index 7fad79e843e..c45417ba99d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
@@ -66,7 +66,7 @@ public class PipeTransferTabletInsertNodeReq extends
TPipeTransferReq {
/////////////////////////////// WriteBack & Batch
///////////////////////////////
- public static PipeTransferTabletInsertNodeReq
toTPipeTransferRawReq(InsertNode insertNode) {
+ public static PipeTransferTabletInsertNodeReq toTPipeTransferRawReq(final
InsertNode insertNode) {
final PipeTransferTabletInsertNodeReq req = new
PipeTransferTabletInsertNodeReq();
req.insertNode = insertNode;
@@ -76,7 +76,7 @@ public class PipeTransferTabletInsertNodeReq extends
TPipeTransferReq {
/////////////////////////////// Thrift ///////////////////////////////
- public static PipeTransferTabletInsertNodeReq toTPipeTransferReq(InsertNode
insertNode) {
+ public static PipeTransferTabletInsertNodeReq toTPipeTransferReq(final
InsertNode insertNode) {
final PipeTransferTabletInsertNodeReq req = new
PipeTransferTabletInsertNodeReq();
req.insertNode = insertNode;
@@ -88,7 +88,8 @@ public class PipeTransferTabletInsertNodeReq extends
TPipeTransferReq {
return req;
}
- public static PipeTransferTabletInsertNodeReq
fromTPipeTransferReq(TPipeTransferReq transferReq) {
+ public static PipeTransferTabletInsertNodeReq fromTPipeTransferReq(
+ final TPipeTransferReq transferReq) {
final PipeTransferTabletInsertNodeReq insertNodeReq = new
PipeTransferTabletInsertNodeReq();
insertNodeReq.insertNode = (InsertNode)
PlanNodeType.deserialize(transferReq.body);
@@ -101,7 +102,7 @@ public class PipeTransferTabletInsertNodeReq extends
TPipeTransferReq {
}
/////////////////////////////// Air Gap ///////////////////////////////
- public static byte[] toTPipeTransferBytes(InsertNode insertNode) throws
IOException {
+ public static byte[] toTPipeTransferBytes(final InsertNode insertNode)
throws IOException {
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(),
outputStream);
@@ -114,14 +115,14 @@ public class PipeTransferTabletInsertNodeReq extends
TPipeTransferReq {
/////////////////////////////// Object ///////////////////////////////
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
- PipeTransferTabletInsertNodeReq that = (PipeTransferTabletInsertNodeReq)
obj;
+ final PipeTransferTabletInsertNodeReq that =
(PipeTransferTabletInsertNodeReq) obj;
return Objects.equals(insertNode, that.insertNode)
&& version == that.version
&& type == that.type