This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new eaa5bcb012e Fix pipe permission retry and table parser progress
(#17844)
eaa5bcb012e is described below
commit eaa5bcb012e48525424dcba0140b293747e7f564
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 4 18:23:43 2026 +0800
Fix pipe permission retry and table parser progress (#17844)
---
.../table/TsFileInsertionEventTableParser.java | 197 +++++++++++----------
.../sink/protocol/writeback/WriteBackSink.java | 21 ++-
.../pipe/event/TsFileInsertionEventParserTest.java | 88 +++++++++
3 files changed, 213 insertions(+), 93 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
index edaaa3aa06d..8ecdcc0cec5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
@@ -148,39 +148,24 @@ public class TsFileInsertionEventTableParser extends
TsFileInsertionEventParser
new Iterator<TabletInsertionEvent>() {
private TsFileInsertionEventTableParserTabletIterator
tabletIterator;
+ private PipeRawTabletInsertionEvent nextEvent;
+ private Tablet bufferedTablet;
+ private boolean iterationClosed = false;
@Override
public boolean hasNext() {
try {
- if (tabletIterator == null) {
- tabletIterator =
- new TsFileInsertionEventTableParserTabletIterator(
- tsFileSequenceReader,
- entry ->
- (Objects.isNull(tablePattern)
- ||
tablePattern.matchesTable(entry.getKey()))
- && hasTablePrivilege(entry.getKey()),
- allocatedMemoryBlockForTablet,
- allocatedMemoryBlockForBatchData,
- allocatedMemoryBlockForChunk,
- allocatedMemoryBlockForChunkMeta,
- allocatedMemoryBlockForTableSchemas,
- currentModifications,
- startTime,
- endTime);
+ if (nextEvent != null) {
+ return true;
}
- final boolean hasNext = tabletIterator.hasNext();
- if (hasNext && !parseStartTimeRecorded) {
- // Record start time on first hasNext() that returns true
- recordParseStartTime();
- } else if (!hasNext && parseStartTimeRecorded &&
!parseEndTimeRecorded) {
- // Record end time on last hasNext() that returns false
- recordParseEndTime();
- close();
- } else if (!hasNext) {
- close();
+
+ final Tablet tablet = pollNextNonEmptyTablet();
+ if (tablet == null) {
+ return false;
}
- return hasNext;
+
+ nextEvent = buildTabletInsertionEvent(tablet,
!prepareNextNonEmptyTablet());
+ return true;
} catch (Exception e) {
close();
throw new PipeException(
@@ -211,74 +196,108 @@ public class TsFileInsertionEventTableParser extends
TsFileInsertionEventParser
return false;
}
+ private Tablet pollNextNonEmptyTablet() throws Exception {
+ if (!prepareNextNonEmptyTablet()) {
+ return null;
+ }
+
+ final Tablet tablet = bufferedTablet;
+ bufferedTablet = null;
+ return tablet;
+ }
+
+ private boolean prepareNextNonEmptyTablet() throws Exception {
+ if (bufferedTablet != null) {
+ return true;
+ }
+ if (iterationClosed) {
+ return false;
+ }
+
+ if (tabletIterator == null) {
+ tabletIterator =
+ new TsFileInsertionEventTableParserTabletIterator(
+ tsFileSequenceReader,
+ entry ->
+ (Objects.isNull(tablePattern)
+ ||
tablePattern.matchesTable(entry.getKey()))
+ && hasTablePrivilege(entry.getKey()),
+ allocatedMemoryBlockForTablet,
+ allocatedMemoryBlockForBatchData,
+ allocatedMemoryBlockForChunk,
+ allocatedMemoryBlockForChunkMeta,
+ allocatedMemoryBlockForTableSchemas,
+ currentModifications,
+ startTime,
+ endTime);
+ }
+
+ while (tabletIterator.hasNext()) {
+ if (!parseStartTimeRecorded) {
+ recordParseStartTime();
+ }
+
+ final Tablet tablet = tabletIterator.next();
+ recordTabletMetrics(tablet);
+ if (!PipeRawTabletInsertionEvent.isTabletEmpty(tablet)) {
+ bufferedTablet = tablet;
+ return true;
+ }
+ }
+
+ closeIteration();
+ return false;
+ }
+
+ private void closeIteration() {
+ if (iterationClosed) {
+ return;
+ }
+
+ if (parseStartTimeRecorded && !parseEndTimeRecorded) {
+ recordParseEndTime();
+ }
+ close();
+ iterationClosed = true;
+ }
+
+ private PipeRawTabletInsertionEvent buildTabletInsertionEvent(
+ final Tablet tablet, final boolean needToReport) {
+ return sourceEvent == null
+ ? new PipeRawTabletInsertionEvent(
+ Boolean.TRUE,
+ null,
+ null,
+ null,
+ tablet,
+ true,
+ null,
+ 0,
+ pipeTaskMeta,
+ sourceEvent,
+ needToReport)
+ : new PipeRawTabletInsertionEvent(
+ Boolean.TRUE,
+ sourceEvent.getSourceDatabaseNameFromDataRegion(),
+ sourceEvent.getRawTableModelDataBase(),
+ sourceEvent.getRawTreeModelDataBase(),
+ tablet,
+ true,
+ sourceEvent.getPipeName(),
+ sourceEvent.getCreationTime(),
+ pipeTaskMeta,
+ sourceEvent,
+ needToReport);
+ }
+
@Override
public TabletInsertionEvent next() {
if (!hasNext()) {
- close();
throw new NoSuchElementException();
}
- final Tablet tablet = tabletIterator.next();
- // Record tablet metrics
- recordTabletMetrics(tablet);
-
- final TabletInsertionEvent next;
- if (!hasNext()) {
- next =
- sourceEvent == null
- ? new PipeRawTabletInsertionEvent(
- Boolean.TRUE,
- null,
- null,
- null,
- tablet,
- true,
- null,
- 0,
- pipeTaskMeta,
- sourceEvent,
- true)
- : new PipeRawTabletInsertionEvent(
- Boolean.TRUE,
-
sourceEvent.getSourceDatabaseNameFromDataRegion(),
- sourceEvent.getRawTableModelDataBase(),
- sourceEvent.getRawTreeModelDataBase(),
- tablet,
- true,
- sourceEvent.getPipeName(),
- sourceEvent.getCreationTime(),
- pipeTaskMeta,
- sourceEvent,
- true);
- close();
- } else {
- next =
- sourceEvent == null
- ? new PipeRawTabletInsertionEvent(
- Boolean.TRUE,
- null,
- null,
- null,
- tablet,
- true,
- null,
- 0,
- pipeTaskMeta,
- sourceEvent,
- false)
- : new PipeRawTabletInsertionEvent(
- Boolean.TRUE,
-
sourceEvent.getSourceDatabaseNameFromDataRegion(),
- sourceEvent.getRawTableModelDataBase(),
- sourceEvent.getRawTreeModelDataBase(),
- tablet,
- true,
- sourceEvent.getPipeName(),
- sourceEvent.getCreationTime(),
- pipeTaskMeta,
- sourceEvent,
- false);
- }
+ final TabletInsertionEvent next = nextEvent;
+ nextEvent = null;
return next;
}
};
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
index 581792475c0..9bec114ede1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.audit.UserEntity;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.queryengine.common.SqlDialect;
import org.apache.iotdb.commons.utils.StatusUtils;
@@ -284,7 +285,8 @@ public class WriteBackSink implements PipeConnector {
&& status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& !(skipIfNoPrivileges
&& status.getCode() ==
TSStatusCode.NO_PERMISSION.getStatusCode())) {
- throw new PipeException(
+ throwWriteBackExceptionIfNecessary(
+ status,
String.format(
"Write back PipeInsertNodeTabletInsertionEvent %s error, result
status %s",
pipeInsertNodeTabletInsertionEvent, status));
@@ -328,7 +330,8 @@ public class WriteBackSink implements PipeConnector {
&& status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& !(skipIfNoPrivileges
&& status.getCode() ==
TSStatusCode.NO_PERMISSION.getStatusCode())) {
- throw new PipeException(
+ throwWriteBackExceptionIfNecessary(
+ status,
String.format(
"Write back PipeRawTabletInsertionEvent %s error, result status
%s",
pipeRawTabletInsertionEvent, status));
@@ -373,13 +376,23 @@ public class WriteBackSink implements PipeConnector {
&& status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& !(skipIfNoPrivileges
&& status.getCode() ==
TSStatusCode.NO_PERMISSION.getStatusCode())) {
- throw new PipeException(
+ throwWriteBackExceptionIfNecessary(
+ status,
String.format(
"Write back PipeStatementInsertionEvent %s error, result status
%s",
pipeStatementInsertionEvent, status));
}
}
+ private static void throwWriteBackExceptionIfNecessary(
+ final TSStatus status, final String exceptionMessage) {
+ if (status.getCode() == TSStatusCode.NO_PERMISSION.getStatusCode()) {
+ throw new
PipeRuntimeSinkNonReportTimeConfigurableException(exceptionMessage,
Long.MAX_VALUE);
+ }
+
+ throw new PipeException(exceptionMessage);
+ }
+
@Override
public void close() throws Exception {
if (session != null) {
@@ -410,7 +423,7 @@ public class WriteBackSink implements PipeConnector {
.status;
} catch (final AccessDeniedException e) {
if (!skipIfNoPrivileges) {
- throw e;
+ throw new
PipeRuntimeSinkNonReportTimeConfigurableException(e.getMessage(),
Long.MAX_VALUE);
}
LOGGER.debug(
DataNodePipeMessages.EXECUTE_STATEMENT_TO_DATABASE_SKIP_BECAUSE_NO,
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
index 84569bf586c..50109b935c3 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
@@ -525,6 +525,77 @@ public class TsFileInsertionEventParserTest {
}
}
+ @Test
+ public void testTableParserWithTablePatternReportsLastNonEmptyTablet()
throws Exception {
+ final int originalPipeDataStructureTabletRowSize =
+ PipeConfig.getInstance().getPipeDataStructureTabletRowSize();
+
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(2);
+
+ try {
+ alignedTsFile = new File("table-parser-table-pattern.tsfile");
+ if (alignedTsFile.exists()) {
+ Assert.assertTrue(alignedTsFile.delete());
+ }
+
+ final List<IMeasurementSchema> schemaList =
+ Arrays.asList(
+ new MeasurementSchema("tag0", TSDataType.STRING),
+ new MeasurementSchema("s0", TSDataType.INT64));
+ final List<String> columnNameList = Arrays.asList("tag0", "s0");
+ final List<TSDataType> dataTypeList = Arrays.asList(TSDataType.STRING,
TSDataType.INT64);
+ final List<ColumnCategory> columnCategoryList =
+ Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD);
+
+ try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) {
+ writer.registerTableSchema(new TableSchema("test", schemaList,
columnCategoryList));
+ writer.registerTableSchema(new TableSchema("test1", schemaList,
columnCategoryList));
+ writer.writeTable(
+ generateSimpleTableTablet(
+ "test", columnNameList, dataTypeList, columnCategoryList,
"ignored", 0, 2));
+ writer.writeTable(
+ generateSimpleTableTablet(
+ "test1", columnNameList, dataTypeList, columnCategoryList,
"matched", 3, 4));
+ writer.writeTable(
+ generateSimpleTableTablet(
+ "test1", columnNameList, dataTypeList, columnCategoryList,
"unmatched", 2, 10));
+ }
+
+ try (final TsFileInsertionEventTableParser parser =
+ new TsFileInsertionEventTableParser(
+ alignedTsFile,
+ new TablePattern(true, null, "test1"),
+ 3,
+ 5,
+ null,
+ null,
+ null,
+ false)) {
+ final Iterator<TabletInsertionEvent> iterator =
parser.toTabletInsertionEvents().iterator();
+ int rowCount = 0;
+ PipeRawTabletInsertionEvent lastEvent = null;
+ while (iterator.hasNext()) {
+ final PipeRawTabletInsertionEvent event =
(PipeRawTabletInsertionEvent) iterator.next();
+ final Tablet tablet = event.convertToTablet();
+ Assert.assertEquals("test1", tablet.getTableName());
+
Assert.assertFalse(PipeRawTabletInsertionEvent.isTabletEmpty(tablet));
+ rowCount += tablet.getRowSize();
+ if (lastEvent != null) {
+ Assert.assertFalse(lastEvent.isNeedToReport());
+ }
+ lastEvent = event;
+ }
+
+ Assert.assertEquals(2, rowCount);
+ Assert.assertNotNull(lastEvent);
+ Assert.assertTrue(lastEvent.isNeedToReport());
+ }
+ } finally {
+ CommonDescriptor.getInstance()
+ .getConfig()
+
.setPipeDataStructureTabletRowSize(originalPipeDataStructureTabletRowSize);
+ }
+ }
+
@Test
public void manualTestScanParserSplitPerformance() throws Exception {
Assume.assumeTrue(
@@ -1343,6 +1414,23 @@ public class TsFileInsertionEventParserTest {
}
}
+ private Tablet generateSimpleTableTablet(
+ final String tableName,
+ final List<String> columnNameList,
+ final List<TSDataType> dataTypeList,
+ final List<ColumnCategory> columnCategoryList,
+ final String tagValue,
+ final long... timestamps) {
+ final Tablet tablet =
+ new Tablet(tableName, columnNameList, dataTypeList,
columnCategoryList, timestamps.length);
+ for (int rowIndex = 0; rowIndex < timestamps.length; ++rowIndex) {
+ tablet.addTimestamp(rowIndex, timestamps[rowIndex]);
+ tablet.addValue(rowIndex, 0, tagValue);
+ tablet.addValue(rowIndex, 1, (long) rowIndex);
+ }
+ return tablet;
+ }
+
private void generateLargeTableTsFile(
final File tsFile,
final int tableCount,