This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch hotfix/2.0.9.4-sjzt in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 441b43c0bf330ac55c5543982c22c69dc3bb5f4a Author: 陈哲涵 <[email protected]> AuthorDate: Wed Apr 29 06:17:00 2026 +0000 [TIMECHODB] Fixed object pipe bugs (cherry picked from commit 1bb588274ef22914dfa0afdbbfb4c0c1dc0cb5af) --- .../common/tsfile/PipeTsFileInsertionEvent.java | 6 +- .../sink/protocol/tsfile/PipeTsFileLocalSink.java | 67 ++++++++-- .../sink/util/builder/TsFileNameGenerator.java | 63 +++++++++ .../sink/util/builder/TsFileNameGeneratorTest.java | 146 +++++++++++++++++++++ .../plugin/sink/tsfile/PipeTsFileRemoteSink.java | 70 ++++++++-- .../plugin/sink/tsfile/ScpRemoteFileTransfer.java | 16 ++- 6 files changed, 337 insertions(+), 31 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 19abb31b28b..803269377a3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -373,13 +373,9 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent final Iterator<String> pathIterator = objectPathIterator(); final int linked = PipeDataNodeResourceManager.object().linkObjectFiles(resource, pathIterator, pipeName); + hasObjectData = linked > 0; if (linked > 0) { - if (hasObjectData == null) { - hasObjectData = true; - } PipeDataNodeResourceManager.object().increaseReference(resource, pipeName); - } else if (hasObjectData == null) { - hasObjectData = false; } PipeDataNodeResourceManager.object().setTsFileClosed(resource, pipeName); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/PipeTsFileLocalSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/PipeTsFileLocalSink.java index f1627af7a9e..943d0efd147 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/PipeTsFileLocalSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/PipeTsFileLocalSink.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.tsfile; import org.apache.iotdb.commons.pipe.sink.protocol.PipeBatchMetricsSettable; +import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; @@ -69,10 +70,9 @@ public class PipeTsFileLocalSink implements PipeSink, PipeBatchMetricsSettable { private static final Logger LOGGER = LoggerFactory.getLogger(PipeTsFileLocalSink.class); - private final TsFileNameGenerator tsFileNameGenerator = new TsFileNameGenerator(); - private FileTransfer fileTransfer; private PipeTabletEventTsFileBatch eventTsFileBatch; + private List<Pair<String, Pair<File, File>>> sealedBatchedTsFiles; private Histogram tsFileBatchSizeHistogram = new DoNothingHistogram(); private Histogram tsFileBatchTimeIntervalHistogram = new DoNothingHistogram(); @@ -164,18 +164,27 @@ public class PipeTsFileLocalSink implements PipeSink, PipeBatchMetricsSettable { return; } if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) { - final File tsFile = tsFileInsertionEvent.getTsFile(); + final PipeTsFileInsertionEvent event = (PipeTsFileInsertionEvent) tsFileInsertionEvent; + if (!event.waitForTsFileClose()) { + LOGGER.warn( + "Pipe skipping temporary TsFile which shouldn't be transferred: {}", event.getTsFile()); + return; + } + + final File tsFile = event.getTsFile(); if (tsFile != null && tsFile.exists()) { fileTransfer.transferFile( tsFile, PipeObjectPathUtil.resolveLinkedObjectDirectory( - ((PipeTsFileInsertionEvent) tsFileInsertionEvent).getTsFileResource(), - ((PipeTsFileInsertionEvent) tsFileInsertionEvent).getPipeName()), - tsFileNameGenerator.nextFileName()); + event.getTsFileResource(), event.getPipeName()), + TsFileNameGenerator.targetNameForEvent(event)); } } else { - fileTransfer.transferFile( - tsFileInsertionEvent.getTsFile(), null, tsFileNameGenerator.nextFileName()); + final File tsFile = tsFileInsertionEvent.getTsFile(); + if (tsFile != null && tsFile.exists()) { + fileTransfer.transferFile( + tsFile, null, TsFileNameGenerator.targetNameForEvent(tsFileInsertionEvent)); + } } } @@ -192,8 +201,14 @@ public class PipeTsFileLocalSink implements PipeSink, PipeBatchMetricsSettable { @Override public void close() throws Exception { + cleanupSealedBatchedTsFiles(); + if (eventTsFileBatch != null) { + eventTsFileBatch.close(); + eventTsFileBatch = null; + } if (fileTransfer != null) { fileTransfer.close(); + fileTransfer = null; } } @@ -238,7 +253,7 @@ public class PipeTsFileLocalSink implements PipeSink, PipeBatchMetricsSettable { if (!eventTsFileBatch.shouldEmit() || eventTsFileBatch.isEmpty()) { return; } - final List<Pair<String, Pair<File, File>>> list = eventTsFileBatch.sealTsFiles(); + final List<Pair<String, Pair<File, File>>> list = getOrSealBatchedTsFiles(); for (final Pair<String, Pair<File, File>> sealed : list) { final Pair<File, File> tsFileAndObjectDir = sealed.getRight(); if (tsFileAndObjectDir == null) { @@ -247,10 +262,42 @@ public class PipeTsFileLocalSink implements PipeSink, PipeBatchMetricsSettable { final File tsFile = tsFileAndObjectDir.getLeft(); final File objectDir = tsFileAndObjectDir.getRight(); if (tsFile != null && tsFile.exists()) { - fileTransfer.transferFile(tsFile, objectDir, tsFileNameGenerator.nextFileName()); + fileTransfer.transferFile( + tsFile, objectDir, TsFileNameGenerator.targetNameForGeneratedFile(tsFile)); } } eventTsFileBatch.decreaseEventsReferenceCount(PipeTsFileLocalSink.class.getName(), true); + cleanupSealedBatchedTsFiles(); eventTsFileBatch.onSuccess(); } + + private List<Pair<String, Pair<File, File>>> getOrSealBatchedTsFiles() throws Exception { + if (sealedBatchedTsFiles == null) { + sealedBatchedTsFiles = eventTsFileBatch.sealTsFiles(); + } + return sealedBatchedTsFiles; + } + + private void cleanupSealedBatchedTsFiles() { + if (sealedBatchedTsFiles == null) { + return; + } + + for (final Pair<String, Pair<File, File>> sealed : sealedBatchedTsFiles) { + if (sealed == null || sealed.getRight() == null) { + continue; + } + + final File tsFile = sealed.getRight().getLeft(); + final File objectDir = sealed.getRight().getRight(); + if (tsFile != null && tsFile.exists()) { + FileUtils.deleteFileOrDirectory(tsFile, true); + } + if (objectDir != null && objectDir.exists()) { + FileUtils.deleteFileOrDirectory(objectDir, true); + } + } + + sealedBatchedTsFiles = null; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/TsFileNameGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/TsFileNameGenerator.java index b6623056c9a..5b696bdb5ee 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/TsFileNameGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/TsFileNameGenerator.java @@ -19,10 +19,20 @@ package org.apache.iotdb.db.pipe.sink.util.builder; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; + +import org.apache.tsfile.common.constant.TsFileConstant; + +import java.io.File; import java.util.UUID; +import java.util.regex.Pattern; public class TsFileNameGenerator { + private static final Pattern NON_FILE_NAME_CHAR_PATTERN = Pattern.compile("[^A-Za-z0-9._-]"); + private final String runId; private long lastTimestamp; private long indexSeq; @@ -49,4 +59,57 @@ public class TsFileNameGenerator { public String getRunId() { return runId; } + + public static String targetNameForEvent(final TsFileInsertionEvent event) { + if (event == null) { + return "tsfile"; + } + + if (event instanceof EnrichedEvent) { + final EnrichedEvent enrichedEvent = (EnrichedEvent) event; + final CommitterKey committerKey = enrichedEvent.getCommitterKey(); + if (committerKey != null && enrichedEvent.getCommitId() != EnrichedEvent.NO_COMMIT_ID) { + return sanitizeFileName( + stripTsFileSuffix(event.getTsFile()) + + "_" + + committerKey.stringify() + + "_" + + enrichedEvent.getCommitId()); + } + } + + return targetNameForFile(event.getTsFile()); + } + + public static String targetNameForFile(final File tsFile) { + if (tsFile == null) { + return "tsfile"; + } + + final String normalizedPath = tsFile.toPath().toAbsolutePath().normalize().toString(); + return sanitizeFileName( + stripTsFileSuffix(tsFile) + "_" + Integer.toUnsignedString(normalizedPath.hashCode())); + } + + public static String targetNameForGeneratedFile(final File tsFile) { + return sanitizeFileName(stripTsFileSuffix(tsFile)); + } + + private static String stripTsFileSuffix(final File tsFile) { + return stripTsFileSuffix(tsFile == null ? null : tsFile.getName()); + } + + private static String stripTsFileSuffix(final String tsFileName) { + if (tsFileName == null || tsFileName.isEmpty()) { + return "tsfile"; + } + return tsFileName.endsWith(TsFileConstant.TSFILE_SUFFIX) + ? tsFileName.substring(0, tsFileName.length() - TsFileConstant.TSFILE_SUFFIX.length()) + : tsFileName; + } + + private static String sanitizeFileName(final String rawName) { + final String sanitized = NON_FILE_NAME_CHAR_PATTERN.matcher(rawName).replaceAll("_"); + return sanitized.isEmpty() ? "tsfile" : sanitized; + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/util/builder/TsFileNameGeneratorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/util/builder/TsFileNameGeneratorTest.java new file mode 100644 index 00000000000..dd12077abbe --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/util/builder/TsFileNameGeneratorTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink.util.builder; + +import org.apache.iotdb.commons.consensus.index.ProgressIndex; +import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.util.Collections; + +public class TsFileNameGeneratorTest { + + @Test + public void testTargetNameForEventUsesStableCommitIdentity() { + final TestTsFileEvent event = new TestTsFileEvent(new File("source.tsfile")); + event.setCommitterKeyAndCommitId(new CommitterKey("pipe name/1", 100L, 7, 2), 11L); + + Assert.assertEquals( + "source_pipe_name_1_7_100_2_11", TsFileNameGenerator.targetNameForEvent(event)); + } + + @Test + public void testTargetNameForFileDistinguishesDifferentPaths() { + final File first = new File("target/first/same.tsfile"); + final File second = new File("target/second/same.tsfile"); + + Assert.assertNotEquals( + TsFileNameGenerator.targetNameForFile(first), + TsFileNameGenerator.targetNameForFile(second)); + } + + @Test + public void testTargetNameForGeneratedFileStripsTsFileSuffix() { + Assert.assertEquals( + "tb_1_2_3", TsFileNameGenerator.targetNameForGeneratedFile(new File("tb_1_2_3.tsfile"))); + } + + private static class TestTsFileEvent extends EnrichedEvent implements TsFileInsertionEvent { + + private final File tsFile; + + private TestTsFileEvent(final File tsFile) { + super( + "pipe", + 1L, + (PipeTaskMeta) null, + (TreePattern) null, + (TablePattern) null, + null, + null, + null, + true, + Long.MIN_VALUE, + Long.MAX_VALUE); + this.tsFile = tsFile; + } + + @Override + public Iterable<TabletInsertionEvent> toTabletInsertionEvents() { + return Collections.emptyList(); + } + + @Override + public File getTsFile() { + return tsFile; + } + + @Override + public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) { + return true; + } + + @Override + public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) { + return true; + } + + @Override + public ProgressIndex getProgressIndex() { + return MinimumProgressIndex.INSTANCE; + } + + @Override + public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( + final String pipeName, + final long creationTime, + final PipeTaskMeta pipeTaskMeta, + final TreePattern treePattern, + final TablePattern tablePattern, + final String userId, + final String userName, + final String cliHostname, + final boolean skipIfNoPrivileges, + final long startTime, + final long endTime) { + return this; + } + + @Override + public boolean isGeneratedByPipe() { + return false; + } + + @Override + public boolean mayEventTimeOverlappedWithTimeRange() { + return true; + } + + @Override + public boolean mayEventPathsOverlappedWithPattern() { + return true; + } + + @Override + public void close() { + // do nothing + } + } +} diff --git a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/PipeTsFileRemoteSink.java b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/PipeTsFileRemoteSink.java index f876295275b..de4f7fb89a2 100644 --- a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/PipeTsFileRemoteSink.java +++ b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/PipeTsFileRemoteSink.java @@ -21,6 +21,7 @@ package org.apache.iotdb.pipe.plugin.sink.tsfile; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.sink.protocol.PipeBatchMetricsSettable; +import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; @@ -40,7 +41,6 @@ import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; -import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException; import org.apache.tsfile.utils.Pair; @@ -72,9 +72,9 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SIN public class PipeTsFileRemoteSink implements PipeSink, PipeBatchMetricsSettable { private static final Logger LOGGER = LoggerFactory.getLogger(PipeTsFileRemoteSink.class); - private final TsFileNameGenerator tsFileNameGenerator = new TsFileNameGenerator(); private RemoteFileTransfer remoteFileTransfer; private PipeTabletEventTsFileBatch eventTsFileBatch; + private List<Pair<String, Pair<File, File>>> sealedBatchedTsFiles; private Histogram tsFileBatchSizeHistogram = new DoNothingHistogram(); private Histogram tsFileBatchTimeIntervalHistogram = new DoNothingHistogram(); @@ -146,23 +146,28 @@ public class PipeTsFileRemoteSink implements PipeSink, PipeBatchMetricsSettable if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) { final PipeTsFileInsertionEvent event = (PipeTsFileInsertionEvent) tsFileInsertionEvent; if (!event.waitForTsFileClose()) { - throw new PipeException( - "Timeout waiting for tsfile close before sink transfer: " + tsFileInsertionEvent); + LOGGER.warn( + "Pipe skipping temporary TsFile which shouldn't be transferred: {}", event.getTsFile()); + return; } + final TsFileResource tsFileResource = event.getTsFileResource(); - if (tsFileResource == null) { - throw new PipeException("TsFile resource is null, event: " + tsFileInsertionEvent); - } - final File tsFile = tsFileResource.getTsFile(); + final File tsFile = event.getTsFile(); if (tsFile != null && tsFile.exists()) { remoteFileTransfer.transferFile( tsFile, - PipeObjectPathUtil.resolveLinkedObjectDirectory(tsFileResource, event.getPipeName()), - tsFileNameGenerator.nextFileName()); + tsFileResource == null + ? null + : PipeObjectPathUtil.resolveLinkedObjectDirectory( + tsFileResource, event.getPipeName()), + TsFileNameGenerator.targetNameForEvent(event)); } } else { - remoteFileTransfer.transferFile( - tsFileInsertionEvent.getTsFile(), null, tsFileNameGenerator.nextFileName()); + final File tsFile = tsFileInsertionEvent.getTsFile(); + if (tsFile != null && tsFile.exists()) { + remoteFileTransfer.transferFile( + tsFile, null, TsFileNameGenerator.targetNameForEvent(tsFileInsertionEvent)); + } } } @@ -207,6 +212,11 @@ public class PipeTsFileRemoteSink implements PipeSink, PipeBatchMetricsSettable @Override public void close() throws Exception { + cleanupSealedBatchedTsFiles(); + if (eventTsFileBatch != null) { + eventTsFileBatch.close(); + eventTsFileBatch = null; + } if (remoteFileTransfer != null) { remoteFileTransfer.close(); remoteFileTransfer = null; @@ -254,7 +264,7 @@ public class PipeTsFileRemoteSink implements PipeSink, PipeBatchMetricsSettable if (!eventTsFileBatch.shouldEmit() || eventTsFileBatch.isEmpty()) { return; } - final List<Pair<String, Pair<File, File>>> list = eventTsFileBatch.sealTsFiles(); + final List<Pair<String, Pair<File, File>>> list = getOrSealBatchedTsFiles(); for (final Pair<String, Pair<File, File>> sealed : list) { final Pair<File, File> tsFileAndObjectDir = sealed.getRight(); if (tsFileAndObjectDir == null) { @@ -263,10 +273,42 @@ public class PipeTsFileRemoteSink implements PipeSink, PipeBatchMetricsSettable final File tsFile = tsFileAndObjectDir.getLeft(); final File objectDir = tsFileAndObjectDir.getRight(); if (tsFile != null && tsFile.exists()) { - remoteFileTransfer.transferFile(tsFile, objectDir, tsFileNameGenerator.nextFileName()); + remoteFileTransfer.transferFile( + tsFile, objectDir, TsFileNameGenerator.targetNameForGeneratedFile(tsFile)); } } eventTsFileBatch.decreaseEventsReferenceCount(PipeTsFileRemoteSink.class.getName(), true); + cleanupSealedBatchedTsFiles(); eventTsFileBatch.onSuccess(); } + + private List<Pair<String, Pair<File, File>>> getOrSealBatchedTsFiles() throws Exception { + if (sealedBatchedTsFiles == null) { + sealedBatchedTsFiles = eventTsFileBatch.sealTsFiles(); + } + return sealedBatchedTsFiles; + } + + private void cleanupSealedBatchedTsFiles() { + if (sealedBatchedTsFiles == null) { + return; + } + + for (final Pair<String, Pair<File, File>> sealed : sealedBatchedTsFiles) { + if (sealed == null || sealed.getRight() == null) { + continue; + } + + final File tsFile = sealed.getRight().getLeft(); + final File objectDir = sealed.getRight().getRight(); + if (tsFile != null && tsFile.exists()) { + FileUtils.deleteFileOrDirectory(tsFile, true); + } + if (objectDir != null && objectDir.exists()) { + FileUtils.deleteFileOrDirectory(objectDir, true); + } + } + + sealedBatchedTsFiles = null; + } } diff --git a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java index c99808be19a..c47545902cf 100644 --- a/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java +++ b/library-pipe/tsfile-remote-sink/src/main/java/org/apache/iotdb/pipe/plugin/sink/tsfile/ScpRemoteFileTransfer.java @@ -182,8 +182,20 @@ class ScpRemoteFileTransfer implements RemoteFileTransfer { private void executeRemoteCommand(ClientSession s, String command) throws IOException { try (ChannelExec channel = s.createExecChannel(command)) { channel.open().verify(CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS); - channel.waitFor(WAIT_FOR_CLOSED, CONNECT_TIMEOUT_MS); - } catch (Exception ignore) { + final Set<ClientChannelEvent> events = channel.waitFor(WAIT_FOR_CLOSED, CONNECT_TIMEOUT_MS); + if (!events.contains(ClientChannelEvent.CLOSED)) { + throw new IOException("Remote command timed out: " + command); + } + + final Integer exitStatus = channel.getExitStatus(); + if (exitStatus != null && exitStatus != 0) { + throw new IOException( + String.format("Remote command failed with exit code %s: %s", exitStatus, command)); + } + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new IOException("Failed to execute remote command: " + command, e); } }
