This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch hotfix/2.0.9.4-sjzt in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 643272787477859fcc17b35139c12e875ed58ed4 Author: 罗振羽 <[email protected]> AuthorDate: Thu May 14 15:58:29 2026 +0000 [TIMECHODB][TIMECHO] Fix scp thread nums && Historical files were deleted during the Linked process. (cherry picked from commit 496e8a0ae52d0d88028f5cc8683a07a1d534fa85) --- .../manual/basic/IoTDBPipeTsFileSinkObjectIT.java | 246 ++++++++++++++++++++- .../common/tsfile/PipeTsFileInsertionEvent.java | 9 +- .../resource/tsfile/PipeTsFileResourceManager.java | 26 ++- ...istoricalDataRegionTsFileAndDeletionSource.java | 12 +- .../dataregion/modification/ModificationFile.java | 37 ++++ .../plugin/sink/tsfile/ScpRemoteFileTransfer.java | 22 +- .../plugin/sink/tsfile/ScpSshClientManager.java | 58 +++++ 7 files changed, 369 insertions(+), 41 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileSinkObjectIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileSinkObjectIT.java index 52f50b078d8..afb53732fa0 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileSinkObjectIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileSinkObjectIT.java @@ -55,7 +55,11 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; +import java.util.Set; @RunWith(IoTDBTestRunner.class) @Category({MultiClusterIT2DualTableManualBasic.class}) @@ -67,10 +71,12 @@ public class IoTDBPipeTsFileSinkObjectIT extends AbstractPipeTableModelDualManua private static final String OBJECT_TABLE_NAME = "factory_metrics"; private static final int OBJECT_MULTI_WEEK_DEVICE_COUNT = 5; + private static final int OBJECT_INCREMENTAL_TSFILE_COUNT = 100; private static final long HOUR_MS = 3600 * 1000L; private static final long DAY_MS = 24L * HOUR_MS; private static final long WEEK_MS = 7L * DAY_MS; + private static final long EXPORT_SCAN_SLEEP_MS = 500L; /** Base time aligned so each device sits in a distinct week bucket. */ private static final long OBJECT_BASE_TIME = 1600000000000L; @@ -225,6 +231,209 @@ public class IoTDBPipeTsFileSinkObjectIT extends AbstractPipeTableModelDualManua } } + @Test + public void testPipeTsFileLocalSinkObjectHundredGeneratedTsFilesLoadToReceiverWithoutMods() + throws Exception { + executeIncrementalExportLoadRoundTrip(false); + } + + @Test + public void testPipeTsFileLocalSinkObjectHundredGeneratedTsFilesLoadToReceiverWithMods() + throws Exception { + executeIncrementalExportLoadRoundTrip(true); + } + + private void executeIncrementalExportLoadRoundTrip(final boolean withMods) throws Exception { + final String database = "db1"; + final String pipeName = + withMods + ? "p_incremental_generated_obj_with_mods" + : "p_incremental_generated_obj_without_mods"; + final Map<String, List<Long>> expectedTimesByDevice = new LinkedHashMap<>(); + final List<File> sourceTsFiles = new ArrayList<>(); + + for (int i = 0; i < OBJECT_INCREMENTAL_TSFILE_COUNT; i++) { + final String deviceId = String.format("device_%03d", i + 1); + final File tsFile = + new File( + sourceTsDir, + withMods + ? String.format("incremental_object_with_mods_%03d.tsfile", i + 1) + : String.format("incremental_object_without_mods_%03d.tsfile", i + 1)); + final long startTime = OBJECT_BASE_TIME + (long) i * DAY_MS; + final long endTime = startTime + HOUR_MS; + + try (StandardObjectTableModelTsFileGenerator generator = + new StandardObjectTableModelTsFileGenerator(tsFile)) { + generator.writeDeviceData(OBJECT_TABLE_NAME, deviceId, startTime, endTime, HOUR_MS); + if (withMods) { + generator.generateDeletion(OBJECT_TABLE_NAME, deviceId, endTime, endTime); + expectedTimesByDevice.put( + deviceId, generateExpectedTimes(startTime, endTime, HOUR_MS, endTime, endTime)); + } else { + expectedTimesByDevice.put(deviceId, generateExpectedTimes(startTime, endTime, HOUR_MS)); + } + } + sourceTsFiles.add(tsFile); + } + + try (ITableSession sender = senderEnv.getTableSessionConnection()) { + sender.executeNonQueryStatement(String.format("CREATE DATABASE IF NOT EXISTS %s", database)); + sender.executeNonQueryStatement(String.format("USE \"%s\"", database)); + for (File tsFile : sourceTsFiles) { + sender.executeNonQueryStatement(String.format("LOAD '%s'", tsFile.getAbsolutePath())); + } + TestUtils.executeNonQueryWithRetry(senderEnv, "flush"); + + sender.executeNonQueryStatement( + String.format( + "CREATE PIPE %s " + + "WITH SOURCE (" + + "'source.capture.table'='true', " + + "'source.database-name'='%s', " + + "'source.table-name'='%s', " + + "'source.inclusion'='data.insert', " + + "'source.mods.enable'='%s', " + + "'source.history.enable'='true', " + + "'source.realtime.enable'='false' " + + ") " + + "WITH SINK (" + + "'sink'='tsfile-local-sink', " + + "'sink.local.target-path'='%s', " + + "'sink.batch.max-delay-seconds'='1', " + + "'sink.batch.size-bytes'='1048576'" + + ")", + pipeName, database, OBJECT_TABLE_NAME, withMods, targetDir)); + } + + try { + try (ITableSession receiver = receiverEnv.getTableSessionConnection()) { + receiver.executeNonQueryStatement( + String.format("CREATE DATABASE IF NOT EXISTS %s", database)); + receiver.executeNonQueryStatement(String.format("USE \"%s\"", database)); + loadExportedTsFilesIncrementallyUntilExpectedOnReceiver( + receiver, new File(targetDir), database, expectedTimesByDevice, 180_000L); + } + } finally { + try (ITableSession sender = senderEnv.getTableSessionConnection()) { + sender.executeNonQueryStatement("DROP PIPE " + pipeName); + } + } + + if (!withMods) { + final List<File> modsFiles = new ArrayList<>(); + findFilesBySuffix(new File(targetDir), ModificationFile.FILE_SUFFIX, modsFiles); + Assert.assertTrue( + "No companion mods file should be exported for the pure object TsFiles.", + modsFiles.isEmpty()); + } + } + + private void loadExportedTsFilesIncrementallyUntilExpectedOnReceiver( + final ITableSession receiver, + final File exportRoot, + final String database, + final Map<String, List<Long>> expectedTimesByDevice, + final long timeoutMs) + throws Exception { + final long expectedTotalRows = calculateExpectedRowCount(expectedTimesByDevice); + final long deadline = System.currentTimeMillis() + timeoutMs; + final Set<String> loadedTsFilePaths = new HashSet<>(); + + long previousDiscoveredTsFileCount = -1; + Throwable lastValidationFailure = null; + + while (System.currentTimeMillis() < deadline) { + final List<File> exportedTsFiles = new ArrayList<>(); + findTsFiles(exportRoot, exportedTsFiles); + exportedTsFiles.sort(Comparator.comparing(File::getAbsolutePath)); + + boolean loadedAnyThisRound = false; + for (File tsFile : exportedTsFiles) { + final String tsFilePath = tsFile.getAbsolutePath(); + if (loadedTsFilePaths.contains(tsFilePath)) { + continue; + } + + try { + receiver.executeNonQueryStatement( + String.format("LOAD '%s' WITH ('database-name'='%s')", tsFilePath, database)); + loadedTsFilePaths.add(tsFilePath); + loadedAnyThisRound = true; + } catch (Exception e) { + LOGGER.info("Receiver LOAD will retry later for exported TsFile {}", tsFilePath, e); + } + } + + if (loadedAnyThisRound) { + TestUtils.executeNonQueryWithRetry(receiverEnv, "flush"); + } + + final long actualRowCount = queryRowCountOrNegative(receiver, OBJECT_TABLE_NAME); + if (actualRowCount == expectedTotalRows) { + try { + for (Map.Entry<String, List<Long>> entry : expectedTimesByDevice.entrySet()) { + assertDeviceObjectBytesMatchGenerator( + receiver, OBJECT_TABLE_NAME, entry.getKey(), entry.getValue()); + } + return; + } catch (Throwable t) { + lastValidationFailure = t; + } + } + + final long discoveredTsFileCount = exportedTsFiles.size(); + if (!loadedAnyThisRound && discoveredTsFileCount == previousDiscoveredTsFileCount) { + Thread.sleep(EXPORT_SCAN_SLEEP_MS); + } + previousDiscoveredTsFileCount = discoveredTsFileCount; + } + + final List<File> exportedTsFiles = new ArrayList<>(); + findTsFiles(exportRoot, exportedTsFiles); + exportedTsFiles.sort(Comparator.comparing(File::getAbsolutePath)); + + final List<String> pendingTsFiles = new ArrayList<>(); + for (File tsFile : exportedTsFiles) { + final String tsFilePath = tsFile.getAbsolutePath(); + if (!loadedTsFilePaths.contains(tsFilePath)) { + pendingTsFiles.add(tsFilePath); + } + } + + final long actualRowCount = queryRowCountOrNegative(receiver, OBJECT_TABLE_NAME); + final String baseMessage = + String.format( + "Timeout waiting receiver data to match expected rows. expected=%d, actual=%d, " + + "loadedTsFiles=%d, discoveredTsFiles=%d, pendingTsFiles=%s", + expectedTotalRows, + actualRowCount, + loadedTsFilePaths.size(), + exportedTsFiles.size(), + pendingTsFiles); + if (lastValidationFailure != null) { + throw new AssertionError(baseMessage, lastValidationFailure); + } + Assert.fail(baseMessage); + } + + private static long calculateExpectedRowCount( + final Map<String, List<Long>> expectedTimesByDevice) { + long totalRowCount = 0; + for (List<Long> expectedTimes : expectedTimesByDevice.values()) { + totalRowCount += expectedTimes.size(); + } + return totalRowCount; + } + + private static long queryRowCountOrNegative(final ITableSession session, final String table) { + try { + return queryRowCount(session, table); + } catch (Exception e) { + return -1; + } + } + private static List<Long> generateExpectedTimes( final long startTime, final long endTime, final long interval) { final List<Long> times = new ArrayList<>(); @@ -234,6 +443,22 @@ public class IoTDBPipeTsFileSinkObjectIT extends AbstractPipeTableModelDualManua return times; } + private static List<Long> generateExpectedTimes( + final long startTime, + final long endTime, + final long interval, + final long deleteStartTime, + final long deleteEndTime) { + final List<Long> times = new ArrayList<>(); + for (long t = startTime; t <= endTime; t += interval) { + if (t >= deleteStartTime && t <= deleteEndTime) { + continue; + } + times.add(t); + } + return times; + } + /** * Polls until at least one {@code .tsfile} appears under {@code root} (recursive) or {@code * timeoutMs} elapses. @@ -427,14 +652,15 @@ public class IoTDBPipeTsFileSinkObjectIT extends AbstractPipeTableModelDualManua + root.getAbsolutePath()); } - private static void loadAllTsFilesUnderDir(final ITableSession session, final File root) - throws Exception { + private static void loadAllTsFilesUnderDir( + final ITableSession session, final File root, final String database) throws Exception { final List<File> tsfiles = new ArrayList<>(); findTsFiles(root, tsfiles); Assert.assertFalse(tsfiles.isEmpty()); tsfiles.sort(Comparator.comparing(File::getAbsolutePath)); for (File f : tsfiles) { - session.executeNonQueryStatement(String.format("LOAD '%s'", f.getAbsolutePath())); + session.executeNonQueryStatement( + String.format("LOAD '%s' WITH ('database-name'='%s')", f.getAbsolutePath(), database)); } } @@ -461,6 +687,20 @@ public class IoTDBPipeTsFileSinkObjectIT extends AbstractPipeTableModelDualManua } } + private static void findFilesBySuffix(File dir, String suffix, List<File> matchedFiles) { + File[] files = dir.listFiles(); + if (files == null) { + return; + } + for (File f : files) { + if (f.isDirectory()) { + findFilesBySuffix(f, suffix, matchedFiles); + } else if (f.getName().endsWith(suffix)) { + matchedFiles.add(f); + } + } + } + private static void deleteDirectoryQuietly(Path dirPath) { if (!Files.exists(dirPath)) { return; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 670e7c5360a..843908882e5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -370,7 +370,11 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) { extractTime = System.nanoTime(); final List<ModEntry> linkedObjectColumnModEntries = new ArrayList<>(); + final File firstName = tsFile; try { + if (isGeneratedByHistoricalExtractor) { + tsFile = PipeTsFileResourceManager.getHardlinkOrCopiedFileInPipeDir(tsFile, pipeName); + } if (Objects.nonNull(pipeName)) { final boolean shouldLinkObjectFiles = !Objects.equals(hasObjectData, Boolean.FALSE); final Iterator<String> pathIterator = @@ -383,8 +387,9 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent PipeDataNodeResourceManager.object().setTsFileClosed(resource, pipeName); } - final File firstName = tsFile; - tsFile = PipeDataNodeResourceManager.tsfile().increaseFileReference(tsFile, true, pipeName); + + tsFile = + PipeDataNodeResourceManager.tsfile().increaseFileReference(firstName, true, pipeName); if (isWithMod) { if (modFile != null) { modFile = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java index 644beb1ac40..0589411327a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java @@ -163,6 +163,7 @@ public class PipeTsFileResourceManager { try (ModificationFile modificationFile = new ModificationFile(ModificationFile.getExclusiveMods(pipeTsFile), false)) { + modificationFile.clear(); modificationFile.write(toWrite); modsFile = modificationFile.getFile(); } @@ -172,12 +173,20 @@ public class PipeTsFileResourceManager { if (Objects.nonNull(pipeName)) { hardlinkOrCopiedFileToPipeTsFileResourceMap .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>()) - .put(modsFile.getPath(), new PipeTsFileResource(modsFile)); + .compute( + modsFile.getPath(), + (path, existingResource) -> { + if (existingResource == null) { + return new PipeTsFileResource(modsFile); + } else { + existingResource.increaseReferenceCount(); + return existingResource; + } + }); } } finally { segmentLock.unlock(modsFile); } - increasePublicReference(modsFile, pipeName, false); return modsFile; } @@ -393,24 +402,21 @@ public class PipeTsFileResourceManager { : hardlinkOrCopiedFileToTsFilePublicResourceMap; } - public void pinTsFileResource( - final TsFileResource resource, final boolean withMods, final @Nullable String pipeName) + public void pinTsFileResource(final TsFileResource resource, final @Nullable String pipeName) throws IOException { increaseFileReference(resource.getTsFile(), true, pipeName); - if (withMods && resource.getExclusiveModFile().exists()) { + // Create hard links as long as the mods exist, ensuring that the Object can be used normally. + if (resource.getExclusiveModFile().exists()) { increaseFileReference(resource.getExclusiveModFile().getFile(), false, pipeName); } } - public void unpinTsFileResource( - final TsFileResource resource, - final boolean shouldTransferModFile, - final @Nullable String pipeName) + public void unpinTsFileResource(final TsFileResource resource, final @Nullable String pipeName) throws IOException { decreaseFileReference( getHardlinkOrCopiedFileInPipeDir(resource.getTsFile(), pipeName), pipeName); - if (shouldTransferModFile && resource.exclusiveModFileExists()) { + if (resource.exclusiveModFileExists()) { decreaseFileReference( getHardlinkOrCopiedFileInPipeDir(resource.getExclusiveModFile().getFile(), pipeName), pipeName); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java index 0440000a140..77607071bb9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java @@ -640,8 +640,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource // Pin the resource, in case the file is removed by compaction or anything. // Will unpin it after the PipeTsFileInsertionEvent is created and pinned. try { - PipeDataNodeResourceManager.tsfile() - .pinTsFileResource(resource, shouldTransferModFile, pipeName); + PipeDataNodeResourceManager.tsfile().pinTsFileResource(resource, pipeName); return false; } catch (final IOException e) { LOGGER.warn("Pipe: failed to pin TsFileResource {}", resource.getTsFilePath(), e); @@ -866,8 +865,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource return true; } finally { try { - PipeDataNodeResourceManager.tsfile() - .unpinTsFileResource(resource, shouldTransferModFile, pipeName); + PipeDataNodeResourceManager.tsfile().unpinTsFileResource(resource, pipeName); } catch (final IOException e) { LOGGER.warn( "Pipe {}@{}: failed to unpin skipped historical TsFileResource, original path: {}", @@ -955,8 +953,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource return isReferenceCountIncreased ? event : null; } finally { try { - PipeDataNodeResourceManager.tsfile() - .unpinTsFileResource(resource, shouldTransferModFile, pipeName); + PipeDataNodeResourceManager.tsfile().unpinTsFileResource(resource, pipeName); } catch (final IOException e) { LOGGER.warn( "Pipe {}@{}: failed to unpin TsFileResource after creating event, original path: {}", @@ -1048,8 +1045,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource if (resource instanceof TsFileResource) { try { PipeDataNodeResourceManager.tsfile() - .unpinTsFileResource( - (TsFileResource) resource, shouldTransferModFile, pipeName); + .unpinTsFileResource((TsFileResource) resource, pipeName); } catch (final IOException e) { LOGGER.warn( "Pipe {}@{}: failed to unpin TsFileResource after dropping pipe, original path: {}", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java index 03f44c3c295..dc24ee955c4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java @@ -163,6 +163,43 @@ public class ModificationFile implements AutoCloseable { updateModFileMetric(updateFileNum, size); } + public void clear() throws IOException { + lock.writeLock().lock(); + try { + if (!fileExists) { + return; + } + long sizeBeforeClear = getFileLength(); + if (fileOutputStream != null) { + fileOutputStream.close(); + fileOutputStream = null; + } + if (channel != null) { + channel.truncate(0); + channel.close(); + channel = null; + } else { + try (FileChannel tempChannel = FileChannel.open(file.toPath(), APPEND)) { + tempChannel.truncate(0); + } + } + + if (cascadeFiles != null) { + for (ModificationFile cascadeFile : cascadeFiles) { + cascadeFile.clear(); + } + } + + if (updateMetrics && !removed) { + FileMetrics.getInstance().increaseModFileSize(-sizeBeforeClear); + } + + fileExists = false; + } finally { + lock.writeLock().unlock(); + } + } + private void updateModFileMetric(int num, long size) { if (!removed && updateMetrics) { FileMetrics.getInstance().increaseModFileNum(num); diff --git a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java index 87556ded0cd..e05b62a793b 100644 --- a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java +++ b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java @@ -116,7 +116,6 @@ class ScpRemoteFileTransfer implements RemoteFileTransfer { private final ExecutorService objectUploadExecutor; private final BlockingQueue<PooledWorkerSession> idleWorkerSessions; - private SshClient client; private ClientSession session; ScpRemoteFileTransfer(PipeParameters params) { @@ -516,22 +515,13 @@ class ScpRemoteFileTransfer implements RemoteFileTransfer { } } - private SshClient getOrCreateClient() { - if (client == null || !client.isStarted()) { - synchronized (this) { - if (client == null || !client.isStarted()) { - System.setProperty("org.apache.sshd.security.provider.BC.enabled", "false"); - client = SshClient.setUpDefaultClient(); - client.start(); - } - } - } - return client; + private SshClient getSharedClient() throws IOException { + return ScpSshClientManager.getClient(); } private ClientSession createAuthenticatedSession() throws IOException { final ClientSession createdSession = - getOrCreateClient().connect(user, host, port).verify(CONNECT_TIMEOUT_MS).getSession(); + getSharedClient().connect(user, host, port).verify(CONNECT_TIMEOUT_MS).getSession(); createdSession.addPasswordIdentity(password != null ? password : ""); createdSession.auth().verify(CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS); return createdSession; @@ -606,10 +596,6 @@ class ScpRemoteFileTransfer implements RemoteFileTransfer { private synchronized void invalidateSession() { invalidateMainSession(); invalidateIdleWorkerSessions(); - if (client != null && client.isStarted()) { - client.stop(); - client = null; - } } private void invalidateIdleWorkerSessions() { @@ -636,7 +622,7 @@ class ScpRemoteFileTransfer implements RemoteFileTransfer { } @Override - public synchronized void close() { + public synchronized void close() throws IOException { objectUploadExecutor.shutdown(); try { if (!objectUploadExecutor.awaitTermination( diff --git a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpSshClientManager.java b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpSshClientManager.java new file mode 100644 index 00000000000..d31227d0c05 --- /dev/null +++ b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpSshClientManager.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.plugin.sink.tsfile; + +import org.apache.sshd.client.SshClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +class ScpSshClientManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(ScpSshClientManager.class); + + private static volatile SshClient client; + + private ScpSshClientManager() {} + + static SshClient getClient() throws IOException { + if (client == null || !client.isStarted()) { + synchronized (ScpSshClientManager.class) { + if (client == null || !client.isStarted()) { + client = createClient(); + LOGGER.info("Created static shared SCP SSH client"); + } + } + } + return client; + } + + private static SshClient createClient() throws IOException { + try { + System.setProperty("org.apache.sshd.security.provider.BC.enabled", "false"); + final SshClient sshClient = SshClient.setUpDefaultClient(); + sshClient.start(); + return sshClient; + } catch (Exception e) { + throw new IOException("Failed to create shared SCP SSH client", e); + } + } +}
