This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1b981972bd7 Pipe: Support InsertRows in
PipeInsertNodeTabletInsertionEvent (#12286)
1b981972bd7 is described below
commit 1b981972bd7b95259e75a5dcb9fb74c740d51fc4
Author: Caideyipi <[email protected]>
AuthorDate: Fri Apr 26 14:31:44 2024 +0800
Pipe: Support InsertRows in PipeInsertNodeTabletInsertionEvent (#12286)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../response/pipe/task/PipeTableResp.java | 8 +-
.../iotdb/confignode/manager/ProcedureManager.java | 2 +-
.../receiver/IoTDBConfigNodeReceiverAgent.java | 6 +-
.../impl/pipe/task/AlterPipeProcedureV2.java | 2 +-
.../request/PipeTransferTabletInsertNodeReq.java | 46 ++------
.../protocol/legacy/IoTDBLegacyPipeConnector.java | 11 +-
.../connector/protocol/opcua/OpcUaConnector.java | 4 +-
.../protocol/websocket/WebSocketConnector.java | 17 ++-
.../websocket/WebSocketConnectorServer.java | 5 +-
.../db/pipe/event/common/row/PipeRowCollector.java | 4 +-
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 121 +++++++++++++++------
.../tablet/TabletInsertionDataContainer.java | 15 ++-
.../realtime/PipeRealtimeDataRegionExtractor.java | 6 +-
.../processor/aggregate/AggregateProcessor.java | 2 +-
.../twostage/plugin/TwoStageCountProcessor.java | 3 +
.../protocol/thrift/IoTDBDataNodeReceiver.java | 13 ++-
.../visitor/PipePlanToStatementVisitor.java | 105 +++++++++++++-----
.../pipe/task/connection/PipeEventCollector.java | 3 +-
.../planner/plan/node/write/InsertRowsNode.java | 9 +-
.../broker/SubscriptionPrefetchingQueue.java | 28 ++---
.../commons/pipe/task/meta/PipeStaticMeta.java | 29 ++---
21 files changed, 274 insertions(+), 165 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
index 31aabb6f031..9385f43f9a8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
@@ -51,13 +51,13 @@ public class PipeTableResp implements DataSet {
return allPipeMeta;
}
- public PipeTableResp filter(Boolean whereClause, String pipeName) {
+ public PipeTableResp filter(final Boolean whereClause, final String
pipeName) {
if (whereClause == null || !whereClause) {
if (pipeName == null) {
return this;
} else {
final List<PipeMeta> filteredPipeMeta = new ArrayList<>();
- for (PipeMeta pipeMeta : allPipeMeta) {
+ for (final PipeMeta pipeMeta : allPipeMeta) {
if (pipeMeta.getStaticMeta().getPipeName().equals(pipeName)) {
filteredPipeMeta.add(pipeMeta);
break;
@@ -70,7 +70,7 @@ public class PipeTableResp implements DataSet {
return this;
} else {
String sortedConnectorParametersString = null;
- for (PipeMeta pipeMeta : allPipeMeta) {
+ for (final PipeMeta pipeMeta : allPipeMeta) {
if (pipeMeta.getStaticMeta().getPipeName().equals(pipeName)) {
sortedConnectorParametersString =
pipeMeta.getStaticMeta().getConnectorParameters().toString();
@@ -79,7 +79,7 @@ public class PipeTableResp implements DataSet {
}
final List<PipeMeta> filteredPipeMeta = new ArrayList<>();
- for (PipeMeta pipeMeta : allPipeMeta) {
+ for (final PipeMeta pipeMeta : allPipeMeta) {
if (pipeMeta
.getStaticMeta()
.getConnectorParameters()
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index ed4bf94df68..58cdac06271 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -1153,7 +1153,7 @@ public class ProcedureManager {
final Procedure<ConfigNodeProcedureEnv> finishedProcedure =
executor.getResultOrProcedure(procedureId);
if (!finishedProcedure.isFinished()) {
- // the procedure is still executing
+ // The procedure is still executing
statusList.add(
RpcUtils.getStatus(TSStatusCode.OVERLAP_WITH_EXISTING_TASK,
PROCEDURE_TIMEOUT_MESSAGE));
isSucceed = false;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/receiver/IoTDBConfigNodeReceiverAgent.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/receiver/IoTDBConfigNodeReceiverAgent.java
index 3b7b5557dca..fce5367398f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/receiver/IoTDBConfigNodeReceiverAgent.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/receiver/IoTDBConfigNodeReceiverAgent.java
@@ -41,17 +41,17 @@ public class IoTDBConfigNodeReceiverAgent extends
IoTDBReceiverAgent {
}
@Override
- protected IoTDBReceiver getReceiverWithSpecifiedClient(String key) {
+ protected IoTDBReceiver getReceiverWithSpecifiedClient(final String key) {
return clientKey2ReceiverMap.get(key);
}
@Override
- protected void setReceiverWithSpecifiedClient(String key, IoTDBReceiver
receiver) {
+ protected void setReceiverWithSpecifiedClient(final String key, final
IoTDBReceiver receiver) {
clientKey2ReceiverMap.put(key, receiver);
}
@Override
- protected void removeReceiverWithSpecifiedClient(String key) {
+ protected void removeReceiverWithSpecifiedClient(final String key) {
clientKey2ReceiverMap.remove(key);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index 3fa1d11226b..936ad32e01d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -111,7 +111,7 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
final Map<Integer, PipeTaskMeta> currentConsensusGroupId2PipeTaskMeta =
currentPipeRuntimeMeta.getConsensusGroupId2TaskMetaMap();
- // deep copy reused attributes
+ // Deep copy reused attributes
updatedPipeStaticMeta =
new PipeStaticMeta(
alterPipeRequest.getPipeName(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
index 88fe4d738db..7fad79e843e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
@@ -21,13 +21,13 @@ package
org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
+import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.tsfile.utils.BytesUtils;
@@ -51,41 +51,17 @@ public class PipeTransferTabletInsertNodeReq extends
TPipeTransferReq {
}
public InsertBaseStatement constructStatement() {
- if (insertNode instanceof InsertRowNode) {
- final InsertRowNode node = (InsertRowNode) insertNode;
-
- final InsertRowStatement statement = new InsertRowStatement();
- statement.setDevicePath(node.getDevicePath());
- statement.setTime(node.getTime());
- statement.setMeasurements(node.getMeasurements());
- statement.setDataTypes(node.getDataTypes());
- statement.setValues(node.getValues());
- statement.setNeedInferType(node.isNeedInferType());
- statement.setAligned(node.isAligned());
- statement.setMeasurementSchemas(node.getMeasurementSchemas());
- return statement;
+ if (!(insertNode instanceof InsertRowNode
+ || insertNode instanceof InsertTabletNode
+ || insertNode instanceof InsertRowsNode)) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unknown InsertNode type %s when constructing statement from
insert node.",
+ insertNode));
}
- if (insertNode instanceof InsertTabletNode) {
- final InsertTabletNode node = (InsertTabletNode) insertNode;
-
- final InsertTabletStatement statement = new InsertTabletStatement();
- statement.setDevicePath(node.getDevicePath());
- statement.setMeasurements(node.getMeasurements());
- statement.setTimes(node.getTimes());
- statement.setColumns(node.getColumns());
- statement.setBitMaps(node.getBitMaps());
- statement.setRowCount(node.getRowCount());
- statement.setDataTypes(node.getDataTypes());
- statement.setAligned(node.isAligned());
- statement.setMeasurementSchemas(node.getMeasurementSchemas());
- return statement;
- }
-
- throw new UnsupportedOperationException(
- String.format(
- "unknown InsertNode type %s when constructing statement from
insert node.",
- insertNode));
+ return (InsertBaseStatement)
+ IoTDBDataNodeReceiver.PLAN_TO_STATEMENT_VISITOR.process(insertNode,
null);
}
/////////////////////////////// WriteBack & Batch
///////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index dd12caf55a1..355d354c385 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -325,11 +325,12 @@ public class IoTDBLegacyPipeConnector implements
PipeConnector {
private void doTransfer(final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeInsertionEvent)
throws IoTDBConnectionException, StatementExecutionException {
- final Tablet tablet = pipeInsertNodeInsertionEvent.convertToTablet();
- if (pipeInsertNodeInsertionEvent.isAligned()) {
- sessionPool.insertAlignedTablet(tablet);
- } else {
- sessionPool.insertTablet(tablet);
+ for (final Tablet tablet :
pipeInsertNodeInsertionEvent.convertToTablets()) {
+ if (pipeInsertNodeInsertionEvent.isAligned()) {
+ sessionPool.insertAlignedTablet(tablet);
+ } else {
+ sessionPool.insertTablet(tablet);
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
index 5efb8000236..fa1c372db28 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
@@ -186,7 +186,9 @@ public class OpcUaConnector implements PipeConnector {
OpcUaConnector.class.getName())) {
return;
}
- transferTablet(server,
pipeInsertNodeTabletInsertionEvent.convertToTablet());
+ for (final Tablet tablet :
pipeInsertNodeTabletInsertionEvent.convertToTablets()) {
+ transferTablet(server, tablet);
+ }
} finally {
pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(
OpcUaConnector.class.getName(), false);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index 049ab3d2e05..e1e5a5dd44c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.pipe.connector.protocol.websocket;
import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
@@ -93,7 +94,7 @@ public class WebSocketConnector implements PipeConnector {
}
@Override
- public void transfer(TabletInsertionEvent tabletInsertionEvent) {
+ public void transfer(final TabletInsertionEvent tabletInsertionEvent) {
if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
&& !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
LOGGER.warn(
@@ -103,6 +104,20 @@ public class WebSocketConnector implements PipeConnector {
return;
}
+ if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+ final PipeInsertNodeTabletInsertionEvent event =
+ (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
+ for (final PipeRawTabletInsertionEvent rawTabletInsertionEvent :
+ event.toRawTabletInsertionEvents()) {
+ // Skip report if any tablet events is added
+ event.skipReportOnCommit();
+ // Transfer raw tablet insertion event to make sure one event binds
+ // to only one tablet in the server
+ transfer(rawTabletInsertionEvent);
+ }
+ return;
+ }
+
((EnrichedEvent) tabletInsertionEvent)
.increaseReferenceCount(WebSocketConnector.class.getName());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
index 441bc0450df..2db12e0a10e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.pipe.connector.protocol.websocket;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -368,9 +367,7 @@ public class WebSocketConnectorServer extends
WebSocketServer {
try {
ByteBuffer tabletBuffer;
- if (event instanceof PipeInsertNodeTabletInsertionEvent) {
- tabletBuffer = ((PipeInsertNodeTabletInsertionEvent)
event).convertToTablet().serialize();
- } else if (event instanceof PipeRawTabletInsertionEvent) {
+ if (event instanceof PipeRawTabletInsertionEvent) {
tabletBuffer = ((PipeRawTabletInsertionEvent)
event).convertToTablet().serialize();
} else {
throw new NotImplementedException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
index bf2d1659fe4..1b747652e1c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
@@ -112,11 +112,11 @@ public class PipeRowCollector implements RowCollector {
this.tablet = null;
}
- public Iterable<TabletInsertionEvent> convertToTabletInsertionEvents() {
+ public List<TabletInsertionEvent> convertToTabletInsertionEvents(final
boolean shouldReport) {
collectTabletInsertionEvent();
final int eventListSize = tabletInsertionEventList.size();
- if (eventListSize > 0) { // The last event should report progress
+ if (eventListSize > 0 && shouldReport) { // The last event should report
progress
((PipeRawTabletInsertionEvent)
tabletInsertionEventList.get(eventListSize - 1))
.markAsNeedToReport();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 798cb1da706..dad53f2b69d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
@@ -41,8 +42,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import java.util.Objects;
import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent
implements TabletInsertionEvent {
@@ -54,7 +59,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
private final boolean isAligned;
private final boolean isGeneratedByPipe;
- private TabletInsertionDataContainer dataContainer;
+ private List<TabletInsertionDataContainer> dataContainers;
private ProgressIndex progressIndex;
@@ -129,8 +134,11 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
public boolean internallyDecreaseResourceReferenceCount(String
holderMessage) {
try {
PipeResourceManager.wal().unpin(walEntryHandler);
- // Release the container's memory.
- dataContainer = null;
+ // Release the containers' memory.
+ if (dataContainers != null) {
+ dataContainers.clear();
+ dataContainers = null;
+ }
return true;
} catch (Exception e) {
LOGGER.warn(
@@ -190,6 +198,14 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
}
// We assume that `timestamps` is ordered.
return startTime <= timestamps[timestamps.length - 1] && timestamps[0]
<= endTime;
+ } else if (insertNode instanceof InsertRowsNode) {
+ for (final InsertRowNode node : ((InsertRowsNode)
insertNode).getInsertRowNodeList()) {
+ final long timestamp = node.getTime();
+ if (startTime <= timestamp && timestamp <= endTime) {
+ return true;
+ }
+ }
+ return false;
} else {
throw new UnSupportedDataTypeException(
String.format("InsertNode type %s is not supported.",
insertNode.getClass().getName()));
@@ -209,28 +225,18 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
@Override
public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row,
RowCollector> consumer) {
- try {
- if (dataContainer == null) {
- dataContainer =
- new TabletInsertionDataContainer(pipeTaskMeta, this,
getInsertNode(), pipePattern);
- }
- return dataContainer.processRowByRow(consumer);
- } catch (Exception e) {
- throw new PipeException("Process row by row error.", e);
- }
+ return initDataContainers().stream()
+ .map(tabletInsertionDataContainer ->
tabletInsertionDataContainer.processRowByRow(consumer))
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
}
@Override
public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet,
RowCollector> consumer) {
- try {
- if (dataContainer == null) {
- dataContainer =
- new TabletInsertionDataContainer(pipeTaskMeta, this,
getInsertNode(), pipePattern);
- }
- return dataContainer.processTablet(consumer);
- } catch (Exception e) {
- throw new PipeException("Process tablet error.", e);
- }
+ return initDataContainers().stream()
+ .map(tabletInsertionDataContainer ->
tabletInsertionDataContainer.processTablet(consumer))
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
}
/////////////////////////// convertToTablet ///////////////////////////
@@ -239,21 +245,55 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
return isAligned;
}
- public Tablet convertToTablet() {
+ public List<Tablet> convertToTablets() {
+ return initDataContainers().stream()
+ .map(TabletInsertionDataContainer::convertToTablet)
+ .collect(Collectors.toList());
+ }
+
+ /////////////////////////// dataContainer ///////////////////////////
+
+ private List<TabletInsertionDataContainer> initDataContainers() {
try {
- if (dataContainer == null) {
- dataContainer =
- new TabletInsertionDataContainer(pipeTaskMeta, this,
getInsertNode(), pipePattern);
+ if (dataContainers != null) {
+ return dataContainers;
+ }
+
+ dataContainers = new ArrayList<>();
+ final InsertNode node = getInsertNode();
+ switch (node.getType()) {
+ case INSERT_ROW:
+ case INSERT_TABLET:
+ dataContainers.add(
+ new TabletInsertionDataContainer(pipeTaskMeta, this, node,
pipePattern));
+ break;
+ case INSERT_ROWS:
+ for (final InsertRowNode insertRowNode : ((InsertRowsNode)
node).getInsertRowNodeList()) {
+ dataContainers.add(
+ new TabletInsertionDataContainer(pipeTaskMeta, this,
insertRowNode, pipePattern));
+ }
+ break;
+ default:
+ throw new UnSupportedDataTypeException("Unsupported node type " +
node.getType());
+ }
+
+ final int size = dataContainers.size();
+ if (size > 0) {
+ dataContainers.get(size - 1).markAsNeedToReport();
}
- return dataContainer.convertToTablet();
+
+ return dataContainers;
} catch (Exception e) {
- throw new PipeException("Convert to tablet error.", e);
+ throw new PipeException("Initialize data container error.", e);
}
}
public long count() {
- final Tablet covertedTablet = convertToTablet();
- return (long) covertedTablet.rowSize * covertedTablet.getSchemas().size();
+ long count = 0;
+ for (final Tablet covertedTablet : convertToTablets()) {
+ count += (long) covertedTablet.rowSize *
covertedTablet.getSchemas().size();
+ }
+ return count;
}
/////////////////////////// parsePatternOrTime ///////////////////////////
@@ -266,9 +306,22 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
&& (Objects.isNull(node) ||
!pipePattern.coversDevice(node.getDevicePath().getFullPath()));
}
- public PipeRawTabletInsertionEvent parseEventWithPatternOrTime() {
- return new PipeRawTabletInsertionEvent(
- convertToTablet(), isAligned, pipeName, pipeTaskMeta, this, true);
+ public List<PipeRawTabletInsertionEvent> toRawTabletInsertionEvents() {
+ final List<PipeRawTabletInsertionEvent> events =
+ convertToTablets().stream()
+ .map(
+ tablet ->
+ new PipeRawTabletInsertionEvent(
+ tablet, isAligned, pipeName, pipeTaskMeta, this,
false))
+ .filter(event -> !event.hasNoNeedParsingAndIsEmpty())
+ .collect(Collectors.toList());
+
+ final int size = events.size();
+ if (size > 0) {
+ events.get(size - 1).markAsNeedToReport();
+ }
+
+ return events;
}
/////////////////////////// Object ///////////////////////////
@@ -276,8 +329,8 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
@Override
public String toString() {
return String.format(
- "PipeInsertNodeTabletInsertionEvent{walEntryHandler=%s,
progressIndex=%s, isAligned=%s, isGeneratedByPipe=%s, dataContainer=%s}",
- walEntryHandler, progressIndex, isAligned, isGeneratedByPipe,
dataContainer)
+ "PipeInsertNodeTabletInsertionEvent{walEntryHandler=%s,
progressIndex=%s, isAligned=%s, isGeneratedByPipe=%s, dataContainers=%s}",
+ walEntryHandler, progressIndex, isAligned, isGeneratedByPipe,
dataContainers)
+ " - "
+ super.toString();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
index 750a77d209e..e95bc5373b1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
@@ -75,6 +75,9 @@ public class TabletInsertionDataContainer {
private Tablet tablet;
+ // Whether the container shall report progress
+ private boolean shouldReport = false;
+
private static final Integer CACHED_FULL_ROW_INDEX_LIST_ROW_COUNT_UPPER = 16;
private static final Map<Integer, List<Integer>> cachedFullRowIndexList =
new HashMap<>();
@@ -124,6 +127,10 @@ public class TabletInsertionDataContainer {
return isAligned;
}
+ public void markAsNeedToReport() {
+ shouldReport = true;
+ }
+
//////////////////////////// parse ////////////////////////////
private void parse(InsertRowNode insertRowNode, PipePattern pattern) {
@@ -551,7 +558,7 @@ public class TabletInsertionDataContainer {
//////////////////////////// process ////////////////////////////
- public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row,
RowCollector> consumer) {
+ public List<TabletInsertionEvent> processRowByRow(BiConsumer<Row,
RowCollector> consumer) {
if (valueColumns.length == 0 || timestampColumn.length == 0) {
return Collections.emptyList();
}
@@ -571,13 +578,13 @@ public class TabletInsertionDataContainer {
columnNameStringList),
rowCollector);
}
- return rowCollector.convertToTabletInsertionEvents();
+ return rowCollector.convertToTabletInsertionEvents(shouldReport);
}
- public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet,
RowCollector> consumer) {
+ public List<TabletInsertionEvent> processTablet(BiConsumer<Tablet,
RowCollector> consumer) {
final PipeRowCollector rowCollector = new PipeRowCollector(pipeTaskMeta,
sourceEvent);
consumer.accept(convertToTablet(), rowCollector);
- return rowCollector.convertToTabletInsertionEvents();
+ return rowCollector.convertToTabletInsertionEvents(shouldReport);
}
//////////////////////////// convertToTablet ////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
index c939a9002c7..2c37ca06fb2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
@@ -290,7 +290,7 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
}
if (!pendingQueue.waitedOffer(event)) {
- // this would not happen, but just in case.
+ // This would not happen, but just in case.
// pendingQueue is unbounded, so it should never reach capacity.
LOGGER.error(
"extract: pending queue of PipeRealtimeDataRegionHybridExtractor {} "
@@ -301,7 +301,7 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
// Do not report exception since the PipeHeartbeatEvent doesn't affect
// the correction of pipe progress.
- // ignore this event.
+ // Ignore this event.
event.decreaseReferenceCount(PipeRealtimeDataRegionExtractor.class.getName(),
false);
}
}
@@ -344,7 +344,7 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
if
(event.increaseReferenceCount(PipeRealtimeDataRegionExtractor.class.getName()))
{
return event.getEvent();
} else {
- // if the event's reference count can not be increased, it means the
data represented by
+ // If the event's reference count can not be increased, it means the
data represented by
// this event is not reliable anymore. the data has been lost. we simply
discard this
// event and report the exception to PipeRuntimeAgent.
final String errorMessage =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
index 4d19195e0f4..dde1fab5a28 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
@@ -534,7 +534,7 @@ public class AggregateProcessor implements PipeProcessor {
exception.set(e);
}
rowCollector
- .convertToTabletInsertionEvents()
+ .convertToTabletInsertionEvents(false)
.forEach(
tabletEvent -> {
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
index cbc14d976dc..2017051866f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
@@ -211,6 +211,8 @@ public class TwoStageCountProcessor implements
PipeProcessor {
collectGlobalCountIfNecessary(eventCollector);
commitLocalProgressIndexIfNecessary();
triggerCombineIfNecessary();
+ eventCollector.collect(event);
+ return;
}
if (event instanceof PipeWatermarkEvent) {
@@ -218,6 +220,7 @@ public class TwoStageCountProcessor implements
PipeProcessor {
new Pair<>(
new long[] {((PipeWatermarkEvent) event).getWatermark(),
localCount.get()},
localCommitProgressIndex.get()));
+ // TODO: Collect watermark events. We ignore it because they may cause
OOM in collector.
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index d8b4c12c733..c9626c3ba81 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -105,8 +105,11 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
private static final String[] RECEIVER_FILE_BASE_DIRS =
IOTDB_CONFIG.getPipeReceiverFileDirs();
private static FolderManager folderManager = null;
- private final PipeStatementTSStatusVisitor statusVisitor = new
PipeStatementTSStatusVisitor();
- private final PipeStatementExceptionVisitor exceptionVisitor =
+ public static final PipePlanToStatementVisitor PLAN_TO_STATEMENT_VISITOR =
+ new PipePlanToStatementVisitor();
+ private static final PipeStatementTSStatusVisitor STATEMENT_STATUS_VISITOR =
+ new PipeStatementTSStatusVisitor();
+ private static final PipeStatementExceptionVisitor
STATEMENT_EXCEPTION_VISITOR =
new PipeStatementExceptionVisitor();
private final PipeStatementToBatchVisitor batchVisitor = new
PipeStatementToBatchVisitor();
@@ -329,7 +332,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
.alterLogicalViewByPipe((AlterLogicalViewNode)
req.getPlanNode()))
: new TPipeTransferResp(
executeStatementAndClassifyExceptions(
- new PipePlanToStatementVisitor().process(req.getPlanNode(),
null)));
+ PLAN_TO_STATEMENT_VISITOR.process(req.getPlanNode(), null)));
}
private TPipeTransferResp handleTransferConfigPlan(final TPipeTransferReq
req) {
@@ -361,7 +364,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
receiverId.get(),
statement,
result);
- return statement.accept(statusVisitor, result);
+ return statement.accept(STATEMENT_STATUS_VISITOR, result);
}
} catch (final Exception e) {
LOGGER.warn(
@@ -369,7 +372,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
receiverId.get(),
statement,
e);
- return statement.accept(exceptionVisitor, e);
+ return statement.accept(STATEMENT_EXCEPTION_VISITOR, e);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java
index 44b3d67d0c0..32e3e6947ed 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java
@@ -34,8 +34,14 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.Int
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.MeasurementGroup;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.DeleteDataStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateTimeSeriesStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.AlterTimeSeriesStatement;
@@ -54,11 +60,12 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.stream.Collectors;
public class PipePlanToStatementVisitor extends PlanVisitor<Statement, Void> {
@Override
- public Statement visitPlan(PlanNode node, Void context) {
+ public Statement visitPlan(final PlanNode node, final Void context) {
throw new UnsupportedOperationException(
String.format(
"PipePlanToStatementVisitor does not support visiting general
plan, PlanNode: %s",
@@ -66,8 +73,48 @@ public class PipePlanToStatementVisitor extends
PlanVisitor<Statement, Void> {
}
@Override
- public CreateTimeSeriesStatement visitCreateTimeSeries(CreateTimeSeriesNode
node, Void context) {
- CreateTimeSeriesStatement statement = new CreateTimeSeriesStatement();
+ public InsertRowStatement visitInsertRow(final InsertRowNode node, final
Void context) {
+ final InsertRowStatement statement = new InsertRowStatement();
+ statement.setDevicePath(node.getDevicePath());
+ statement.setTime(node.getTime());
+ statement.setMeasurements(node.getMeasurements());
+ statement.setDataTypes(node.getDataTypes());
+ statement.setValues(node.getValues());
+ statement.setNeedInferType(node.isNeedInferType());
+ statement.setAligned(node.isAligned());
+ statement.setMeasurementSchemas(node.getMeasurementSchemas());
+ return statement;
+ }
+
+ @Override
+ public InsertTabletStatement visitInsertTablet(final InsertTabletNode node,
final Void context) {
+ final InsertTabletStatement statement = new InsertTabletStatement();
+ statement.setDevicePath(node.getDevicePath());
+ statement.setMeasurements(node.getMeasurements());
+ statement.setTimes(node.getTimes());
+ statement.setColumns(node.getColumns());
+ statement.setBitMaps(node.getBitMaps());
+ statement.setRowCount(node.getRowCount());
+ statement.setDataTypes(node.getDataTypes());
+ statement.setAligned(node.isAligned());
+ statement.setMeasurementSchemas(node.getMeasurementSchemas());
+ return statement;
+ }
+
+ @Override
+ public InsertRowsStatement visitInsertRows(final InsertRowsNode node, final
Void context) {
+ final InsertRowsStatement statement = new InsertRowsStatement();
+ statement.setInsertRowStatementList(
+ node.getInsertRowNodeList().stream()
+ .map(insertRowNode -> visitInsertRow(insertRowNode, context))
+ .collect(Collectors.toList()));
+ return statement;
+ }
+
+ @Override
+ public CreateTimeSeriesStatement visitCreateTimeSeries(
+ final CreateTimeSeriesNode node, final Void context) {
+ final CreateTimeSeriesStatement statement = new
CreateTimeSeriesStatement();
statement.setPath(node.getPath());
statement.setDataType(node.getDataType());
statement.setEncoding(node.getEncoding());
@@ -81,8 +128,8 @@ public class PipePlanToStatementVisitor extends
PlanVisitor<Statement, Void> {
@Override
public CreateAlignedTimeSeriesStatement visitCreateAlignedTimeSeries(
- CreateAlignedTimeSeriesNode node, Void context) {
- CreateAlignedTimeSeriesStatement statement = new
CreateAlignedTimeSeriesStatement();
+ final CreateAlignedTimeSeriesNode node, final Void context) {
+ final CreateAlignedTimeSeriesStatement statement = new
CreateAlignedTimeSeriesStatement();
statement.setDataTypes(node.getDataTypes());
statement.setCompressors(node.getCompressors());
statement.setEncodings(node.getEncodings());
@@ -96,17 +143,17 @@ public class PipePlanToStatementVisitor extends
PlanVisitor<Statement, Void> {
@Override
public CreateMultiTimeSeriesStatement visitCreateMultiTimeSeries(
- CreateMultiTimeSeriesNode node, Void context) {
- CreateMultiTimeSeriesStatement statement = new
CreateMultiTimeSeriesStatement();
-
- List<PartialPath> paths = new ArrayList<>();
- List<TSDataType> dataTypes = new ArrayList<>();
- List<TSEncoding> encodings = new ArrayList<>();
- List<CompressionType> compressors = new ArrayList<>();
- List<Map<String, String>> propsList = new ArrayList<>();
- List<String> aliasList = new ArrayList<>();
- List<Map<String, String>> tagsList = new ArrayList<>();
- List<Map<String, String>> attributesList = new ArrayList<>();
+ final CreateMultiTimeSeriesNode node, final Void context) {
+ final CreateMultiTimeSeriesStatement statement = new
CreateMultiTimeSeriesStatement();
+
+ final List<PartialPath> paths = new ArrayList<>();
+ final List<TSDataType> dataTypes = new ArrayList<>();
+ final List<TSEncoding> encodings = new ArrayList<>();
+ final List<CompressionType> compressors = new ArrayList<>();
+ final List<Map<String, String>> propsList = new ArrayList<>();
+ final List<String> aliasList = new ArrayList<>();
+ final List<Map<String, String>> tagsList = new ArrayList<>();
+ final List<Map<String, String>> attributesList = new ArrayList<>();
for (Map.Entry<PartialPath, MeasurementGroup> path2Group :
node.getMeasurementGroupMap().entrySet()) {
@@ -146,8 +193,9 @@ public class PipePlanToStatementVisitor extends
PlanVisitor<Statement, Void> {
}
@Override
- public AlterTimeSeriesStatement visitAlterTimeSeries(AlterTimeSeriesNode
node, Void context) {
- AlterTimeSeriesStatement statement = new AlterTimeSeriesStatement();
+ public AlterTimeSeriesStatement visitAlterTimeSeries(
+ final AlterTimeSeriesNode node, final Void context) {
+ final AlterTimeSeriesStatement statement = new AlterTimeSeriesStatement();
statement.setAlterMap(node.getAlterMap());
statement.setAlterType(node.getAlterType());
statement.setAttributesMap(node.getAttributesMap());
@@ -159,7 +207,7 @@ public class PipePlanToStatementVisitor extends
PlanVisitor<Statement, Void> {
@Override
public InternalCreateTimeSeriesStatement visitInternalCreateTimeSeries(
- InternalCreateTimeSeriesNode node, Void context) {
+ final InternalCreateTimeSeriesNode node, final Void context) {
return new InternalCreateTimeSeriesStatement(
node.getDevicePath(),
node.getMeasurementGroup().getMeasurements(),
@@ -170,36 +218,37 @@ public class PipePlanToStatementVisitor extends
PlanVisitor<Statement, Void> {
}
@Override
- public ActivateTemplateStatement visitActivateTemplate(ActivateTemplateNode
node, Void context) {
- ActivateTemplateStatement statement = new ActivateTemplateStatement();
+ public ActivateTemplateStatement visitActivateTemplate(
+ final ActivateTemplateNode node, final Void context) {
+ final ActivateTemplateStatement statement = new
ActivateTemplateStatement();
statement.setPath(node.getActivatePath());
return statement;
}
@Override
public BatchActivateTemplateStatement visitInternalBatchActivateTemplate(
- InternalBatchActivateTemplateNode node, Void context) {
+ final InternalBatchActivateTemplateNode node, final Void context) {
return new BatchActivateTemplateStatement(
new ArrayList<>(node.getTemplateActivationMap().keySet()));
}
@Override
public InternalCreateMultiTimeSeriesStatement
visitInternalCreateMultiTimeSeries(
- InternalCreateMultiTimeSeriesNode node, Void context) {
+ final InternalCreateMultiTimeSeriesNode node, final Void context) {
return new InternalCreateMultiTimeSeriesStatement(node.getDeviceMap());
}
@Override
public BatchActivateTemplateStatement visitBatchActivateTemplate(
- BatchActivateTemplateNode node, Void context) {
+ final BatchActivateTemplateNode node, final Void context) {
return new BatchActivateTemplateStatement(
new ArrayList<>(node.getTemplateActivationMap().keySet()));
}
@Override
public CreateLogicalViewStatement visitCreateLogicalView(
- CreateLogicalViewNode node, Void context) {
- CreateLogicalViewStatement statement = new CreateLogicalViewStatement();
+ final CreateLogicalViewNode node, final Void context) {
+ final CreateLogicalViewStatement statement = new
CreateLogicalViewStatement();
statement.setTargetFullPaths(node.getViewPathList());
statement.setViewExpressions(new
ArrayList<>(node.getViewPathToSourceExpressionMap().values()));
return statement;
@@ -208,8 +257,8 @@ public class PipePlanToStatementVisitor extends
PlanVisitor<Statement, Void> {
// We do not support AlterLogicalViewNode parsing and use direct rpc instead
@Override
- public DeleteDataStatement visitDeleteData(DeleteDataNode node, Void
context) {
- DeleteDataStatement statement = new DeleteDataStatement();
+ public DeleteDataStatement visitDeleteData(DeleteDataNode node, final Void
context) {
+ final DeleteDataStatement statement = new DeleteDataStatement();
statement.setDeleteEndTime(node.getDeleteEndTime());
statement.setDeleteStartTime(node.getDeleteStartTime());
statement.setPathList(node.getPathList());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index 730cc5bb661..c7e34b53aa6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -83,8 +83,7 @@ public class PipeEventCollector implements EventCollector,
AutoCloseable {
private void parseAndCollectEvent(PipeInsertNodeTabletInsertionEvent
sourceEvent) {
if (sourceEvent.shouldParseTimeOrPattern()) {
- final PipeRawTabletInsertionEvent parsedEvent =
sourceEvent.parseEventWithPatternOrTime();
- if (!parsedEvent.hasNoNeedParsingAndIsEmpty()) {
+ for (PipeRawTabletInsertionEvent parsedEvent :
sourceEvent.toRawTabletInsertionEvents()) {
collectEvent(parsedEvent);
}
} else {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 7a9cbf8190b..a828d36f951 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
@@ -58,7 +59,7 @@ public class InsertRowsNode extends InsertNode {
*/
private List<Integer> insertRowNodeIndexList;
- /** the InsertRowsNode list */
+ /** The {@link InsertRowNode} list */
private List<InsertRowNode> insertRowNodeList;
public InsertRowsNode(PlanNodeId id) {
@@ -74,7 +75,7 @@ public class InsertRowsNode extends InsertNode {
this.insertRowNodeList = insertRowNodeList;
}
- /** record the result of insert rows */
+ /** Record the result of insert rows */
private Map<Integer, TSStatus> results = new HashMap<>();
public List<Integer> getInsertRowNodeIndexList() {
@@ -227,14 +228,14 @@ public class InsertRowsNode extends InsertNode {
List<TEndPoint> redirectInfo = new ArrayList<>();
for (int i = 0; i < insertRowNodeList.size(); i++) {
InsertRowNode insertRowNode = insertRowNodeList.get(i);
- // data region for insert row node
+ // Data region for insert row node
TRegionReplicaSet dataRegionReplicaSet =
analysis
.getDataPartitionInfo()
.getDataRegionReplicaSetForWriting(
insertRowNode.devicePath.getFullPath(),
TimePartitionUtils.getTimePartitionSlot(insertRowNode.getTime()));
- // collect redirectInfo
+ // Collect redirectInfo
redirectInfo.add(dataRegionReplicaSet.getDataNodeLocations().get(0).getClientRpcEndPoint());
InsertRowsNode tmpNode = splitMap.get(dataRegionReplicaSet);
if (tmpNode != null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index e4bb921042b..9a75e464475 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -144,23 +145,23 @@ public class SubscriptionPrefetchingQueue {
}
if (event instanceof TabletInsertionEvent) {
- Tablet tablet = convertToTablet((TabletInsertionEvent) event);
- if (Objects.isNull(tablet)) {
+ final List<Tablet> tabletList =
convertToTablets((TabletInsertionEvent) event);
+ if (Objects.isNull(tabletList) || tabletList.isEmpty()) {
continue;
}
- tablets.add(tablet);
+ tablets.addAll(tabletList);
enrichedEvents.add((EnrichedEvent) event);
if (tablets.size() >= limit) {
break;
}
} else if (event instanceof PipeTsFileInsertionEvent) {
- for (TabletInsertionEvent tabletInsertionEvent :
+ for (final TabletInsertionEvent tabletInsertionEvent :
((PipeTsFileInsertionEvent) event).toTabletInsertionEvents()) {
- Tablet tablet = convertToTablet(tabletInsertionEvent);
- if (Objects.isNull(tablet)) {
+ final List<Tablet> tabletList =
convertToTablets(tabletInsertionEvent);
+ if (Objects.isNull(tabletList) || tabletList.isEmpty()) {
continue;
}
- tablets.add(tablet);
+ tablets.addAll(tabletList);
}
enrichedEvents.add((EnrichedEvent) event);
if (tablets.size() >= limit) {
@@ -218,23 +219,24 @@ public class SubscriptionPrefetchingQueue {
}
}
- /////////////////////////////// utility ///////////////////////////////
+ /////////////////////////////// Utility ///////////////////////////////
- private Tablet convertToTablet(TabletInsertionEvent tabletInsertionEvent) {
+ private List<Tablet> convertToTablets(TabletInsertionEvent
tabletInsertionEvent) {
if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
- return ((PipeInsertNodeTabletInsertionEvent)
tabletInsertionEvent).convertToTablet();
+ return ((PipeInsertNodeTabletInsertionEvent)
tabletInsertionEvent).convertToTablets();
} else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
- return ((PipeRawTabletInsertionEvent)
tabletInsertionEvent).convertToTablet();
+ return Collections.singletonList(
+ ((PipeRawTabletInsertionEvent)
tabletInsertionEvent).convertToTablet());
}
LOGGER.warn(
"Subscription: Only support convert PipeInsertNodeTabletInsertionEvent
or PipeRawTabletInsertionEvent to tablet. Ignore {}.",
tabletInsertionEvent);
- return null;
+ return Collections.emptyList();
}
private String generateSubscriptionCommitId() {
- // subscription commit id format:
{DataNodeId}#{RebootTimes}#{TopicName}_{BrokerId}#{Id}
+ // Subscription commit id format:
{DataNodeId}#{RebootTimes}#{TopicName}_{BrokerId}#{Id}
// Recording data node ID and reboot times to address potential stale
commit IDs caused by
// leader transfers or restarts.
return IoTDBDescriptor.getInstance().getConfig().getDataNodeId()
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
index 731d7e9e3db..718ff85527d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
@@ -47,11 +47,11 @@ public class PipeStaticMeta {
}
public PipeStaticMeta(
- String pipeName,
- long creationTime,
- Map<String, String> extractorAttributes,
- Map<String, String> processorAttributes,
- Map<String, String> connectorAttributes) {
+ final String pipeName,
+ final long creationTime,
+ final Map<String, String> extractorAttributes,
+ final Map<String, String> processorAttributes,
+ final Map<String, String> connectorAttributes) {
this.pipeName = pipeName;
this.creationTime = creationTime;
extractorParameters = new PipeParameters(extractorAttributes);
@@ -90,28 +90,28 @@ public class PipeStaticMeta {
return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
}
- public void serialize(OutputStream outputStream) throws IOException {
+ public void serialize(final OutputStream outputStream) throws IOException {
ReadWriteIOUtils.write(pipeName, outputStream);
ReadWriteIOUtils.write(creationTime, outputStream);
ReadWriteIOUtils.write(extractorParameters.getAttribute().size(),
outputStream);
- for (Map.Entry<String, String> entry :
extractorParameters.getAttribute().entrySet()) {
+ for (final Map.Entry<String, String> entry :
extractorParameters.getAttribute().entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
ReadWriteIOUtils.write(entry.getValue(), outputStream);
}
ReadWriteIOUtils.write(processorParameters.getAttribute().size(),
outputStream);
- for (Map.Entry<String, String> entry :
processorParameters.getAttribute().entrySet()) {
+ for (final Map.Entry<String, String> entry :
processorParameters.getAttribute().entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
ReadWriteIOUtils.write(entry.getValue(), outputStream);
}
ReadWriteIOUtils.write(connectorParameters.getAttribute().size(),
outputStream);
- for (Map.Entry<String, String> entry :
connectorParameters.getAttribute().entrySet()) {
+ for (final Map.Entry<String, String> entry :
connectorParameters.getAttribute().entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
ReadWriteIOUtils.write(entry.getValue(), outputStream);
}
}
- public static PipeStaticMeta deserialize(InputStream inputStream) throws
IOException {
+ public static PipeStaticMeta deserialize(final InputStream inputStream)
throws IOException {
final PipeStaticMeta pipeStaticMeta = new PipeStaticMeta();
pipeStaticMeta.pipeName = ReadWriteIOUtils.readString(inputStream);
@@ -143,7 +143,7 @@ public class PipeStaticMeta {
return pipeStaticMeta;
}
- public static PipeStaticMeta deserialize(ByteBuffer byteBuffer) {
+ public static PipeStaticMeta deserialize(final ByteBuffer byteBuffer) {
final PipeStaticMeta pipeStaticMeta = new PipeStaticMeta();
pipeStaticMeta.pipeName = ReadWriteIOUtils.readString(byteBuffer);
@@ -176,14 +176,14 @@ public class PipeStaticMeta {
}
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
- PipeStaticMeta that = (PipeStaticMeta) obj;
+ final PipeStaticMeta that = (PipeStaticMeta) obj;
return pipeName.equals(that.pipeName)
&& creationTime == that.creationTime
&& extractorParameters.equals(that.extractorParameters)
@@ -218,7 +218,8 @@ public class PipeStaticMeta {
public static final String SYSTEM_PIPE_PREFIX = "__";
public static final String SUBSCRIPTION_PIPE_PREFIX = SYSTEM_PIPE_PREFIX +
"subscription.";
- public static String generateSubscriptionPipeName(String topicName, String
consumerGroupId) {
+ public static String generateSubscriptionPipeName(
+ final String topicName, final String consumerGroupId) {
return SUBSCRIPTION_PIPE_PREFIX + topicName + "_" + consumerGroupId;
}
}