This is an automated email from the ASF dual-hosted git repository. justinchen pushed a commit to branch api-13 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2c8649f3f8b09664f19036925177533b7a032287 Author: Caideyipi <[email protected]> AuthorDate: Mon Dec 22 10:33:58 2025 +0800 Pipe: Implemented the processTabletWithCollect api with tablet collector (#16930) --- .../iotdb/pipe/api/collector/DataCollector.java | 37 +++++++++++ .../iotdb/pipe/api/collector/TabletCollector.java | 44 +++++++++++++ .../event/dml/insertion/TabletInsertionEvent.java | 11 ++++ .../db/pipe/event/common/row/PipeRowCollector.java | 20 ++---- .../tablet/PipeInsertNodeTabletInsertionEvent.java | 12 ++++ .../common/tablet/PipeRawTabletEventConverter.java | 72 ++++++++++++++++++++++ .../common/tablet/PipeRawTabletInsertionEvent.java | 7 +++ .../event/common/tablet/PipeTabletCollector.java | 53 ++++++++++++++++ .../tablet/TabletInsertionDataContainer.java | 4 ++ .../event/TsFileInsertionDataContainerTest.java | 36 +++++++---- 10 files changed, 269 insertions(+), 27 deletions(-) diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/DataCollector.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/DataCollector.java new file mode 100644 index 00000000000..36b403b81af --- /dev/null +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/DataCollector.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.api.collector; + +import org.apache.iotdb.pipe.api.access.Row; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; + +import java.util.List; + +/** Transform data to {@link TabletInsertionEvent}. */ +public interface DataCollector { + + /** + * Transform data to {@link TabletInsertionEvent}. + * + * @param shouldReport Whether to report progress for generated events + * @see Row + */ + List<TabletInsertionEvent> convertToTabletInsertionEvents(final boolean shouldReport); +} diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/TabletCollector.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/TabletCollector.java new file mode 100644 index 00000000000..68a71d46afc --- /dev/null +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/TabletCollector.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.api.collector; + +import org.apache.iotdb.pipe.api.access.Row; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; + +import org.apache.tsfile.write.record.Tablet; + +import java.io.IOException; +import java.util.function.BiConsumer; + +/** + * Used to collect rows generated by {@link + * TabletInsertionEvent#processTabletWithCollect(BiConsumer)}. + */ +public interface TabletCollector { + + /** + * Collects a tablet. + * + * @param tablet Tablet to be collected + * @throws IOException if any I/O errors occur + * @see Row + */ + void collectTablet(Tablet tablet) throws IOException; +} diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java index 6e1575a464f..c7025b471b9 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java @@ -21,6 +21,7 @@ package org.apache.iotdb.pipe.api.event.dml.insertion; import org.apache.iotdb.pipe.api.access.Row; import org.apache.iotdb.pipe.api.collector.RowCollector; +import org.apache.iotdb.pipe.api.collector.TabletCollector; import org.apache.iotdb.pipe.api.event.Event; import org.apache.tsfile.write.record.Tablet; @@ -45,4 +46,14 @@ public interface TabletInsertionEvent extends Event { * contains the results collected by the {@link RowCollector} */ Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, RowCollector> consumer); + + /** + * The consumer processes the Tablet directly and collects the results by {@link + * org.apache.iotdb.pipe.api.collector.TabletCollector}. + * + * @return {@code Iterable<TabletInsertionEvent>} a list of new {@link TabletInsertionEvent} + * contains the results collected by the {@link TabletCollector} + */ + Iterable<TabletInsertionEvent> processTabletWithCollect( + BiConsumer<Tablet, TabletCollector> consumer); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java index 4cb7d972dac..42560f23a5c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.event.common.row; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletEventConverter; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; import org.apache.iotdb.pipe.api.access.Row; @@ -36,17 +37,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -public class PipeRowCollector implements RowCollector { - - private final List<TabletInsertionEvent> tabletInsertionEventList = new ArrayList<>(); +public class PipeRowCollector extends PipeRawTabletEventConverter implements RowCollector { private Tablet tablet = null; - private boolean isAligned = false; - private final PipeTaskMeta pipeTaskMeta; // Used to report progress - private final EnrichedEvent sourceEvent; // Used to report progress public PipeRowCollector(PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent) { - this.pipeTaskMeta = pipeTaskMeta; - this.sourceEvent = sourceEvent; + super(pipeTaskMeta, sourceEvent); } @Override @@ -113,14 +108,9 @@ public class PipeRowCollector implements RowCollector { this.tablet = null; } + @Override public List<TabletInsertionEvent> convertToTabletInsertionEvents(final boolean shouldReport) { collectTabletInsertionEvent(); - - final int eventListSize = tabletInsertionEventList.size(); - if (eventListSize > 0 && shouldReport) { // The last event should report progress - ((PipeRawTabletInsertionEvent) tabletInsertionEventList.get(eventListSize - 1)) - .markAsNeedToReport(); - } - return tabletInsertionEventList; + return super.convertToTabletInsertionEvents(shouldReport); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index 9bdeb11a6a7..391875b674c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -40,6 +40,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTablet import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException; import org.apache.iotdb.pipe.api.access.Row; import org.apache.iotdb.pipe.api.collector.RowCollector; +import org.apache.iotdb.pipe.api.collector.TabletCollector; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeException; @@ -309,6 +310,17 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent .collect(Collectors.toList()); } + @Override + public Iterable<TabletInsertionEvent> processTabletWithCollect( + BiConsumer<Tablet, TabletCollector> consumer) { + return initEventParsers().stream() + .map( + tabletInsertionEventParser -> + tabletInsertionEventParser.processTabletWithCollect(consumer)) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } + /////////////////////////// convertToTablet /////////////////////////// public boolean isAligned(final int i) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java new file mode 100644 index 00000000000..4f387e71883 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.event.common.tablet; + +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent; +import org.apache.iotdb.pipe.api.collector.DataCollector; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; + +import java.util.ArrayList; +import java.util.List; + +public abstract class PipeRawTabletEventConverter implements DataCollector { + protected final List<TabletInsertionEvent> tabletInsertionEventList = new ArrayList<>(); + protected boolean isAligned = false; + protected final PipeTaskMeta pipeTaskMeta; // Used to report progress + protected final EnrichedEvent sourceEvent; // Used to report progress + protected final String sourceEventDataBaseName; + protected final Boolean isTableModel; + + public PipeRawTabletEventConverter(PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent) { + this.pipeTaskMeta = pipeTaskMeta; + this.sourceEvent = sourceEvent; + if (sourceEvent instanceof PipeInsertionEvent) { + sourceEventDataBaseName = + ((PipeInsertionEvent) sourceEvent).getSourceDatabaseNameFromDataRegion(); + isTableModel = ((PipeInsertionEvent) sourceEvent).getRawIsTableModelEvent(); + } else { + sourceEventDataBaseName = null; + isTableModel = null; + } + } + + public PipeRawTabletEventConverter( + PipeTaskMeta pipeTaskMeta, + EnrichedEvent sourceEvent, + String sourceEventDataBase, + Boolean isTableModel) { + this.pipeTaskMeta = pipeTaskMeta; + this.sourceEvent = sourceEvent; + this.sourceEventDataBaseName = sourceEventDataBase; + this.isTableModel = isTableModel; + } + + @Override + public List<TabletInsertionEvent> convertToTabletInsertionEvents(final boolean shouldReport) { + final int eventListSize = tabletInsertionEventList.size(); + if (eventListSize > 0 && shouldReport) { // The last event should report progress + ((PipeRawTabletInsertionEvent) tabletInsertionEventList.get(eventListSize - 1)) + .markAsNeedToReport(); + } + return tabletInsertionEventList; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 2443283c1c6..514128980bf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -34,6 +34,7 @@ import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock; import org.apache.iotdb.pipe.api.access.Row; import org.apache.iotdb.pipe.api.collector.RowCollector; +import org.apache.iotdb.pipe.api.collector.TabletCollector; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.tsfile.utils.RamUsageEstimator; @@ -291,6 +292,12 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent return dataContainer.processTablet(consumer); } + @Override + public Iterable<TabletInsertionEvent> processTabletWithCollect( + BiConsumer<Tablet, TabletCollector> consumer) { + return initEventParser().processTabletWithCollect(consumer); + } + /////////////////////////// convertToTablet /////////////////////////// public boolean isAligned() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java new file mode 100644 index 00000000000..3efd02bd90d --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.event.common.tablet; + +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent; +import org.apache.iotdb.pipe.api.collector.TabletCollector; + +import org.apache.tsfile.write.record.Tablet; + +public class PipeTabletCollector extends PipeRawTabletEventConverter implements TabletCollector { + + public PipeTabletCollector(PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent) { + super(pipeTaskMeta, sourceEvent); + } + + @Override + public void collectTablet(final Tablet tablet) { + final PipeInsertionEvent pipeInsertionEvent = + sourceEvent instanceof PipeInsertionEvent ? ((PipeInsertionEvent) sourceEvent) : null; + tabletInsertionEventList.add( + new PipeRawTabletInsertionEvent( + isTableModel, + sourceEventDataBaseName, + pipeInsertionEvent == null ? null : pipeInsertionEvent.getRawTableModelDataBase(), + pipeInsertionEvent == null ? null : pipeInsertionEvent.getRawTreeModelDataBase(), + tablet, + isAligned, + sourceEvent == null ? null : sourceEvent.getPipeName(), + sourceEvent == null ? 0 : sourceEvent.getCreationTime(), + pipeTaskMeta, + sourceEvent, + false)); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java index aa1ee8e6325..8143ab81d1b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java @@ -30,6 +30,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.pipe.api.access.Row; import org.apache.iotdb.pipe.api.collector.RowCollector; +import org.apache.iotdb.pipe.api.collector.TabletCollector; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.tsfile.enums.TSDataType; @@ -706,4 +707,7 @@ public class TabletInsertionDataContainer { return tablet; } + + public abstract List<TabletInsertionEvent> processTabletWithCollect( + final BiConsumer<Tablet, TabletCollector> consumer); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java index a91a8b97685..709b1e8f78a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java @@ -26,6 +26,9 @@ import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer; import org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer; import org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer; +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; @@ -593,18 +596,15 @@ public class TsFileInsertionDataContainerTest { }) .forEach( tabletInsertionEvent2 -> - tabletInsertionEvent2.processTablet( - (tablet, rowCollector) -> - new PipeRawTabletInsertionEvent(tablet, false) - .processRowByRow( - (row, collector) -> { - try { - rowCollector.collectRow(row); - count3.addAndGet(getNonNullSize(row)); - } catch (final IOException e) { - throw new RuntimeException(e); - } - }))))); + tabletInsertionEvent2.processTabletWithCollect( + (tablet, collector) -> { + try { + collector.collectTablet(tablet); + count3.addAndGet(getNonNullSize(tablet)); + } catch (final IOException e) { + throw new RuntimeException(e); + } + })))); Assert.assertEquals(expectedCount, count1.get()); Assert.assertEquals(expectedCount, count2.get()); @@ -624,4 +624,16 @@ public class TsFileInsertionDataContainerTest { } return count; } + + private int getNonNullSize(final Tablet tablet) { + int count = 0; + for (int i = 0; i < tablet.getRowSize(); ++i) { + for (int j = 0; j < tablet.getSchemas().size(); ++j) { + if (!tablet.isNull(i, j)) { + ++count; + } + } + } + return count; + } }
