This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch receiver-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/receiver-fix by this push:
     new 09e43a4ceec re
09e43a4ceec is described below

commit 09e43a4ceec69863383364fcc7b9a29e6bec5803
Author: Caideyipi <[email protected]>
AuthorDate: Mon Apr 27 14:05:23 2026 +0800

    re
---
 .../protocol/airgap/IoTDBAirGapReceiver.java       |   5 +
 .../iotconsensusv2/IoTConsensusV2Receiver.java     |  63 ++++++++++---
 .../protocol/airgap/IoTDBAirGapReceiverTest.java   | 103 +++++++++++++++++++++
 .../commons/pipe/receiver/IoTDBFileReceiver.java   |  75 ++++++++++-----
 .../pipe/receiver/PipeReceiverFilePathUtils.java   |  42 +++++++++
 .../pipe/receiver/IoTDBFileReceiverTest.java       |  46 +++++++++
 6 files changed, 301 insertions(+), 33 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index 8658d12b6a8..278c1ccaaef 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -178,6 +178,11 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
       if (System.currentTimeMillis() - startTime
           < PipeConfig.getInstance().getPipeAirGapRetryMaxMs()) {
         handleReq(req, startTime);
+      } else {
+        LOGGER.warn(
+            "Pipe air gap receiver {}: Temporary unavailable retry timed out, 
returning FAIL to sender.",
+            receiverId);
+        fail();
       }
     } else {
       LOGGER.warn(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
index e0ef8e4072c..31c7db57c33 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
 import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiverAgent;
+import org.apache.iotdb.commons.pipe.receiver.PipeReceiverFilePathUtils;
 import 
org.apache.iotdb.commons.pipe.sink.payload.iotconsensusv2.request.IoTConsensusV2RequestType;
 import 
org.apache.iotdb.commons.pipe.sink.payload.iotconsensusv2.request.IoTConsensusV2RequestVersion;
 import 
org.apache.iotdb.commons.pipe.sink.payload.iotconsensusv2.request.IoTConsensusV2TransferFilePieceReq;
@@ -78,6 +79,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
@@ -520,13 +522,19 @@ public class IoTConsensusV2Receiver {
     File writingFile = tsFileWriter.getWritingFile();
     RandomAccessFile writingFileWriter = tsFileWriter.getWritingFileWriter();
 
-    File currentWritingDirPath = tsFileWriter.getLocalWritingDir();
-
-    final List<File> files =
-        req.getFileNames().stream()
-            .map(fileName -> new File(currentWritingDirPath, fileName))
-            .collect(Collectors.toList());
     try {
+      final List<File> files =
+          req.getFileNames().stream()
+              .map(
+                  fileName -> {
+                    try {
+                      return resolveWritingFilePath(tsFileWriter, 
fileName).toFile();
+                    } catch (final IOException e) {
+                      throw new IllegalArgumentException(e);
+                    }
+                  })
+              .collect(Collectors.toList());
+
       if (isWritingFileNonAvailable(tsFileWriter)) {
         final TSStatus status =
             RpcUtils.getStatus(
@@ -601,16 +609,20 @@ public class IoTConsensusV2Receiver {
       }
       return new TIoTConsensusV2TransferResp(status);
     } catch (Exception e) {
+      final Throwable rootCause = e instanceof IllegalArgumentException ? 
e.getCause() : e;
       LOGGER.warn(
           "IoTConsensusV2-PipeName-{}: Failed to seal file {} from req {}.",
           consensusPipeName,
-          files,
+          req.getFileNames(),
           req,
-          e);
+          rootCause);
       return new TIoTConsensusV2TransferResp(
           RpcUtils.getStatus(
               TSStatusCode.IOT_CONSENSUS_V2_TRANSFER_FILE_ERROR,
-              String.format("Failed to seal file %s because %s", writingFile, 
e.getMessage())));
+              String.format(
+                  "Failed to seal file %s because %s",
+                  req.getFileNames(),
+                  rootCause == null ? e.getMessage() : 
rootCause.getMessage())));
     } finally {
       // If the writing file is not sealed successfully, the writing file will 
be deleted.
       // All pieces of the writing file and its mod(if exists) should be 
retransmitted by the
@@ -809,7 +821,22 @@ public class IoTConsensusV2Receiver {
   private boolean isFileExistedAndNameCorrect(
       IoTConsensusV2TsFileWriter tsFileWriter, String fileName) {
     final File writingFile = tsFileWriter.getWritingFile();
-    return writingFile != null && writingFile.getName().equals(fileName);
+    try {
+      return writingFile != null
+          && writingFile.exists()
+          && writingFile
+              .toPath()
+              .toAbsolutePath()
+              .normalize()
+              .equals(resolveWritingFilePath(tsFileWriter, fileName));
+    } catch (final IOException e) {
+      LOGGER.warn(
+          "IoTConsensusV2-PipeName-{}: Illegal file name {} when checking 
writing file.",
+          consensusPipeName,
+          fileName,
+          e);
+      return false;
+    }
   }
 
   private boolean isWritingFileOffsetNonCorrect(
@@ -874,7 +901,7 @@ public class IoTConsensusV2Receiver {
     }
     // Every tsFileWriter has its own writing path.
     // 1 Thread --> 1 connection --> 1 tsFileWriter --> 1 path
-    tsFileWriter.setWritingFile(new File(tsFileWriter.getLocalWritingDir(), 
fileName));
+    tsFileWriter.setWritingFile(resolveWritingFilePath(tsFileWriter, 
fileName).toFile());
     tsFileWriter.setWritingFileWriter(new 
RandomAccessFile(tsFileWriter.getWritingFile(), "rw"));
     LOGGER.info(
         "IoTConsensusV2-PipeName-{}: Writing file {} was created. Ready to 
write file pieces.",
@@ -882,6 +909,20 @@ public class IoTConsensusV2Receiver {
         tsFileWriter.getWritingFile().getPath());
   }
 
+  private Path resolveWritingFilePath(
+      final IoTConsensusV2TsFileWriter tsFileWriter, final String fileName) 
throws IOException {
+    try {
+      return PipeReceiverFilePathUtils.resolveFilePath(
+          tsFileWriter.getLocalWritingDir().toPath(), fileName);
+    } catch (final IOException e) {
+      LOGGER.error(
+          "IoTConsensusV2-PipeName-{}: Path traversal attempt detected! 
Filename: {}",
+          consensusPipeName,
+          fileName);
+      throw e;
+    }
+  }
+
   private void initiateTsFileBufferFolder(List<String> receiverBaseDirsName) 
throws IOException {
     // initiate receiverFileDirs
     for (String receiverFileBaseDir : receiverBaseDirsName) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java
index 19dea8140a1..e23db1f1ca8 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java
@@ -19,18 +19,32 @@
 
 package org.apache.iotdb.db.pipe.receiver.protocol.airgap;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiver;
 import 
org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapELanguageConstant;
+import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapOneByteResponse;
+import 
org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
+import 
org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiverAgent;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
 import org.apache.tsfile.utils.BytesUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.net.Socket;
+import java.nio.ByteBuffer;
 
 public class IoTDBAirGapReceiverTest {
 
@@ -69,4 +83,93 @@ public class IoTDBAirGapReceiverTest {
         Assert.assertThrows(IOException.class, () -> 
receiver.readData(inputStream));
     Assert.assertTrue(exception.getMessage().contains("nested E-Language 
prefix"));
   }
+
+  @Test
+  public void testTemporaryUnavailableRetryTimeoutReturnsFail() throws 
Exception {
+    final CommonConfig commonConfig = 
CommonDescriptor.getInstance().getConfig();
+    final long originalRetryLocalIntervalMs = 
commonConfig.getPipeAirGapRetryLocalIntervalMs();
+    final long originalRetryMaxMs = commonConfig.getPipeAirGapRetryMaxMs();
+
+    try {
+      commonConfig.setPipeAirGapRetryLocalIntervalMs(0);
+      commonConfig.setPipeAirGapRetryMaxMs(1);
+
+      final RecordingSocket socket = new RecordingSocket();
+      final IoTDBAirGapReceiver receiver = new IoTDBAirGapReceiver(socket, 3L);
+      final StubIoTDBDataNodeReceiverAgent stubAgent = new 
StubIoTDBDataNodeReceiverAgent();
+      stubAgent.setStubReceiver(
+          new StubReceiver(
+              new TSStatus(
+                  
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())));
+      setField(receiver, "agent", stubAgent);
+
+      final AirGapPseudoTPipeTransferRequest req = new 
AirGapPseudoTPipeTransferRequest();
+      req.setVersion(IoTDBSinkRequestVersion.VERSION_1.getVersion());
+      req.setType((short) 0);
+      req.setBody(ByteBuffer.allocate(0));
+
+      final Method handleReq =
+          IoTDBAirGapReceiver.class.getDeclaredMethod(
+              "handleReq", AirGapPseudoTPipeTransferRequest.class, long.class);
+      handleReq.setAccessible(true);
+      handleReq.invoke(receiver, req, System.currentTimeMillis() - 10_000L);
+
+      Assert.assertArrayEquals(AirGapOneByteResponse.FAIL, 
socket.getWrittenBytes());
+    } finally {
+      
commonConfig.setPipeAirGapRetryLocalIntervalMs(originalRetryLocalIntervalMs);
+      commonConfig.setPipeAirGapRetryMaxMs(originalRetryMaxMs);
+    }
+  }
+
+  private static void setField(final Object target, final String fieldName, 
final Object value)
+      throws Exception {
+    final Field field = IoTDBAirGapReceiver.class.getDeclaredField(fieldName);
+    field.setAccessible(true);
+    field.set(target, value);
+  }
+
+  private static class RecordingSocket extends Socket {
+
+    private final ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream();
+
+    @Override
+    public OutputStream getOutputStream() {
+      return outputStream;
+    }
+
+    byte[] getWrittenBytes() {
+      return outputStream.toByteArray();
+    }
+  }
+
+  private static class StubIoTDBDataNodeReceiverAgent extends 
IoTDBDataNodeReceiverAgent {
+
+    void setStubReceiver(final IoTDBReceiver receiver) {
+      setReceiverWithSpecifiedClient(null, receiver);
+    }
+  }
+
+  private static class StubReceiver implements IoTDBReceiver {
+
+    private final TPipeTransferResp response;
+
+    private StubReceiver(final TSStatus status) {
+      response = new TPipeTransferResp(status);
+    }
+
+    @Override
+    public TPipeTransferResp receive(final TPipeTransferReq req) {
+      return response;
+    }
+
+    @Override
+    public void handleExit() {
+      // noop for unit test
+    }
+
+    @Override
+    public IoTDBSinkRequestVersion getVersion() {
+      return IoTDBSinkRequestVersion.VERSION_1;
+    }
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index e2484576a77..6879f88ced8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -497,16 +497,7 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
             receiverFileDirWithIdSuffix.get().getPath());
       }
     }
-    Path baseDir = 
receiverFileDirWithIdSuffix.get().toPath().toAbsolutePath().normalize();
-    Path targetPath = baseDir.resolve(fileName).toAbsolutePath().normalize();
-
-    if (!targetPath.startsWith(baseDir)) {
-      LOGGER.error(
-          "Receiver id = {}: Path traversal attempt detected! Filename: {}",
-          receiverId.get(),
-          fileName);
-      throw new IOException("Illegal fileName: " + fileName + " (Path 
traversal detected)");
-    }
+    final Path targetPath = resolveReceiverFilePath(fileName);
 
     writingFile = targetPath.toFile();
     writingFileWriter = new RandomAccessFile(writingFile, "rw");
@@ -517,7 +508,37 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
   }
 
   private boolean isFileExistedAndNameCorrect(final String fileName) {
-    return writingFile != null && writingFile.exists() && 
writingFile.getName().equals(fileName);
+    try {
+      return writingFile != null
+          && writingFile.exists()
+          && receiverFileDirWithIdSuffix.get() != null
+          && writingFile
+              .toPath()
+              .toAbsolutePath()
+              .normalize()
+              .equals(resolveReceiverFilePath(fileName));
+    } catch (final IOException e) {
+      PipeLogger.log(
+          LOGGER::warn,
+          e,
+          "Receiver id = %s: Illegal file name %s when checking writing file.",
+          receiverId.get(),
+          fileName);
+      return false;
+    }
+  }
+
+  private Path resolveReceiverFilePath(final String fileName) throws 
IOException {
+    try {
+      return PipeReceiverFilePathUtils.resolveFilePath(
+          receiverFileDirWithIdSuffix.get().toPath(), fileName);
+    } catch (final IOException e) {
+      LOGGER.error(
+          "Receiver id = {}: Path traversal attempt detected! Filename: {}",
+          receiverId.get(),
+          fileName);
+      throw e;
+    }
   }
 
   private void closeCurrentWritingFileWriter(final boolean fsyncBeforeClose) {
@@ -680,15 +701,22 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
   // Support null in fileName list, which means that this file is optional and 
is currently absent
   protected final TPipeTransferResp handleTransferFileSealV2(final 
PipeTransferFileSealReqV2 req) {
     final List<String> fileNames = req.getFileNames();
-    final List<File> files =
-        fileNames.stream()
-            .map(
-                fileName ->
-                    Objects.nonNull(fileName)
-                        ? new File(receiverFileDirWithIdSuffix.get(), fileName)
-                        : null)
-            .collect(Collectors.toList());
     try {
+      final List<File> files =
+          fileNames.stream()
+              .map(
+                  fileName -> {
+                    if (Objects.isNull(fileName)) {
+                      return null;
+                    }
+                    try {
+                      return resolveReceiverFilePath(fileName).toFile();
+                    } catch (final IOException e) {
+                      throw new IllegalArgumentException(e);
+                    }
+                  })
+              .collect(Collectors.toList());
+
       if (!isWritingFileAvailable()) {
         final TSStatus status =
             RpcUtils.getStatus(
@@ -754,17 +782,20 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       }
       return new TPipeTransferResp(status);
     } catch (final Exception e) {
+      final Throwable rootCause = e instanceof IllegalArgumentException ? 
e.getCause() : e;
       PipeLogger.log(
           LOGGER::warn,
-          e,
+          rootCause,
           "Receiver id = %s: Failed to seal file %s from req %s.",
           receiverId.get(),
-          files,
+          fileNames,
           req);
       return new TPipeTransferResp(
           RpcUtils.getStatus(
               TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
-              String.format("Failed to seal file %s because %s", files, 
e.getMessage())));
+              String.format(
+                  "Failed to seal file %s because %s",
+                  fileNames, rootCause == null ? e.getMessage() : 
rootCause.getMessage())));
     } finally {
       // If the writing file is not sealed successfully, the writing file will 
be deleted.
       // All pieces of the writing file and its mod(if exists) should be 
retransmitted by the
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverFilePathUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverFilePathUtils.java
new file mode 100644
index 00000000000..bc7275d4ebe
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverFilePathUtils.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.receiver;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+public final class PipeReceiverFilePathUtils {
+
+  private PipeReceiverFilePathUtils() {
+    // Utility class
+  }
+
+  public static Path resolveFilePath(final Path baseDir, final String 
fileName) throws IOException {
+    final Path normalizedBaseDir = baseDir.toAbsolutePath().normalize();
+    final Path normalizedTargetPath =
+        normalizedBaseDir.resolve(fileName).toAbsolutePath().normalize();
+
+    if (!normalizedTargetPath.startsWith(normalizedBaseDir)) {
+      throw new IOException("Illegal fileName: " + fileName + " (Path 
traversal detected)");
+    }
+
+    return normalizedTargetPath;
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
index a372326433d..8d2db54d5b6 100644
--- 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
@@ -21,8 +21,10 @@ package org.apache.iotdb.commons.pipe.receiver;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV1;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV2;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
@@ -33,6 +35,8 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 public class IoTDBFileReceiverTest {
@@ -63,6 +67,25 @@ public class IoTDBFileReceiverTest {
     }
   }
 
+  @Test
+  public void testRejectPathTraversalFileNameInSealRequest() throws Exception {
+    final Path baseDir = Files.createTempDirectory("iotdb-file-receiver-test");
+    final DummyFileReceiver receiver = new DummyFileReceiver(baseDir.toFile());
+    try {
+      receiver.createWritingFile("normal.tsfile", false);
+
+      final TPipeTransferResp response =
+          receiver.sealFiles(
+              Arrays.asList("../outside.mod", "normal.tsfile"), 
Arrays.asList(0L, 0L));
+
+      Assert.assertEquals(
+          TSStatusCode.PIPE_TRANSFER_FILE_ERROR.getStatusCode(), 
response.getStatus().getCode());
+      Assert.assertTrue(response.getStatus().getMessage().contains("Illegal 
fileName"));
+    } finally {
+      receiver.handleExit();
+    }
+  }
+
   private static class DummyFileReceiver extends IoTDBFileReceiver {
 
     DummyFileReceiver(final File baseDir) {
@@ -73,6 +96,12 @@ public class IoTDBFileReceiverTest {
       updateWritingFileIfNeeded(fileName, isSingleFile);
     }
 
+    TPipeTransferResp sealFiles(final List<String> fileNames, final List<Long> 
fileLengths)
+        throws IOException {
+      return handleTransferFileSealV2(
+          DummyFileSealReqV2.toTPipeTransferReq(fileNames, fileLengths, 
Collections.emptyMap()));
+    }
+
     File getWritingFileInBaseDir(final String fileName) {
       return 
receiverFileDirWithIdSuffix.get().toPath().resolve(fileName).toFile();
     }
@@ -130,4 +159,21 @@ public class IoTDBFileReceiverTest {
       return null;
     }
   }
+
+  private static class DummyFileSealReqV2 extends PipeTransferFileSealReqV2 {
+
+    static DummyFileSealReqV2 toTPipeTransferReq(
+        final List<String> fileNames,
+        final List<Long> fileLengths,
+        final java.util.Map<String, String> parameters)
+        throws IOException {
+      return (DummyFileSealReqV2)
+          new DummyFileSealReqV2().convertToTPipeTransferReq(fileNames, 
fileLengths, parameters);
+    }
+
+    @Override
+    protected PipeRequestType getPlanType() {
+      return PipeRequestType.TRANSFER_SCHEMA_SNAPSHOT_SEAL;
+    }
+  }
 }

Reply via email to