This is an automated email from the ASF dual-hosted git repository. JackieTien97 pushed a commit to branch rc/2.0.10 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3d7eb219da9baf2b96d0628fad22f398f675d178 Author: Caideyipi <[email protected]> AuthorDate: Thu Jun 4 18:23:43 2026 +0800 Fix pipe permission retry and table parser progress (#17844) --- .../table/TsFileInsertionEventTableParser.java | 197 +++++++++++---------- .../sink/protocol/writeback/WriteBackSink.java | 21 ++- .../pipe/event/TsFileInsertionEventParserTest.java | 88 +++++++++ 3 files changed, 213 insertions(+), 93 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java index edaaa3aa06d..8ecdcc0cec5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java @@ -148,39 +148,24 @@ public class TsFileInsertionEventTableParser extends TsFileInsertionEventParser new Iterator<TabletInsertionEvent>() { private TsFileInsertionEventTableParserTabletIterator tabletIterator; + private PipeRawTabletInsertionEvent nextEvent; + private Tablet bufferedTablet; + private boolean iterationClosed = false; @Override public boolean hasNext() { try { - if (tabletIterator == null) { - tabletIterator = - new TsFileInsertionEventTableParserTabletIterator( - tsFileSequenceReader, - entry -> - (Objects.isNull(tablePattern) - || tablePattern.matchesTable(entry.getKey())) - && hasTablePrivilege(entry.getKey()), - allocatedMemoryBlockForTablet, - allocatedMemoryBlockForBatchData, - allocatedMemoryBlockForChunk, - allocatedMemoryBlockForChunkMeta, - allocatedMemoryBlockForTableSchemas, - currentModifications, - startTime, - endTime); + if (nextEvent != null) { + return true; } - final boolean hasNext = tabletIterator.hasNext(); - if (hasNext && !parseStartTimeRecorded) { - // Record start time on first hasNext() that returns true - recordParseStartTime(); - } else if (!hasNext && parseStartTimeRecorded && !parseEndTimeRecorded) { - // Record end time on last hasNext() that returns false - recordParseEndTime(); - close(); - } else if (!hasNext) { - close(); + + final Tablet tablet = pollNextNonEmptyTablet(); + if (tablet == null) { + return false; } - return hasNext; + + nextEvent = buildTabletInsertionEvent(tablet, !prepareNextNonEmptyTablet()); + return true; } catch (Exception e) { close(); throw new PipeException( @@ -211,74 +196,108 @@ public class TsFileInsertionEventTableParser extends TsFileInsertionEventParser return false; } + private Tablet pollNextNonEmptyTablet() throws Exception { + if (!prepareNextNonEmptyTablet()) { + return null; + } + + final Tablet tablet = bufferedTablet; + bufferedTablet = null; + return tablet; + } + + private boolean prepareNextNonEmptyTablet() throws Exception { + if (bufferedTablet != null) { + return true; + } + if (iterationClosed) { + return false; + } + + if (tabletIterator == null) { + tabletIterator = + new TsFileInsertionEventTableParserTabletIterator( + tsFileSequenceReader, + entry -> + (Objects.isNull(tablePattern) + || tablePattern.matchesTable(entry.getKey())) + && hasTablePrivilege(entry.getKey()), + allocatedMemoryBlockForTablet, + allocatedMemoryBlockForBatchData, + allocatedMemoryBlockForChunk, + allocatedMemoryBlockForChunkMeta, + allocatedMemoryBlockForTableSchemas, + currentModifications, + startTime, + endTime); + } + + while (tabletIterator.hasNext()) { + if (!parseStartTimeRecorded) { + recordParseStartTime(); + } + + final Tablet tablet = tabletIterator.next(); + recordTabletMetrics(tablet); + if (!PipeRawTabletInsertionEvent.isTabletEmpty(tablet)) { + bufferedTablet = tablet; + return true; + } + } + + closeIteration(); + return false; + } + + private void closeIteration() { + if (iterationClosed) { + return; + } + + if (parseStartTimeRecorded && !parseEndTimeRecorded) { + recordParseEndTime(); + } + close(); + iterationClosed = true; + } + + private PipeRawTabletInsertionEvent buildTabletInsertionEvent( + final Tablet tablet, final boolean needToReport) { + return sourceEvent == null + ? new PipeRawTabletInsertionEvent( + Boolean.TRUE, + null, + null, + null, + tablet, + true, + null, + 0, + pipeTaskMeta, + sourceEvent, + needToReport) + : new PipeRawTabletInsertionEvent( + Boolean.TRUE, + sourceEvent.getSourceDatabaseNameFromDataRegion(), + sourceEvent.getRawTableModelDataBase(), + sourceEvent.getRawTreeModelDataBase(), + tablet, + true, + sourceEvent.getPipeName(), + sourceEvent.getCreationTime(), + pipeTaskMeta, + sourceEvent, + needToReport); + } + @Override public TabletInsertionEvent next() { if (!hasNext()) { - close(); throw new NoSuchElementException(); } - final Tablet tablet = tabletIterator.next(); - // Record tablet metrics - recordTabletMetrics(tablet); - - final TabletInsertionEvent next; - if (!hasNext()) { - next = - sourceEvent == null - ? new PipeRawTabletInsertionEvent( - Boolean.TRUE, - null, - null, - null, - tablet, - true, - null, - 0, - pipeTaskMeta, - sourceEvent, - true) - : new PipeRawTabletInsertionEvent( - Boolean.TRUE, - sourceEvent.getSourceDatabaseNameFromDataRegion(), - sourceEvent.getRawTableModelDataBase(), - sourceEvent.getRawTreeModelDataBase(), - tablet, - true, - sourceEvent.getPipeName(), - sourceEvent.getCreationTime(), - pipeTaskMeta, - sourceEvent, - true); - close(); - } else { - next = - sourceEvent == null - ? new PipeRawTabletInsertionEvent( - Boolean.TRUE, - null, - null, - null, - tablet, - true, - null, - 0, - pipeTaskMeta, - sourceEvent, - false) - : new PipeRawTabletInsertionEvent( - Boolean.TRUE, - sourceEvent.getSourceDatabaseNameFromDataRegion(), - sourceEvent.getRawTableModelDataBase(), - sourceEvent.getRawTreeModelDataBase(), - tablet, - true, - sourceEvent.getPipeName(), - sourceEvent.getCreationTime(), - pipeTaskMeta, - sourceEvent, - false); - } + final TabletInsertionEvent next = nextEvent; + nextEvent = null; return next; } }; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java index 581792475c0..9bec114ede1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.audit.UserEntity; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.exception.auth.AccessDeniedException; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException; import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.commons.queryengine.common.SqlDialect; import org.apache.iotdb.commons.utils.StatusUtils; @@ -284,7 +285,8 @@ public class WriteBackSink implements PipeConnector { && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && !(skipIfNoPrivileges && status.getCode() == TSStatusCode.NO_PERMISSION.getStatusCode())) { - throw new PipeException( + throwWriteBackExceptionIfNecessary( + status, String.format( "Write back PipeInsertNodeTabletInsertionEvent %s error, result status %s", pipeInsertNodeTabletInsertionEvent, status)); @@ -328,7 +330,8 @@ public class WriteBackSink implements PipeConnector { && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && !(skipIfNoPrivileges && status.getCode() == TSStatusCode.NO_PERMISSION.getStatusCode())) { - throw new PipeException( + throwWriteBackExceptionIfNecessary( + status, String.format( "Write back PipeRawTabletInsertionEvent %s error, result status %s", pipeRawTabletInsertionEvent, status)); @@ -373,13 +376,23 @@ public class WriteBackSink implements PipeConnector { && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && !(skipIfNoPrivileges && status.getCode() == TSStatusCode.NO_PERMISSION.getStatusCode())) { - throw new PipeException( + throwWriteBackExceptionIfNecessary( + status, String.format( "Write back PipeStatementInsertionEvent %s error, result status %s", pipeStatementInsertionEvent, status)); } } + private static void throwWriteBackExceptionIfNecessary( + final TSStatus status, final String exceptionMessage) { + if (status.getCode() == TSStatusCode.NO_PERMISSION.getStatusCode()) { + throw new PipeRuntimeSinkNonReportTimeConfigurableException(exceptionMessage, Long.MAX_VALUE); + } + + throw new PipeException(exceptionMessage); + } + @Override public void close() throws Exception { if (session != null) { @@ -410,7 +423,7 @@ public class WriteBackSink implements PipeConnector { .status; } catch (final AccessDeniedException e) { if (!skipIfNoPrivileges) { - throw e; + throw new PipeRuntimeSinkNonReportTimeConfigurableException(e.getMessage(), Long.MAX_VALUE); } LOGGER.debug( DataNodePipeMessages.EXECUTE_STATEMENT_TO_DATABASE_SKIP_BECAUSE_NO, diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java index 84569bf586c..50109b935c3 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java @@ -525,6 +525,77 @@ public class TsFileInsertionEventParserTest { } } + @Test + public void testTableParserWithTablePatternReportsLastNonEmptyTablet() throws Exception { + final int originalPipeDataStructureTabletRowSize = + PipeConfig.getInstance().getPipeDataStructureTabletRowSize(); + CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(2); + + try { + alignedTsFile = new File("table-parser-table-pattern.tsfile"); + if (alignedTsFile.exists()) { + Assert.assertTrue(alignedTsFile.delete()); + } + + final List<IMeasurementSchema> schemaList = + Arrays.asList( + new MeasurementSchema("tag0", TSDataType.STRING), + new MeasurementSchema("s0", TSDataType.INT64)); + final List<String> columnNameList = Arrays.asList("tag0", "s0"); + final List<TSDataType> dataTypeList = Arrays.asList(TSDataType.STRING, TSDataType.INT64); + final List<ColumnCategory> columnCategoryList = + Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD); + + try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) { + writer.registerTableSchema(new TableSchema("test", schemaList, columnCategoryList)); + writer.registerTableSchema(new TableSchema("test1", schemaList, columnCategoryList)); + writer.writeTable( + generateSimpleTableTablet( + "test", columnNameList, dataTypeList, columnCategoryList, "ignored", 0, 2)); + writer.writeTable( + generateSimpleTableTablet( + "test1", columnNameList, dataTypeList, columnCategoryList, "matched", 3, 4)); + writer.writeTable( + generateSimpleTableTablet( + "test1", columnNameList, dataTypeList, columnCategoryList, "unmatched", 2, 10)); + } + + try (final TsFileInsertionEventTableParser parser = + new TsFileInsertionEventTableParser( + alignedTsFile, + new TablePattern(true, null, "test1"), + 3, + 5, + null, + null, + null, + false)) { + final Iterator<TabletInsertionEvent> iterator = parser.toTabletInsertionEvents().iterator(); + int rowCount = 0; + PipeRawTabletInsertionEvent lastEvent = null; + while (iterator.hasNext()) { + final PipeRawTabletInsertionEvent event = (PipeRawTabletInsertionEvent) iterator.next(); + final Tablet tablet = event.convertToTablet(); + Assert.assertEquals("test1", tablet.getTableName()); + Assert.assertFalse(PipeRawTabletInsertionEvent.isTabletEmpty(tablet)); + rowCount += tablet.getRowSize(); + if (lastEvent != null) { + Assert.assertFalse(lastEvent.isNeedToReport()); + } + lastEvent = event; + } + + Assert.assertEquals(2, rowCount); + Assert.assertNotNull(lastEvent); + Assert.assertTrue(lastEvent.isNeedToReport()); + } + } finally { + CommonDescriptor.getInstance() + .getConfig() + .setPipeDataStructureTabletRowSize(originalPipeDataStructureTabletRowSize); + } + } + @Test public void manualTestScanParserSplitPerformance() throws Exception { Assume.assumeTrue( @@ -1343,6 +1414,23 @@ public class TsFileInsertionEventParserTest { } } + private Tablet generateSimpleTableTablet( + final String tableName, + final List<String> columnNameList, + final List<TSDataType> dataTypeList, + final List<ColumnCategory> columnCategoryList, + final String tagValue, + final long... timestamps) { + final Tablet tablet = + new Tablet(tableName, columnNameList, dataTypeList, columnCategoryList, timestamps.length); + for (int rowIndex = 0; rowIndex < timestamps.length; ++rowIndex) { + tablet.addTimestamp(rowIndex, timestamps[rowIndex]); + tablet.addValue(rowIndex, 0, tagValue); + tablet.addValue(rowIndex, 1, (long) rowIndex); + } + return tablet; + } + private void generateLargeTableTsFile( final File tsFile, final int tableCount,
