This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch patch-2094 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9cd92ff44774490c3b7bfb639f6a51fb30250ae8 Author: 陈哲涵 <[email protected]> AuthorDate: Thu May 14 23:27:57 2026 +0800 [TIMECHODB] Fixed the load parameter passes & Added tests for object with pipe/load (cherry picked from commit 16498aad04ddd069f69134d01ad515b093911479) --- iotdb-client/client-go | 2 +- iotdb-core/datanode/pom.xml | 5 + .../plan/relational/sql/ast/LoadTsFile.java | 2 + .../plan/statement/crud/LoadTsFileStatement.java | 5 + .../object/PipeObjectResourceManagerTest.java | 141 +++++++++++++++++ .../plan/planner/node/load/LoadTsFileNodeTest.java | 72 +++++++++ .../load/LoadTsFileDispatcherImplTest.java | 147 ++++++++++++++++++ .../scheduler/load/LoadTsFileSchedulerTest.java | 152 +++++++++++++++++- .../iotdb/db/storageengine/StorageEngineTest.java | 79 ++++++++++ .../splitter/LoadTsFileObjectFileBatchTest.java | 169 +++++++++++++++++++++ 10 files changed, 772 insertions(+), 2 deletions(-) diff --git a/iotdb-client/client-go b/iotdb-client/client-go index dc64b1a7648..2ea2655e090 160000 --- a/iotdb-client/client-go +++ b/iotdb-client/client-go @@ -1 +1 @@ -Subproject commit dc64b1a7648d3c505c10eed5419f422bb49f1def +Subproject commit 2ea2655e090dcefd12bf1a789a51c8df9a28fa24 diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml index 23189f61336..ac0d555bc0c 100644 --- a/iotdb-core/datanode/pom.xml +++ b/iotdb-core/datanode/pom.xml @@ -308,6 +308,11 @@ <artifactId>powermock-api-mockito2</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-reflect</artifactId> + <scope>test</scope> + </dependency> <dependency> <groupId>org.awaitility</groupId> <artifactId>awaitility</artifactId> diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java index ee2934a4381..4ef8f4d322e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java @@ -208,6 +208,7 @@ public class LoadTsFile extends Statement { LoadTsFileConfigurator.parseOrGetDefaultTabletConversionThresholdBytes(loadAttributes); this.verify = LoadTsFileConfigurator.parseOrGetDefaultVerify(loadAttributes); this.isAsyncLoad = LoadTsFileConfigurator.parseOrGetDefaultAsyncLoad(loadAttributes); + this.isGeneratedByPipe = LoadTsFileConfigurator.parseOrGetDefaultPipeGenerated(loadAttributes); this.objectFileSearchRoot = LoadTsFileConfigurator.parseObjectFileSearchRoot(loadAttributes); } @@ -311,6 +312,7 @@ public class LoadTsFile extends Statement { subStatement.autoCreateDatabase = this.autoCreateDatabase; subStatement.isAsyncLoad = this.isAsyncLoad; subStatement.isGeneratedByPipe = this.isGeneratedByPipe; + subStatement.objectFileSearchRoot = this.objectFileSearchRoot; // Set all files in the batch subStatement.tsFiles = new ArrayList<>(batchFiles); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java index e59ed1f0038..cbfae15b114 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java @@ -45,6 +45,7 @@ import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurat import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY; import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.DATABASE_LEVEL_KEY; import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.DATABASE_NAME_KEY; +import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.OBJECT_FILE_PATHS_KEY; import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_DELETE_VALUE; import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_KEY; import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_NONE_VALUE; @@ -364,6 +365,7 @@ public class LoadTsFileStatement extends Statement { statement.autoCreateDatabase = this.autoCreateDatabase; statement.isAsyncLoad = this.isAsyncLoad; statement.isGeneratedByPipe = this.isGeneratedByPipe; + statement.objectFileSearchRoot = this.objectFileSearchRoot; statement.tsFiles = new ArrayList<>(batchFiles); statement.resources = new ArrayList<>(batchFiles.size()); @@ -403,6 +405,9 @@ public class LoadTsFileStatement extends Statement { if (isGeneratedByPipe) { loadAttributes.put(PIPE_GENERATED_KEY, String.valueOf(true)); } + if (objectFileSearchRoot != null) { + loadAttributes.put(OBJECT_FILE_PATHS_KEY, objectFileSearchRoot.getAbsolutePath()); + } return new LoadTsFile(null, file.getAbsolutePath(), loadAttributes); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/object/PipeObjectResourceManagerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/object/PipeObjectResourceManagerTest.java new file mode 100644 index 00000000000..024998afa7e --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/object/PipeObjectResourceManagerTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.resource.object; + +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Optional; + +@PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*", "javax.management.*"}) +@RunWith(PowerMockRunner.class) +@PrepareForTest(TierManager.class) +public class PipeObjectResourceManagerTest { + + private static final String PIPE_NAME = "pipeA"; + private static final String OBJECT_RELATIVE_PATH = "99/object.bin"; + + @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private TsFileResource tsFileResource; + private File originalObjectFile; + + @Before + public void setUp() throws Exception { + final TierManager tierManager = Mockito.mock(TierManager.class); + PowerMockito.mockStatic(TierManager.class); + PowerMockito.when(TierManager.getInstance()).thenReturn(tierManager); + Mockito.when(tierManager.getFileTierLevel(Mockito.any(File.class))).thenReturn(0); + + originalObjectFile = temporaryFolder.newFile("original-object.bin"); + try (final FileOutputStream outputStream = new FileOutputStream(originalObjectFile)) { + outputStream.write("payload".getBytes(StandardCharsets.UTF_8)); + } + Mockito.when(tierManager.getAbsoluteObjectFilePath(Mockito.anyString(), Mockito.eq(false))) + .thenReturn(Optional.of(originalObjectFile)); + + final File tsFileDir = new File(temporaryFolder.getRoot(), "data/sequence/root.test/1/100"); + Assert.assertTrue(tsFileDir.mkdirs()); + final File tsFile = new File(tsFileDir, "1-0-0-0.tsfile"); + Assert.assertTrue(tsFile.createNewFile()); + tsFileResource = new TsFileResource(tsFile); + } + + @Test + public void testManagerLifecycleCleansUpHardlinksAfterCloseAndDereference() throws Exception { + final PipeObjectResourceManager manager = new PipeObjectResourceManager(); + + final int linkedCount = + manager.linkObjectFiles( + tsFileResource, + Arrays.asList(OBJECT_RELATIVE_PATH, "99/sub/object-2.bin").iterator(), + PIPE_NAME); + Assert.assertEquals(2, linkedCount); + + final File linkedDir = manager.getLinkedObjectDirectory(tsFileResource, PIPE_NAME); + final File hardlink = + manager.getObjectFileHardlink(tsFileResource, OBJECT_RELATIVE_PATH, PIPE_NAME); + Assert.assertNotNull(linkedDir); + Assert.assertTrue(linkedDir.exists()); + Assert.assertNotNull(hardlink); + Assert.assertTrue(hardlink.exists()); + + manager.increaseReference(tsFileResource, PIPE_NAME); + manager.decreaseReference(tsFileResource, PIPE_NAME); + Assert.assertTrue(linkedDir.exists()); + + manager.setTsFileClosed(tsFileResource, PIPE_NAME); + Assert.assertTrue(linkedDir.exists()); + + manager.decreaseReference(tsFileResource, PIPE_NAME); + + Assert.assertNull(manager.getLinkedObjectDirectory(tsFileResource, PIPE_NAME)); + Assert.assertNull( + manager.getObjectFileHardlink(tsFileResource, OBJECT_RELATIVE_PATH, PIPE_NAME)); + Assert.assertFalse(linkedDir.exists()); + Assert.assertFalse(hardlink.exists()); + } + + @Test + public void testResourceCleanupResetsHardlinkLifecycleState() throws Exception { + final File resourceDir = new File(temporaryFolder.getRoot(), "resource-dir"); + final PipeObjectResource resource = new PipeObjectResource(tsFileResource, resourceDir); + + resource.linkObjectFile(OBJECT_RELATIVE_PATH); + Assert.assertEquals(1, resource.getLinkedFileCount()); + Assert.assertNotNull(resource.getObjectFileHardlink(OBJECT_RELATIVE_PATH)); + + resource.increaseReferenceCount(); + resource.setTsFileClosed(); + Assert.assertTrue(resource.decreaseReferenceCount()); + + resource.cleanup(); + + Assert.assertTrue(resource.isClosed()); + Assert.assertEquals(0, resource.getLinkedFileCount()); + Assert.assertEquals(0, resource.getReferenceCount()); + Assert.assertNull(resource.getObjectFileHardlink(OBJECT_RELATIVE_PATH)); + Assert.assertFalse(resourceDir.exists()); + + try { + resource.linkObjectFile(OBJECT_RELATIVE_PATH); + Assert.fail("Expected IOException"); + } catch (final IOException e) { + Assert.assertTrue(e.getMessage().contains("Object resource is closed")); + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java index 4a558e4ecb4..f2f7d9865b4 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java @@ -19,17 +19,25 @@ package org.apache.iotdb.db.queryengine.plan.planner.node.load; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadSingleTsFileNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileObjectPieceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.load.splitter.LoadTsFileObjectFileBatch; +import org.apache.iotdb.db.storageengine.load.splitter.LoadTsFileObjectFileBatch.ObjectFileChunk; import org.apache.tsfile.exception.NotImplementedException; import org.junit.Assert; import org.junit.Test; import java.io.File; +import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; @@ -90,4 +98,68 @@ public class LoadTsFileNodeTest { LoadTsFilePieceNode node1 = (LoadTsFilePieceNode) LoadTsFilePieceNode.deserialize(buffer); Assert.assertEquals(node.getTsFile(), node1.getTsFile()); } + + @Test + public void testLoadTsFileObjectPieceNode() { + final LoadTsFileObjectFileBatch objectBatch = + new LoadTsFileObjectFileBatch( + Collections.singletonList( + new ObjectFileChunk("0/object/a.bin", 0, 3, true, new byte[] {1, 2, 3})), + new TTimePartitionSlot(123L)); + final LoadTsFileObjectPieceNode node = + new LoadTsFileObjectPieceNode(new PlanNodeId("object"), objectBatch); + + Assert.assertEquals(0, node.getDataSize()); + Assert.assertEquals(objectBatch, node.getObjectBatch()); + Assert.assertNull(node.getRegionReplicaSet()); + Assert.assertEquals(Collections.emptyList(), node.getChildren()); + Assert.assertEquals(Collections.emptyList(), node.getOutputColumnNames()); + try { + node.clone(); + Assert.fail(); + } catch (NotImplementedException ignored) { + } + try { + node.splitByPartition(new Analysis()); + Assert.fail(); + } catch (NotImplementedException ignored) { + } + Assert.assertEquals(0, node.allowedChildCount()); + Assert.assertEquals("LoadTsFileObjectPieceNode{dataSize=0}", node.toString()); + + final LoadTsFileObjectPieceNode deserialized = + (LoadTsFileObjectPieceNode) PlanNodeType.deserialize(node.serializeToByteBuffer()); + Assert.assertNotNull(deserialized); + Assert.assertEquals( + TimePartitionUtils.getTimePartitionSlot( + node.getObjectBatch().getLastTimePartitionSlot().getStartTime()), + deserialized.getObjectBatch().getLastTimePartitionSlot()); + Assert.assertEquals(1, deserialized.getObjectBatch().getObjectFileChunks().size()); + final ObjectFileChunk chunk = deserialized.getObjectBatch().getObjectFileChunks().get(0); + Assert.assertEquals("0/object/a.bin", chunk.getObjectRelativePath()); + Assert.assertEquals(0, chunk.getFileOffset()); + Assert.assertEquals(3, chunk.getTotalFileLength()); + Assert.assertTrue(chunk.isIncludeLastByte()); + Assert.assertArrayEquals(new byte[] {1, 2, 3}, chunk.getBytes()); + } + + @Test + public void testLoadTsFileObjectPieceNodeDeserializeInvalidBuffer() { + final LoadTsFileObjectFileBatch objectBatch = + new LoadTsFileObjectFileBatch( + Collections.singletonList( + new ObjectFileChunk("0/object/a.bin", 0, 3, true, new byte[] {1, 2, 3})), + new TTimePartitionSlot(123L)); + final ByteBuffer buffer = + new LoadTsFileObjectPieceNode(new PlanNodeId("object"), objectBatch) + .serializeToByteBuffer(); + buffer.getShort(); + buffer.limit(buffer.position() + 2); + + try { + Assert.assertNull(LoadTsFileObjectPieceNode.deserialize(buffer.slice())); + } catch (final BufferUnderflowException e) { + Assert.fail("Deserialize should fail gracefully for truncated object piece buffer."); + } + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImplTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImplTest.java new file mode 100644 index 00000000000..3585c86ac4a --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImplTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.scheduler.load; + +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.commons.partition.StorageExecutor; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException; +import org.apache.iotdb.db.queryengine.common.PlanFragmentId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; +import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileObjectPieceNode; +import org.apache.iotdb.db.storageengine.StorageEngine; +import org.apache.iotdb.db.storageengine.load.splitter.LoadTsFileObjectFileBatch; +import org.apache.iotdb.rpc.RpcUtils; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Collections; + +@PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*", "javax.management.*"}) +@RunWith(PowerMockRunner.class) +@PrepareForTest(StorageEngine.class) +public class LoadTsFileDispatcherImplTest { + + @Test + public void testDispatchLocallyObjectPieceNodeSuccess() throws Exception { + final StorageEngine storageEngine = Mockito.mock(StorageEngine.class); + PowerMockito.mockStatic(StorageEngine.class); + PowerMockito.when(StorageEngine.getInstance()).thenReturn(storageEngine); + + final LoadTsFileDispatcherImpl dispatcher = new LoadTsFileDispatcherImpl(null, false); + dispatcher.setUuid("test-uuid"); + + final LoadTsFileObjectPieceNode objectPieceNode = createObjectPieceNode(); + final FragmentInstance instance = createFragmentInstance(objectPieceNode); + + Mockito.when( + storageEngine.writeLoadTsFileObjectPieceNode( + Mockito.eq(new DataRegionId(1)), + Mockito.same(objectPieceNode), + Mockito.eq("test-uuid"))) + .thenReturn(RpcUtils.SUCCESS_STATUS); + + dispatcher.dispatchLocally(instance); + + Mockito.verify(storageEngine) + .writeLoadTsFileObjectPieceNode( + Mockito.eq(new DataRegionId(1)), + Mockito.same(objectPieceNode), + Mockito.eq("test-uuid")); + } + + @Test + public void testDispatchLocallyObjectPieceNodeFailure() throws Exception { + final StorageEngine storageEngine = Mockito.mock(StorageEngine.class); + PowerMockito.mockStatic(StorageEngine.class); + PowerMockito.when(StorageEngine.getInstance()).thenReturn(storageEngine); + + final LoadTsFileDispatcherImpl dispatcher = new LoadTsFileDispatcherImpl(null, false); + dispatcher.setUuid("test-uuid"); + + final LoadTsFileObjectPieceNode objectPieceNode = createObjectPieceNode(); + final FragmentInstance instance = createFragmentInstance(objectPieceNode); + final TSStatus failureStatus = new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()); + failureStatus.setMessage("object piece dispatch failed"); + + Mockito.when( + storageEngine.writeLoadTsFileObjectPieceNode( + Mockito.eq(new DataRegionId(1)), + Mockito.same(objectPieceNode), + Mockito.eq("test-uuid"))) + .thenReturn(failureStatus); + + try { + dispatcher.dispatchLocally(instance); + Assert.fail("Expected FragmentInstanceDispatchException"); + } catch (final FragmentInstanceDispatchException e) { + Assert.assertEquals( + TSStatusCode.LOAD_FILE_ERROR.getStatusCode(), e.getFailureStatus().getCode()); + Assert.assertEquals("object piece dispatch failed", e.getFailureStatus().getMessage()); + } + } + + private static LoadTsFileObjectPieceNode createObjectPieceNode() { + return new LoadTsFileObjectPieceNode( + new PlanNodeId("object-piece"), + new LoadTsFileObjectFileBatch( + Collections.emptyList(), new TTimePartitionSlot().setStartTime(1L))); + } + + private static FragmentInstance createFragmentInstance( + final LoadTsFileObjectPieceNode objectPieceNode) { + final PlanFragmentId fragmentId = new PlanFragmentId("test", 0); + final FragmentInstance instance = + new FragmentInstance( + new PlanFragment(fragmentId, objectPieceNode), + fragmentId.genFragmentInstanceId(), + null, + null, + 0L, + null, + false, + false); + instance.setExecutorAndHost(new StorageExecutor(createReplicaSet())); + return instance; + } + + private static TRegionReplicaSet createReplicaSet() { + final TEndPoint endPoint = new TEndPoint().setIp("127.0.0.1").setPort(9000); + final TDataNodeLocation dataNodeLocation = + new TDataNodeLocation().setInternalEndPoint(endPoint); + return new TRegionReplicaSet() + .setRegionId(new DataRegionId(1).convertToTConsensusGroupId()) + .setDataNodeLocations(Collections.singletonList(dataNodeLocation)); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java index 161a95b4ec9..fd105b69a98 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java @@ -19,7 +19,14 @@ package org.apache.iotdb.db.queryengine.plan.scheduler.load; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.PlanFragmentId; import org.apache.iotdb.db.queryengine.execution.QueryStateMachine; @@ -27,15 +34,38 @@ import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment; import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode; +import org.apache.iotdb.db.queryengine.plan.scheduler.FragInstanceDispatchResult; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager; +import org.apache.iotdb.db.storageengine.load.splitter.ChunkData; +import org.apache.iotdb.db.storageengine.load.splitter.LoadTsFileObjectFileBatch; +import org.apache.iotdb.db.storageengine.load.splitter.LoadTsFileObjectFileBatchIterator; +import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.tsfile.utils.Pair; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.powermock.reflect.Whitebox; +import java.io.File; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anySet; +import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class LoadTsFileSchedulerTest { @@ -52,8 +82,16 @@ public class LoadTsFileSchedulerTest { when(planFragment.getId()).thenReturn(new PlanFragmentId("test", 0)); } + @After + public void tearDown() { + if (Whitebox.getInternalState(LoadTsFileMemoryManager.getInstance(), "dataCacheMemoryBlock") + != null) { + LoadTsFileMemoryManager.getInstance().releaseDataCacheMemoryBlock(); + } + } + @Test - public void tt() { + public void testSchedulerMetadataAccessors() { LoadTsFileScheduler t = spy( new LoadTsFileScheduler( @@ -67,4 +105,116 @@ public class LoadTsFileSchedulerTest { Assert.assertNull(t.getTotalCpuTime()); Assert.assertNull(t.getFragmentInfo()); } + + @Test + public void testDispatchObjectFileBatchesReturnsFalseWhenObjectPieceDispatchFails() + throws Exception { + final LoadTsFileScheduler scheduler = createScheduler(); + final LoadTsFileDispatcherImpl dispatcher = mock(LoadTsFileDispatcherImpl.class); + Whitebox.setInternalState(scheduler, "dispatcher", dispatcher); + + final LoadTsFilePieceNode pieceNode = + new LoadTsFilePieceNode(new PlanNodeId("piece"), new File("test.tsfile")); + final ChunkData chunkData = mock(ChunkData.class); + final LoadTsFileObjectFileBatchIterator iterator = + mock(LoadTsFileObjectFileBatchIterator.class); + final TSStatus failureStatus = new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()); + failureStatus.setMessage("dispatch object piece failed"); + + when(chunkData.getObjectFiles()) + .thenReturn(Collections.singleton(new Pair<>(new File("base"), "1/object.bin"))); + when(chunkData.getObjectFileBatchIterator(anyInt())).thenReturn(iterator); + when(iterator.hasNext()).thenReturn(true, false); + when(iterator.next()) + .thenReturn( + new LoadTsFileObjectFileBatch( + Collections.emptyList(), new TTimePartitionSlot().setStartTime(1L))); + when(dispatcher.dispatch(isNull(), anyList())) + .thenReturn( + CompletableFuture.completedFuture(new FragInstanceDispatchResult(failureStatus))); + + pieceNode.addTsFileData(chunkData); + + final boolean result = + Whitebox.invokeMethod( + scheduler, "dispatchObjectFileBatches", pieceNode, createReplicaSet()); + + Assert.assertFalse(result); + verify(dispatcher).dispatch(isNull(), anyList()); + } + + @Test + @SuppressWarnings("unchecked") + public void testSecondPhaseUsesRollbackCommandWhenFirstPhaseFails() throws Exception { + final LoadTsFileScheduler scheduler = createScheduler(); + final LoadTsFileDispatcherImpl dispatcher = mock(LoadTsFileDispatcherImpl.class); + final ArgumentCaptor<TLoadCommandReq> commandCaptor = + ArgumentCaptor.forClass(TLoadCommandReq.class); + Whitebox.setInternalState(scheduler, "dispatcher", dispatcher); + ((Set<TRegionReplicaSet>) Whitebox.getInternalState(scheduler, "allReplicaSets")) + .add(createReplicaSet()); + + when(dispatcher.dispatchCommand(commandCaptor.capture(), anySet())) + .thenReturn(CompletableFuture.completedFuture(new FragInstanceDispatchResult(true))); + + final TsFileResource tsFileResource = mock(TsFileResource.class); + when(tsFileResource.getTsFile()).thenReturn(new File("rollback.tsfile")); + + final boolean result = + Whitebox.invokeMethod(scheduler, "secondPhase", false, "rollback-uuid", tsFileResource); + + Assert.assertTrue(result); + Assert.assertEquals( + LoadTsFileScheduler.LoadCommand.ROLLBACK.ordinal(), commandCaptor.getValue().commandType); + } + + @Test + @SuppressWarnings("unchecked") + public void testSecondPhaseReturnsFalseWhenRollbackDispatchFails() throws Exception { + final LoadTsFileScheduler scheduler = createScheduler(); + final LoadTsFileDispatcherImpl dispatcher = mock(LoadTsFileDispatcherImpl.class); + final ArgumentCaptor<TLoadCommandReq> commandCaptor = + ArgumentCaptor.forClass(TLoadCommandReq.class); + final TSStatus failureStatus = new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()); + failureStatus.setMessage("rollback failed"); + Whitebox.setInternalState(scheduler, "dispatcher", dispatcher); + ((Set<TRegionReplicaSet>) Whitebox.getInternalState(scheduler, "allReplicaSets")) + .add(createReplicaSet()); + + when(dispatcher.dispatchCommand(commandCaptor.capture(), anySet())) + .thenReturn( + CompletableFuture.completedFuture(new FragInstanceDispatchResult(failureStatus))); + + final TsFileResource tsFileResource = mock(TsFileResource.class); + when(tsFileResource.getTsFile()).thenReturn(new File("rollback.tsfile")); + + final boolean result = + Whitebox.invokeMethod(scheduler, "secondPhase", false, "rollback-uuid", tsFileResource); + + Assert.assertFalse(result); + Assert.assertEquals( + LoadTsFileScheduler.LoadCommand.ROLLBACK.ordinal(), commandCaptor.getValue().commandType); + } + + private LoadTsFileScheduler createScheduler() { + final MPPQueryContext queryContext = mock(MPPQueryContext.class); + when(queryContext.getTimeOut()).thenReturn(10_000L); + when(queryContext.getStartTime()).thenReturn(0L); + return new LoadTsFileScheduler( + distributedQueryPlan, + queryContext, + mock(QueryStateMachine.class), + mock(IClientManager.class), + mock(IPartitionFetcher.class), + false); + } + + private TRegionReplicaSet createReplicaSet() { + final TEndPoint endPoint = new TEndPoint().setIp("127.0.0.1").setPort(9000); + final TDataNodeLocation dataNodeLocation = + new TDataNodeLocation().setInternalEndPoint(endPoint); + return new TRegionReplicaSet() + .setRegionId(new DataRegionId(1).convertToTConsensusGroupId()) + .setDataNodeLocations(Collections.singletonList(dataNodeLocation)); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/StorageEngineTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/StorageEngineTest.java index 4c4ac20e9ae..afd2e2170ed 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/StorageEngineTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/StorageEngineTest.java @@ -18,9 +18,17 @@ */ package org.apache.iotdb.db.storageengine; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileObjectPieceNode; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.apache.iotdb.db.storageengine.load.LoadTsFileManager; +import org.apache.iotdb.db.storageengine.load.splitter.LoadTsFileObjectFileBatch; +import org.apache.iotdb.rpc.RpcUtils; +import org.apache.iotdb.rpc.TSStatusCode; import com.google.common.collect.Lists; import org.junit.After; @@ -32,8 +40,12 @@ import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.reflect.Whitebox; +import java.io.IOException; +import java.util.Collections; import java.util.List; +import java.util.Map; @PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*", "javax.management.*"}) @RunWith(PowerMockRunner.class) @@ -41,14 +53,25 @@ import java.util.List; public class StorageEngineTest { private StorageEngine storageEngine; + private LoadTsFileManager originalLoadTsFileManager; + private Map<DataRegionId, DataRegion> dataRegionMap; + @SuppressWarnings("unchecked") @Before public void setUp() { storageEngine = StorageEngine.getInstance(); + originalLoadTsFileManager = Whitebox.getInternalState(storageEngine, "loadTsFileManager"); + dataRegionMap = Whitebox.getInternalState(storageEngine, "dataRegionMap"); } @After public void after() { + if (dataRegionMap != null) { + dataRegionMap.clear(); + } + if (storageEngine != null && originalLoadTsFileManager != null) { + Whitebox.setInternalState(storageEngine, "loadTsFileManager", originalLoadTsFileManager); + } storageEngine = null; } @@ -84,4 +107,60 @@ public class StorageEngineTest { Assert.assertEquals(1, TimePartitionUtils.getTimePartitionId(timePartitionInterval * 2 - 1)); Assert.assertEquals(2, TimePartitionUtils.getTimePartitionId(timePartitionInterval * 2 + 1)); } + + @Test + public void testWriteLoadTsFileObjectPieceNodeReturnsSuccessWhenDataRegionMissing() { + TSStatus status = + storageEngine.writeLoadTsFileObjectPieceNode( + new DataRegionId(1), createObjectPieceNode(), "missing-region"); + + Assert.assertEquals(RpcUtils.SUCCESS_STATUS, status); + } + + @Test + public void testWriteLoadTsFileObjectPieceNodeIOException() throws Exception { + final DataRegionId dataRegionId = new DataRegionId(1); + final DataRegion dataRegion = PowerMockito.mock(DataRegion.class); + final LoadTsFileManager loadTsFileManager = PowerMockito.mock(LoadTsFileManager.class); + final LoadTsFileObjectPieceNode objectPieceNode = createObjectPieceNode(); + + dataRegionMap.put(dataRegionId, dataRegion); + Whitebox.setInternalState(storageEngine, "loadTsFileManager", loadTsFileManager); + PowerMockito.doThrow(new IOException("write object payload failed")) + .when(loadTsFileManager) + .writeObjectPayloadToDataRegion(dataRegion, objectPieceNode, "uuid"); + + final TSStatus status = + storageEngine.writeLoadTsFileObjectPieceNode(dataRegionId, objectPieceNode, "uuid"); + + Assert.assertEquals(TSStatusCode.LOAD_FILE_ERROR.getStatusCode(), status.getCode()); + Assert.assertEquals("write object payload failed", status.getMessage()); + } + + @Test + public void testWriteLoadTsFileObjectPieceNodeUnexpectedException() throws Exception { + final DataRegionId dataRegionId = new DataRegionId(1); + final DataRegion dataRegion = PowerMockito.mock(DataRegion.class); + final LoadTsFileManager loadTsFileManager = PowerMockito.mock(LoadTsFileManager.class); + final LoadTsFileObjectPieceNode objectPieceNode = createObjectPieceNode(); + + dataRegionMap.put(dataRegionId, dataRegion); + Whitebox.setInternalState(storageEngine, "loadTsFileManager", loadTsFileManager); + PowerMockito.doThrow(new RuntimeException("unexpected object payload failure")) + .when(loadTsFileManager) + .writeObjectPayloadToDataRegion(dataRegion, objectPieceNode, "uuid"); + + final TSStatus status = + storageEngine.writeLoadTsFileObjectPieceNode(dataRegionId, objectPieceNode, "uuid"); + + Assert.assertEquals(TSStatusCode.LOAD_FILE_ERROR.getStatusCode(), status.getCode()); + Assert.assertEquals("unexpected object payload failure", status.getMessage()); + } + + private static LoadTsFileObjectPieceNode createObjectPieceNode() { + return new LoadTsFileObjectPieceNode( + new PlanNodeId("object-piece"), + new LoadTsFileObjectFileBatch( + Collections.emptyList(), new TTimePartitionSlot().setStartTime(1L))); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/splitter/LoadTsFileObjectFileBatchTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/splitter/LoadTsFileObjectFileBatchTest.java new file mode 100644 index 00000000000..53823eb94d9 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/splitter/LoadTsFileObjectFileBatchTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.load.splitter; + +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.iotdb.db.storageengine.load.splitter.LoadTsFileObjectFileBatch.ObjectFileChunk; + +import org.apache.tsfile.utils.Pair; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Set; + +public class LoadTsFileObjectFileBatchTest { + + @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Test + public void testSerializeDeserializeAndWriteChunks() throws Exception { + final TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot(100L); + final LoadTsFileObjectFileBatch batch = + new LoadTsFileObjectFileBatch( + Arrays.asList( + new ObjectFileChunk("0/dir/file.bin", 0, 5, false, new byte[] {1, 2, 3}), + new ObjectFileChunk("0/dir/file.bin", 3, 5, true, new byte[] {4, 5})), + timePartitionSlot); + + final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (final DataOutputStream stream = new DataOutputStream(byteArrayOutputStream)) { + batch.serialize(stream); + } + + final LoadTsFileObjectFileBatch deserialized = + (LoadTsFileObjectFileBatch) + TsFileData.deserialize(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())); + Assert.assertEquals( + TimePartitionUtils.getTimePartitionSlot(timePartitionSlot.getStartTime()), + deserialized.getLastTimePartitionSlot()); + Assert.assertEquals(2, deserialized.getObjectFileChunks().size()); + assertChunk(deserialized.getObjectFileChunks().get(0), "0/dir/file.bin", 0, 5, false, 1, 2, 3); + assertChunk(deserialized.getObjectFileChunks().get(1), "0/dir/file.bin", 3, 5, true, 4, 5); + + final File stagedDir = temporaryFolder.newFolder("staged"); + for (final ObjectFileChunk chunk : deserialized.getObjectFileChunks()) { + chunk.writeChunk(stagedDir, "1"); + } + + final Path writtenFile = stagedDir.toPath().resolve("1").resolve("dir").resolve("file.bin"); + Assert.assertArrayEquals(new byte[] {1, 2, 3, 4, 5}, Files.readAllBytes(writtenFile)); + } + + @Test + public void testWriteChunkValidatesRelativePathAndOffsets() throws Exception { + final File stagedDir = temporaryFolder.newFolder("staged-validate"); + + try { + new ObjectFileChunk("file.bin", 0, 1, true, new byte[] {1}).writeChunk(stagedDir, "1"); + Assert.fail("Expected invalid object relative path to be rejected."); + } catch (final IOException e) { + Assert.assertTrue(e.getMessage().contains("Invalid object relative path")); + } + + final ObjectFileChunk firstChunk = + new ObjectFileChunk("0/dir/file.bin", 0, 3, false, new byte[] {1, 2}); + firstChunk.writeChunk(stagedDir, "1"); + + try { + new ObjectFileChunk("0/dir/file.bin", 1, 3, true, new byte[] {3}).writeChunk(stagedDir, "1"); + Assert.fail("Expected offset mismatch to be rejected."); + } catch (final IOException e) { + Assert.assertTrue(e.getMessage().contains("offset mismatch")); + } + + try { + new ObjectFileChunk("0/dir/another.bin", 0, 5, true, new byte[] {1, 2}) + .writeChunk(stagedDir, "1"); + Assert.fail("Expected final chunk size mismatch to be rejected."); + } catch (final IOException e) { + Assert.assertTrue(e.getMessage().contains("size mismatch")); + } + } + + @Test + public void testIteratorSplitsFilesAndSkipsEmptyEntries() throws Exception { + final File baseDir = temporaryFolder.newFolder("iterator"); + final Path nestedDir = baseDir.toPath().resolve("0"); + Files.createDirectories(nestedDir); + Files.write(nestedDir.resolve("a.bin"), new byte[] {0, 1, 2, 3, 4}); + Files.write(nestedDir.resolve("b.bin"), new byte[] {9, 8}); + Files.write(nestedDir.resolve("empty.bin"), new byte[0]); + + final Set<Pair<File, String>> fileSet = new LinkedHashSet<>(); + fileSet.add(new Pair<>(baseDir, "0/missing.bin")); + fileSet.add(new Pair<>(baseDir, "0/empty.bin")); + fileSet.add(new Pair<>(baseDir, "0/a.bin")); + fileSet.add(new Pair<>(baseDir, "0/b.bin")); + + try (final LoadTsFileObjectFileBatchIterator iterator = + new LoadTsFileObjectFileBatchIterator(fileSet, 4, new TTimePartitionSlot(7L))) { + Assert.assertTrue(iterator.hasNext()); + final List<ObjectFileChunk> firstBatch = iterator.next().getObjectFileChunks(); + Assert.assertEquals(1, firstBatch.size()); + assertChunk(firstBatch.get(0), "0/a.bin", 0, 5, false, 0, 1, 2, 3); + + Assert.assertTrue(iterator.hasNext()); + final List<ObjectFileChunk> secondBatch = iterator.next().getObjectFileChunks(); + Assert.assertEquals(2, secondBatch.size()); + assertChunk(secondBatch.get(0), "0/a.bin", 4, 5, true, 4); + assertChunk(secondBatch.get(1), "0/b.bin", 0, 2, true, 9, 8); + + Assert.assertFalse(iterator.hasNext()); + try { + iterator.next(); + Assert.fail("Expected iterator exhaustion to throw."); + } catch (final NoSuchElementException ignored) { + } + } + } + + private static void assertChunk( + final ObjectFileChunk chunk, + final String relativePath, + final long offset, + final long totalLength, + final boolean includeLastByte, + final int... bytes) { + Assert.assertEquals(relativePath, chunk.getObjectRelativePath()); + Assert.assertEquals(offset, chunk.getFileOffset()); + Assert.assertEquals(totalLength, chunk.getTotalFileLength()); + Assert.assertEquals(includeLastByte, chunk.isIncludeLastByte()); + + final byte[] expected = new byte[bytes.length]; + for (int i = 0; i < bytes.length; i++) { + expected[i] = (byte) bytes[i]; + } + Assert.assertArrayEquals(expected, chunk.getBytes()); + } +}
