This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch hotfix/2.0.9.4-sjzt in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e95a45a9a3bb4b66c21ebaf7964617f052cf7f23 Author: 罗振羽 <[email protected]> AuthorDate: Wed May 6 19:52:23 2026 +0800 [TIMECHODB] fix(pipe): transfer exclusive mod in tsfile sinks and hard-link local files (cherry picked from commit 212c49f7debd1d2b610be518a794fbc8e09f8855) --- .../IoTDBPipeTableModelObjectImportExportIT.java | 682 +++++++++++++++++++++ .../manual/basic/IoTDBPipeTsFileSinkObjectIT.java | 77 ++- .../common/tsfile/PipeTsFileInsertionEvent.java | 87 ++- .../common/tsfile/TsFileObjectPathIterator.java | 19 +- .../tsfile/parser/TsFileInsertionEventParser.java | 3 + .../parser/TsFileInsertionEventParserProvider.java | 13 +- .../table/TsFileInsertionEventTableParser.java | 99 ++- ...ileInsertionEventTableParserTabletIterator.java | 18 +- .../tsfile/parser/util/ModsOperationUtil.java | 52 +- .../resource/tsfile/PipeTsFileResourceManager.java | 47 ++ .../db/pipe/sink/protocol/tsfile/FileTransfer.java | 33 +- .../sink/protocol/tsfile/LocalFileTransfer.java | 80 ++- .../sink/protocol/tsfile/PipeTsFileLocalSink.java | 9 +- .../load/util/LoadObjectFileUtil.java | 50 +- .../tsfile/parser/util/ModsOperationUtilTest.java | 47 ++ .../plugin/sink/tsfile/PipeTsFileRemoteSink.java | 9 +- .../plugin/sink/tsfile/RemoteFileTransfer.java | 3 +- .../plugin/sink/tsfile/ScpRemoteFileTransfer.java | 28 +- 18 files changed, 1262 insertions(+), 94 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTableModelObjectImportExportIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTableModelObjectImportExportIT.java new file mode 100644 index 00000000000..5485c0ff1fb --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTableModelObjectImportExportIT.java @@ -0,0 +1,682 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it.dual.tablemodel.manual.basic; + +import org.apache.iotdb.db.it.utils.StandardObjectTableModelTsFileGenerator; +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.isession.ITableSession; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualBasic; +import org.apache.iotdb.pipe.it.dual.tablemodel.manual.AbstractPipeTableModelDualManualIT; + +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.write.record.Tablet; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2DualTableManualBasic.class}) +public class IoTDBPipeTableModelObjectImportExportIT extends AbstractPipeTableModelDualManualIT { + + private static final Logger LOGGER = + LoggerFactory.getLogger(IoTDBPipeTableModelObjectImportExportIT.class); + + private static final String TEST_DATABASE = "db1"; + + private static final String OBJECT_TABLE_NAME = "factory_metrics"; + + private static final int OBJECT_MULTI_WEEK_DEVICE_COUNT = 5; + + private static final int SOURCE_SINK_INSERT_ROWS = 100; + + private static final int SOURCE_SINK_DELETE_FROM_FIRST_OFFSET = 20; + private static final int SOURCE_SINK_DELETE_LEN = 25; + + private static final long EXPECTED_ROWS_AFTER_DELETE = + SOURCE_SINK_INSERT_ROWS - SOURCE_SINK_DELETE_LEN; + + private static final long SENDER_DELETE_LO = 1L + SOURCE_SINK_DELETE_FROM_FIRST_OFFSET; + + private static final long SENDER_DELETE_HI = SENDER_DELETE_LO + SOURCE_SINK_DELETE_LEN - 1; + + private static final long HOUR_MS = 3600 * 1000L; + private static final long DAY_MS = 24L * HOUR_MS; + private static final long WEEK_MS = 7L * DAY_MS; + + private static final long OBJECT_BASE_TIME = 1600000000000L; + + private String targetDir; + private String sourceTsDir; + + private enum TableSourceScope { + ALL, + DATABASE_ONLY, + DATABASE_AND_TABLE + } + + @Override + @Before + public void setUp() { + super.setUp(); + + try { + targetDir = + Files.createTempDirectory("pipe_table_tsfile_source_sink_it").toAbsolutePath().toString(); + sourceTsDir = + Files.createTempDirectory("pipe_table_tsfile_source_sink_it_src") + .toAbsolutePath() + .toString(); + } catch (IOException e) { + throw new RuntimeException("Failed to create temp directory", e); + } + } + + @After + public void cleanUpTempDirectories() { + if (targetDir != null) { + deleteRecursivelyQuietly(Paths.get(targetDir)); + } + if (sourceTsDir != null) { + deleteRecursivelyQuietly(Paths.get(sourceTsDir)); + } + } + + private static byte[] buildExpectedObjectPayload(final long timestamp) { + String payload = "Industrial_Grade_Payload_Verification_For_Timestamp_" + timestamp; + return payload.getBytes(); + } + + @Test + public void testHistoricalExportWithDeletesForAllScope() throws Exception { + verifyHistoricalObjectExportWithDeletes( + "p_scope_all_hist_mods", + TableSourceScope.ALL, + false, + true, + false, + EXPECTED_ROWS_AFTER_DELETE); + } + + @Test + public void testHistoricalExportWithDeletesForDatabaseScope() throws Exception { + verifyHistoricalObjectExportWithDeletes( + "p_scope_db_hist_mods", + TableSourceScope.DATABASE_ONLY, + false, + true, + false, + EXPECTED_ROWS_AFTER_DELETE); + } + + @Test + public void testHistoricalExportWithDeletesForDatabaseAndTableScope() throws Exception { + verifyHistoricalObjectExportWithDeletes( + "p_scope_dbt_hist_mods", + TableSourceScope.DATABASE_AND_TABLE, + false, + true, + false, + EXPECTED_ROWS_AFTER_DELETE); + } + + @Test + public void testHistoricalExportWithoutDeletesForDatabaseAndTableScope() throws Exception { + verifyHistoricalObjectExportWithoutDeletes( + "p_scope_dbt_hist_nomods", TableSourceScope.DATABASE_AND_TABLE, true, false); + } + + @Test + public void testHistoricalExportWithoutDeletesForAllScope() throws Exception { + verifyHistoricalObjectExportWithoutDeletes( + "p_scope_all_hist_nomods", TableSourceScope.ALL, true, false); + } + + @Test + public void testHistoricalExportWithoutDeletesForDatabaseScope() throws Exception { + verifyHistoricalObjectExportWithoutDeletes( + "p_scope_db_hist_nomods", TableSourceScope.DATABASE_ONLY, true, false); + } + + @Test + public void testObjectRoundTripFromMultiWeekGeneratedTsFile() throws Exception { + final File tsFile = new File(sourceTsDir, "five_devices_multi_week.tsfile"); + final List<List<Long>> expectedTimesPerDevice = new ArrayList<>(); + + try (StandardObjectTableModelTsFileGenerator generator = + new StandardObjectTableModelTsFileGenerator(tsFile)) { + for (int i = 0; i < OBJECT_MULTI_WEEK_DEVICE_COUNT; i++) { + final String deviceId = String.format("device_%02d", i + 1); + final long weekStart = OBJECT_BASE_TIME + (long) i * WEEK_MS; + final long weekEnd = weekStart + 6 * DAY_MS; + generator.writeDeviceData(OBJECT_TABLE_NAME, deviceId, weekStart, weekEnd, DAY_MS); + expectedTimesPerDevice.add(buildExpectedTimestamps(weekStart, weekEnd, DAY_MS)); + } + } + + try (ITableSession sender = senderEnv.getTableSessionConnection()) { + sender.executeNonQueryStatement( + String.format("CREATE DATABASE IF NOT EXISTS %s", TEST_DATABASE)); + useDatabase(sender, TEST_DATABASE); + sender.executeNonQueryStatement( + String.format( + "LOAD '%s' WITH ('database-name'='%s')", tsFile.getAbsolutePath(), TEST_DATABASE)); + TestUtils.executeNonQueryWithRetry(senderEnv, "flush"); + + sender.executeNonQueryStatement( + String.format( + "CREATE PIPE p_src_sink_multi_week WITH SOURCE (" + + "'source.capture.table'='true', " + + "'source.database-name'='%s', " + + "'source.table-name'='%s', " + + "'source.inclusion'='data.insert', " + + "'source.history.enable'='true', " + + "'source.realtime.enable'='false' " + + ") WITH SINK (" + + "%s" + + ")", + TEST_DATABASE, OBJECT_TABLE_NAME, buildTsFileLocalSinkClause())); + + waitForExportedTsFile(new File(targetDir), 60_000); + sender.executeNonQueryStatement("DROP PIPE p_src_sink_multi_week"); + sender.executeNonQueryStatement(String.format("DROP DATABASE %s", TEST_DATABASE)); + } + + final List<File> exportedTsFiles = new ArrayList<>(); + collectTsFilesRecursively(new File(targetDir), exportedTsFiles); + Assert.assertFalse(exportedTsFiles.isEmpty()); + + try (ITableSession receiver = receiverEnv.getTableSessionConnection()) { + receiver.executeNonQueryStatement( + String.format("CREATE DATABASE IF NOT EXISTS %s", TEST_DATABASE)); + useDatabase(receiver, TEST_DATABASE); + for (File f : exportedTsFiles) { + receiver.executeNonQueryStatement( + String.format( + "LOAD '%s' WITH ('database-name'='%s')", f.getAbsolutePath(), TEST_DATABASE)); + } + TestUtils.executeNonQueryWithRetry(receiverEnv, "flush"); + + for (int i = 0; i < OBJECT_MULTI_WEEK_DEVICE_COUNT; i++) { + final String deviceId = String.format("device_%02d", i + 1); + assertGeneratedObjectPayloadsByDevice( + receiver, OBJECT_TABLE_NAME, deviceId, expectedTimesPerDevice.get(i)); + } + } + } + + private void verifyHistoricalObjectExportWithDeletes( + final String pipeName, + final TableSourceScope scope, + final boolean realtimeFirst, + final boolean historyEnable, + final boolean realtimeEnable, + final long expectedRowsAfterLoad) + throws Exception { + try (ITableSession sender = senderEnv.getTableSessionConnection()) { + createSenderObjectTable(sender); + + if (realtimeFirst) { + sender.executeNonQueryStatement( + String.format( + "CREATE PIPE %s WITH SOURCE (%s) WITH SINK (%s)", + pipeName, + buildObjectSourceClause(scope, true, historyEnable, realtimeEnable, true), + buildTsFileLocalSinkClause())); + Thread.sleep(2000); + } + + insertObjectRowsWithContiguousTimestamps(sender, "t1", 1L, SOURCE_SINK_INSERT_ROWS); + TestUtils.executeNonQueryWithRetry(senderEnv, "flush"); + final long delLo = 1L + SOURCE_SINK_DELETE_FROM_FIRST_OFFSET; + final long delHi = delLo + SOURCE_SINK_DELETE_LEN - 1; + sender.executeNonQueryStatement( + String.format("DELETE FROM t1 WHERE time >= %d AND time <= %d", delLo, delHi)); + TestUtils.executeNonQueryWithRetry(senderEnv, "flush"); + + if (!realtimeFirst) { + sender.executeNonQueryStatement( + String.format( + "CREATE PIPE %s WITH SOURCE (%s) WITH SINK (%s)", + pipeName, + buildObjectSourceClause(scope, true, historyEnable, realtimeEnable, true), + buildTsFileLocalSinkClause())); + } + + waitForExportedTsFilesWithMods(new File(targetDir), 120_000); + sender.executeNonQueryStatement("DROP PIPE IF EXISTS " + pipeName); + sender.executeNonQueryStatement("DROP TABLE IF EXISTS t1"); + } + + try (ITableSession receiver = receiverEnv.getTableSessionConnection()) { + createReceiverDatabaseForLoad(receiver); + loadExportedTsFiles(receiver, new File(targetDir)); + TestUtils.executeNonQueryWithRetry(receiverEnv, "flush"); + Assert.assertEquals(expectedRowsAfterLoad, queryTableRowCount(receiver, "t1")); + assertNoRowsExistInDeletedTimeRange(receiver, SENDER_DELETE_LO, SENDER_DELETE_HI); + assertObjectColumnMatchesExpectedPayload( + receiver, buildExpectedTimestampsAfterSenderDelete(1L, SOURCE_SINK_INSERT_ROWS)); + } + } + + private void verifyHistoricalObjectExportWithoutDeletes( + final String pipeName, + final TableSourceScope scope, + final boolean historyEnable, + final boolean realtimeEnable) + throws Exception { + try (ITableSession sender = senderEnv.getTableSessionConnection()) { + createSenderObjectTable(sender); + + if (realtimeEnable && !historyEnable) { + sender.executeNonQueryStatement( + String.format( + "CREATE PIPE %s WITH SOURCE (%s) WITH SINK (%s)", + pipeName, + buildObjectSourceClause(scope, false, historyEnable, realtimeEnable, true), + buildTsFileLocalSinkClause())); + Thread.sleep(2000); + } + + insertObjectRowsWithContiguousTimestamps(sender, "t1", 1L, SOURCE_SINK_INSERT_ROWS); + TestUtils.executeNonQueryWithRetry(senderEnv, "flush"); + + if (!realtimeEnable || historyEnable) { + sender.executeNonQueryStatement( + String.format( + "CREATE PIPE %s WITH SOURCE (%s) WITH SINK (%s)", + pipeName, + buildObjectSourceClause(scope, false, historyEnable, realtimeEnable, true), + buildTsFileLocalSinkClause())); + } + + waitForExportedTsFile(new File(targetDir), 120_000); + sender.executeNonQueryStatement("DROP PIPE IF EXISTS " + pipeName); + sender.executeNonQueryStatement("DROP TABLE IF EXISTS t1"); + } + + try (ITableSession receiver = receiverEnv.getTableSessionConnection()) { + createReceiverDatabaseForLoad(receiver); + loadExportedTsFiles(receiver, new File(targetDir)); + TestUtils.executeNonQueryWithRetry(receiverEnv, "flush"); + Assert.assertEquals(SOURCE_SINK_INSERT_ROWS, queryTableRowCount(receiver, "t1")); + assertObjectColumnMatchesExpectedPayload( + receiver, buildContiguousTimestamps(1L, SOURCE_SINK_INSERT_ROWS)); + } + } + + private static List<Long> buildContiguousTimestamps( + final long startInclusive, final int rowCount) { + final List<Long> out = new ArrayList<>(rowCount); + for (int i = 0; i < rowCount; i++) { + out.add(startInclusive + i); + } + return out; + } + + private static List<Long> buildExpectedTimestampsAfterSenderDelete( + final long firstTimeInclusive, final int rowCount) { + final List<Long> out = new ArrayList<>(); + final long end = firstTimeInclusive + rowCount - 1; + for (long t = firstTimeInclusive; t <= end; t++) { + if (t < SENDER_DELETE_LO || t > SENDER_DELETE_HI) { + out.add(t); + } + } + return out; + } + + private static void assertNoRowsExistInDeletedTimeRange( + final ITableSession session, final long loInclusive, final long hiInclusive) + throws Exception { + useDatabase(session, TEST_DATABASE); + final String sql = + String.format( + "SELECT COUNT(*) FROM t1 WHERE time >= %d AND time <= %d", loInclusive, hiInclusive); + try (SessionDataSet ds = session.executeQueryStatement(sql)) { + final SessionDataSet.DataIterator it = ds.iterator(); + Assert.assertTrue(it.next()); + Assert.assertEquals( + "Rows in sender-deleted time window must not exist on receiver after LOAD", + 0L, + it.getLong(1)); + } + } + + private static void assertObjectColumnMatchesExpectedPayload( + final ITableSession session, final List<Long> expectedTimesAsc) throws Exception { + useDatabase(session, TEST_DATABASE); + try (SessionDataSet ds = + session.executeQueryStatement("SELECT time, READ_OBJECT(file) FROM t1 ORDER BY time ASC")) { + final SessionDataSet.DataIterator it = ds.iterator(); + int idx = 0; + while (it.next()) { + Assert.assertTrue("More rows than expected", idx < expectedTimesAsc.size()); + final long expectedTs = expectedTimesAsc.get(idx); + Assert.assertEquals("time column", expectedTs, it.getLong(1)); + final byte[] actual = it.getBlob(2).getValues(); + Assert.assertArrayEquals( + "OBJECT column (file) mismatch at time " + expectedTs, + buildExpectedObjectPayload(expectedTs), + actual); + idx++; + } + Assert.assertEquals(expectedTimesAsc.size(), idx); + } + } + + private void createSenderObjectTable(final ITableSession sender) throws Exception { + sender.executeNonQueryStatement( + String.format("CREATE DATABASE IF NOT EXISTS %s", TEST_DATABASE)); + useDatabase(sender, TEST_DATABASE); + sender.executeNonQueryStatement( + "CREATE TABLE IF NOT EXISTS t1 (id STRING TAG, file OBJECT FIELD)"); + } + + private void createReceiverDatabaseForLoad(final ITableSession receiver) throws Exception { + receiver.executeNonQueryStatement( + String.format("CREATE DATABASE IF NOT EXISTS %s", TEST_DATABASE)); + useDatabase(receiver, TEST_DATABASE); + } + + private static void useDatabase(final ITableSession session, final String database) + throws Exception { + session.executeNonQueryStatement(String.format("USE %s", database)); + } + + private String buildObjectSourceClause( + final TableSourceScope scope, + final boolean modsEnable, + final boolean historyEnable, + final boolean realtimeEnable, + final boolean captureTable) { + final StringBuilder sb = new StringBuilder(); + sb.append("'source.capture.table'='").append(captureTable).append("'"); + switch (scope) { + case DATABASE_AND_TABLE: + sb.append(", 'source.database-name'='") + .append(TEST_DATABASE) + .append("', 'source.table-name'='t1'"); + break; + case DATABASE_ONLY: + sb.append(", 'source.database-name'='").append(TEST_DATABASE).append("'"); + break; + case ALL: + default: + break; + } + sb.append(", 'source.inclusion'='data.insert"); + if (modsEnable) { + sb.append(",data.delete"); + } + sb.append("'"); + if (modsEnable) { + sb.append(", 'source.mods.enable'='true'"); + } + sb.append(", 'source.history.enable'='").append(historyEnable).append("'"); + sb.append(", 'source.realtime.enable'='").append(realtimeEnable).append("'"); + return sb.toString(); + } + + private String buildTsFileLocalSinkClause() { + return String.format( + "'sink'='tsfile-local-sink', 'sink.local.target-path'='%s', " + + "'sink.batch.max-delay-seconds'='1', 'sink.batch.size-bytes'='10485760'", + targetDir); + } + + private static List<Long> buildExpectedTimestamps( + final long startTime, final long endTime, final long interval) { + final List<Long> times = new ArrayList<>(); + for (long t = startTime; t <= endTime; t += interval) { + times.add(t); + } + return times; + } + + private static void waitForExportedTsFile(final File root, final long timeoutMs) + throws Exception { + waitForStableExportedTsFiles( + root, timeoutMs, "Timeout waiting for exported .tsfile under " + root.getAbsolutePath()); + } + + private static void assertGeneratedObjectPayloadsByDevice( + final ITableSession session, + final String tableName, + final String deviceId, + final List<Long> expectedTimes) + throws Exception { + useDatabase(session, TEST_DATABASE); + final String query = + String.format( + "SELECT time, READ_OBJECT(sensor_obj) FROM %s WHERE id='%s' ORDER BY time ASC", + tableName, deviceId); + + try (SessionDataSet dataSet = session.executeQueryStatement(query)) { + final SessionDataSet.DataIterator iterator = dataSet.iterator(); + int count = 0; + while (iterator.next()) { + Assert.assertTrue("More rows than expected for " + deviceId, count < expectedTimes.size()); + final long actualTime = iterator.getLong(1); + final Binary binary = iterator.getBlob(2); + final byte[] actualBytes = binary.getValues(); + final long expectedTime = expectedTimes.get(count); + Assert.assertEquals("Time mismatch at index " + count, expectedTime, actualTime); + final byte[] expectedBytes = + String.format("AutoGenerated|Table=%s|ID=%s|Time=%d", tableName, deviceId, expectedTime) + .getBytes(StandardCharsets.UTF_8); + Assert.assertArrayEquals( + "Object byte content mismatch at time " + actualTime, expectedBytes, actualBytes); + count++; + } + Assert.assertEquals("Total row count mismatch for " + deviceId, expectedTimes.size(), count); + } + } + + private void insertObjectRowsWithContiguousTimestamps( + final ITableSession session, final String tableName, final long startTs, final int rowCount) + throws Exception { + final long endTs = startTs + rowCount - 1; + insertObjectRowsInTimeRange(session, tableName, startTs, endTs); + } + + private void insertObjectRowsInTimeRange( + final ITableSession session, final String tableName, final long startTs, final long endTs) + throws Exception { + List<String> columnNames = Arrays.asList("id", "file"); + List<TSDataType> dataTypes = Arrays.asList(TSDataType.STRING, TSDataType.OBJECT); + List<ColumnCategory> columnCategories = Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD); + + Tablet tablet = new Tablet(tableName, columnNames, dataTypes, columnCategories, 100); + + for (long ts = startTs; ts <= endTs; ts++) { + int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, ts); + tablet.addValue(rowIndex, 0, "device1"); + + byte[] dynamicObjectBytes = buildExpectedObjectPayload(ts); + tablet.addValue(rowIndex, 1, true, 0, dynamicObjectBytes); + + if (tablet.getRowSize() == tablet.getMaxRowNumber()) { + session.insert(tablet); + tablet.reset(); + } + } + + if (tablet.getRowSize() > 0) { + session.insert(tablet); + } + } + + private static void waitForExportedTsFilesWithMods(final File root, final long timeoutMs) + throws Exception { + waitForStableExportedTsFiles( + root, timeoutMs, "Timeout waiting for exported .tsfile under " + root.getAbsolutePath()); + } + + private static void waitForStableExportedTsFiles( + final File root, final long timeoutMs, final String timeoutMessage) throws Exception { + final long deadline = System.currentTimeMillis() + timeoutMs; + ExportedTsFileSnapshot previousReadySnapshot = null; + while (System.currentTimeMillis() < deadline) { + final ExportedTsFileSnapshot currentSnapshot = ExportedTsFileSnapshot.capture(root); + if (currentSnapshot.isReady()) { + if (currentSnapshot.equals(previousReadySnapshot)) { + return; + } + previousReadySnapshot = currentSnapshot; + } else { + previousReadySnapshot = null; + } + Thread.sleep(1000); + } + Assert.fail(timeoutMessage); + } + + private static final class ExportedTsFileSnapshot { + + private final List<String> fingerprints; + private final boolean ready; + + private ExportedTsFileSnapshot(final List<String> fingerprints, final boolean ready) { + this.fingerprints = fingerprints; + this.ready = ready; + } + + private static ExportedTsFileSnapshot capture(final File root) { + final List<File> tsfiles = new ArrayList<>(); + collectTsFilesRecursively(root, tsfiles); + if (tsfiles.isEmpty()) { + return new ExportedTsFileSnapshot(new ArrayList<>(), false); + } + + tsfiles.sort(Comparator.comparing(File::getAbsolutePath)); + final List<String> fingerprints = new ArrayList<>(); + for (final File tsFile : tsfiles) { + fingerprints.add(buildFileFingerprint(tsFile)); + } + return new ExportedTsFileSnapshot(fingerprints, true); + } + + private boolean isReady() { + return ready; + } + + private static String buildFileFingerprint(final File file) { + return file.getAbsolutePath() + ":" + file.length() + ":" + file.lastModified(); + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof ExportedTsFileSnapshot)) { + return false; + } + final ExportedTsFileSnapshot that = (ExportedTsFileSnapshot) obj; + return ready == that.ready && fingerprints.equals(that.fingerprints); + } + + @Override + public int hashCode() { + int result = fingerprints.hashCode(); + result = 31 * result + Boolean.hashCode(ready); + return result; + } + } + + private static void loadExportedTsFiles(final ITableSession session, final File root) + throws Exception { + useDatabase(session, TEST_DATABASE); + final List<File> tsfiles = new ArrayList<>(); + collectTsFilesRecursively(root, tsfiles); + Assert.assertFalse(tsfiles.isEmpty()); + tsfiles.sort(Comparator.comparing(File::getAbsolutePath)); + for (File f : tsfiles) { + session.executeNonQueryStatement( + String.format( + "LOAD '%s' WITH ('database-name'='%s')", f.getAbsolutePath(), TEST_DATABASE)); + } + } + + private static long queryTableRowCount(final ITableSession session, final String table) + throws Exception { + useDatabase(session, TEST_DATABASE); + try (SessionDataSet ds = session.executeQueryStatement("SELECT COUNT(*) FROM " + table)) { + final SessionDataSet.DataIterator it = ds.iterator(); + Assert.assertTrue(it.next()); + return it.getLong(1); + } + } + + private static void collectTsFilesRecursively(File dir, List<File> tsfiles) { + File[] files = dir.listFiles(); + if (files == null) { + return; + } + for (File f : files) { + if (f.isDirectory()) { + collectTsFilesRecursively(f, tsfiles); + } else if (f.getName().endsWith(".tsfile")) { + tsfiles.add(f); + } + } + } + + private static void deleteRecursivelyQuietly(Path dirPath) { + if (!Files.exists(dirPath)) { + return; + } + try { + Files.walk(dirPath) + .sorted(Comparator.reverseOrder()) + .forEach( + path -> { + try { + Files.deleteIfExists(path); + } catch (IOException e) { + LOGGER.warn("Failed to delete path {}", path, e); + } + }); + } catch (IOException e) { + LOGGER.warn("Failed to cleanup temp directory {}", dirPath, e); + } + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileSinkObjectIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileSinkObjectIT.java index 0fa98a8a9c4..471f242f75c 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileSinkObjectIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileSinkObjectIT.java @@ -22,6 +22,7 @@ package org.apache.iotdb.pipe.it.dual.tablemodel.manual.basic; import org.apache.iotdb.db.it.utils.StandardObjectTableModelTsFileGenerator; import org.apache.iotdb.db.it.utils.TestUtils; import org.apache.iotdb.db.utils.ObjectTypeUtils; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.isession.ITableSession; import org.apache.iotdb.isession.SessionDataSet; import org.apache.iotdb.it.framework.IoTDBTestRunner; @@ -66,6 +67,7 @@ public class IoTDBPipeTsFileSinkObjectIT extends AbstractPipeTableModelDualManua private static final String OBJECT_TABLE_NAME = "factory_metrics"; private static final int OBJECT_MULTI_WEEK_DEVICE_COUNT = 5; + private static final long HOUR_MS = 3600 * 1000L; private static final long DAY_MS = 24L * HOUR_MS; private static final long WEEK_MS = 7L * DAY_MS; @@ -118,21 +120,21 @@ public class IoTDBPipeTsFileSinkObjectIT extends AbstractPipeTableModelDualManua session.executeNonQueryStatement( "CREATE TABLE IF NOT EXISTS t1 (id STRING TAG, file OBJECT FIELD)"); - insertObjectData(session, "t1", 1, 250); + insertObjectData(session, "t1", 1, 900); TestUtils.executeNonQueryWithRetry(senderEnv, "flush"); session.executeNonQueryStatement( String.format( "CREATE PIPE p1 " - + "WITH EXTRACTOR (" - + "'extractor.capture.table'='true', " - + "'extractor.database-name'='db1', " - + "'extractor.table-name'='t1', " - + "'extractor.inclusion'='data.insert', " - + "'extractor.history.enable'='true', " - + "'extractor.realtime.enable'='true' " + + "WITH SOURCE (" + + "'source.capture.table'='true', " + + "'source.database-name'='db1', " + + "'source.table-name'='t1', " + + "'source.inclusion'='data.insert', " + + "'source.history.enable'='true', " + + "'source.realtime.enable'='true' " + ") " - + "WITH CONNECTOR (" + + "WITH SINK (" + "'sink'='tsfile-local-sink', " + "'sink.local.target-path'='%s', " + "'sink.batch.max-delay-seconds'='1', " @@ -140,12 +142,12 @@ public class IoTDBPipeTsFileSinkObjectIT extends AbstractPipeTableModelDualManua + ")", targetDir)); - waitForAndVerifyExportedObjects(250, 1, 250, 0, 251, 500); + waitForAndVerifyExportedObjects(900, 1, 900, 0, 901, 2400); - insertObjectData(session, "t1", 251, 500); + insertObjectData(session, "t1", 901, 2400); TestUtils.executeNonQueryWithRetry(senderEnv, "flush"); - waitForAndVerifyExportedObjects(250, 1, 250, 250, 251, 500); + waitForAndVerifyExportedObjects(900, 1, 900, 1500, 901, 2400); session.executeNonQueryStatement("DROP PIPE p1"); } @@ -191,7 +193,7 @@ public class IoTDBPipeTsFileSinkObjectIT extends AbstractPipeTableModelDualManua + "'source.history.enable'='true', " + "'source.realtime.enable'='true' " + ") " - + "WITH CONNECTOR (" + + "WITH SINK (" + "'sink'='tsfile-local-sink', " + "'sink.local.target-path'='%s', " + "'sink.batch.max-delay-seconds'='1', " @@ -396,6 +398,55 @@ public class IoTDBPipeTsFileSinkObjectIT extends AbstractPipeTableModelDualManua "Realtime Object count mismatch after Pipe sync", expectedRealtimeCount, realtimeFound); } + private static void waitForExportedTsFilesWithMods(final File root, final long timeoutMs) + throws Exception { + final long deadline = System.currentTimeMillis() + timeoutMs; + while (System.currentTimeMillis() < deadline) { + final List<File> tsfiles = new ArrayList<>(); + findTsFiles(root, tsfiles); + if (!tsfiles.isEmpty()) { + boolean allMods = true; + for (File tf : tsfiles) { + final File mod = new File(tf.getParent(), tf.getName() + ModificationFile.FILE_SUFFIX); + if (!mod.isFile()) { + allMods = false; + break; + } + } + if (allMods) { + Thread.sleep(2000); + return; + } + } + Thread.sleep(800); + } + Assert.fail( + "Timeout waiting for .tsfile and companion " + + ModificationFile.FILE_SUFFIX + + " under " + + root.getAbsolutePath()); + } + + private static void loadAllTsFilesUnderDir(final ITableSession session, final File root) + throws Exception { + final List<File> tsfiles = new ArrayList<>(); + findTsFiles(root, tsfiles); + Assert.assertFalse(tsfiles.isEmpty()); + tsfiles.sort(Comparator.comparing(File::getAbsolutePath)); + for (File f : tsfiles) { + session.executeNonQueryStatement(String.format("LOAD '%s'", f.getAbsolutePath())); + } + } + + private static long queryRowCount(final ITableSession session, final String table) + throws Exception { + try (SessionDataSet ds = session.executeQueryStatement("SELECT COUNT(*) FROM " + table)) { + final SessionDataSet.DataIterator it = ds.iterator(); + Assert.assertTrue(it.next()); + return it.getLong(1); + } + } + private static void findTsFiles(File dir, List<File> tsfiles) { File[] files = dir.listFiles(); if (files == null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 803269377a3..260b3716d3e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -48,6 +48,7 @@ import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager; import org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexAndFlushManager; import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; @@ -368,22 +369,44 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent @Override public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) { extractTime = System.nanoTime(); + final List<ModEntry> linkedObjectColumnModEntries = new ArrayList<>(); try { if (Objects.nonNull(pipeName)) { - final Iterator<String> pathIterator = objectPathIterator(); + // In the real tsfile transfer path, the tsfile event is expected to observe a sealed + // resource here. The UNCLOSED case is mainly introduced by unit tests that manually mock + // an unsealed resource to verify realtime behavior before close. + final boolean shouldLinkObjectFiles = + !Objects.equals(hasObjectData, Boolean.FALSE) && resource.isClosed(); + final Iterator<String> pathIterator = + shouldLinkObjectFiles + ? new TsFileObjectPathIterator(this, linkedObjectColumnModEntries) + : Collections.emptyIterator(); final int linked = PipeDataNodeResourceManager.object().linkObjectFiles(resource, pathIterator, pipeName); - hasObjectData = linked > 0; if (linked > 0) { + hasObjectData = true; PipeDataNodeResourceManager.object().increaseReference(resource, pipeName); + } else if (shouldLinkObjectFiles) { + hasObjectData = false; } - PipeDataNodeResourceManager.object().setTsFileClosed(resource, pipeName); + if (resource.isClosed()) { + PipeDataNodeResourceManager.object().setTsFileClosed(resource, pipeName); + } } tsFile = PipeDataNodeResourceManager.tsfile().increaseFileReference(tsFile, true, pipeName); if (isWithMod) { - modFile = - PipeDataNodeResourceManager.tsfile().increaseFileReference(modFile, false, pipeName); + if (modFile != null) { + modFile = + PipeDataNodeResourceManager.tsfile().increaseFileReference(modFile, false, pipeName); + } + } else { + if (pipeName != null && !linkedObjectColumnModEntries.isEmpty()) { + modFile = + PipeDataNodeResourceManager.tsfile() + .writeModEntriesForPipeTsFile(linkedObjectColumnModEntries, tsFile, pipeName); + isWithMod = true; + } } return true; } catch (final Exception e) { @@ -408,7 +431,7 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent PipeDataNodeResourceManager.object().decreaseReference(resource, pipeName); } PipeDataNodeResourceManager.tsfile().decreaseFileReference(tsFile, pipeName); - if (isWithMod) { + if (isWithMod && modFile != null) { PipeDataNodeResourceManager.tsfile().decreaseFileReference(modFile, pipeName); } close(); @@ -869,7 +892,11 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent */ private TsFileInsertionEventParser initEventParser(final boolean objectPathsOnly) { try { - eventParser.compareAndSet(null, createParserProvider().provide(isWithMod, objectPathsOnly)); + final boolean collectObjectColumnModEntries = objectPathsOnly && !isWithMod; + eventParser.compareAndSet( + null, + createParserProvider() + .provide(isWithMod, objectPathsOnly, collectObjectColumnModEntries)); return eventParser.get(); } catch (final Exception e) { close(); @@ -924,13 +951,33 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent /** Release the resource of {@link TsFileInsertionEventParser}. */ @Override public void close() { - eventParser.getAndUpdate( - parser -> { - if (Objects.nonNull(parser)) { - parser.close(); - } - return null; - }); + closeParser(); + } + + public void closeParser() { + closeParserInternal(null); + } + + public List<ModEntry> drainGeneratedObjectColumnModEntriesAndCloseParser() { + final List<ModEntry> modEntries = new ArrayList<>(); + closeParserInternal(modEntries); + return modEntries; + } + + public void drainGeneratedObjectColumnModEntriesAndCloseParserTo( + final List<ModEntry> modEntries) { + closeParserInternal(modEntries); + } + + private void closeParserInternal(final List<ModEntry> modEntries) { + final TsFileInsertionEventParser parser = eventParser.getAndSet(null); + if (Objects.isNull(parser)) { + return; + } + if (modEntries != null) { + parser.drainGeneratedObjectColumnModEntriesTo(modEntries); + } + parser.close(); } /////////////////////////// Object /////////////////////////// @@ -975,7 +1022,7 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent this.eventParser); } - private static class PipeTsFileInsertionEventResource extends PipeEventResource { + private class PipeTsFileInsertionEventResource extends PipeEventResource { private final File tsFile; private final boolean isWithMod; @@ -1015,17 +1062,11 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent PipeDataNodeResourceManager.object().decreaseReference(resource, pipeName); } PipeDataNodeResourceManager.tsfile().decreaseFileReference(tsFile, pipeName); - if (isWithMod) { + if (isWithMod && modFile != null) { PipeDataNodeResourceManager.tsfile().decreaseFileReference(modFile, pipeName); } - eventParser.getAndUpdate( - parser -> { - if (Objects.nonNull(parser)) { - parser.close(); - } - return null; - }); + PipeTsFileInsertionEvent.this.close(); } catch (final Exception e) { LOGGER.warn("Decrease reference count for TsFile {} error.", tsFile.getPath(), e); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileObjectPathIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileObjectPathIterator.java index 139641c0a04..c7a78243149 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileObjectPathIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileObjectPathIterator.java @@ -20,15 +20,18 @@ package org.apache.iotdb.db.pipe.event.common.tsfile; import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; @@ -38,6 +41,7 @@ public final class TsFileObjectPathIterator implements Iterator<String> { private final Iterator<TabletInsertionEvent> tabletEventIterator; private final PipeTsFileInsertionEvent tsfileEvent; + @Nullable private final List<ModEntry> modEntriesSink; private Iterator<String> currentPathIterator; @@ -45,11 +49,18 @@ public final class TsFileObjectPathIterator implements Iterator<String> { private String nextPath = null; private boolean isClosed = false; - public TsFileObjectPathIterator(@Nonnull PipeTsFileInsertionEvent tsFileEvent) { + public TsFileObjectPathIterator(@Nonnull final PipeTsFileInsertionEvent tsFileEvent) { + this(tsFileEvent, null); + } + + TsFileObjectPathIterator( + @Nonnull final PipeTsFileInsertionEvent tsFileEvent, + @Nullable final List<ModEntry> modEntriesSink) { Objects.requireNonNull(tsFileEvent, "tsFileEvent cannot be null"); this.tabletEventIterator = tsFileEvent.toTabletInsertionEvents(true).iterator(); this.tsfileEvent = tsFileEvent; + this.modEntriesSink = modEntriesSink; this.currentPathIterator = Collections.emptyIterator(); } @@ -139,7 +150,11 @@ public final class TsFileObjectPathIterator implements Iterator<String> { hasCachedNext = false; currentPathIterator = Collections.emptyIterator(); if (tsfileEvent != null) { - tsfileEvent.close(); + if (modEntriesSink != null) { + tsfileEvent.drainGeneratedObjectColumnModEntriesAndCloseParserTo(modEntriesSink); + } else { + tsfileEvent.closeParser(); + } } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java index f64e3d9dd21..117513e726c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java @@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.util.List; public abstract class TsFileInsertionEventParser implements AutoCloseable { @@ -155,6 +156,8 @@ public abstract class TsFileInsertionEventParser implements AutoCloseable { */ public abstract Iterable<TabletInsertionEvent> toTabletInsertionEvents(); + public void drainGeneratedObjectColumnModEntriesTo(final List<ModEntry> modEntries) {} + /** * Record parse start time when hasNext() is called for the first time and returns true. Should be * called in Iterator.hasNext() when it's the first call. diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java index 8a09c744932..83b305f0d29 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java @@ -87,7 +87,7 @@ public class TsFileInsertionEventParserProvider { */ public TsFileInsertionEventParser provide(final boolean isWithMod) throws IOException, IllegalPathException { - return this.provide(isWithMod, false); + return this.provide(isWithMod, false, false); } /** @@ -101,6 +101,14 @@ public class TsFileInsertionEventParserProvider { */ public TsFileInsertionEventParser provide(final boolean isWithMod, final boolean objectPathsOnly) throws IOException, IllegalPathException { + return provide(isWithMod, objectPathsOnly, false); + } + + public TsFileInsertionEventParser provide( + final boolean isWithMod, + final boolean objectPathsOnly, + final boolean collectObjectColumnModEntries) + throws IOException, IllegalPathException { if (pipeName != null) { PipeTsFileToTabletsMetrics.getInstance() .markTsFileToTabletInvocation(pipeName + "_" + creationTime); @@ -118,7 +126,8 @@ public class TsFileInsertionEventParserProvider { entity, sourceEvent, isWithMod, - objectPathsOnly); + objectPathsOnly, + collectObjectColumnModEntries); } // Use scan container to save memory diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java index 362af4d3fbe..cd21ab297f2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java @@ -33,6 +33,9 @@ import org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUti import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; +import org.apache.iotdb.db.utils.ModificationUtils; import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeException; @@ -42,12 +45,25 @@ import org.apache.tsfile.write.record.Tablet; import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.Set; public class TsFileInsertionEventTableParser extends TsFileInsertionEventParser { + private final boolean collectObjectColumnModEntries; + + private final List<ModEntry> originalModEntries; + + private final Map<String, Set<String>> tableObjectMeasurements = new HashMap<>(); + private final long startTime; private final long endTime; private final TablePattern tablePattern; @@ -69,7 +85,8 @@ public class TsFileInsertionEventTableParser extends TsFileInsertionEventParser final IAuditEntity entity, final PipeInsertionEvent sourceEvent, final boolean isWithMod, - final boolean objectPathsOnly) + final boolean objectPathsOnly, + final boolean collectObjectColumnModEntries) throws IOException { super( tsFile, @@ -87,12 +104,22 @@ public class TsFileInsertionEventTableParser extends TsFileInsertionEventParser objectPathsOnly, isWithMod); + this.collectObjectColumnModEntries = collectObjectColumnModEntries; this.isWithMod = isWithMod; try { - currentModifications = - isWithMod - ? ModsOperationUtil.loadModificationsFromTsFile(tsFile) - : PatternTreeMapFactory.getModsPatternTreeMap(); + final boolean loadModificationsFromTsFile = + this.isWithMod || this.collectObjectColumnModEntries; + if (this.collectObjectColumnModEntries) { + originalModEntries = ModsOperationUtil.readAllModificationsFromTsFile(tsFile); + currentModifications = + ModsOperationUtil.buildModificationsPatternTreeMap(originalModEntries); + } else { + originalModEntries = Collections.emptyList(); + currentModifications = + loadModificationsFromTsFile + ? ModsOperationUtil.loadModificationsFromTsFile(tsFile) + : PatternTreeMapFactory.getModsPatternTreeMap(); + } allocatedMemoryBlockForModifications = PipeDataNodeResourceManager.memory() .forceAllocateForTabletWithRetry(currentModifications.ramBytesUsed()); @@ -150,7 +177,60 @@ public class TsFileInsertionEventTableParser extends TsFileInsertionEventParser entity, sourceEvent, isWithMod, - objectPathsOnly); + objectPathsOnly, + false); + } + + private void recordTableObjectMeasurements( + final String tableName, final List<String> objectMeasurements) { + if (tableName == null + || tableName.isEmpty() + || objectMeasurements == null + || objectMeasurements.isEmpty()) { + return; + } + tableObjectMeasurements + .computeIfAbsent(tableName, ignored -> new LinkedHashSet<>()) + .addAll(objectMeasurements); + } + + private List<ModEntry> generateObjectColumnModEntries(List<ModEntry> generatedEntries) { + if (!collectObjectColumnModEntries + || originalModEntries.isEmpty() + || tableObjectMeasurements.isEmpty()) { + return Collections.emptyList(); + } + + for (final ModEntry modEntry : originalModEntries) { + if (!(modEntry instanceof TableDeletionEntry)) { + continue; + } + + final TableDeletionEntry tableDeletionEntry = (TableDeletionEntry) modEntry; + final Set<String> objectMeasurements = + tableObjectMeasurements.get(tableDeletionEntry.getTableName()); + if (objectMeasurements == null || objectMeasurements.isEmpty()) { + continue; + } + + ModEntry entry = + ModsOperationUtil.buildObjectColumnDeletionEntries( + tableDeletionEntry, objectMeasurements); + if (entry != null) { + generatedEntries.add(entry); + } + } + + return generatedEntries.isEmpty() + ? Collections.emptyList() + : ModificationUtils.sortAndMerge(generatedEntries); + } + + @Override + public void drainGeneratedObjectColumnModEntriesTo(final List<ModEntry> modEntries) { + if (modEntries != null) { + modEntries.addAll(generateObjectColumnModEntries(new ArrayList<>())); + } } @Override @@ -181,7 +261,12 @@ public class TsFileInsertionEventTableParser extends TsFileInsertionEventParser currentModifications, startTime, endTime, - objectPathsOnly); + objectPathsOnly, + collectObjectColumnModEntries, + collectObjectColumnModEntries && objectPathsOnly + ? TsFileInsertionEventTableParser.this + ::recordTableObjectMeasurements + : null); } final boolean hasNext = tabletIterator.hasNext(); if (hasNext && !parseStartTimeRecorded) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java index c458f85851c..97c4ca74414 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java @@ -59,6 +59,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.BiConsumer; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -82,6 +83,10 @@ public class TsFileInsertionEventTableParserTabletIterator implements Iterator<T private final boolean objectPathsOnly; + private final boolean collectObjectColumnModEntries; + + private final BiConsumer<String, List<String>> tableObjectMeasurementsSink; + // mods entry private final PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> modifications; @@ -104,6 +109,7 @@ public class TsFileInsertionEventTableParserTabletIterator implements Iterator<T private List<ColumnCategory> columnTypes; private List<String> measurementList; private List<TSDataType> dataTypeList; + private List<String> objectMeasurementList; private int deviceIdSize; private List<ModsOperationUtil.ModsInfo> modsInfoList; @@ -124,12 +130,16 @@ public class TsFileInsertionEventTableParserTabletIterator implements Iterator<T final PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> modifications, final long startTime, final long endTime, - final boolean objectPathsOnly) + final boolean objectPathsOnly, + final boolean collectObjectColumnModEntries, + final BiConsumer<String, List<String>> tableObjectMeasurementsSink) throws IOException { this.startTime = startTime; this.endTime = endTime; this.modifications = modifications; + this.collectObjectColumnModEntries = collectObjectColumnModEntries; + this.tableObjectMeasurementsSink = tableObjectMeasurementsSink; this.reader = tsFileSequenceReader; this.metadataQuerier = new MetadataQuerierByFileImpl(reader); @@ -278,6 +288,7 @@ public class TsFileInsertionEventTableParserTabletIterator implements Iterator<T dataTypeList = new ArrayList<>(); columnTypes = new ArrayList<>(); measurementList = new ArrayList<>(); + objectMeasurementList = new ArrayList<>(); for (int i = 0; i < columnSchemaSize; i++) { final IMeasurementSchema schema = tableSchema.getColumnSchemas().get(i); @@ -290,9 +301,14 @@ public class TsFileInsertionEventTableParserTabletIterator implements Iterator<T columnTypes.add(ColumnCategory.TAG); measurementList.add(measurementName); dataTypeList.add(schema.getType()); + } else if (schema.getType() == TSDataType.OBJECT) { + objectMeasurementList.add(measurementName); } } } + if (collectObjectColumnModEntries && tableObjectMeasurementsSink != null) { + tableObjectMeasurementsSink.accept(tableName, objectMeasurementList); + } deviceIdSize = dataTypeList.size(); state = State.INIT_CHUNK_METADATA; break; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtil.java index 66fed43fead..b8ac32db1bc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtil.java @@ -20,19 +20,23 @@ package org.apache.iotdb.db.pipe.event.common.tsfile.parser.util; import org.apache.iotdb.commons.path.PatternTreeMap; +import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; +import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; import org.apache.iotdb.db.utils.ModificationUtils; import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.read.common.TimeRange; import java.io.File; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; /** @@ -53,20 +57,58 @@ public class ModsOperationUtil { */ public static PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> loadModificationsFromTsFile(File tsFile) { - PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> modifications = - PatternTreeMapFactory.getModsPatternTreeMap(); + return buildModificationsPatternTreeMap(readAllModificationsFromTsFile(tsFile)); + } + public static List<ModEntry> readAllModificationsFromTsFile(final File tsFile) { try { - ModificationFile.readAllModifications(tsFile, true) - .forEach( - modification -> modifications.append(modification.keyOfPatternTree(), modification)); + return ModificationFile.readAllModifications(tsFile, true); } catch (Exception e) { throw new PipeException("Failed to load modifications from TsFile: " + tsFile.getPath(), e); } + } + public static PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> + buildModificationsPatternTreeMap(final List<ModEntry> modEntries) { + final PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> modifications = + PatternTreeMapFactory.getModsPatternTreeMap(); + for (final ModEntry modEntry : modEntries) { + modifications.append(modEntry.keyOfPatternTree(), modEntry); + } return modifications; } + public static ModEntry buildObjectColumnDeletionEntries( + final TableDeletionEntry tableDeletionEntry, final Set<String> objectMeasurementNames) { + if (tableDeletionEntry == null + || objectMeasurementNames == null + || objectMeasurementNames.isEmpty()) { + return null; + } + + final DeletionPredicate predicate = tableDeletionEntry.getPredicate(); + final List<String> targetMeasurements; + if (predicate.getMeasurementNames().isEmpty()) { + targetMeasurements = new ArrayList<>(objectMeasurementNames); + } else { + targetMeasurements = + predicate.getMeasurementNames().stream() + .filter(objectMeasurementNames::contains) + .collect(Collectors.toList()); + } + + if (targetMeasurements.isEmpty()) { + return null; + } + + return new TableDeletionEntry( + new DeletionPredicate( + predicate.getTableName(), predicate.getIdPredicate(), targetMeasurements), + new TimeRange( + tableDeletionEntry.getTimeRange().getMin(), + tableDeletionEntry.getTimeRange().getMax())); + } + /** * Check if data in the specified time range is completely deleted by mods Different logic for * tree model and table model diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java index c84504fa52b..644beb1ac40 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java @@ -23,6 +23,8 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.tsfile.enums.TSDataType; @@ -35,6 +37,7 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -134,6 +137,50 @@ public class PipeTsFileResourceManager { return resultFile; } + /** + * Resolves the pipe-directory TsFile path for {@code tsFile}, derives the companion mods file + * ({@code tsFileName + ModificationFileV1.FILE_SUFFIX}, i.e. {@code .mods}), creates parent + * directories if needed, writes {@code entries} with {@link ModificationFile}, and returns that + * mods file. + * + * @param entries modification entries to append (may be empty) + * @param tsFile original or pipe-hardlinked TsFile; used only to compute the mods path under pipe + * dir + * @param pipeName pipe name for {@link #getHardlinkOrCopiedFileInPipeDir(File, String)} + * @return the written mods file under the pipe TsFile directory + * @throws IOException if the directory cannot be created or write fails + */ + public File writeModEntriesForPipeTsFile( + final List<ModEntry> entries, final File tsFile, final @Nullable String pipeName) + throws IOException { + final List<ModEntry> toWrite = entries != null ? entries : Collections.emptyList(); + final File pipeTsFile = getHardlinkOrCopiedFileInPipeDir(tsFile, pipeName); + final File parent = pipeTsFile.getParentFile(); + if (parent != null && !parent.exists() && !parent.mkdirs()) { + throw new IOException("Failed to create directory: " + parent.getPath()); + } + final File modsFile; + + try (ModificationFile modificationFile = + new ModificationFile(ModificationFile.getExclusiveMods(pipeTsFile), false)) { + modificationFile.write(toWrite); + modsFile = modificationFile.getFile(); + } + + segmentLock.lock(modsFile); + try { + if (Objects.nonNull(pipeName)) { + hardlinkOrCopiedFileToPipeTsFileResourceMap + .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>()) + .put(modsFile.getPath(), new PipeTsFileResource(modsFile)); + } + } finally { + segmentLock.unlock(modsFile); + } + increasePublicReference(modsFile, pipeName, false); + return modsFile; + } + private boolean increaseReferenceIfExists( final File file, final @Nullable String pipeName, final boolean isTsFile) throws IOException { segmentLock.lock(file); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/FileTransfer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/FileTransfer.java index ab264f17b16..aba24a513cf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/FileTransfer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/FileTransfer.java @@ -23,34 +23,31 @@ import java.io.File; import java.io.IOException; /** - * FileTransfer - File transfer interface + * Abstraction for transferring exported TsFiles and their companion artifacts to a sink target. * - * <p>Abstractions for different file transfer methods (e.g. local copy, SCP). Implementations must: - * - * <ul> - * <li>{@link #handshake()} - Test connection / reachability of the transfer target - * <li>{@link #transferFile(File, File, String)} - Transfer a single file (e.g. TSFile) to the - * target directory with the given file name - * </ul> + * <p>Implementations may write to a local directory, a remote host, or any other file-oriented + * destination, but they must preserve the exported TsFile name and keep companion files aligned + * with it. */ public interface FileTransfer extends AutoCloseable { /** - * Handshake / test connection. Verifies that the transfer target is reachable (e.g. local dir - * exists, remote host is reachable). + * Verifies that the transfer target is reachable and ready to accept files. * - * @throws IOException when connection test fails + * @throws IOException when the target cannot be prepared */ void handshake() throws IOException; /** - * Transfer a single file (e.g. TSFile) to the target. + * Transfers one exported TsFile together with its optional companion files. * - * @param tsfile local source file (e.g. TSFile) - * @param objectPath target parent directory as File; if null, use implementation's default base - * path - * @param targetFileName target file name at destination - * @throws IOException when transfer fails + * @param tsfile local TsFile to transfer + * @param modFile local companion mod file; may be null when no mod file exists + * @param objectPath local directory containing exported Object files; may be null when the TsFile + * has no Object data + * @param targetFileName target file name used for the transferred TsFile + * @throws IOException when any part of the transfer fails */ - void transferFile(File tsfile, File objectPath, String targetFileName) throws IOException; + void transferFile(File tsfile, File modFile, File objectPath, String targetFileName) + throws IOException; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/LocalFileTransfer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/LocalFileTransfer.java index 46610a1d9e5..a703dd9c432 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/LocalFileTransfer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/LocalFileTransfer.java @@ -1,8 +1,11 @@ package org.apache.iotdb.db.pipe.sink.protocol.tsfile; +import org.apache.iotdb.commons.utils.FileUtils; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.tsfile.common.constant.TsFileConstant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +42,7 @@ public class LocalFileTransfer implements FileTransfer { } @Override - public void transferFile(File tsFile, File objectSourceDir, String targetName) + public void transferFile(File tsFile, File modFile, File objectSourceDir, String targetName) throws IOException { if (tsFile == null || !tsFile.exists()) { throw new PipeException("Source TsFile is missing"); @@ -49,7 +52,9 @@ public class LocalFileTransfer implements FileTransfer { copyObjectDirectory(objectSourceDir, targetName); } - copyTsFile(tsFile, targetName); + final String finalTsName = finalTsFileName(targetName); + exportModIfPresent(modFile, finalTsName); + exportTsFile(tsFile, finalTsName); } private void copyObjectDirectory(File sourceDir, String dirName) throws IOException { @@ -82,16 +87,75 @@ public class LocalFileTransfer implements FileTransfer { } } - private void copyTsFile(File tsFile, String targetName) throws IOException { - String finalTsName = targetName.endsWith(".tsfile") ? targetName : targetName + ".tsfile"; - File dest = new File(targetBaseDir, finalTsName); + private static String finalTsFileName(final String targetName) { + return targetName.endsWith(TsFileConstant.TSFILE_SUFFIX) + ? targetName + : targetName + TsFileConstant.TSFILE_SUFFIX; + } + + private void exportTsFile(final File tsFile, final String finalTsName) throws IOException { + final File dest = new File(targetBaseDir, finalTsName); + transferByHardLinkOrCopy(tsFile, dest, "TsFile"); + } + + private void exportModIfPresent(final File modFile, final String finalTsName) throws IOException { + if (modFile == null || !modFile.exists()) { + return; + } + final String modDestName = finalTsName + ModificationFile.FILE_SUFFIX; + final File modDest = new File(targetBaseDir, modDestName); + transferByHardLinkOrCopy(modFile, modDest, "TsFile mod"); + } + + void transferByHardLinkOrCopy(final File source, final File dest, final String fileType) + throws IOException { + if (source.getCanonicalFile().equals(dest.getCanonicalFile())) { + LOGGER.info( + "Skip exporting {} because source and target are the same file: {}", + fileType, + dest.getAbsolutePath()); + return; + } + + final Path targetDirectory = dest.getParentFile().toPath(); + if (!Files.exists(targetDirectory)) { + Files.createDirectories(targetDirectory); + } + + if (isSameFileStore(source.toPath(), targetDirectory)) { + try { + createHardLink(source, dest); + LOGGER.info("Successfully hard-linked {} to {}", fileType, dest.getAbsolutePath()); + return; + } catch (final IOException e) { + LOGGER.warn( + "Failed to hard-link {} to {}, fallback to copy.", fileType, dest.getAbsolutePath(), e); + } + } else { + LOGGER.info( + "Detected cross-filesystem export for {}, fallback to copy: {}", + fileType, + dest.getAbsolutePath()); + } + + copyFile(source, dest); + LOGGER.info("Successfully copied {} to {}", fileType, dest.getAbsolutePath()); + } + + boolean isSameFileStore(final Path sourcePath, final Path targetDirectory) throws IOException { + return Files.getFileStore(sourcePath).equals(Files.getFileStore(targetDirectory)); + } + + void createHardLink(final File source, final File dest) throws IOException { + FileUtils.createHardLink(source, dest); + } - Files.copy(tsFile.toPath(), dest.toPath(), StandardCopyOption.REPLACE_EXISTING); - LOGGER.info("Successfully transferred TsFile to {}", dest.getAbsolutePath()); + private void copyFile(final File source, final File dest) throws IOException { + Files.copy(source.toPath(), dest.toPath(), StandardCopyOption.REPLACE_EXISTING); } @Override public void close() { - // No external resource is maintained by local transfer. + // No-op because local transfer does not keep any open connection or file handle. } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/PipeTsFileLocalSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/PipeTsFileLocalSink.java index 943d0efd147..bfb543cd978 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/PipeTsFileLocalSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/PipeTsFileLocalSink.java @@ -173,8 +173,13 @@ public class PipeTsFileLocalSink implements PipeSink, PipeBatchMetricsSettable { final File tsFile = event.getTsFile(); if (tsFile != null && tsFile.exists()) { + final File modFile = + event.isWithMod() && event.getModFile() != null && event.getModFile().exists() + ? event.getModFile() + : null; fileTransfer.transferFile( tsFile, + modFile, PipeObjectPathUtil.resolveLinkedObjectDirectory( event.getTsFileResource(), event.getPipeName()), TsFileNameGenerator.targetNameForEvent(event)); @@ -183,7 +188,7 @@ public class PipeTsFileLocalSink implements PipeSink, PipeBatchMetricsSettable { final File tsFile = tsFileInsertionEvent.getTsFile(); if (tsFile != null && tsFile.exists()) { fileTransfer.transferFile( - tsFile, null, TsFileNameGenerator.targetNameForEvent(tsFileInsertionEvent)); + tsFile, null, null, TsFileNameGenerator.targetNameForEvent(tsFileInsertionEvent)); } } } @@ -263,7 +268,7 @@ public class PipeTsFileLocalSink implements PipeSink, PipeBatchMetricsSettable { final File objectDir = tsFileAndObjectDir.getRight(); if (tsFile != null && tsFile.exists()) { fileTransfer.transferFile( - tsFile, objectDir, TsFileNameGenerator.targetNameForGeneratedFile(tsFile)); + tsFile, null, objectDir, TsFileNameGenerator.targetNameForGeneratedFile(tsFile)); } } eventTsFileBatch.decreaseEventsReferenceCount(PipeTsFileLocalSink.class.getName(), true); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/util/LoadObjectFileUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/util/LoadObjectFileUtil.java index a70416d5400..f8f268813a5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/util/LoadObjectFileUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/util/LoadObjectFileUtil.java @@ -19,11 +19,17 @@ package org.apache.iotdb.db.storageengine.load.util; +import org.apache.iotdb.commons.path.PatternTreeMap; import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.load.LoadFileException; import org.apache.iotdb.db.exception.load.ObjectFileCorruptedException; +import org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil; +import org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil.ModsInfo; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadSingleTsFileNode; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; +import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.iotdb.db.utils.ObjectTypeUtils; import org.apache.tsfile.common.conf.TSFileDescriptor; @@ -54,6 +60,7 @@ import java.nio.ByteBuffer; import java.nio.file.FileStore; import java.nio.file.Files; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -89,6 +96,8 @@ public class LoadObjectFileUtil { final File targetDir = new File(objectTempBaseDir, targetDirName); final File searchRoot = node.getObjectFileSearchRoot() != null ? node.getObjectFileSearchRoot() : targetDir; + final PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> modifications = + loadModificationsFromTsFile(tsFile); if (!targetDir.exists() && !targetDir.mkdirs() && !targetDir.exists()) { throw new LoadFileException( @@ -102,7 +111,10 @@ public class LoadObjectFileUtil { while (iterator.hasNext()) { final Map<IDeviceID, List<TimeseriesMetadata>> deviceMetadataMap = iterator.next(); - for (final List<TimeseriesMetadata> metadataList : deviceMetadataMap.values()) { + for (final Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry : + deviceMetadataMap.entrySet()) { + final IDeviceID deviceID = entry.getKey(); + final List<TimeseriesMetadata> metadataList = entry.getValue(); final List<Map<Integer, long[]>> alignedTimeBatchesByChunkIndex = collectAlignedTimeBatchesByChunkIndex(reader, metadataList); for (final TimeseriesMetadata tsMetadata : metadataList) { @@ -110,6 +122,8 @@ public class LoadObjectFileUtil { continue; } + final ModsInfo modsInfo = + buildMeasurementModsInfo(deviceID, tsMetadata.getMeasurementId(), modifications); int chunkIndex = 0; for (final IChunkMetadata chunkMetadata : tsMetadata.getChunkMetadataList()) { scanObjectChunkAndProcessFiles( @@ -119,7 +133,8 @@ public class LoadObjectFileUtil { searchRoot, targetDir, alignedTimeBatchesByChunkIndex, - chunkIndex++); + chunkIndex++, + modsInfo); } } } @@ -140,7 +155,8 @@ public class LoadObjectFileUtil { final File searchRoot, final File targetDir, final List<Map<Integer, long[]>> alignedTimeBatchesByChunkIndex, - final int chunkIndex) + final int chunkIndex, + final ModsInfo modsInfo) throws IOException, LoadFileException { reader.position(chunkMetadata.getOffsetOfChunkHeader()); final byte marker = reader.readMarker(); @@ -170,7 +186,7 @@ public class LoadObjectFileUtil { .nextValueBatch(times); final int len = Math.min(values.length, times.length); for (int i = 0; i < len; i++) { - if (values[i] == null) { + if (values[i] == null || ModsOperationUtil.isDelete(times[i], modsInfo)) { continue; } final Binary binary = values[i].getBinary(); @@ -194,7 +210,7 @@ public class LoadObjectFileUtil { .getAllSatisfiedPageData(); while (batchData.hasCurrent()) { final Binary binary = batchData.getBinary(); - if (binary != null) { + if (binary != null && !ModsOperationUtil.isDelete(batchData.currentTime(), modsInfo)) { final Pair<Long, String> lengthAndPath = ObjectTypeUtils.parseObjectBinaryToSizeStringPathPair(binary); final long expectedLength = lengthAndPath.getLeft(); @@ -227,6 +243,30 @@ public class LoadObjectFileUtil { return result; } + private static PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> + loadModificationsFromTsFile(final File tsFile) throws IOException { + final PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> modifications = + PatternTreeMapFactory.getModsPatternTreeMap(); + for (final ModEntry modification : ModificationFile.readAllModifications(tsFile, true)) { + modifications.append(modification.keyOfPatternTree(), modification); + } + return modifications; + } + + private static ModsInfo buildMeasurementModsInfo( + final IDeviceID deviceID, + final String measurement, + final PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> modifications) { + if (modifications == null || modifications.isEmpty()) { + return null; + } + + final List<ModsInfo> modsInfos = + ModsOperationUtil.initializeMeasurementMods( + deviceID, Collections.singletonList(measurement), modifications); + return modsInfos.isEmpty() ? null : modsInfos.get(0); + } + private static Map<Integer, long[]> collectAlignedTimeBatchForChunk( final TsFileSequenceReader reader, final IChunkMetadata chunkMetadata) throws IOException { final Map<Integer, long[]> timeBatchByPage = new HashMap<>(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtilTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtilTest.java index 7eb09bbab41..762453ea51d 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtilTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtilTest.java @@ -19,14 +19,20 @@ package org.apache.iotdb.db.pipe.event.common.tsfile.parser.util; +import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; +import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.NOP; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry; import org.apache.tsfile.read.common.TimeRange; import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Set; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -383,6 +389,47 @@ public class ModsOperationUtilTest { assertEquals(2, binarySearchMods(mods, 25, 2)); } + @Test + public void testBuildObjectColumnDeletionEntriesForFullTableDeletion() { + final TableDeletionEntry tableDeletionEntry = + new TableDeletionEntry(new DeletionPredicate("table1", new NOP()), new TimeRange(0, 100)); + final Set<String> objectMeasurements = + new LinkedHashSet<>(Arrays.asList("obj_col_1", "obj_col_2")); + + final TableDeletionEntry generatedEntry = + (TableDeletionEntry) + ModsOperationUtil.buildObjectColumnDeletionEntries( + tableDeletionEntry, objectMeasurements); + + assertEquals( + Arrays.asList("obj_col_1", "obj_col_2"), + generatedEntry.getPredicate().getMeasurementNames()); + assertEquals(0, generatedEntry.getTimeRange().getMin()); + assertEquals(100, generatedEntry.getTimeRange().getMax()); + } + + @Test + public void testBuildObjectColumnDeletionEntriesForSpecifiedMeasurements() { + final TableDeletionEntry tableDeletionEntry = + new TableDeletionEntry( + new DeletionPredicate( + "table1", new NOP(), Arrays.asList("s1", "obj_col_2", "obj_col_1")), + new TimeRange(0, 100)); + final Set<String> objectMeasurements = + new LinkedHashSet<>(Arrays.asList("obj_col_1", "obj_col_2", "obj_col_3")); + + final TableDeletionEntry generatedEntry = + (TableDeletionEntry) + ModsOperationUtil.buildObjectColumnDeletionEntries( + tableDeletionEntry, objectMeasurements); + + assertEquals( + Arrays.asList("obj_col_2", "obj_col_1"), + generatedEntry.getPredicate().getMeasurementNames()); + assertEquals(0, generatedEntry.getTimeRange().getMin()); + assertEquals(100, generatedEntry.getTimeRange().getMax()); + } + // Helper method to access the private binarySearchMods method for testing private int binarySearchMods(List<ModEntry> mods, long time, int startIndex) { // Use reflection to access the private method diff --git a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/PipeTsFileRemoteSink.java b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/PipeTsFileRemoteSink.java index de4f7fb89a2..4ee5131b3e2 100644 --- a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/PipeTsFileRemoteSink.java +++ b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/PipeTsFileRemoteSink.java @@ -154,8 +154,13 @@ public class PipeTsFileRemoteSink implements PipeSink, PipeBatchMetricsSettable final TsFileResource tsFileResource = event.getTsFileResource(); final File tsFile = event.getTsFile(); if (tsFile != null && tsFile.exists()) { + final File modFile = + event.isWithMod() && event.getModFile() != null && event.getModFile().exists() + ? event.getModFile() + : null; remoteFileTransfer.transferFile( tsFile, + modFile, tsFileResource == null ? null : PipeObjectPathUtil.resolveLinkedObjectDirectory( @@ -166,7 +171,7 @@ public class PipeTsFileRemoteSink implements PipeSink, PipeBatchMetricsSettable final File tsFile = tsFileInsertionEvent.getTsFile(); if (tsFile != null && tsFile.exists()) { remoteFileTransfer.transferFile( - tsFile, null, TsFileNameGenerator.targetNameForEvent(tsFileInsertionEvent)); + tsFile, null, null, TsFileNameGenerator.targetNameForEvent(tsFileInsertionEvent)); } } } @@ -274,7 +279,7 @@ public class PipeTsFileRemoteSink implements PipeSink, PipeBatchMetricsSettable final File objectDir = tsFileAndObjectDir.getRight(); if (tsFile != null && tsFile.exists()) { remoteFileTransfer.transferFile( - tsFile, objectDir, TsFileNameGenerator.targetNameForGeneratedFile(tsFile)); + tsFile, null, objectDir, TsFileNameGenerator.targetNameForGeneratedFile(tsFile)); } } eventTsFileBatch.decreaseEventsReferenceCount(PipeTsFileRemoteSink.class.getName(), true); diff --git a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/RemoteFileTransfer.java b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/RemoteFileTransfer.java index 9c440f971ac..604a3946503 100644 --- a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/RemoteFileTransfer.java +++ b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/RemoteFileTransfer.java @@ -25,7 +25,8 @@ import java.io.IOException; interface RemoteFileTransfer { void handshake() throws IOException; - void transferFile(File tsFile, File objectSourceDir, String targetName) throws IOException; + void transferFile(File tsFile, File modFile, File objectSourceDir, String targetName) + throws IOException; void close() throws IOException; } diff --git a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java index c47545902cf..113cdaae122 100644 --- a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java +++ b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java @@ -19,6 +19,7 @@ package org.apache.iotdb.pipe.plugin.sink.tsfile; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import com.google.common.util.concurrent.RateLimiter; @@ -104,11 +105,13 @@ class ScpRemoteFileTransfer implements RemoteFileTransfer { } @Override - public void transferFile(File tsFile, File objectSourceDir, String targetName) + public void transferFile(File tsFile, File modFile, File objectSourceDir, String targetName) throws IOException { try { syncObjectDirectory(objectSourceDir, targetName); - syncTsFile(tsFile, targetName); + final String finalTsName = computeFinalTsName(targetName); + syncModFile(modFile, finalTsName); + syncTsFile(tsFile, finalTsName); } catch (final Exception e) { invalidateSession(); throw new IOException("Scp transfer failed: " + targetName, e); @@ -155,15 +158,30 @@ class ScpRemoteFileTransfer implements RemoteFileTransfer { } } - private void syncTsFile(File tsFile, String targetName) throws IOException { - final String finalTsName = - targetName.endsWith(TSFILE_EXTENSION) ? targetName : targetName + TSFILE_EXTENSION; + private static String computeFinalTsName(final String targetName) { + return targetName.endsWith(TSFILE_EXTENSION) ? targetName : targetName + TSFILE_EXTENSION; + } + + private void syncTsFile(final File tsFile, final String finalTsName) throws IOException { acquireTransferBytes(tsFile.length()); ScpClientCreator.instance() .createScpClient(getSession()) .upload(tsFile.toPath(), remoteBaseDir + UNIX_SEPARATOR + finalTsName); } + private void syncModFile(final File modFile, final String finalTsName) throws IOException { + if (modFile == null || !modFile.exists()) { + return; + } + final String remoteModPath = + remoteBaseDir + UNIX_SEPARATOR + finalTsName + ModificationFile.FILE_SUFFIX; + acquireTransferBytes(modFile.length()); + ScpClientCreator.instance() + .createScpClient(getSession()) + .upload(modFile.toPath(), remoteModPath); + LOGGER.info("Successfully transferred TsFile mod to {}", remoteModPath); + } + private void ensureRemoteDirExists(ClientSession s, String dir) throws IOException { executeRemoteCommand(s, "mkdir -p " + shellQuote(dir)); }
