This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch hotfix/2.0.9.4-sjzt
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9cd92ff44774490c3b7bfb639f6a51fb30250ae8
Author: 陈哲涵 <[email protected]>
AuthorDate: Thu May 14 23:27:57 2026 +0800

    [TIMECHODB] Fixed the load parameter passes & Added tests for object with 
pipe/load
    
    (cherry picked from commit 16498aad04ddd069f69134d01ad515b093911479)
---
 iotdb-client/client-go                             |   2 +-
 iotdb-core/datanode/pom.xml                        |   5 +
 .../plan/relational/sql/ast/LoadTsFile.java        |   2 +
 .../plan/statement/crud/LoadTsFileStatement.java   |   5 +
 .../object/PipeObjectResourceManagerTest.java      | 141 +++++++++++++++++
 .../plan/planner/node/load/LoadTsFileNodeTest.java |  72 +++++++++
 .../load/LoadTsFileDispatcherImplTest.java         | 147 ++++++++++++++++++
 .../scheduler/load/LoadTsFileSchedulerTest.java    | 152 +++++++++++++++++-
 .../iotdb/db/storageengine/StorageEngineTest.java  |  79 ++++++++++
 .../splitter/LoadTsFileObjectFileBatchTest.java    | 169 +++++++++++++++++++++
 10 files changed, 772 insertions(+), 2 deletions(-)

diff --git a/iotdb-client/client-go b/iotdb-client/client-go
index dc64b1a7648..2ea2655e090 160000
--- a/iotdb-client/client-go
+++ b/iotdb-client/client-go
@@ -1 +1 @@
-Subproject commit dc64b1a7648d3c505c10eed5419f422bb49f1def
+Subproject commit 2ea2655e090dcefd12bf1a789a51c8df9a28fa24
diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index 23189f61336..ac0d555bc0c 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -308,6 +308,11 @@
             <artifactId>powermock-api-mockito2</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-reflect</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.awaitility</groupId>
             <artifactId>awaitility</artifactId>
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
index ee2934a4381..4ef8f4d322e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
@@ -208,6 +208,7 @@ public class LoadTsFile extends Statement {
         
LoadTsFileConfigurator.parseOrGetDefaultTabletConversionThresholdBytes(loadAttributes);
     this.verify = 
LoadTsFileConfigurator.parseOrGetDefaultVerify(loadAttributes);
     this.isAsyncLoad = 
LoadTsFileConfigurator.parseOrGetDefaultAsyncLoad(loadAttributes);
+    this.isGeneratedByPipe = 
LoadTsFileConfigurator.parseOrGetDefaultPipeGenerated(loadAttributes);
     this.objectFileSearchRoot = 
LoadTsFileConfigurator.parseObjectFileSearchRoot(loadAttributes);
   }
 
@@ -311,6 +312,7 @@ public class LoadTsFile extends Statement {
       subStatement.autoCreateDatabase = this.autoCreateDatabase;
       subStatement.isAsyncLoad = this.isAsyncLoad;
       subStatement.isGeneratedByPipe = this.isGeneratedByPipe;
+      subStatement.objectFileSearchRoot = this.objectFileSearchRoot;
 
       // Set all files in the batch
       subStatement.tsFiles = new ArrayList<>(batchFiles);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index e59ed1f0038..cbfae15b114 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -45,6 +45,7 @@ import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurat
 import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY;
 import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.DATABASE_LEVEL_KEY;
 import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.DATABASE_NAME_KEY;
+import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.OBJECT_FILE_PATHS_KEY;
 import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_DELETE_VALUE;
 import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_KEY;
 import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_NONE_VALUE;
@@ -364,6 +365,7 @@ public class LoadTsFileStatement extends Statement {
       statement.autoCreateDatabase = this.autoCreateDatabase;
       statement.isAsyncLoad = this.isAsyncLoad;
       statement.isGeneratedByPipe = this.isGeneratedByPipe;
+      statement.objectFileSearchRoot = this.objectFileSearchRoot;
 
       statement.tsFiles = new ArrayList<>(batchFiles);
       statement.resources = new ArrayList<>(batchFiles.size());
@@ -403,6 +405,9 @@ public class LoadTsFileStatement extends Statement {
     if (isGeneratedByPipe) {
       loadAttributes.put(PIPE_GENERATED_KEY, String.valueOf(true));
     }
+    if (objectFileSearchRoot != null) {
+      loadAttributes.put(OBJECT_FILE_PATHS_KEY, 
objectFileSearchRoot.getAbsolutePath());
+    }
 
     return new LoadTsFile(null, file.getAbsolutePath(), loadAttributes);
   }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/object/PipeObjectResourceManagerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/object/PipeObjectResourceManagerTest.java
new file mode 100644
index 00000000000..024998afa7e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/object/PipeObjectResourceManagerTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.object;
+
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Optional;
+
+@PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*", 
"javax.management.*"})
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(TierManager.class)
+public class PipeObjectResourceManagerTest {
+
+  private static final String PIPE_NAME = "pipeA";
+  private static final String OBJECT_RELATIVE_PATH = "99/object.bin";
+
+  @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  private TsFileResource tsFileResource;
+  private File originalObjectFile;
+
+  @Before
+  public void setUp() throws Exception {
+    final TierManager tierManager = Mockito.mock(TierManager.class);
+    PowerMockito.mockStatic(TierManager.class);
+    PowerMockito.when(TierManager.getInstance()).thenReturn(tierManager);
+    
Mockito.when(tierManager.getFileTierLevel(Mockito.any(File.class))).thenReturn(0);
+
+    originalObjectFile = temporaryFolder.newFile("original-object.bin");
+    try (final FileOutputStream outputStream = new 
FileOutputStream(originalObjectFile)) {
+      outputStream.write("payload".getBytes(StandardCharsets.UTF_8));
+    }
+    Mockito.when(tierManager.getAbsoluteObjectFilePath(Mockito.anyString(), 
Mockito.eq(false)))
+        .thenReturn(Optional.of(originalObjectFile));
+
+    final File tsFileDir = new File(temporaryFolder.getRoot(), 
"data/sequence/root.test/1/100");
+    Assert.assertTrue(tsFileDir.mkdirs());
+    final File tsFile = new File(tsFileDir, "1-0-0-0.tsfile");
+    Assert.assertTrue(tsFile.createNewFile());
+    tsFileResource = new TsFileResource(tsFile);
+  }
+
+  @Test
+  public void testManagerLifecycleCleansUpHardlinksAfterCloseAndDereference() 
throws Exception {
+    final PipeObjectResourceManager manager = new PipeObjectResourceManager();
+
+    final int linkedCount =
+        manager.linkObjectFiles(
+            tsFileResource,
+            Arrays.asList(OBJECT_RELATIVE_PATH, 
"99/sub/object-2.bin").iterator(),
+            PIPE_NAME);
+    Assert.assertEquals(2, linkedCount);
+
+    final File linkedDir = manager.getLinkedObjectDirectory(tsFileResource, 
PIPE_NAME);
+    final File hardlink =
+        manager.getObjectFileHardlink(tsFileResource, OBJECT_RELATIVE_PATH, 
PIPE_NAME);
+    Assert.assertNotNull(linkedDir);
+    Assert.assertTrue(linkedDir.exists());
+    Assert.assertNotNull(hardlink);
+    Assert.assertTrue(hardlink.exists());
+
+    manager.increaseReference(tsFileResource, PIPE_NAME);
+    manager.decreaseReference(tsFileResource, PIPE_NAME);
+    Assert.assertTrue(linkedDir.exists());
+
+    manager.setTsFileClosed(tsFileResource, PIPE_NAME);
+    Assert.assertTrue(linkedDir.exists());
+
+    manager.decreaseReference(tsFileResource, PIPE_NAME);
+
+    Assert.assertNull(manager.getLinkedObjectDirectory(tsFileResource, 
PIPE_NAME));
+    Assert.assertNull(
+        manager.getObjectFileHardlink(tsFileResource, OBJECT_RELATIVE_PATH, 
PIPE_NAME));
+    Assert.assertFalse(linkedDir.exists());
+    Assert.assertFalse(hardlink.exists());
+  }
+
+  @Test
+  public void testResourceCleanupResetsHardlinkLifecycleState() throws 
Exception {
+    final File resourceDir = new File(temporaryFolder.getRoot(), 
"resource-dir");
+    final PipeObjectResource resource = new PipeObjectResource(tsFileResource, 
resourceDir);
+
+    resource.linkObjectFile(OBJECT_RELATIVE_PATH);
+    Assert.assertEquals(1, resource.getLinkedFileCount());
+    Assert.assertNotNull(resource.getObjectFileHardlink(OBJECT_RELATIVE_PATH));
+
+    resource.increaseReferenceCount();
+    resource.setTsFileClosed();
+    Assert.assertTrue(resource.decreaseReferenceCount());
+
+    resource.cleanup();
+
+    Assert.assertTrue(resource.isClosed());
+    Assert.assertEquals(0, resource.getLinkedFileCount());
+    Assert.assertEquals(0, resource.getReferenceCount());
+    Assert.assertNull(resource.getObjectFileHardlink(OBJECT_RELATIVE_PATH));
+    Assert.assertFalse(resourceDir.exists());
+
+    try {
+      resource.linkObjectFile(OBJECT_RELATIVE_PATH);
+      Assert.fail("Expected IOException");
+    } catch (final IOException e) {
+      Assert.assertTrue(e.getMessage().contains("Object resource is closed"));
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java
index 4a558e4ecb4..f2f7d9865b4 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java
@@ -19,17 +19,25 @@
 
 package org.apache.iotdb.db.queryengine.plan.planner.node.load;
 
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.commons.utils.TimePartitionUtils;
 import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadSingleTsFileNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileObjectPieceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.load.splitter.LoadTsFileObjectFileBatch;
+import 
org.apache.iotdb.db.storageengine.load.splitter.LoadTsFileObjectFileBatch.ObjectFileChunk;
 
 import org.apache.tsfile.exception.NotImplementedException;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.File;
+import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -90,4 +98,68 @@ public class LoadTsFileNodeTest {
     LoadTsFilePieceNode node1 = (LoadTsFilePieceNode) 
LoadTsFilePieceNode.deserialize(buffer);
     Assert.assertEquals(node.getTsFile(), node1.getTsFile());
   }
+
+  @Test
+  public void testLoadTsFileObjectPieceNode() {
+    final LoadTsFileObjectFileBatch objectBatch =
+        new LoadTsFileObjectFileBatch(
+            Collections.singletonList(
+                new ObjectFileChunk("0/object/a.bin", 0, 3, true, new byte[] 
{1, 2, 3})),
+            new TTimePartitionSlot(123L));
+    final LoadTsFileObjectPieceNode node =
+        new LoadTsFileObjectPieceNode(new PlanNodeId("object"), objectBatch);
+
+    Assert.assertEquals(0, node.getDataSize());
+    Assert.assertEquals(objectBatch, node.getObjectBatch());
+    Assert.assertNull(node.getRegionReplicaSet());
+    Assert.assertEquals(Collections.emptyList(), node.getChildren());
+    Assert.assertEquals(Collections.emptyList(), node.getOutputColumnNames());
+    try {
+      node.clone();
+      Assert.fail();
+    } catch (NotImplementedException ignored) {
+    }
+    try {
+      node.splitByPartition(new Analysis());
+      Assert.fail();
+    } catch (NotImplementedException ignored) {
+    }
+    Assert.assertEquals(0, node.allowedChildCount());
+    Assert.assertEquals("LoadTsFileObjectPieceNode{dataSize=0}", 
node.toString());
+
+    final LoadTsFileObjectPieceNode deserialized =
+        (LoadTsFileObjectPieceNode) 
PlanNodeType.deserialize(node.serializeToByteBuffer());
+    Assert.assertNotNull(deserialized);
+    Assert.assertEquals(
+        TimePartitionUtils.getTimePartitionSlot(
+            node.getObjectBatch().getLastTimePartitionSlot().getStartTime()),
+        deserialized.getObjectBatch().getLastTimePartitionSlot());
+    Assert.assertEquals(1, 
deserialized.getObjectBatch().getObjectFileChunks().size());
+    final ObjectFileChunk chunk = 
deserialized.getObjectBatch().getObjectFileChunks().get(0);
+    Assert.assertEquals("0/object/a.bin", chunk.getObjectRelativePath());
+    Assert.assertEquals(0, chunk.getFileOffset());
+    Assert.assertEquals(3, chunk.getTotalFileLength());
+    Assert.assertTrue(chunk.isIncludeLastByte());
+    Assert.assertArrayEquals(new byte[] {1, 2, 3}, chunk.getBytes());
+  }
+
+  @Test
+  public void testLoadTsFileObjectPieceNodeDeserializeInvalidBuffer() {
+    final LoadTsFileObjectFileBatch objectBatch =
+        new LoadTsFileObjectFileBatch(
+            Collections.singletonList(
+                new ObjectFileChunk("0/object/a.bin", 0, 3, true, new byte[] 
{1, 2, 3})),
+            new TTimePartitionSlot(123L));
+    final ByteBuffer buffer =
+        new LoadTsFileObjectPieceNode(new PlanNodeId("object"), objectBatch)
+            .serializeToByteBuffer();
+    buffer.getShort();
+    buffer.limit(buffer.position() + 2);
+
+    try {
+      Assert.assertNull(LoadTsFileObjectPieceNode.deserialize(buffer.slice()));
+    } catch (final BufferUnderflowException e) {
+      Assert.fail("Deserialize should fail gracefully for truncated object 
piece buffer.");
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImplTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImplTest.java
new file mode 100644
index 00000000000..3585c86ac4a
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImplTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.scheduler.load;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.partition.StorageExecutor;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileObjectPieceNode;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import 
org.apache.iotdb.db.storageengine.load.splitter.LoadTsFileObjectFileBatch;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+
+@PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*", 
"javax.management.*"})
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(StorageEngine.class)
+public class LoadTsFileDispatcherImplTest {
+
+  @Test
+  public void testDispatchLocallyObjectPieceNodeSuccess() throws Exception {
+    final StorageEngine storageEngine = Mockito.mock(StorageEngine.class);
+    PowerMockito.mockStatic(StorageEngine.class);
+    PowerMockito.when(StorageEngine.getInstance()).thenReturn(storageEngine);
+
+    final LoadTsFileDispatcherImpl dispatcher = new 
LoadTsFileDispatcherImpl(null, false);
+    dispatcher.setUuid("test-uuid");
+
+    final LoadTsFileObjectPieceNode objectPieceNode = createObjectPieceNode();
+    final FragmentInstance instance = createFragmentInstance(objectPieceNode);
+
+    Mockito.when(
+            storageEngine.writeLoadTsFileObjectPieceNode(
+                Mockito.eq(new DataRegionId(1)),
+                Mockito.same(objectPieceNode),
+                Mockito.eq("test-uuid")))
+        .thenReturn(RpcUtils.SUCCESS_STATUS);
+
+    dispatcher.dispatchLocally(instance);
+
+    Mockito.verify(storageEngine)
+        .writeLoadTsFileObjectPieceNode(
+            Mockito.eq(new DataRegionId(1)),
+            Mockito.same(objectPieceNode),
+            Mockito.eq("test-uuid"));
+  }
+
+  @Test
+  public void testDispatchLocallyObjectPieceNodeFailure() throws Exception {
+    final StorageEngine storageEngine = Mockito.mock(StorageEngine.class);
+    PowerMockito.mockStatic(StorageEngine.class);
+    PowerMockito.when(StorageEngine.getInstance()).thenReturn(storageEngine);
+
+    final LoadTsFileDispatcherImpl dispatcher = new 
LoadTsFileDispatcherImpl(null, false);
+    dispatcher.setUuid("test-uuid");
+
+    final LoadTsFileObjectPieceNode objectPieceNode = createObjectPieceNode();
+    final FragmentInstance instance = createFragmentInstance(objectPieceNode);
+    final TSStatus failureStatus = new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
+    failureStatus.setMessage("object piece dispatch failed");
+
+    Mockito.when(
+            storageEngine.writeLoadTsFileObjectPieceNode(
+                Mockito.eq(new DataRegionId(1)),
+                Mockito.same(objectPieceNode),
+                Mockito.eq("test-uuid")))
+        .thenReturn(failureStatus);
+
+    try {
+      dispatcher.dispatchLocally(instance);
+      Assert.fail("Expected FragmentInstanceDispatchException");
+    } catch (final FragmentInstanceDispatchException e) {
+      Assert.assertEquals(
+          TSStatusCode.LOAD_FILE_ERROR.getStatusCode(), 
e.getFailureStatus().getCode());
+      Assert.assertEquals("object piece dispatch failed", 
e.getFailureStatus().getMessage());
+    }
+  }
+
+  private static LoadTsFileObjectPieceNode createObjectPieceNode() {
+    return new LoadTsFileObjectPieceNode(
+        new PlanNodeId("object-piece"),
+        new LoadTsFileObjectFileBatch(
+            Collections.emptyList(), new 
TTimePartitionSlot().setStartTime(1L)));
+  }
+
+  private static FragmentInstance createFragmentInstance(
+      final LoadTsFileObjectPieceNode objectPieceNode) {
+    final PlanFragmentId fragmentId = new PlanFragmentId("test", 0);
+    final FragmentInstance instance =
+        new FragmentInstance(
+            new PlanFragment(fragmentId, objectPieceNode),
+            fragmentId.genFragmentInstanceId(),
+            null,
+            null,
+            0L,
+            null,
+            false,
+            false);
+    instance.setExecutorAndHost(new StorageExecutor(createReplicaSet()));
+    return instance;
+  }
+
+  private static TRegionReplicaSet createReplicaSet() {
+    final TEndPoint endPoint = new 
TEndPoint().setIp("127.0.0.1").setPort(9000);
+    final TDataNodeLocation dataNodeLocation =
+        new TDataNodeLocation().setInternalEndPoint(endPoint);
+    return new TRegionReplicaSet()
+        .setRegionId(new DataRegionId(1).convertToTConsensusGroupId())
+        .setDataNodeLocations(Collections.singletonList(dataNodeLocation));
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
index 161a95b4ec9..fd105b69a98 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
@@ -19,7 +19,14 @@
 
 package org.apache.iotdb.db.queryengine.plan.scheduler.load;
 
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
 import org.apache.iotdb.db.queryengine.execution.QueryStateMachine;
@@ -27,15 +34,38 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
+import 
org.apache.iotdb.db.queryengine.plan.scheduler.FragInstanceDispatchResult;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
+import org.apache.iotdb.db.storageengine.load.splitter.ChunkData;
+import 
org.apache.iotdb.db.storageengine.load.splitter.LoadTsFileObjectFileBatch;
+import 
org.apache.iotdb.db.storageengine.load.splitter.LoadTsFileObjectFileBatchIterator;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
+import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.tsfile.utils.Pair;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
+import org.powermock.reflect.Whitebox;
 
+import java.io.File;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anySet;
+import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class LoadTsFileSchedulerTest {
@@ -52,8 +82,16 @@ public class LoadTsFileSchedulerTest {
     when(planFragment.getId()).thenReturn(new PlanFragmentId("test", 0));
   }
 
+  @After
+  public void tearDown() {
+    if (Whitebox.getInternalState(LoadTsFileMemoryManager.getInstance(), 
"dataCacheMemoryBlock")
+        != null) {
+      LoadTsFileMemoryManager.getInstance().releaseDataCacheMemoryBlock();
+    }
+  }
+
   @Test
-  public void tt() {
+  public void testSchedulerMetadataAccessors() {
     LoadTsFileScheduler t =
         spy(
             new LoadTsFileScheduler(
@@ -67,4 +105,116 @@ public class LoadTsFileSchedulerTest {
     Assert.assertNull(t.getTotalCpuTime());
     Assert.assertNull(t.getFragmentInfo());
   }
+
+  @Test
+  public void 
testDispatchObjectFileBatchesReturnsFalseWhenObjectPieceDispatchFails()
+      throws Exception {
+    final LoadTsFileScheduler scheduler = createScheduler();
+    final LoadTsFileDispatcherImpl dispatcher = 
mock(LoadTsFileDispatcherImpl.class);
+    Whitebox.setInternalState(scheduler, "dispatcher", dispatcher);
+
+    final LoadTsFilePieceNode pieceNode =
+        new LoadTsFilePieceNode(new PlanNodeId("piece"), new 
File("test.tsfile"));
+    final ChunkData chunkData = mock(ChunkData.class);
+    final LoadTsFileObjectFileBatchIterator iterator =
+        mock(LoadTsFileObjectFileBatchIterator.class);
+    final TSStatus failureStatus = new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
+    failureStatus.setMessage("dispatch object piece failed");
+
+    when(chunkData.getObjectFiles())
+        .thenReturn(Collections.singleton(new Pair<>(new File("base"), 
"1/object.bin")));
+    when(chunkData.getObjectFileBatchIterator(anyInt())).thenReturn(iterator);
+    when(iterator.hasNext()).thenReturn(true, false);
+    when(iterator.next())
+        .thenReturn(
+            new LoadTsFileObjectFileBatch(
+                Collections.emptyList(), new 
TTimePartitionSlot().setStartTime(1L)));
+    when(dispatcher.dispatch(isNull(), anyList()))
+        .thenReturn(
+            CompletableFuture.completedFuture(new 
FragInstanceDispatchResult(failureStatus)));
+
+    pieceNode.addTsFileData(chunkData);
+
+    final boolean result =
+        Whitebox.invokeMethod(
+            scheduler, "dispatchObjectFileBatches", pieceNode, 
createReplicaSet());
+
+    Assert.assertFalse(result);
+    verify(dispatcher).dispatch(isNull(), anyList());
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testSecondPhaseUsesRollbackCommandWhenFirstPhaseFails() throws 
Exception {
+    final LoadTsFileScheduler scheduler = createScheduler();
+    final LoadTsFileDispatcherImpl dispatcher = 
mock(LoadTsFileDispatcherImpl.class);
+    final ArgumentCaptor<TLoadCommandReq> commandCaptor =
+        ArgumentCaptor.forClass(TLoadCommandReq.class);
+    Whitebox.setInternalState(scheduler, "dispatcher", dispatcher);
+    ((Set<TRegionReplicaSet>) Whitebox.getInternalState(scheduler, 
"allReplicaSets"))
+        .add(createReplicaSet());
+
+    when(dispatcher.dispatchCommand(commandCaptor.capture(), anySet()))
+        .thenReturn(CompletableFuture.completedFuture(new 
FragInstanceDispatchResult(true)));
+
+    final TsFileResource tsFileResource = mock(TsFileResource.class);
+    when(tsFileResource.getTsFile()).thenReturn(new File("rollback.tsfile"));
+
+    final boolean result =
+        Whitebox.invokeMethod(scheduler, "secondPhase", false, 
"rollback-uuid", tsFileResource);
+
+    Assert.assertTrue(result);
+    Assert.assertEquals(
+        LoadTsFileScheduler.LoadCommand.ROLLBACK.ordinal(), 
commandCaptor.getValue().commandType);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testSecondPhaseReturnsFalseWhenRollbackDispatchFails() throws 
Exception {
+    final LoadTsFileScheduler scheduler = createScheduler();
+    final LoadTsFileDispatcherImpl dispatcher = 
mock(LoadTsFileDispatcherImpl.class);
+    final ArgumentCaptor<TLoadCommandReq> commandCaptor =
+        ArgumentCaptor.forClass(TLoadCommandReq.class);
+    final TSStatus failureStatus = new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
+    failureStatus.setMessage("rollback failed");
+    Whitebox.setInternalState(scheduler, "dispatcher", dispatcher);
+    ((Set<TRegionReplicaSet>) Whitebox.getInternalState(scheduler, 
"allReplicaSets"))
+        .add(createReplicaSet());
+
+    when(dispatcher.dispatchCommand(commandCaptor.capture(), anySet()))
+        .thenReturn(
+            CompletableFuture.completedFuture(new 
FragInstanceDispatchResult(failureStatus)));
+
+    final TsFileResource tsFileResource = mock(TsFileResource.class);
+    when(tsFileResource.getTsFile()).thenReturn(new File("rollback.tsfile"));
+
+    final boolean result =
+        Whitebox.invokeMethod(scheduler, "secondPhase", false, 
"rollback-uuid", tsFileResource);
+
+    Assert.assertFalse(result);
+    Assert.assertEquals(
+        LoadTsFileScheduler.LoadCommand.ROLLBACK.ordinal(), 
commandCaptor.getValue().commandType);
+  }
+
+  private LoadTsFileScheduler createScheduler() {
+    final MPPQueryContext queryContext = mock(MPPQueryContext.class);
+    when(queryContext.getTimeOut()).thenReturn(10_000L);
+    when(queryContext.getStartTime()).thenReturn(0L);
+    return new LoadTsFileScheduler(
+        distributedQueryPlan,
+        queryContext,
+        mock(QueryStateMachine.class),
+        mock(IClientManager.class),
+        mock(IPartitionFetcher.class),
+        false);
+  }
+
+  private TRegionReplicaSet createReplicaSet() {
+    final TEndPoint endPoint = new 
TEndPoint().setIp("127.0.0.1").setPort(9000);
+    final TDataNodeLocation dataNodeLocation =
+        new TDataNodeLocation().setInternalEndPoint(endPoint);
+    return new TRegionReplicaSet()
+        .setRegionId(new DataRegionId(1).convertToTConsensusGroupId())
+        .setDataNodeLocations(Collections.singletonList(dataNodeLocation));
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/StorageEngineTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/StorageEngineTest.java
index 4c4ac20e9ae..afd2e2170ed 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/StorageEngineTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/StorageEngineTest.java
@@ -18,9 +18,17 @@
  */
 package org.apache.iotdb.db.storageengine;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileObjectPieceNode;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.load.LoadTsFileManager;
+import 
org.apache.iotdb.db.storageengine.load.splitter.LoadTsFileObjectFileBatch;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import com.google.common.collect.Lists;
 import org.junit.After;
@@ -32,8 +40,12 @@ import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
 
+import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 @PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*", 
"javax.management.*"})
 @RunWith(PowerMockRunner.class)
@@ -41,14 +53,25 @@ import java.util.List;
 public class StorageEngineTest {
 
   private StorageEngine storageEngine;
+  private LoadTsFileManager originalLoadTsFileManager;
+  private Map<DataRegionId, DataRegion> dataRegionMap;
 
+  @SuppressWarnings("unchecked")
   @Before
   public void setUp() {
     storageEngine = StorageEngine.getInstance();
+    originalLoadTsFileManager = Whitebox.getInternalState(storageEngine, 
"loadTsFileManager");
+    dataRegionMap = Whitebox.getInternalState(storageEngine, "dataRegionMap");
   }
 
   @After
   public void after() {
+    if (dataRegionMap != null) {
+      dataRegionMap.clear();
+    }
+    if (storageEngine != null && originalLoadTsFileManager != null) {
+      Whitebox.setInternalState(storageEngine, "loadTsFileManager", 
originalLoadTsFileManager);
+    }
     storageEngine = null;
   }
 
@@ -84,4 +107,60 @@ public class StorageEngineTest {
     Assert.assertEquals(1, 
TimePartitionUtils.getTimePartitionId(timePartitionInterval * 2 - 1));
     Assert.assertEquals(2, 
TimePartitionUtils.getTimePartitionId(timePartitionInterval * 2 + 1));
   }
+
+  @Test
+  public void 
testWriteLoadTsFileObjectPieceNodeReturnsSuccessWhenDataRegionMissing() {
+    TSStatus status =
+        storageEngine.writeLoadTsFileObjectPieceNode(
+            new DataRegionId(1), createObjectPieceNode(), "missing-region");
+
+    Assert.assertEquals(RpcUtils.SUCCESS_STATUS, status);
+  }
+
+  @Test
+  public void testWriteLoadTsFileObjectPieceNodeIOException() throws Exception 
{
+    final DataRegionId dataRegionId = new DataRegionId(1);
+    final DataRegion dataRegion = PowerMockito.mock(DataRegion.class);
+    final LoadTsFileManager loadTsFileManager = 
PowerMockito.mock(LoadTsFileManager.class);
+    final LoadTsFileObjectPieceNode objectPieceNode = createObjectPieceNode();
+
+    dataRegionMap.put(dataRegionId, dataRegion);
+    Whitebox.setInternalState(storageEngine, "loadTsFileManager", 
loadTsFileManager);
+    PowerMockito.doThrow(new IOException("write object payload failed"))
+        .when(loadTsFileManager)
+        .writeObjectPayloadToDataRegion(dataRegion, objectPieceNode, "uuid");
+
+    final TSStatus status =
+        storageEngine.writeLoadTsFileObjectPieceNode(dataRegionId, 
objectPieceNode, "uuid");
+
+    Assert.assertEquals(TSStatusCode.LOAD_FILE_ERROR.getStatusCode(), 
status.getCode());
+    Assert.assertEquals("write object payload failed", status.getMessage());
+  }
+
+  @Test
+  public void testWriteLoadTsFileObjectPieceNodeUnexpectedException() throws 
Exception {
+    final DataRegionId dataRegionId = new DataRegionId(1);
+    final DataRegion dataRegion = PowerMockito.mock(DataRegion.class);
+    final LoadTsFileManager loadTsFileManager = 
PowerMockito.mock(LoadTsFileManager.class);
+    final LoadTsFileObjectPieceNode objectPieceNode = createObjectPieceNode();
+
+    dataRegionMap.put(dataRegionId, dataRegion);
+    Whitebox.setInternalState(storageEngine, "loadTsFileManager", 
loadTsFileManager);
+    PowerMockito.doThrow(new RuntimeException("unexpected object payload 
failure"))
+        .when(loadTsFileManager)
+        .writeObjectPayloadToDataRegion(dataRegion, objectPieceNode, "uuid");
+
+    final TSStatus status =
+        storageEngine.writeLoadTsFileObjectPieceNode(dataRegionId, 
objectPieceNode, "uuid");
+
+    Assert.assertEquals(TSStatusCode.LOAD_FILE_ERROR.getStatusCode(), 
status.getCode());
+    Assert.assertEquals("unexpected object payload failure", 
status.getMessage());
+  }
+
+  private static LoadTsFileObjectPieceNode createObjectPieceNode() {
+    return new LoadTsFileObjectPieceNode(
+        new PlanNodeId("object-piece"),
+        new LoadTsFileObjectFileBatch(
+            Collections.emptyList(), new 
TTimePartitionSlot().setStartTime(1L)));
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/splitter/LoadTsFileObjectFileBatchTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/splitter/LoadTsFileObjectFileBatchTest.java
new file mode 100644
index 00000000000..53823eb94d9
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/splitter/LoadTsFileObjectFileBatchTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.splitter;
+
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import 
org.apache.iotdb.db.storageengine.load.splitter.LoadTsFileObjectFileBatch.ObjectFileChunk;
+
+import org.apache.tsfile.utils.Pair;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+public class LoadTsFileObjectFileBatchTest {
+
+  @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Test
+  public void testSerializeDeserializeAndWriteChunks() throws Exception {
+    final TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot(100L);
+    final LoadTsFileObjectFileBatch batch =
+        new LoadTsFileObjectFileBatch(
+            Arrays.asList(
+                new ObjectFileChunk("0/dir/file.bin", 0, 5, false, new byte[] 
{1, 2, 3}),
+                new ObjectFileChunk("0/dir/file.bin", 3, 5, true, new byte[] 
{4, 5})),
+            timePartitionSlot);
+
+    final ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+    try (final DataOutputStream stream = new 
DataOutputStream(byteArrayOutputStream)) {
+      batch.serialize(stream);
+    }
+
+    final LoadTsFileObjectFileBatch deserialized =
+        (LoadTsFileObjectFileBatch)
+            TsFileData.deserialize(new 
ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
+    Assert.assertEquals(
+        
TimePartitionUtils.getTimePartitionSlot(timePartitionSlot.getStartTime()),
+        deserialized.getLastTimePartitionSlot());
+    Assert.assertEquals(2, deserialized.getObjectFileChunks().size());
+    assertChunk(deserialized.getObjectFileChunks().get(0), "0/dir/file.bin", 
0, 5, false, 1, 2, 3);
+    assertChunk(deserialized.getObjectFileChunks().get(1), "0/dir/file.bin", 
3, 5, true, 4, 5);
+
+    final File stagedDir = temporaryFolder.newFolder("staged");
+    for (final ObjectFileChunk chunk : deserialized.getObjectFileChunks()) {
+      chunk.writeChunk(stagedDir, "1");
+    }
+
+    final Path writtenFile = 
stagedDir.toPath().resolve("1").resolve("dir").resolve("file.bin");
+    Assert.assertArrayEquals(new byte[] {1, 2, 3, 4, 5}, 
Files.readAllBytes(writtenFile));
+  }
+
+  @Test
+  public void testWriteChunkValidatesRelativePathAndOffsets() throws Exception 
{
+    final File stagedDir = temporaryFolder.newFolder("staged-validate");
+
+    try {
+      new ObjectFileChunk("file.bin", 0, 1, true, new byte[] 
{1}).writeChunk(stagedDir, "1");
+      Assert.fail("Expected invalid object relative path to be rejected.");
+    } catch (final IOException e) {
+      Assert.assertTrue(e.getMessage().contains("Invalid object relative 
path"));
+    }
+
+    final ObjectFileChunk firstChunk =
+        new ObjectFileChunk("0/dir/file.bin", 0, 3, false, new byte[] {1, 2});
+    firstChunk.writeChunk(stagedDir, "1");
+
+    try {
+      new ObjectFileChunk("0/dir/file.bin", 1, 3, true, new byte[] 
{3}).writeChunk(stagedDir, "1");
+      Assert.fail("Expected offset mismatch to be rejected.");
+    } catch (final IOException e) {
+      Assert.assertTrue(e.getMessage().contains("offset mismatch"));
+    }
+
+    try {
+      new ObjectFileChunk("0/dir/another.bin", 0, 5, true, new byte[] {1, 2})
+          .writeChunk(stagedDir, "1");
+      Assert.fail("Expected final chunk size mismatch to be rejected.");
+    } catch (final IOException e) {
+      Assert.assertTrue(e.getMessage().contains("size mismatch"));
+    }
+  }
+
+  @Test
+  public void testIteratorSplitsFilesAndSkipsEmptyEntries() throws Exception {
+    final File baseDir = temporaryFolder.newFolder("iterator");
+    final Path nestedDir = baseDir.toPath().resolve("0");
+    Files.createDirectories(nestedDir);
+    Files.write(nestedDir.resolve("a.bin"), new byte[] {0, 1, 2, 3, 4});
+    Files.write(nestedDir.resolve("b.bin"), new byte[] {9, 8});
+    Files.write(nestedDir.resolve("empty.bin"), new byte[0]);
+
+    final Set<Pair<File, String>> fileSet = new LinkedHashSet<>();
+    fileSet.add(new Pair<>(baseDir, "0/missing.bin"));
+    fileSet.add(new Pair<>(baseDir, "0/empty.bin"));
+    fileSet.add(new Pair<>(baseDir, "0/a.bin"));
+    fileSet.add(new Pair<>(baseDir, "0/b.bin"));
+
+    try (final LoadTsFileObjectFileBatchIterator iterator =
+        new LoadTsFileObjectFileBatchIterator(fileSet, 4, new 
TTimePartitionSlot(7L))) {
+      Assert.assertTrue(iterator.hasNext());
+      final List<ObjectFileChunk> firstBatch = 
iterator.next().getObjectFileChunks();
+      Assert.assertEquals(1, firstBatch.size());
+      assertChunk(firstBatch.get(0), "0/a.bin", 0, 5, false, 0, 1, 2, 3);
+
+      Assert.assertTrue(iterator.hasNext());
+      final List<ObjectFileChunk> secondBatch = 
iterator.next().getObjectFileChunks();
+      Assert.assertEquals(2, secondBatch.size());
+      assertChunk(secondBatch.get(0), "0/a.bin", 4, 5, true, 4);
+      assertChunk(secondBatch.get(1), "0/b.bin", 0, 2, true, 9, 8);
+
+      Assert.assertFalse(iterator.hasNext());
+      try {
+        iterator.next();
+        Assert.fail("Expected iterator exhaustion to throw.");
+      } catch (final NoSuchElementException ignored) {
+      }
+    }
+  }
+
+  private static void assertChunk(
+      final ObjectFileChunk chunk,
+      final String relativePath,
+      final long offset,
+      final long totalLength,
+      final boolean includeLastByte,
+      final int... bytes) {
+    Assert.assertEquals(relativePath, chunk.getObjectRelativePath());
+    Assert.assertEquals(offset, chunk.getFileOffset());
+    Assert.assertEquals(totalLength, chunk.getTotalFileLength());
+    Assert.assertEquals(includeLastByte, chunk.isIncludeLastByte());
+
+    final byte[] expected = new byte[bytes.length];
+    for (int i = 0; i < bytes.length; i++) {
+      expected[i] = (byte) bytes[i];
+    }
+    Assert.assertArrayEquals(expected, chunk.getBytes());
+  }
+}


Reply via email to