This is an automated email from the ASF dual-hosted git repository. justinchen pushed a commit to branch rc/2.0.6 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 155789d75b211ce72120c72060b4dadba9363aa8 Author: Caideyipi <[email protected]> AuthorDate: Wed Feb 4 17:26:20 2026 +0800 File sink --- .../dataregion/PipeDataRegionSinkConstructor.java | 4 + .../iotdb/db/pipe/sink/protocol/ToFileSink.java | 369 +++++++++++++++++++++ .../agent/plugin/builtin/BuiltinPipePlugin.java | 2 + 3 files changed, 375 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java index 09773d0cad5..58d72295ef1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.donothing.DoNothingSink; import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeSinkConstructor; import org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper; +import org.apache.iotdb.db.pipe.sink.protocol.ToFileSink; import org.apache.iotdb.db.pipe.sink.protocol.legacy.IoTDBLegacyPipeSink; import org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.PipeConsensusAsyncSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink; @@ -62,6 +63,8 @@ class PipeDataRegionSinkConstructor extends PipeSinkConstructor { BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingSink::new); pluginConstructors.put( BuiltinPipePlugin.WRITE_BACK_CONNECTOR.getPipePluginName(), WriteBackSink::new); + pluginConstructors.put( + BuiltinPipePlugin.TO_FILE_CONNECTOR.getPipePluginName(), ToFileSink::new); pluginConstructors.put( BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName(), IoTDBDataRegionAsyncSink::new); @@ -85,5 +88,6 @@ class PipeDataRegionSinkConstructor extends PipeSinkConstructor { pluginConstructors.put( BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName(), PipeConsensusAsyncSink::new); + pluginConstructors.put(BuiltinPipePlugin.TO_FILE_SINK.getPipePluginName(), ToFileSink::new); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/ToFileSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/ToFileSink.java new file mode 100644 index 00000000000..f5b2d4111a4 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/ToFileSink.java @@ -0,0 +1,369 @@ +/* + * IoTDB Pipe框架自定义Sink插件 + * + * 功能:将 Pipe 下发的数据以 TsFile 形式写入本地指定目录,按库名 / 导出日期创建子目录。 + * TabletInsertionEvent 直接按 库(db) + 导出日期 写成 tsfile(同一库下多 device 共用一个文件); + * TsFileInsertionEvent 使用 PipeConnector 默认逻辑转为 TabletInsertionEvent 后同样写入, + * 不再做 TsFile 复制或拆分,避免大文件重写带来的 GC 压力。 + * + * 使用方法示例: + * CREATE PIPE pipe1 + * WITH CONNECTOR ( + * 'connector'='ToFileSink', + * 'sink.directory'='/path/to/save/tsfiles' + * ); + */ + +package org.apache.iotdb.db.pipe.sink.protocol; + +import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventTsFileBatch; +import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTransferBatchReqBuilder; +import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException; +import org.apache.iotdb.pipe.api.PipeConnector; +import org.apache.iotdb.pipe.api.annotation.TableModel; +import org.apache.iotdb.pipe.api.annotation.TreeModel; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.pipe.api.event.Event; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; +import org.apache.iotdb.pipe.api.exception.PipeConnectionException; +import org.apache.iotdb.pipe.api.exception.PipeException; + +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +/** + * 将 Pipe 数据以 TsFile 形式写入本地目录的 Sink。 + * + * <p>默认使用 {@link PipeTabletEventTsFileBatch} 批处理:仅支持 PipeInsertNodeTabletInsertionEvent / + * PipeRawTabletInsertionEvent,按批 flush 到 sink.directory,按库名/日期子目录布局。心跳事件用于触发落盘,避免最后几条留在内存。 + */ +@TreeModel +@TableModel +public class ToFileSink implements PipeConnector { + + private static final Logger LOGGER = LoggerFactory.getLogger(ToFileSink.class); + + // 配置参数键 + private static final String SINK_DIRECTORY_KEY = "sink.directory"; + private static final String CONNECTOR_DIRECTORY_KEY = "connector.directory"; + private static final String SINK_CREATE_SUBDIRS_KEY = "sink.create-subdirs"; + + /** 批处理:最大延迟(毫秒)触发 flush */ + private static final String SINK_BATCH_DELAY_MS_KEY = "sink.batch-delay-ms"; + + private static final String CONNECTOR_BATCH_DELAY_MS_KEY = "connector.batch-delay-ms"; + + /** 批处理:最大缓冲字节数触发 flush */ + private static final String SINK_BATCH_SIZE_BYTES_KEY = "sink.batch-size-bytes"; + + private static final String CONNECTOR_BATCH_SIZE_BYTES_KEY = "connector.batch-size-bytes"; + + /** 默认批延迟偏小,避免多 region 时 Pipe tablet 内存超限(flush 越早越早释放内存) */ + private static final int DEFAULT_BATCH_DELAY_MS = 500; + + private static final long DEFAULT_BATCH_SIZE_BYTES = 20L * 1024 * 1024; + + // 配置参数 + private String targetDirectory; + private boolean createSubdirs = false; + private String pipeName; + + /** 实例唯一标识,用于 sink.parallel.tasks>1 时避免多实例写同一目录文件名冲突 */ + private String instanceId; + + /** TsFile 批处理器(默认唯一写路径) */ + private PipeTabletEventTsFileBatch tsFileBatch; + + private final Object batchFlushLock = new Object(); + + private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyyMMdd"); + + /** 文件名时间戳:精确到毫秒,避免同一秒内多次 flush 覆盖同一文件 */ + private static final DateTimeFormatter DATETIME_FORMAT = + DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS"); + + /** 实例内 flush 序号,保证同一毫秒内多次 flush 也不重名 */ + private final AtomicLong flushSequence = new AtomicLong(0); + + @Override + public void validate(PipeParameterValidator validator) throws Exception { + final PipeParameters parameters = validator.getParameters(); + + // 验证目录参数 + final String dir = + parameters.getStringOrDefault( + Arrays.asList(SINK_DIRECTORY_KEY, CONNECTOR_DIRECTORY_KEY), null); + validator.validate( + arg -> { + final String directory = (String) arg; + return directory != null && !directory.trim().isEmpty(); + }, + String.format( + "The parameter %s or %s must be specified and not empty", + SINK_DIRECTORY_KEY, CONNECTOR_DIRECTORY_KEY), + dir); + + // 验证目录路径 + if (dir != null) { + final Path path = Paths.get(dir); + if (path.toFile().exists() && !path.toFile().isDirectory()) { + throw new PipeException(String.format("The path %s exists but is not a directory", dir)); + } + } + } + + @Override + public void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration) + throws Exception { + // 获取目标目录 + targetDirectory = + parameters.getStringOrDefault( + Arrays.asList(SINK_DIRECTORY_KEY, CONNECTOR_DIRECTORY_KEY), null); + + if (targetDirectory == null || targetDirectory.trim().isEmpty()) { + throw new PipeException( + String.format( + "The parameter %s or %s must be specified", + SINK_DIRECTORY_KEY, CONNECTOR_DIRECTORY_KEY)); + } + + // 规范化路径 + targetDirectory = Paths.get(targetDirectory).toAbsolutePath().normalize().toString(); + + createSubdirs = + parameters.getBooleanOrDefault( + Arrays.asList(SINK_CREATE_SUBDIRS_KEY, "connector.create-subdirs"), false); + + // 获取Pipe信息 + pipeName = configuration.getRuntimeEnvironment().getPipeName(); + // 实例唯一标识,sink.parallel.tasks>1 时避免多实例写同一目录文件名冲突 + instanceId = + Integer.toHexString(System.identityHashCode(this)) + + "_" + + Long.toHexString(System.nanoTime() % 0xFFFF); + + int batchDelayMs = + parameters.getIntOrDefault( + Arrays.asList(SINK_BATCH_DELAY_MS_KEY, CONNECTOR_BATCH_DELAY_MS_KEY), + DEFAULT_BATCH_DELAY_MS); + long batchSizeBytes = + parameters.getLongOrDefault( + Arrays.asList(SINK_BATCH_SIZE_BYTES_KEY, CONNECTOR_BATCH_SIZE_BYTES_KEY), + DEFAULT_BATCH_SIZE_BYTES); + tsFileBatch = new PipeTabletEventTsFileBatch(batchDelayMs, batchSizeBytes); + + LOGGER.info( + "ToFileSink customized: targetDirectory={}, createSubdirs={}, pipeName={}, instanceId={}, batchDelayMs={}, batchSizeBytes={}", + targetDirectory, + createSubdirs, + pipeName, + instanceId, + batchDelayMs, + batchSizeBytes); + } + + @Override + public void handshake() throws Exception { + // 创建目标目录 + final Path targetPath = Paths.get(targetDirectory); + if (!Files.exists(targetPath)) { + Files.createDirectories(targetPath); + LOGGER.info("Created target directory: {}", targetDirectory); + } + + // 验证目录可写 + if (!Files.isWritable(targetPath)) { + throw new PipeException( + String.format("Target directory is not writable: %s", targetDirectory)); + } + + LOGGER.info("ToFileSink handshake completed: {}", targetDirectory); + } + + @Override + public void heartbeat() throws Exception { + // 检查目标目录是否仍然存在且可写 + final Path targetPath = Paths.get(targetDirectory); + if (!Files.exists(targetPath)) { + throw new PipeConnectionException( + String.format("Target directory does not exist: %s", targetDirectory)); + } + if (!Files.isWritable(targetPath)) { + throw new PipeConnectionException( + String.format("Target directory is not writable: %s", targetDirectory)); + } + } + + /** + * 正确流程(按开发者约定): 1. constructBatch:将事件放入批内存(此处通过 onEvent 实现); 2. shouldEmit:判断内存大小或批时间是否超过预期,超过则返回 + * true; 3. 若 shouldEmit 为 true,则手动调用 sealTsFiles 生成文件; 4. 若生成/写入文件成功,则调用 onSuccess。 + */ + @Override + public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception { + if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) + && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) { + LOGGER.warn( + "ToFileSink only supports PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent, ignore {}", + tabletInsertionEvent.getClass().getSimpleName()); + return; + } + try { + // 1. constructBatch:将事件放入内存(PipeTabletEventTsFileBatch 通过 onEvent 实现) + tsFileBatch.onEvent(tabletInsertionEvent); + // 2. shouldEmit:判断内存大小或批时间是否超过预期 + if (tsFileBatch.shouldEmit()) { + // 3. 手动调用 sealTsFiles 生成文件,成功则在 flush 内调用 onSuccess + flushBatchToTargetDirectory(); + } + } catch (WALPipeException | IOException e) { + throw new PipeException("ToFileSink: TsFile batch onEvent failed", e); + } + } + + /** + * 收到 TsFile 事件时先 flush 当前批再解析 TsFile,避免批内占用的 tablet 内存不释放、 导致 toTabletInsertionEvents() 在 + * waitForResourceEnough4Parsing 里等不到内存而超时。 + */ + @Override + public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception { + if (tsFileBatch != null && !tsFileBatch.isEmpty()) { + try { + flushBatchToTargetDirectory(); + } catch (IOException | WriteProcessException e) { + throw new PipeException("ToFileSink: flush before TsFile parse failed", e); + } + } + try { + for (final TabletInsertionEvent tabletInsertionEvent : + tsFileInsertionEvent.toTabletInsertionEvents()) { + transfer(tabletInsertionEvent); + } + } finally { + tsFileInsertionEvent.close(); + } + } + + /** 心跳等事件:若批内已有数据且 shouldEmit 为 true,则 sealTsFiles 并复制,成功则 onSuccess。 */ + @Override + public void transfer(Event event) throws Exception { + if (tsFileBatch != null && !tsFileBatch.isEmpty() && tsFileBatch.shouldEmit()) { + try { + flushBatchToTargetDirectory(); + } catch (IOException | WriteProcessException e) { + throw new PipeException("ToFileSink: heartbeat-triggered batch flush failed", e); + } + } + LOGGER.debug("ToFileSink received event: {}", event.getClass().getSimpleName()); + } + + @Override + public void close() throws Exception { + if (tsFileBatch != null) { + synchronized (batchFlushLock) { + try { + if (!tsFileBatch.isEmpty()) { + flushBatchToTargetDirectory(); + } + } catch (Exception e) { + LOGGER.warn( + "ToFileSink: flush remaining TsFile batch on close failed: {}", e.getMessage()); + } + try { + tsFileBatch.close(); + } catch (Exception e) { + LOGGER.warn("ToFileSink: close TsFile batch failed: {}", e.getMessage()); + } + tsFileBatch = null; + } + } + + LOGGER.info("ToFileSink closed: {}", targetDirectory); + } + + /** + * 按约定:shouldEmit 为 true 时调用本方法。 内部执行 sealTsFiles 生成文件,将 TsFile 复制到目标目录(按 db/日期 布局); + * 仅当生成/写入文件成功后调用 onSuccess。 + */ + private void flushBatchToTargetDirectory() throws IOException, WriteProcessException { + if (tsFileBatch == null || tsFileBatch.isEmpty()) { + return; + } + synchronized (batchFlushLock) { + // 3. 手动调用 sealTsFiles 生成文件 + List<Pair<String, File>> dbTsFilePairs = tsFileBatch.sealTsFiles(); + if (dbTsFilePairs.isEmpty()) { + tsFileBatch.decreaseEventsReferenceCount( + PipeTransferBatchReqBuilder.class.getName(), false); + // 无文件生成也视为本批处理完成,需 onSuccess 清空批状态 + tsFileBatch.onSuccess(); + return; + } + final String date = LocalDate.now(ZoneId.systemDefault()).format(DATE_FORMAT); + final String createTime = + Instant.now().atZone(ZoneId.systemDefault()).format(DATETIME_FORMAT); + final long seq = flushSequence.getAndIncrement(); + int index = 0; + for (Pair<String, File> pair : dbTsFilePairs) { + String dbName = pair.left; + File srcFile = pair.right; + if (dbName == null) { + dbName = ""; + } + Path basePath = Paths.get(targetDirectory); + if (createSubdirs && !dbName.isEmpty()) { + basePath = basePath.resolve(dbName.replace('.', '_').replace(' ', '_')); + } + Path dateDir = basePath.resolve(date); + Files.createDirectories(dateDir); + String safeDbName = + dbName.replace('.', '_').replace(' ', '_').replace('/', '_').replace(':', '_'); + // 使用 毫秒时间戳 + 序号,避免同一秒/同一毫秒内多次 flush 覆盖 + String fileName = + instanceId == null + ? String.format("tablet-%s-%s-%d-%d.tsfile", safeDbName, createTime, seq, index) + : String.format( + "tablet-%s-%s-%d-%d-%s.tsfile", safeDbName, createTime, seq, index, instanceId); + Path destPath = dateDir.resolve(fileName); + Files.copy(srcFile.toPath(), destPath, StandardCopyOption.REPLACE_EXISTING); + try { + if (!srcFile.delete()) { + LOGGER.warn("ToFileSink: could not delete batch temp file {}", srcFile); + } + } catch (Exception e) { + LOGGER.warn( + "ToFileSink: failed to delete batch temp file {}: {}", srcFile, e.getMessage()); + } + LOGGER.debug("ToFileSink: batch TsFile copied to {}", destPath); + index++; + } + // 释放本批事件的引用,使 tablet 内存立即归还 PipeMemoryManager + tsFileBatch.decreaseEventsReferenceCount(PipeTransferBatchReqBuilder.class.getName(), false); + // 4. 生成文件并复制成功后调用 onSuccess + tsFileBatch.onSuccess(); + LOGGER.info( + "ToFileSink: flushed TsFile batch, {} file(s) copied to targetDirectory", + dbTsFilePairs.size()); + } + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java index daab48bbe23..d0087160e3e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java @@ -69,6 +69,7 @@ public enum BuiltinPipePlugin { WEBSOCKET_CONNECTOR("websocket-connector", WebSocketSink.class), WRITE_BACK_CONNECTOR("write-back-connector", WriteBackSink.class), + TO_FILE_CONNECTOR("to-file-connector", WriteBackSink.class), DO_NOTHING_SINK("do-nothing-sink", DoNothingSink.class), IOTDB_THRIFT_SINK("iotdb-thrift-sink", IoTDBThriftSink.class), @@ -80,6 +81,7 @@ public enum BuiltinPipePlugin { WRITE_BACK_SINK("write-back-sink", WriteBackSink.class), SUBSCRIPTION_SINK("subscription-sink", DoNothingSink.class), PIPE_CONSENSUS_ASYNC_SINK("pipe-consensus-async-sink", PipeConsensusAsyncSink.class), + TO_FILE_SINK("to-file-sink", WriteBackSink.class), ; private final String pipePluginName;
