This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch patch-2094 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4535752ea068f9aaf57e25d480f789cb87258db5 Author: 罗振羽 <[email protected]> AuthorDate: Sun May 10 00:51:11 2026 +0000 [TIMECHODB][TIMECHODB] Improve tsfile backup SCP batching and plugin resolution --- .../org/apache/iotdb/tool/pipe/TsFileBackup.java | 129 +++++- .../tablet/PipeInsertNodeTabletInsertionEvent.java | 9 +- .../common/tsfile/PipeTsFileInsertionEvent.java | 7 +- .../pipe/resource/object/PipeObjectResource.java | 4 - .../resource/object/PipeObjectResourceManager.java | 3 + library-pipe/tsfile-remote-sink/README.md | 28 +- .../plugin/sink/tsfile/ScpRemoteFileTransfer.java | 469 +++++++++++++++++++-- scripts/tools/tsfile-backup.sh | 14 +- scripts/tools/windows/tsfile-backup.bat | 13 +- 9 files changed, 590 insertions(+), 86 deletions(-) diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/pipe/TsFileBackup.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/pipe/TsFileBackup.java index 1eeafb76c55..575f44aab59 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/pipe/TsFileBackup.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/pipe/TsFileBackup.java @@ -101,6 +101,11 @@ public final class TsFileBackup { static final String SINK_SCP_PASSWORD = "sink.scp.password"; static final String SINK_SCP_REMOTE_PATH = "sink.scp.remote-path"; static final String SINK_RATE_LIMIT = "sink.rate-limit-bytes-per-second"; + static final String SINK_SCP_OBJECT_BATCH_SIZE_BYTES = "sink.scp.object-batch-size-bytes"; + static final String SINK_SCP_OBJECT_PARALLELISM = "sink.scp.object-parallelism"; + static final String SINK_SCP_OBJECT_WAITING_QUEUE_SIZE = "sink.scp.object-waiting-queue-size"; + static final String SINK_SCP_OBJECT_THREAD_KEEP_ALIVE_SECONDS = + "sink.scp.object-thread-keep-alive-seconds"; } /** CLI option names and descriptions. */ @@ -189,6 +194,28 @@ public final class TsFileBackup { static final String RATE_LIMIT_ARG = "bytes/s"; static final String RATE_LIMIT_DESC = "Sink rate limit in bytes/s; omit for unlimited."; + static final String OBJECT_BATCH_SIZE_LONG = "object_batch_size"; + static final String OBJECT_BATCH_SIZE_ARG = "bytes"; + static final String OBJECT_BATCH_SIZE_DESC = + "Maximum bytes per SCP object upload batch; omit to use plugin default."; + + static final String OBJECT_PARALLELISM_LONG = "object_parallelism"; + static final String OBJECT_PARALLELISM_ARG = "object_parallelism"; + static final String OBJECT_PARALLELISM_DESC = + "Maximum parallel SCP uploads for object-file batches; " + + "worker threads are created on demand."; + + static final String OBJECT_WAITING_QUEUE_SIZE_LONG = "object_waiting_queue_size"; + static final String OBJECT_WAITING_QUEUE_SIZE_ARG = "object_waiting_queue_size"; + static final String OBJECT_WAITING_QUEUE_SIZE_DESC = + "Maximum queued async SCP object-upload tasks; " + + "submission waits when the queue limit is reached."; + + static final String OBJECT_THREAD_KEEP_ALIVE_SECONDS_LONG = "object_thread_keep_alive_seconds"; + static final String OBJECT_THREAD_KEEP_ALIVE_SECONDS_ARG = "object_thread_keep_alive_seconds"; + static final String OBJECT_THREAD_KEEP_ALIVE_SECONDS_DESC = + "Idle timeout in seconds before async SCP object-upload worker threads are reclaimed."; + static final String PLUGIN_JAR_LONG = "plugin_jar"; static final String PLUGIN_JAR_ARG = "path"; static final String PLUGIN_JAR_DESC = "Override path to plugin jar."; @@ -235,6 +262,10 @@ public final class TsFileBackup { final String scpUser; final String scpPassword; final Double rateLimitBytesPerSecond; + final Long objectBatchSizeBytes; + final Integer objectParallelism; + final Integer objectWaitingQueueSize; + final Long objectThreadKeepAliveSeconds; final File pluginJar; public BackupConfig(CommandLine line) throws ParseException { @@ -270,6 +301,43 @@ public final class TsFileBackup { this.rateLimitBytesPerSecond = StringUtils.isNotBlank(rateStr) ? Double.parseDouble(rateStr.trim()) : null; + String objectBatchSizeStr = line.getOptionValue(CliOptions.OBJECT_BATCH_SIZE_LONG); + this.objectBatchSizeBytes = + StringUtils.isNotBlank(objectBatchSizeStr) + ? Long.parseLong(objectBatchSizeStr.trim()) + : null; + if (this.objectBatchSizeBytes != null && this.objectBatchSizeBytes <= 0) { + throw new IllegalArgumentException("object_batch_size must be a positive integer."); + } + String objectParallelismStr = line.getOptionValue(CliOptions.OBJECT_PARALLELISM_LONG); + this.objectParallelism = + StringUtils.isNotBlank(objectParallelismStr) + ? Integer.parseInt(objectParallelismStr.trim()) + : null; + if (this.objectParallelism != null && this.objectParallelism <= 0) { + throw new IllegalArgumentException("object_parallelism must be a positive integer."); + } + String objectWaitingQueueSizeStr = + line.getOptionValue(CliOptions.OBJECT_WAITING_QUEUE_SIZE_LONG); + this.objectWaitingQueueSize = + StringUtils.isNotBlank(objectWaitingQueueSizeStr) + ? Integer.parseInt(objectWaitingQueueSizeStr.trim()) + : null; + if (this.objectWaitingQueueSize != null && this.objectWaitingQueueSize < 0) { + throw new IllegalArgumentException( + "object_waiting_queue_size must be a non-negative integer."); + } + String objectThreadKeepAliveSecondsStr = + line.getOptionValue(CliOptions.OBJECT_THREAD_KEEP_ALIVE_SECONDS_LONG); + this.objectThreadKeepAliveSeconds = + StringUtils.isNotBlank(objectThreadKeepAliveSecondsStr) + ? Long.parseLong(objectThreadKeepAliveSecondsStr.trim()) + : null; + if (this.objectThreadKeepAliveSeconds != null && this.objectThreadKeepAliveSeconds <= 0) { + throw new IllegalArgumentException( + "object_thread_keep_alive_seconds must be a positive integer."); + } + String jarPath = resolvePluginJarPath(line.getOptionValue(CliOptions.PLUGIN_JAR_LONG)); this.pluginJar = new File(jarPath); if (!this.pluginJar.isFile()) { @@ -278,7 +346,9 @@ public final class TsFileBackup { + jarPath + ". Set -D" + PLUGIN_JAR_PROPERTY - + " or --plugin_jar."); + + ", --plugin_jar, or " + + ENV_PLUGIN_JAR + + "."); } } @@ -469,6 +539,34 @@ public final class TsFileBackup { .argName(CliOptions.RATE_LIMIT_ARG) .desc(CliOptions.RATE_LIMIT_DESC) .build()); + o.addOption( + Option.builder() + .longOpt(CliOptions.OBJECT_BATCH_SIZE_LONG) + .hasArg() + .argName(CliOptions.OBJECT_BATCH_SIZE_ARG) + .desc(CliOptions.OBJECT_BATCH_SIZE_DESC) + .build()); + o.addOption( + Option.builder() + .longOpt(CliOptions.OBJECT_PARALLELISM_LONG) + .hasArg() + .argName(CliOptions.OBJECT_PARALLELISM_ARG) + .desc(CliOptions.OBJECT_PARALLELISM_DESC) + .build()); + o.addOption( + Option.builder() + .longOpt(CliOptions.OBJECT_WAITING_QUEUE_SIZE_LONG) + .hasArg() + .argName(CliOptions.OBJECT_WAITING_QUEUE_SIZE_ARG) + .desc(CliOptions.OBJECT_WAITING_QUEUE_SIZE_DESC) + .build()); + o.addOption( + Option.builder() + .longOpt(CliOptions.OBJECT_THREAD_KEEP_ALIVE_SECONDS_LONG) + .hasArg() + .argName(CliOptions.OBJECT_THREAD_KEEP_ALIVE_SECONDS_ARG) + .desc(CliOptions.OBJECT_THREAD_KEEP_ALIVE_SECONDS_DESC) + .build()); o.addOption( Option.builder() .longOpt(CliOptions.PLUGIN_JAR_LONG) @@ -609,6 +707,29 @@ public final class TsFileBackup { sink.add( formatKv(PipeKeys.SINK_RATE_LIMIT, String.valueOf(config.rateLimitBytesPerSecond))); } + if (config.objectBatchSizeBytes != null) { + sink.add( + formatKv( + PipeKeys.SINK_SCP_OBJECT_BATCH_SIZE_BYTES, + String.valueOf(config.objectBatchSizeBytes))); + } + if (config.objectParallelism != null) { + sink.add( + formatKv( + PipeKeys.SINK_SCP_OBJECT_PARALLELISM, String.valueOf(config.objectParallelism))); + } + if (config.objectWaitingQueueSize != null) { + sink.add( + formatKv( + PipeKeys.SINK_SCP_OBJECT_WAITING_QUEUE_SIZE, + String.valueOf(config.objectWaitingQueueSize))); + } + if (config.objectThreadKeepAliveSeconds != null) { + sink.add( + formatKv( + PipeKeys.SINK_SCP_OBJECT_THREAD_KEEP_ALIVE_SECONDS, + String.valueOf(config.objectThreadKeepAliveSeconds))); + } return String.format( "CREATE PIPE IF NOT EXISTS %s WITH SOURCE (%s) WITH SINK (%s)", @@ -643,13 +764,13 @@ public final class TsFileBackup { } private static String resolvePluginJarPath(String cliOverride) { - if (StringUtils.isNotBlank(cliOverride)) { - return cliOverride.trim(); - } String prop = System.getProperty(PLUGIN_JAR_PROPERTY); if (StringUtils.isNotBlank(prop)) { return prop.trim(); } + if (StringUtils.isNotBlank(cliOverride)) { + return cliOverride.trim(); + } String env = System.getenv(ENV_PLUGIN_JAR); return StringUtils.isNotBlank(env) ? env.trim() : ""; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index 32f0a584dd7..50b6f7f1b5e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -254,14 +254,7 @@ public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent final int linked = PipeDataNodeResourceManager.object() .linkObjectFiles(tsFileResource, pathIterator, pipeName); - if (linked > 0) { - if (hasObjectData == null) { - hasObjectData = true; - } - PipeDataNodeResourceManager.object().increaseReference(tsFileResource, pipeName); - } else { - hasObjectData = false; - } + hasObjectData = linked > 0; PipeDataNodeSinglePipeMetrics.getInstance() .increaseInsertNodeEventCount(pipeName, creationTime); PipeDataNodeAgent.task() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index f1f1f8b4022..670e7c5360a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -379,12 +379,7 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent : Collections.emptyIterator(); final int linked = PipeDataNodeResourceManager.object().linkObjectFiles(resource, pathIterator, pipeName); - if (linked > 0) { - hasObjectData = true; - PipeDataNodeResourceManager.object().increaseReference(resource, pipeName); - } else if (shouldLinkObjectFiles) { - hasObjectData = false; - } + hasObjectData = linked != 0; PipeDataNodeResourceManager.object().setTsFileClosed(resource, pipeName); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/object/PipeObjectResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/object/PipeObjectResource.java index b70e134e315..2225bfa7bfa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/object/PipeObjectResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/object/PipeObjectResource.java @@ -238,10 +238,6 @@ public class PipeObjectResource implements AutoCloseable { "Cannot link object files: Object resource is closed for TSFile " + tsFileResource.getTsFile().getPath()); } - if (isTsFileClosed.get()) { - throw new IOException( - "Cannot link object files for closed TSFile: " + tsFileResource.getTsFile().getPath()); - } // Attempt to locate the original physical file across different storage tiers final String dataRegionId = tsFileResource.getDataRegionId(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/object/PipeObjectResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/object/PipeObjectResourceManager.java index 5625e4a9914..08f3cf5b758 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/object/PipeObjectResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/object/PipeObjectResourceManager.java @@ -125,6 +125,9 @@ public class PipeObjectResourceManager { linkedCount++; } } + if (linkedCount != 0) { + resource.increaseReferenceCount(); + } return linkedCount; } finally { segmentLock.unlock(tsFile); diff --git a/library-pipe/tsfile-remote-sink/README.md b/library-pipe/tsfile-remote-sink/README.md index 985befe3ef5..d8361d85355 100644 --- a/library-pipe/tsfile-remote-sink/README.md +++ b/library-pipe/tsfile-remote-sink/README.md @@ -42,6 +42,10 @@ WITH SINK ( 'sink.scp.user' = 'root', 'sink.scp.password' = 'your_password', 'sink.scp.remote-path' = '/data/iotdb/pipe', + 'sink.scp.object-batch-size-bytes' = '209715200', + 'sink.scp.object-parallelism' = '4', + 'sink.scp.object-waiting-queue-size' = '8', + 'sink.scp.object-thread-keep-alive-seconds' = '60', 'sink.rate-limit-bytes-per-second' = '10485760' -- Limit to 10 MB/s ); ``` @@ -50,13 +54,17 @@ WITH SINK ( Note: All `sink.` prefixes in the table below can be equivalently replaced with `connector.` (for example, `connector.scp.host`). -| Parameter | Required | Default | Description | -|-------------------------------|----------|----------------|--------------------------------------------------------------------------| -| `scp.host` | Yes | - | Remote host IP or domain for SCP upload. | -| `scp.user` | Yes | - | SSH username for authentication. | -| `scp.remote-path` | Yes | - | Remote base directory where files are uploaded. | -| `scp.port` | No | `22` | SSH/SCP port. | -| `scp.password` | No | Empty | SSH password (can be omitted if key-based authentication is configured). | -| `rate-limit-bytes-per-second` | No | `-1` | Upload bandwidth limit (bytes/sec). `<= 0` means unlimited. | -| `batch.size-bytes` | No | System default | Maximum batch size to trigger file flush/upload. | -| `batch.max-delay-seconds` | No | System default | Maximum wait time to trigger file flush/upload. | +| Parameter | Required | Default | Description | +|----------------------------------------|----------|------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------| +| `scp.host` | Yes | - | Remote host IP or domain for SCP upload. | +| `scp.user` | Yes | - | SSH username for authentication. | +| `scp.remote-path` | Yes | - | Remote base directory where files are uploaded. | +| `scp.port` | No | `22` | SSH/SCP port. | +| `scp.password` | No | Empty | SSH password (can be omitted if key-based authentication is configured). | +| `scp.object-batch-size-bytes` | No | `209715200` | Maximum bytes per SCP object upload batch. | +| `scp.object-parallelism` | No | `min(#cores/4, 16)`<br/>(lower-bounded by `1`) | Maximum parallel SCP uploads for object-file batches. Threads are created on demand and reclaimed after idle timeout. | +| `scp.object-waiting-queue-size` | No | Same as `scp.object-parallelism` | Maximum queued async object-upload tasks. Once the limit is reached, new submissions wait for an existing async upload to finish. | +| `scp.object-thread-keep-alive-seconds` | No | `60` | Idle timeout in seconds before async SCP object-upload worker threads are reclaimed. | +| `rate-limit-bytes-per-second` | No | `-1` | Upload bandwidth limit (bytes/sec). `<= 0` means unlimited. | +| `batch.size-bytes` | No | System default | Maximum batch size to trigger file flush/upload. | +| `batch.max-delay-seconds` | No | System default | Maximum wait time to trigger file flush/upload. | diff --git a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java index 113cdaae122..25005f3f931 100644 --- a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java +++ b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java @@ -38,12 +38,23 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; +import java.util.List; import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_RATE_LIMIT_DEFAULT_VALUE; @@ -65,18 +76,45 @@ class ScpRemoteFileTransfer implements RemoteFileTransfer { private static final Logger LOGGER = LoggerFactory.getLogger(ScpRemoteFileTransfer.class); + private static final String CONNECTOR_SCP_OBJECT_BATCH_SIZE_BYTES_KEY = + "connector.scp.object-batch-size-bytes"; + private static final String SINK_SCP_OBJECT_BATCH_SIZE_BYTES_KEY = + "sink.scp.object-batch-size-bytes"; + private static final String CONNECTOR_SCP_OBJECT_PARALLELISM_KEY = + "connector.scp.object-parallelism"; + private static final String SINK_SCP_OBJECT_PARALLELISM_KEY = "sink.scp.object-parallelism"; + private static final String CONNECTOR_SCP_OBJECT_WAITING_QUEUE_SIZE_KEY = + "connector.scp.object-waiting-queue-size"; + private static final String SINK_SCP_OBJECT_WAITING_QUEUE_SIZE_KEY = + "sink.scp.object-waiting-queue-size"; + private static final String CONNECTOR_SCP_OBJECT_THREAD_KEEP_ALIVE_SECONDS_KEY = + "connector.scp.object-thread-keep-alive-seconds"; + private static final String SINK_SCP_OBJECT_THREAD_KEEP_ALIVE_SECONDS_KEY = + "sink.scp.object-thread-keep-alive-seconds"; private static final String TSFILE_EXTENSION = TsFileConstant.TSFILE_SUFFIX; private static final String UNIX_SEPARATOR = "/"; private static final long CONNECT_TIMEOUT_MS = 10000L; + private static final long DEFAULT_OBJECT_UPLOAD_BATCH_BYTES = 200L * 1024 * 1024; + private static final int DEFAULT_OBJECT_UPLOAD_PARALLELISM = + Math.max(1, Math.min(Runtime.getRuntime().availableProcessors() / 4, 16)); + private static final long DEFAULT_OBJECT_UPLOAD_THREAD_KEEP_ALIVE_SECONDS = 60L; + private static final long EXECUTOR_CLOSE_TIMEOUT_SECONDS = 30L; private static final Set<ClientChannelEvent> WAIT_FOR_CLOSED = EnumSet.of(ClientChannelEvent.CLOSED); + private static final AtomicInteger OBJECT_UPLOAD_THREAD_COUNTER = new AtomicInteger(0); private final String host; private final String user; private final String password; private final String remoteBaseDir; private final int port; + private final long objectUploadBatchBytes; + private final int objectUploadParallelism; + private final int objectUploadWaitingQueueSize; + private final long objectUploadThreadKeepAliveSeconds; private final RateLimiter transferRateLimiter; + private final ThreadPoolExecutor objectUploadExecutor; + private final BlockingQueue<PooledWorkerSession> idleWorkerSessions; private SshClient client; private ClientSession session; @@ -91,6 +129,73 @@ class ScpRemoteFileTransfer implements RemoteFileTransfer { params.getIntOrDefault( Arrays.asList(CONNECTOR_SCP_PORT_KEY, SINK_SCP_PORT_KEY), SINK_SCP_PORT_DEFAULT_VALUE); this.password = params.getStringByKeys(CONNECTOR_SCP_PASSWORD_KEY, SINK_SCP_PASSWORD_KEY); + final long configuredObjectUploadBatchBytes = + params.getLongOrDefault( + Arrays.asList( + CONNECTOR_SCP_OBJECT_BATCH_SIZE_BYTES_KEY, SINK_SCP_OBJECT_BATCH_SIZE_BYTES_KEY), + DEFAULT_OBJECT_UPLOAD_BATCH_BYTES); + this.objectUploadBatchBytes = + configuredObjectUploadBatchBytes > 0 + ? configuredObjectUploadBatchBytes + : DEFAULT_OBJECT_UPLOAD_BATCH_BYTES; + if (configuredObjectUploadBatchBytes <= 0) { + LOGGER.warn( + "Invalid object upload batch size {} bytes, fallback to default {} bytes", + configuredObjectUploadBatchBytes, + DEFAULT_OBJECT_UPLOAD_BATCH_BYTES); + } + final int configuredObjectUploadParallelism = + params.getIntOrDefault( + Arrays.asList(CONNECTOR_SCP_OBJECT_PARALLELISM_KEY, SINK_SCP_OBJECT_PARALLELISM_KEY), + DEFAULT_OBJECT_UPLOAD_PARALLELISM); + this.objectUploadParallelism = + configuredObjectUploadParallelism > 0 + ? configuredObjectUploadParallelism + : DEFAULT_OBJECT_UPLOAD_PARALLELISM; + if (configuredObjectUploadParallelism <= 0) { + LOGGER.warn( + "Invalid object upload parallelism {}, fallback to default {}", + configuredObjectUploadParallelism, + DEFAULT_OBJECT_UPLOAD_PARALLELISM); + } + final int configuredObjectUploadWaitingQueueSize = + params.getIntOrDefault( + Arrays.asList( + CONNECTOR_SCP_OBJECT_WAITING_QUEUE_SIZE_KEY, + SINK_SCP_OBJECT_WAITING_QUEUE_SIZE_KEY), + objectUploadParallelism); + this.objectUploadWaitingQueueSize = + configuredObjectUploadWaitingQueueSize >= 0 + ? configuredObjectUploadWaitingQueueSize + : objectUploadParallelism; + if (configuredObjectUploadWaitingQueueSize < 0) { + LOGGER.warn( + "Invalid object upload waiting queue size {}, fallback to default {}", + configuredObjectUploadWaitingQueueSize, + objectUploadParallelism); + } + final long configuredObjectUploadThreadKeepAliveSeconds = + params.getLongOrDefault( + Arrays.asList( + CONNECTOR_SCP_OBJECT_THREAD_KEEP_ALIVE_SECONDS_KEY, + SINK_SCP_OBJECT_THREAD_KEEP_ALIVE_SECONDS_KEY), + DEFAULT_OBJECT_UPLOAD_THREAD_KEEP_ALIVE_SECONDS); + this.objectUploadThreadKeepAliveSeconds = + configuredObjectUploadThreadKeepAliveSeconds > 0 + ? configuredObjectUploadThreadKeepAliveSeconds + : DEFAULT_OBJECT_UPLOAD_THREAD_KEEP_ALIVE_SECONDS; + if (configuredObjectUploadThreadKeepAliveSeconds <= 0) { + LOGGER.warn( + "Invalid object upload thread keep alive seconds {}, fallback to default {}", + configuredObjectUploadThreadKeepAliveSeconds, + DEFAULT_OBJECT_UPLOAD_THREAD_KEEP_ALIVE_SECONDS); + } + this.idleWorkerSessions = new LinkedBlockingQueue<>(objectUploadParallelism); + this.objectUploadExecutor = + createObjectUploadExecutor( + objectUploadParallelism, + objectUploadWaitingQueueSize, + objectUploadThreadKeepAliveSeconds); final double bytesPerSecond = params.getDoubleOrDefault( @@ -102,59 +207,275 @@ class ScpRemoteFileTransfer implements RemoteFileTransfer { } else { this.transferRateLimiter = null; } + LOGGER.info( + "SCP sink object upload batch size: {} bytes, max parallelism: {}, " + + "waiting queue size: {}, keep alive seconds: {}", + objectUploadBatchBytes, + objectUploadParallelism, + objectUploadWaitingQueueSize, + objectUploadThreadKeepAliveSeconds); } @Override - public void transferFile(File tsFile, File modFile, File objectSourceDir, String targetName) - throws IOException { + public synchronized void transferFile( + File tsFile, File modFile, File objectSourceDir, String targetName) throws IOException { try { syncObjectDirectory(objectSourceDir, targetName); final String finalTsName = computeFinalTsName(targetName); syncModFile(modFile, finalTsName); syncTsFile(tsFile, finalTsName); } catch (final Exception e) { - invalidateSession(); + invalidateMainSession(); + invalidateIdleWorkerSessions(); throw new IOException("Scp transfer failed: " + targetName, e); } } private void syncObjectDirectory(File sourceDir, String targetName) throws IOException { - if (sourceDir == null || !sourceDir.exists()) { + if (sourceDir == null || !sourceDir.exists() || !sourceDir.isDirectory()) { return; } final ClientSession s = getSession(); - final ScpClient scpClient = ScpClientCreator.instance().createScpClient(s); final Path sourcePath = sourceDir.toPath(); final String remoteTargetRoot = remoteBaseDir + UNIX_SEPARATOR + targetName; + uploadObjectChildrenInBatches(s, sourcePath, remoteTargetRoot); + } - final Set<String> currentTaskDirs = new HashSet<>(); + private void uploadObjectChildrenInBatches( + final ClientSession session, final Path sourcePath, final String remoteTargetRoot) + throws IOException { + final ExecutorCompletionService<Void> completionService = + new ExecutorCompletionService<>(objectUploadExecutor); + final Set<Future<Void>> pendingUploads = new HashSet<>(); + final int maxPendingUploadTasks = + Math.max(1, objectUploadParallelism + objectUploadWaitingQueueSize); + final List<Path> currentBatchFiles = new ArrayList<>(); + long currentBatchBytes = 0; + String currentRemoteDir = null; + final Set<String> preparedDirs = new HashSet<>(); try (Stream<Path> walk = Files.walk(sourcePath)) { - walk.filter(Files::isRegularFile) - .forEach( - path -> { - final String relative = - sourcePath.relativize(path).toString().replace("\\", UNIX_SEPARATOR); - final int lastSlash = relative.lastIndexOf(UNIX_SEPARATOR); - - final String remoteDir = - lastSlash == -1 - ? remoteTargetRoot - : remoteTargetRoot + UNIX_SEPARATOR + relative.substring(0, lastSlash); - try { - if (currentTaskDirs.add(remoteDir)) { - executeRemoteCommand(s, "mkdir -p " + shellQuote(remoteDir)); - } - - acquireTransferBytes(Files.size(path)); - scpClient.upload(path, remoteDir, Collections.emptySet()); - } catch (IOException e) { - throw new UncheckedIOException("Failed to sync object file: " + path, e); - } - }); + final Iterable<Path> paths = walk::iterator; + + for (final Path path : paths) { + if (Files.isDirectory(path)) { + final Path relativeDir = sourcePath.relativize(path); + final String relativeDirStr = relativeDir.toString().replace("\\", UNIX_SEPARATOR); + final String remoteDir = + relativeDirStr.isEmpty() + ? remoteTargetRoot + : remoteTargetRoot + UNIX_SEPARATOR + relativeDirStr; + if (!preparedDirs.contains(remoteDir)) { + ensureRemoteDirExists(session, remoteDir); + preparedDirs.add(remoteDir); + } + continue; + } + if (!Files.isRegularFile(path)) { + continue; + } + + final Path file = path; + final Path relativeParent = sourcePath.relativize(file.getParent()); + final String relativeParentStr = relativeParent.toString().replace("\\", UNIX_SEPARATOR); + final String fileRemoteDir = + relativeParentStr.isEmpty() + ? remoteTargetRoot + : remoteTargetRoot + UNIX_SEPARATOR + relativeParentStr; + + final long fileBytes = Files.size(file); + if (!currentBatchFiles.isEmpty() + && (!fileRemoteDir.equals(currentRemoteDir) + || currentBatchBytes + fileBytes > objectUploadBatchBytes)) { + submitBatchUpload( + completionService, + pendingUploads, + new ArrayList<>(currentBatchFiles), + currentRemoteDir, + currentBatchBytes, + maxPendingUploadTasks); + currentBatchFiles.clear(); + currentBatchBytes = 0; + } + + currentRemoteDir = fileRemoteDir; + currentBatchFiles.add(file); + currentBatchBytes += fileBytes; + } } catch (final UncheckedIOException e) { - throw new IOException(e.getMessage(), e.getCause()); + abortBatchUploads( + completionService, + pendingUploads, + new IOException("Failed to iterate object directory: " + sourcePath, e.getCause())); + return; + } catch (final IOException e) { + abortBatchUploads(completionService, pendingUploads, e); + return; + } + + if (!currentBatchFiles.isEmpty()) { + submitBatchUpload( + completionService, + pendingUploads, + new ArrayList<>(currentBatchFiles), + currentRemoteDir, + currentBatchBytes, + maxPendingUploadTasks); + } + + waitForAllBatchUploads(completionService, pendingUploads); + } + + private void submitBatchUpload( + final ExecutorCompletionService<Void> completionService, + final Set<Future<Void>> pendingUploads, + final List<Path> batchFiles, + final String remoteDir, + final long batchBytes, + final int maxPendingUploadTasks) + throws IOException { + waitForBatchUploadSlot(completionService, pendingUploads, maxPendingUploadTasks); + try { + final Future<Void> future = + completionService.submit( + () -> { + uploadObjectBatch(batchFiles, remoteDir, batchBytes); + return null; + }); + pendingUploads.add(future); + } catch (final RejectedExecutionException e) { + throw new IOException("Failed to submit async SCP object upload task for " + remoteDir, e); + } + } + + private void waitForBatchUploadSlot( + final ExecutorCompletionService<Void> completionService, + final Set<Future<Void>> pendingUploads, + final int maxPendingUploadTasks) + throws IOException { + while (pendingUploads.size() >= maxPendingUploadTasks) { + waitForNextBatchUpload(completionService, pendingUploads); + } + } + + private void waitForAllBatchUploads( + final ExecutorCompletionService<Void> completionService, + final Set<Future<Void>> pendingUploads) + throws IOException { + IOException firstException = null; + while (!pendingUploads.isEmpty()) { + try { + waitForNextBatchUpload(completionService, pendingUploads); + } catch (final IOException e) { + if (firstException == null) { + firstException = e; + cancelPendingUploads(pendingUploads); + } else { + firstException.addSuppressed(e); + } + } + } + if (firstException != null) { + throw firstException; + } + } + + private void abortBatchUploads( + final ExecutorCompletionService<Void> completionService, + final Set<Future<Void>> pendingUploads, + final IOException originalException) + throws IOException { + cancelPendingUploads(pendingUploads); + IOException firstException = originalException; + while (!pendingUploads.isEmpty()) { + try { + waitForNextBatchUpload(completionService, pendingUploads); + } catch (final IOException e) { + firstException.addSuppressed(e); + } + } + throw firstException; + } + + private void waitForNextBatchUpload( + final ExecutorCompletionService<Void> completionService, + final Set<Future<Void>> pendingUploads) + throws IOException { + try { + final Future<Void> completedUpload = completionService.take(); + pendingUploads.remove(completedUpload); + completedUpload.get(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while uploading object files via SCP", e); + } catch (final CancellationException e) { + throw new IOException("Cancelled while uploading object files via SCP", e); + } catch (final ExecutionException e) { + final Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException) cause; + } + throw new IOException("Failed to upload object files via SCP", cause); + } + } + + private void cancelPendingUploads(final Set<Future<Void>> pendingUploads) { + for (final Future<Void> pendingUpload : pendingUploads) { + pendingUpload.cancel(true); + } + } + + private void uploadObjectBatch( + final List<Path> batchFiles, final String remoteDir, final long batchBytes) + throws IOException { + acquireTransferBytes(batchBytes); + final ClientSession workerSession = borrowWorkerSession(); + boolean reusable = false; + try { + ScpClientCreator.instance() + .createScpClient(workerSession) + .upload( + batchFiles.toArray(new Path[0]), + remoteDir, + EnumSet.of(ScpClient.Option.TargetIsDirectory)); + reusable = true; + } finally { + recycleWorkerSession(workerSession, reusable); + } + } + + private static BlockingQueue<Runnable> createObjectUploadQueue(final int waitingQueueSize) { + return waitingQueueSize == 0 + ? new SynchronousQueue<>() + : new LinkedBlockingQueue<>(waitingQueueSize); + } + + private static ThreadPoolExecutor createObjectUploadExecutor( + final int maximumParallelism, final int waitingQueueSize, final long keepAliveSeconds) { + final ThreadPoolExecutor executor = + new ThreadPoolExecutor( + maximumParallelism, + maximumParallelism, + keepAliveSeconds, + TimeUnit.SECONDS, + createObjectUploadQueue(waitingQueueSize), + runnable -> + new Thread( + runnable, + "pipe-scp-object-transfer-" + OBJECT_UPLOAD_THREAD_COUNTER.incrementAndGet())); + executor.allowCoreThreadTimeOut(true); + return executor; + } + + private static final class PooledWorkerSession { + + private final ClientSession session; + private final long idleSinceNanos; + + private PooledWorkerSession(final ClientSession session, final long idleSinceNanos) { + this.session = session; + this.idleSinceNanos = idleSinceNanos; } } @@ -217,20 +538,66 @@ class ScpRemoteFileTransfer implements RemoteFileTransfer { } } - private synchronized ClientSession getSession() throws IOException { + private synchronized SshClient getOrCreateClient() { if (client == null || !client.isStarted()) { System.setProperty("org.apache.sshd.security.provider.BC.enabled", "false"); client = SshClient.setUpDefaultClient(); client.start(); } + return client; + } + + private ClientSession createAuthenticatedSession() throws IOException { + final ClientSession createdSession = + getOrCreateClient().connect(user, host, port).verify(CONNECT_TIMEOUT_MS).getSession(); + createdSession.addPasswordIdentity(password != null ? password : ""); + createdSession.auth().verify(CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS); + return createdSession; + } + + private synchronized ClientSession getSession() throws IOException { if (session == null || !session.isOpen()) { - session = client.connect(user, host, port).verify(CONNECT_TIMEOUT_MS).getSession(); - session.addPasswordIdentity(password != null ? password : ""); - session.auth().verify(CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS); + session = createAuthenticatedSession(); } return session; } + private ClientSession borrowWorkerSession() throws IOException { + while (true) { + final PooledWorkerSession pooledWorkerSession = idleWorkerSessions.poll(); + if (pooledWorkerSession == null) { + return createAuthenticatedSession(); + } + if (!pooledWorkerSession.session.isOpen()) { + closeSessionQuietly(pooledWorkerSession.session); + continue; + } + if (hasWorkerSessionExpired(pooledWorkerSession)) { + closeSessionQuietly(pooledWorkerSession.session); + continue; + } + return pooledWorkerSession.session; + } + } + + private boolean hasWorkerSessionExpired(final PooledWorkerSession pooledWorkerSession) { + return System.nanoTime() - pooledWorkerSession.idleSinceNanos + >= TimeUnit.SECONDS.toNanos(objectUploadThreadKeepAliveSeconds); + } + + private void recycleWorkerSession(final ClientSession workerSession, final boolean reusable) { + if (workerSession == null) { + return; + } + if (!reusable || !workerSession.isOpen() || objectUploadExecutor.isShutdown()) { + closeSessionQuietly(workerSession); + return; + } + if (!idleWorkerSessions.offer(new PooledWorkerSession(workerSession, System.nanoTime()))) { + closeSessionQuietly(workerSession); + } + } + private void acquireTransferBytes(long bytes) { if (transferRateLimiter == null || bytes <= 0) { return; @@ -243,19 +610,37 @@ class ScpRemoteFileTransfer implements RemoteFileTransfer { } } - private synchronized void invalidateSession() { + private synchronized void invalidateMainSession() { if (session != null) { session.close(false); session = null; } + } + + private synchronized void invalidateSession() { + invalidateMainSession(); + invalidateIdleWorkerSessions(); if (client != null && client.isStarted()) { client.stop(); client = null; } } + private void invalidateIdleWorkerSessions() { + PooledWorkerSession pooledWorkerSession; + while ((pooledWorkerSession = idleWorkerSessions.poll()) != null) { + closeSessionQuietly(pooledWorkerSession.session); + } + } + + private static void closeSessionQuietly(final ClientSession session) { + if (session != null) { + session.close(false); + } + } + @Override - public void handshake() throws IOException { + public synchronized void handshake() throws IOException { try { ensureRemoteDirExists(getSession(), remoteBaseDir); LOGGER.info("SCP handshake OK, remote base: {}", remoteBaseDir); @@ -265,7 +650,17 @@ class ScpRemoteFileTransfer implements RemoteFileTransfer { } @Override - public void close() { + public synchronized void close() { + objectUploadExecutor.shutdown(); + try { + if (!objectUploadExecutor.awaitTermination( + EXECUTOR_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + objectUploadExecutor.shutdownNow(); + } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + objectUploadExecutor.shutdownNow(); + } invalidateSession(); } } diff --git a/scripts/tools/tsfile-backup.sh b/scripts/tools/tsfile-backup.sh index d99a13bac61..ce480e09859 100755 --- a/scripts/tools/tsfile-backup.sh +++ b/scripts/tools/tsfile-backup.sh @@ -31,7 +31,8 @@ fi TOOL_ROOT="$(cd "$(dirname "$0")/.."; pwd)" PLUGIN_JAR="" -if [ -d "$TOOL_ROOT/ext/pipe" ]; then + +if [ -z "$PLUGIN_JAR" ] && [ -d "$TOOL_ROOT/ext/pipe" ]; then for f in "$TOOL_ROOT/ext/pipe"/tsfile-remote-sink-*-jar-with-dependencies.jar; do if [ -f "$f" ]; then PLUGIN_JAR="$f" @@ -45,12 +46,6 @@ if [ -z "${IOTDB_HOME}" ]; then exit 1 fi -if [ -z "$PLUGIN_JAR" ] || [ ! -f "$PLUGIN_JAR" ]; then - echo "[ERROR] tsfile-remote-sink plugin JAR not found under $TOOL_ROOT/ext/pipe/." >&2 - echo " Use this script from the tsfile-backup distribution, or set TSFILE_REMOTE_SINK_JAR / --plugin_jar." >&2 - exit 1 -fi - if [ -n "$JAVA_HOME" ]; then for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do if [ -x "$java" ]; then @@ -67,7 +62,10 @@ if [ -z "$JAVA" ] ; then exit 1 fi -JVM_OPTS="-Dsun.jnu.encoding=UTF-8 -Dfile.encoding=UTF-8 -Dtsfile.backup.plugin.jar=${PLUGIN_JAR}" +JVM_OPTS="-Dsun.jnu.encoding=UTF-8 -Dfile.encoding=UTF-8" +if [ -n "$PLUGIN_JAR" ] && [ -f "$PLUGIN_JAR" ]; then + JVM_OPTS="${JVM_OPTS} -Dtsfile.backup.plugin.jar=${PLUGIN_JAR}" +fi CLASSPATH="" for f in "${IOTDB_HOME}"/lib/*.jar; do diff --git a/scripts/tools/windows/tsfile-backup.bat b/scripts/tools/windows/tsfile-backup.bat index 908c8a8a852..a116475c29f 100644 --- a/scripts/tools/windows/tsfile-backup.bat +++ b/scripts/tools/windows/tsfile-backup.bat @@ -36,16 +36,10 @@ popd >nul @REM ----------------------------------------------------------------------------- if NOT DEFINED JAVA_HOME goto :err_java -@REM Automatically scan and locate the plugin JAR (no delayed expansion required) set "PLUGIN_JAR=" -for %%f in ("%TOOL_ROOT%\ext\pipe\tsfile-remote-sink-*-jar-with-dependencies.jar") do ( - set "PLUGIN_JAR=%%f" -) -if "%PLUGIN_JAR%"=="" ( - echo [ERROR] tsfile-remote-sink plugin JAR not found under %TOOL_ROOT%\ext\pipe\ - set ret_code=1 - goto finally +if NOT DEFINED PLUGIN_JAR for %%f in ("%TOOL_ROOT%\ext\pipe\tsfile-remote-sink-*-jar-with-dependencies.jar") do ( + if EXIST "%%~f" set "PLUGIN_JAR=%%~f" ) @REM ----------------------------------------------------------------------------- @@ -54,7 +48,8 @@ if "%PLUGIN_JAR%"=="" ( if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.pipe.TsFileBackup @REM Centralized system properties and encoding settings -set "JAVA_OPTS=-ea -Dsun.jnu.encoding=UTF-8 -Dfile.encoding=UTF-8 -DIOTDB_HOME="%IOTDB_HOME%" -Dtsfile.backup.plugin.jar="%PLUGIN_JAR%"" +set "JAVA_OPTS=-ea -Dsun.jnu.encoding=UTF-8 -Dfile.encoding=UTF-8 -DIOTDB_HOME="%IOTDB_HOME%"" +if DEFINED PLUGIN_JAR if EXIST "%PLUGIN_JAR%" set "JAVA_OPTS=%JAVA_OPTS% -Dtsfile.backup.plugin.jar="%PLUGIN_JAR%"" @REM Elegant dependency loading using wildcards (*) to avoid string concatenation hell set "CLASSPATH=%TOOL_ROOT%\lib\*;%IOTDB_HOME%\lib\*"
