This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch remove-sync-entry in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c1a0d73f3428acc911204026e7907919aba41b21 Author: Steve Yurong Su <[email protected]> AuthorDate: Sat Jun 17 01:07:09 2023 +0800 remove server/src/test/java/org/apache/iotdb/db/sync/ --- .../org/apache/iotdb/db/sync/SyncTestUtils.java | 41 - .../db/sync/datasource/DeletionGroupTest.java | 231 ---- .../db/sync/datasource/DeletionOpBlockTest.java | 62 -- .../db/sync/datasource/PipeOpManagerTest.java | 595 ----------- .../db/sync/datasource/TsFileOpBlockTest.java | 1123 -------------------- .../db/sync/persistence/LocalSyncInfoTest.java | 104 -- .../iotdb/db/sync/persistence/SyncLogTest.java | 121 --- .../iotdb/db/sync/pipedata/PipeDataTest.java | 65 -- 8 files changed, 2342 deletions(-) diff --git a/server/src/test/java/org/apache/iotdb/db/sync/SyncTestUtils.java b/server/src/test/java/org/apache/iotdb/db/sync/SyncTestUtils.java deleted file mode 100644 index 5733e60c6d1..00000000000 --- a/server/src/test/java/org/apache/iotdb/db/sync/SyncTestUtils.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.sync; - -import org.apache.iotdb.commons.sync.pipe.PipeInfo; -import org.apache.iotdb.commons.sync.pipe.PipeMessage; -import org.apache.iotdb.commons.sync.pipe.PipeStatus; - -import org.junit.Assert; - -public class SyncTestUtils { - public static void checkPipeInfo( - PipeInfo pipeInfo, - String pipeName, - String pipeSinkName, - PipeStatus status, - long createTime, - PipeMessage.PipeMessageType messageType) { - Assert.assertEquals(pipeName, pipeInfo.getPipeName()); - Assert.assertEquals(pipeSinkName, pipeInfo.getPipeSinkName()); - Assert.assertEquals(status, pipeInfo.getStatus()); - Assert.assertEquals(createTime, pipeInfo.getCreateTime()); - Assert.assertEquals(messageType, pipeInfo.getMessageType()); - } -} diff --git a/server/src/test/java/org/apache/iotdb/db/sync/datasource/DeletionGroupTest.java b/server/src/test/java/org/apache/iotdb/db/sync/datasource/DeletionGroupTest.java deleted file mode 100644 index df10ba50fb4..00000000000 --- a/server/src/test/java/org/apache/iotdb/db/sync/datasource/DeletionGroupTest.java +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.iotdb.db.sync.datasource; - -import org.junit.BeforeClass; -import org.junit.Test; - -import java.util.Iterator; -import java.util.Map; -import java.util.TreeMap; - -import static org.apache.iotdb.db.sync.datasource.DeletionGroup.DeletedType.FULL_DELETED; -import static org.apache.iotdb.db.sync.datasource.DeletionGroup.DeletedType.NO_DELETED; -import static org.apache.iotdb.db.sync.datasource.DeletionGroup.DeletedType.PARTIAL_DELETED; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class DeletionGroupTest { - - private static DeletionGroup deletionGroup1; - private static DeletionGroup deletionGroup2; // empty - - @BeforeClass - public static void prepareData() { - deletionGroup1 = new DeletionGroup(); - - // for item 0 - deletionGroup1.addDelInterval(10, 30); - deletionGroup1.addDelInterval(20, 40); - - // for item 3 - deletionGroup1.addDelInterval(150, 200); - deletionGroup1.addDelInterval(150, 200); - - // for item 1 - deletionGroup1.addDelInterval(50, 50); - deletionGroup1.addDelInterval(50, 50); - - // for item 4 - deletionGroup1.addDelInterval(220, 300); - deletionGroup1.addDelInterval(250, 290); - - // for item 2 - deletionGroup1.addDelInterval(70, 110); - deletionGroup1.addDelInterval(70, 80); - deletionGroup1.addDelInterval(80, 90); - deletionGroup1.addDelInterval(100, 120); - - deletionGroup2 = new DeletionGroup(); - } - - @Test - public void testAddDelInterval() { - boolean hasException = false; - try { - deletionGroup1.addDelInterval(10, 5); - } catch (IllegalArgumentException e) { - hasException = true; - } - assertTrue(hasException); - - TreeMap<Long, Long> delIntervalMap = deletionGroup1.getDelIntervalMap(); - Iterator<Map.Entry<Long, Long>> iter1 = delIntervalMap.entrySet().iterator(); - Map.Entry<Long, Long> entry1 = iter1.next(); - assertEquals(10, entry1.getKey().longValue()); - assertEquals(40, entry1.getValue().longValue()); - entry1 = iter1.next(); - assertEquals(50, entry1.getKey().longValue()); - assertEquals(50, entry1.getValue().longValue()); - entry1 = iter1.next(); - assertEquals(70, entry1.getKey().longValue()); - assertEquals(120, entry1.getValue().longValue()); - entry1 = iter1.next(); - assertEquals(150, entry1.getKey().longValue()); - assertEquals(200, entry1.getValue().longValue()); - entry1 = iter1.next(); - assertEquals(220, entry1.getKey().longValue()); - assertEquals(300, entry1.getValue().longValue()); - } - - @Test - public void testCheckDeletedState() { - boolean hasException = false; - try { - deletionGroup1.checkDeletedState(5, 1); - } catch (IllegalArgumentException e) { - hasException = true; - } - assertTrue(hasException); - - assertEquals(NO_DELETED, deletionGroup1.checkDeletedState(1, 5)); - assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(1, 10)); - assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(2, 15)); - assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(10, 10)); - assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(10, 20)); - assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(30, 40)); - assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(10, 40)); - assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(40, 40)); - assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(35, 45)); - assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(40, 45)); - - assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(50, 50)); - assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(45, 50)); - assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(50, 55)); - assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(45, 55)); - - assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(5, 55)); - assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(5, 500)); - - assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(120, 140)); - assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(120, 150)); - - assertEquals(NO_DELETED, deletionGroup1.checkDeletedState(201, 201)); - assertEquals(NO_DELETED, deletionGroup1.checkDeletedState(400, 500)); - - assertEquals(NO_DELETED, deletionGroup1.checkDeletedState(201, 219)); - assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(201, 220)); - assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(220, 220)); - assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(220, 230)); - assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(230, 250)); - assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(250, 300)); - assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(220, 300)); - assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(220, 330)); - assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(240, 350)); - - // == test empty deletionGroup2 - assertEquals(NO_DELETED, deletionGroup2.checkDeletedState(1, 500)); - } - - @Test - public void testIsDeleted() { - - assertEquals(false, deletionGroup1.isDeleted(5)); - assertEquals(true, deletionGroup1.isDeleted(10)); - assertEquals(true, deletionGroup1.isDeleted(20)); - assertEquals(true, deletionGroup1.isDeleted(40)); - assertEquals(false, deletionGroup1.isDeleted(45)); - - assertEquals(true, deletionGroup1.isDeleted(50)); - - assertEquals(false, deletionGroup1.isDeleted(60)); - assertEquals(true, deletionGroup1.isDeleted(70)); - assertEquals(true, deletionGroup1.isDeleted(100)); - assertEquals(true, deletionGroup1.isDeleted(120)); - assertEquals(false, deletionGroup1.isDeleted(122)); - - assertEquals(true, deletionGroup1.isDeleted(220)); - assertEquals(true, deletionGroup1.isDeleted(250)); - assertEquals(true, deletionGroup1.isDeleted(300)); - assertEquals(false, deletionGroup1.isDeleted(400)); - - // == test empty deletionGroup2 - assertEquals(false, deletionGroup2.isDeleted(100)); - } - - @Test - public void testIsDeleted2() { - DeletionGroup.IntervalCursor intervalCursor = new DeletionGroup.IntervalCursor(); - assertEquals(false, deletionGroup1.isDeleted(1, intervalCursor)); - assertEquals(false, deletionGroup1.isDeleted(5, intervalCursor)); - assertEquals(true, deletionGroup1.isDeleted(10, intervalCursor)); - assertEquals(true, deletionGroup1.isDeleted(20, intervalCursor)); - assertEquals(true, deletionGroup1.isDeleted(40, intervalCursor)); - assertEquals(false, deletionGroup1.isDeleted(45, intervalCursor)); - - assertEquals(true, deletionGroup1.isDeleted(50, intervalCursor)); - assertEquals(false, deletionGroup1.isDeleted(55, intervalCursor)); - - assertEquals(true, deletionGroup1.isDeleted(70, intervalCursor)); - assertEquals(true, deletionGroup1.isDeleted(100, intervalCursor)); - assertEquals(true, deletionGroup1.isDeleted(120, intervalCursor)); - assertEquals(false, deletionGroup1.isDeleted(125, intervalCursor)); - - assertEquals(true, deletionGroup1.isDeleted(300, intervalCursor)); - assertEquals(false, deletionGroup1.isDeleted(301, intervalCursor)); - - intervalCursor = new DeletionGroup.IntervalCursor(); - assertEquals(true, deletionGroup1.isDeleted(10, intervalCursor)); - assertEquals(true, deletionGroup1.isDeleted(20, intervalCursor)); - assertEquals(true, deletionGroup1.isDeleted(40, intervalCursor)); - assertEquals(false, deletionGroup1.isDeleted(45, intervalCursor)); - - assertEquals(true, deletionGroup1.isDeleted(50, intervalCursor)); - assertEquals(false, deletionGroup1.isDeleted(55, intervalCursor)); - - assertEquals(true, deletionGroup1.isDeleted(70, intervalCursor)); - assertEquals(true, deletionGroup1.isDeleted(100, intervalCursor)); - assertEquals(true, deletionGroup1.isDeleted(120, intervalCursor)); - assertEquals(false, deletionGroup1.isDeleted(125, intervalCursor)); - - assertEquals(true, deletionGroup1.isDeleted(300, intervalCursor)); - assertEquals(false, deletionGroup1.isDeleted(301, intervalCursor)); - - intervalCursor.reset(); - assertEquals(true, deletionGroup1.isDeleted(50, intervalCursor)); - assertEquals(false, deletionGroup1.isDeleted(55, intervalCursor)); - - assertEquals(true, deletionGroup1.isDeleted(70, intervalCursor)); - assertEquals(true, deletionGroup1.isDeleted(100, intervalCursor)); - assertEquals(true, deletionGroup1.isDeleted(120, intervalCursor)); - assertEquals(false, deletionGroup1.isDeleted(125, intervalCursor)); - - assertEquals(true, deletionGroup1.isDeleted(300, intervalCursor)); - assertEquals(false, deletionGroup1.isDeleted(301, intervalCursor)); - - intervalCursor.reset(); - assertEquals(false, deletionGroup1.isDeleted(301, intervalCursor)); - - // == test empty deletionGroup2 - intervalCursor = new DeletionGroup.IntervalCursor(); - assertEquals(false, deletionGroup2.isDeleted(301, intervalCursor)); - } -} diff --git a/server/src/test/java/org/apache/iotdb/db/sync/datasource/DeletionOpBlockTest.java b/server/src/test/java/org/apache/iotdb/db/sync/datasource/DeletionOpBlockTest.java deleted file mode 100644 index 0495bd05111..00000000000 --- a/server/src/test/java/org/apache/iotdb/db/sync/datasource/DeletionOpBlockTest.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.iotdb.db.sync.datasource; - -import org.apache.iotdb.commons.exception.IllegalPathException; -import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.db.sync.externalpipe.operation.DeleteOperation; -import org.apache.iotdb.db.sync.externalpipe.operation.Operation; - -import org.junit.Test; - -import java.io.IOException; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -public class DeletionOpBlockTest { - - @Test - public void TstDeletionOpBlock() throws IOException, IllegalPathException { - PartialPath partialPath = new PartialPath("root.a.b.**"); - DeletionOpBlock deletionOpBlock = new DeletionOpBlock("delete", partialPath, -111, 222, 5); - - deletionOpBlock.setBeginIndex(101); - - System.out.println(deletionOpBlock); - - Operation operation = deletionOpBlock.getOperation(0, 5); - assertNull(operation); - - operation = deletionOpBlock.getOperation(101, 5); - DeleteOperation deleteOperation = (DeleteOperation) operation; - - assertEquals(101, deleteOperation.getStartIndex()); - assertEquals(102, deleteOperation.getEndIndex()); - - assertEquals(1, deleteOperation.getDataCount()); - assertEquals(-111, deleteOperation.getStartTime()); - assertEquals(222, deleteOperation.getEndTime()); - assertEquals(partialPath, deleteOperation.getDeletePath()); - - assertEquals(Operation.OperationType.DELETE, deleteOperation.getOperationType()); - assertEquals("DELETE", deleteOperation.getOperationTypeName()); - } -} diff --git a/server/src/test/java/org/apache/iotdb/db/sync/datasource/PipeOpManagerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/datasource/PipeOpManagerTest.java deleted file mode 100644 index 8f7f746c000..00000000000 --- a/server/src/test/java/org/apache/iotdb/db/sync/datasource/PipeOpManagerTest.java +++ /dev/null @@ -1,595 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.sync.datasource; - -import org.apache.iotdb.commons.exception.IllegalPathException; -import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.db.engine.modification.Deletion; -import org.apache.iotdb.db.engine.modification.Modification; -import org.apache.iotdb.db.engine.modification.ModificationFile; -import org.apache.iotdb.db.sync.externalpipe.operation.DeleteOperation; -import org.apache.iotdb.db.sync.externalpipe.operation.InsertOperation; -import org.apache.iotdb.db.sync.externalpipe.operation.Operation; -import org.apache.iotdb.db.utils.EnvironmentUtils; -import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; -import org.apache.iotdb.tsfile.write.TsFileWriter; -import org.apache.iotdb.tsfile.write.record.TSRecord; -import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; -import org.apache.iotdb.tsfile.write.record.datapoint.FloatDataPoint; -import org.apache.iotdb.tsfile.write.record.datapoint.IntDataPoint; -import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; -import org.apache.iotdb.tsfile.write.schema.Schema; - -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -public class PipeOpManagerTest { - public static final String TMP_DIR = "target" + File.separator + "PipeOpManagerTest"; - private static final String seqTsFileName1 = TMP_DIR + File.separator + "test1.tsfile"; - private static final String seqModsFileName1 = seqTsFileName1 + ".mods"; - private static final String unSeqTsFileName1 = TMP_DIR + File.separator + "test2.unseq.tsfile"; - private static final String unSeqModsFileName1 = unSeqTsFileName1 + ".mods"; - public static final String DEFAULT_TEMPLATE = "template"; - public static final List<String> delFileList = new LinkedList<>(); - - private static final String bigSeqTsFileName1 = TMP_DIR + File.separator + "test1.big.seq.tsfile"; - private static final String bigSeqTsFileName2 = TMP_DIR + File.separator + "test2.big.seq.tsfile"; - private static final String bigSeqTsFileName3 = TMP_DIR + File.separator + "test3.big.seq.tsfile"; - - private static int oldMaxNumberOfPointsInPage; - - @BeforeClass - public static void prepareTestData() throws Exception { - oldMaxNumberOfPointsInPage = - TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage(); - - EnvironmentUtils.envSetUp(); - - createSeqTsfile(seqTsFileName1); - delFileList.add(seqTsFileName1); - creatSeqModsFile(seqModsFileName1); - delFileList.add(seqModsFileName1); - - createUnSeqTsfile(unSeqTsFileName1); - delFileList.add(unSeqTsFileName1); - creatUnSeqModsFile(unSeqModsFileName1); - delFileList.add(unSeqModsFileName1); - - createBigSeqTsfile(bigSeqTsFileName1, 1, -1); - delFileList.add(bigSeqTsFileName1); - createBigSeqTsfile(bigSeqTsFileName2, 2, 100); - delFileList.add(bigSeqTsFileName2); - createBigSeqTsfile(bigSeqTsFileName3, 3, 57); - delFileList.add(bigSeqTsFileName3); - } - - @AfterClass - public static void removeTestData() throws Exception { - for (String fileName : delFileList) { - File file = new File(fileName); - if (file.exists()) { - file.delete(); - } - } - - EnvironmentUtils.cleanEnv(); - EnvironmentUtils.cleanAllDir(); - - TSFileDescriptor.getInstance() - .getConfig() - .setMaxNumberOfPointsInPage(oldMaxNumberOfPointsInPage); - } - - private static void createSeqTsfile(String tsfilePath) throws Exception { - File file = new File(tsfilePath); - if (file.exists()) { - file.delete(); - } - - Schema schema = new Schema(); - schema.extendTemplate( - DEFAULT_TEMPLATE, new MeasurementSchema("sensor1", TSDataType.FLOAT, TSEncoding.RLE)); - schema.extendTemplate( - DEFAULT_TEMPLATE, new MeasurementSchema("sensor2", TSDataType.INT32, TSEncoding.TS_2DIFF)); - schema.extendTemplate( - DEFAULT_TEMPLATE, new MeasurementSchema("sensor3", TSDataType.INT32, TSEncoding.TS_2DIFF)); - - TsFileWriter tsFileWriter = new TsFileWriter(file, schema); - - // construct TSRecord - TSRecord tsRecord = new TSRecord(1617206403001L, "root.lemming.device1"); - DataPoint dPoint1 = new FloatDataPoint("sensor1", 1.1f); - DataPoint dPoint2 = new IntDataPoint("sensor2", 12); - DataPoint dPoint3 = new IntDataPoint("sensor3", 13); - tsRecord.addTuple(dPoint1); - tsRecord.addTuple(dPoint2); - tsRecord.addTuple(dPoint3); - tsFileWriter.write(tsRecord); - tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once - - tsRecord = new TSRecord(1617206403002L, "root.lemming.device2"); - dPoint2 = new IntDataPoint("sensor2", 22); - tsRecord.addTuple(dPoint2); - tsFileWriter.write(tsRecord); - tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once - - tsRecord = new TSRecord(1617206403003L, "root.lemming.device3"); - dPoint1 = new FloatDataPoint("sensor1", 3.1f); - dPoint2 = new IntDataPoint("sensor2", 32); - tsRecord.addTuple(dPoint1); - tsRecord.addTuple(dPoint2); - tsFileWriter.write(tsRecord); - tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once - - tsRecord = new TSRecord(1617206403004L, "root.lemming.device3"); - dPoint1 = new FloatDataPoint("sensor1", 4.1f); - dPoint2 = new IntDataPoint("sensor2", 42); - tsRecord.addTuple(dPoint1); - tsRecord.addTuple(dPoint2); - tsFileWriter.write(tsRecord); - tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once - - // close TsFile - tsFileWriter.close(); - } - - private static void createUnSeqTsfile(String tsfilePath) throws Exception { - File file = new File(tsfilePath); - if (file.exists()) { - file.delete(); - } - - Schema schema = new Schema(); - schema.extendTemplate( - DEFAULT_TEMPLATE, new MeasurementSchema("sensor1", TSDataType.FLOAT, TSEncoding.RLE)); - schema.extendTemplate( - DEFAULT_TEMPLATE, new MeasurementSchema("sensor2", TSDataType.INT32, TSEncoding.TS_2DIFF)); - schema.extendTemplate( - DEFAULT_TEMPLATE, new MeasurementSchema("sensor3", TSDataType.INT32, TSEncoding.TS_2DIFF)); - - TsFileWriter tsFileWriter = new TsFileWriter(file, schema); - - // construct TSRecord - TSRecord tsRecord = new TSRecord(1617206403001L, "root2.lemming.device1"); - DataPoint dPoint1 = new FloatDataPoint("sensor1", 1.1f); - DataPoint dPoint2 = new IntDataPoint("sensor2", 12); - DataPoint dPoint3 = new IntDataPoint("sensor3", 13); - tsRecord.addTuple(dPoint1); - tsRecord.addTuple(dPoint2); - tsRecord.addTuple(dPoint3); - tsFileWriter.write(tsRecord); - tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once - - tsRecord = new TSRecord(1617206403002L, "root2.lemming.device2"); - dPoint2 = new IntDataPoint("sensor2", 22); - tsRecord.addTuple(dPoint2); - tsFileWriter.write(tsRecord); - tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once - - tsRecord = new TSRecord(1617206403003L, "root2.lemming.device3"); - dPoint1 = new FloatDataPoint("sensor1", 33.1f); - dPoint2 = new IntDataPoint("sensor2", 332); - dPoint3 = new IntDataPoint("sensor3", 333); - tsRecord.addTuple(dPoint1); - tsRecord.addTuple(dPoint2); - tsRecord.addTuple(dPoint3); - tsFileWriter.write(tsRecord); - tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once - - tsRecord = new TSRecord(1617206403004L, "root2.lemming.device3"); - dPoint1 = new FloatDataPoint("sensor1", 44.1f); - dPoint2 = new IntDataPoint("sensor2", 442); - dPoint3 = new IntDataPoint("sensor3", 443); - tsRecord.addTuple(dPoint1); - tsRecord.addTuple(dPoint2); - tsRecord.addTuple(dPoint3); - tsFileWriter.write(tsRecord); - tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once - - // close TsFile - tsFileWriter.close(); - } - - private static void creatSeqModsFile(String modsFilePath) throws IllegalPathException { - Modification[] modifications = - new Modification[] { - new Deletion(new PartialPath("root.lemming.device2.sensor2"), 2, 1617206403002L), - new Deletion( - new PartialPath("root.lemming.device3.sensor1"), 3, 1617206403003L, 1617206403009L), - }; - - try (ModificationFile mFile = new ModificationFile(modsFilePath)) { - for (Modification mod : modifications) { - mFile.write(mod); - } - } catch (IOException e) { - fail(e.getMessage()); - } finally {; - } - } - - private static void creatUnSeqModsFile(String modsFilePath) throws IllegalPathException { - Modification[] modifications = - new Modification[] { - new Deletion(new PartialPath("root2.lemming.device1.sensor1"), 2, 1617206403001L), - new Deletion(new PartialPath("root2.lemming.device2.*"), 3, 2, Long.MAX_VALUE), - new Deletion( - new PartialPath("root1.lemming.**"), 3, 2, Long.MAX_VALUE), // useless entry for root1 - }; - - try (ModificationFile mFile = new ModificationFile(modsFilePath)) { - for (Modification mod : modifications) { - mFile.write(mod); - } - } catch (IOException e) { - fail(e.getMessage()); - } finally { - } - } - - @Test(timeout = 10_000L) - public void testOpManager() throws IOException { - PipeOpManager pipeOpManager = new PipeOpManager(null); - - String sgName1 = "root1"; - String sgName2 = "root2"; - - TsFileOpBlock tsFileOpBlock1 = new TsFileOpBlock(sgName1, seqTsFileName1, 1); - pipeOpManager.appendOpBlock(sgName1, tsFileOpBlock1); - TsFileOpBlock tsFileOpBlock2 = new TsFileOpBlock(sgName2, unSeqTsFileName1, 2); - pipeOpManager.appendOpBlock(sgName2, tsFileOpBlock2); - - long count1 = tsFileOpBlock1.getDataCount(); - assertEquals(8, count1); - for (int i = 0; i < count1; i++) { - Operation operation = pipeOpManager.getOperation(sgName1, i, 8); - System.out.println("=== data" + i + ": " + operation + ", "); // - assertEquals("root1", operation.getStorageGroup()); - } - - Operation operation = pipeOpManager.getOperation(sgName1, 0, 18); - InsertOperation insertOperation = (InsertOperation) operation; - System.out.println("+++ data10" + ": " + operation + ", "); - assertEquals( - "root.lemming.device1.sensor1", insertOperation.getDataList().get(0).left.toString()); - - pipeOpManager.commitData(sgName1, count1 - 1); - operation = pipeOpManager.getOperation(sgName1, 9, 18); - System.out.println("+++ data11" + ": " + operation + ", "); - assertNull(operation); - - operation = pipeOpManager.getOperation(sgName2, 6, 18); - System.out.println("+++ data12" + ": " + operation + ", "); - assertEquals(4, operation.getDataCount()); - - insertOperation = (InsertOperation) operation; - assertEquals( - "root2.lemming.device3.sensor3", insertOperation.getDataList().get(0).left.toString()); - assertEquals(1617206403003L, insertOperation.getDataList().get(0).right.get(0).getTimestamp()); - assertEquals("333", insertOperation.getDataList().get(0).right.get(0).getValue().toString()); - } - - @Test(timeout = 10_000L) - public void testOpManager_Mods() throws IOException { - PipeOpManager pipeOpManager = new PipeOpManager(null); - - String sgName1 = "root1"; - // String sgName2 = "root2"; - - TsFileOpBlock tsFileOpBlock1 = new TsFileOpBlock(sgName1, seqTsFileName1, seqModsFileName1, 1); - pipeOpManager.appendOpBlock(sgName1, tsFileOpBlock1); - TsFileOpBlock tsFileOpBlock2 = - new TsFileOpBlock(sgName1, unSeqTsFileName1, unSeqModsFileName1, 2); - pipeOpManager.appendOpBlock(sgName1, tsFileOpBlock2); - - long count1 = tsFileOpBlock1.getDataCount(); - assertEquals(8, count1); - for (int i = 0; i < 18; i++) { - Operation operation = pipeOpManager.getOperation(sgName1, i, 8); - assertEquals(sgName1, operation.getStorageGroup()); - } - - // == test batch data in TsFile1 + .mods - Operation operation = pipeOpManager.getOperation(sgName1, 0, 18); - assertEquals(8, operation.getDataCount()); - - InsertOperation insertOperation = (InsertOperation) operation; - int i = 0; - assertEquals(1617206403001L, insertOperation.getDataList().get(i).right.get(0).getTimestamp()); - assertEquals("1.1", insertOperation.getDataList().get(i).right.get(0).getValue().toString()); - - i = 1; - assertEquals(1617206403001L, insertOperation.getDataList().get(i).right.get(0).getTimestamp()); - assertEquals("12", insertOperation.getDataList().get(i).right.get(0).getValue().toString()); - - i = 2; - assertEquals(1617206403001L, insertOperation.getDataList().get(i).right.get(0).getTimestamp()); - assertEquals("13", insertOperation.getDataList().get(i).right.get(0).getValue().toString()); - - i = 3; - assertEquals(1, insertOperation.getDataList().get(i).right.size()); - assertNull(insertOperation.getDataList().get(i).right.get(0)); - - i = 4; - assertEquals(1, insertOperation.getDataList().get(i).right.size()); - assertNull(insertOperation.getDataList().get(i).right.get(0)); - - i = 5; - assertEquals(1617206403003L, insertOperation.getDataList().get(i).right.get(0).getTimestamp()); - assertEquals("32", insertOperation.getDataList().get(i).right.get(0).getValue().toString()); - - i = 6; - assertEquals(1, insertOperation.getDataList().get(i).right.size()); - assertNull(insertOperation.getDataList().get(i).right.get(0)); - - i = 7; - assertEquals(1617206403004L, insertOperation.getDataList().get(i).right.get(0).getTimestamp()); - assertEquals("42", insertOperation.getDataList().get(i).right.get(0).getValue().toString()); - - // == test batch data in TsFile2 + mods - operation = pipeOpManager.getOperation(sgName1, 8, 18); - assertEquals(10, operation.getDataCount()); - - insertOperation = (InsertOperation) operation; - i = 0; - assertEquals( - "root2.lemming.device1.sensor1", insertOperation.getDataList().get(i).left.toString()); - assertEquals(1, insertOperation.getDataList().get(i).right.size()); - assertNull(insertOperation.getDataList().get(i).right.get(0)); - - i = 1; - assertEquals( - "root2.lemming.device1.sensor2", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(1617206403001L, insertOperation.getDataList().get(i).right.get(0).getTimestamp()); - assertEquals("12", insertOperation.getDataList().get(i).right.get(0).getValue().toString()); - - i = 2; - assertEquals( - "root2.lemming.device1.sensor3", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(1617206403001L, insertOperation.getDataList().get(i).right.get(0).getTimestamp()); - assertEquals("13", insertOperation.getDataList().get(i).right.get(0).getValue().toString()); - - i = 3; - assertEquals( - "root2.lemming.device2.sensor2", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(1, insertOperation.getDataList().get(i).right.size()); - assertNull(insertOperation.getDataList().get(i).right.get(0)); - - i = 4; - assertEquals( - "root2.lemming.device3.sensor1", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(1617206403003L, insertOperation.getDataList().get(i).right.get(0).getTimestamp()); - assertEquals("33.1", insertOperation.getDataList().get(i).right.get(0).getValue().toString()); - - i = 5; - assertEquals( - "root2.lemming.device3.sensor2", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(1617206403003L, insertOperation.getDataList().get(i).right.get(0).getTimestamp()); - assertEquals("332", insertOperation.getDataList().get(i).right.get(0).getValue().toString()); - - i = 6; - assertEquals( - "root2.lemming.device3.sensor3", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(1617206403003L, insertOperation.getDataList().get(i).right.get(0).getTimestamp()); - assertEquals("333", insertOperation.getDataList().get(i).right.get(0).getValue().toString()); - - i = 7; - assertEquals( - "root2.lemming.device3.sensor1", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(1617206403004L, insertOperation.getDataList().get(i).right.get(0).getTimestamp()); - assertEquals("44.1", insertOperation.getDataList().get(i).right.get(0).getValue().toString()); - - i = 8; - assertEquals( - "root2.lemming.device3.sensor2", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(1617206403004L, insertOperation.getDataList().get(i).right.get(0).getTimestamp()); - assertEquals("442", insertOperation.getDataList().get(i).right.get(0).getValue().toString()); - - i = 9; - assertEquals( - "root2.lemming.device3.sensor3", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(1617206403004L, insertOperation.getDataList().get(i).right.get(0).getTimestamp()); - assertEquals("443", insertOperation.getDataList().get(i).right.get(0).getValue().toString()); - } - - @Test(timeout = 10_000L) - public void testOpManager_deletion() throws IOException, IllegalPathException { - PipeOpManager pipeOpManager = new PipeOpManager(null); - - String sgName1 = "root1"; - String sgName2 = "root2"; - - TsFileOpBlock tsFileOpBlock1 = new TsFileOpBlock(sgName1, seqTsFileName1, seqModsFileName1, 1); - pipeOpManager.appendOpBlock(sgName1, tsFileOpBlock1); - TsFileOpBlock tsFileOpBlock2 = - new TsFileOpBlock(sgName1, unSeqTsFileName1, unSeqModsFileName1, 2); - pipeOpManager.appendOpBlock(sgName2, tsFileOpBlock2); - - pipeOpManager.commitData(sgName1, tsFileOpBlock1.getDataCount() - 1); - pipeOpManager.commitData(sgName2, tsFileOpBlock2.getDataCount() - 1); - assertTrue(pipeOpManager.isEmpty()); - - PartialPath partialPath = new PartialPath("root.a.**"); - DeletionOpBlock deletionOpBlock = new DeletionOpBlock("root.a", partialPath, -100, 200, 5); - - // == test pipeOpManager.appendOpBlock etc. - pipeOpManager.appendOpBlock(sgName1, deletionOpBlock); - - long beginIndex = pipeOpManager.getFirstAvailableIndex(sgName1); - assertEquals(8, beginIndex); - - Operation operation = pipeOpManager.getOperation(sgName1, beginIndex, 10); - assertEquals(beginIndex, operation.getStartIndex()); - assertEquals(1, operation.getDataCount()); - - DeleteOperation deleteOperation = (DeleteOperation) operation; - assertNotNull(deleteOperation); - - assertEquals(partialPath, deleteOperation.getDeletePath()); - assertEquals(-100, deleteOperation.getStartTime()); - assertEquals(200, deleteOperation.getEndTime()); - - // == test pipeOpManager.appendDeletionOpBlock etc. - String sgName = "root.a"; - Deletion deletion = new Deletion(partialPath, 0, -200, 400); - pipeOpManager.appendDeletionOpBlock(sgName, deletion, 4); - - beginIndex = pipeOpManager.getFirstAvailableIndex(sgName); - assertEquals(0, beginIndex); - - operation = pipeOpManager.getOperation(sgName, beginIndex, 10); - assertEquals(beginIndex, operation.getStartIndex()); - assertEquals(1, operation.getDataCount()); - - deleteOperation = (DeleteOperation) operation; - assertNotNull(deleteOperation); - - assertEquals(partialPath, deleteOperation.getDeletePath()); - assertEquals(-200, deleteOperation.getStartTime()); - assertEquals(400, deleteOperation.getEndTime()); - } - - // == test big seq files and 1 chunk contains multiple pages. - private static void createBigSeqTsfile(String tsfilePath, int seed, int maxPointNumInPage) - throws Exception { - File file = new File(tsfilePath); - if (file.exists()) { - file.delete(); - } - - if (maxPointNumInPage > 0) { - TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(maxPointNumInPage); - } - - Schema schema = new Schema(); - schema.extendTemplate( - DEFAULT_TEMPLATE, new MeasurementSchema("sensor1", TSDataType.FLOAT, TSEncoding.RLE)); - schema.extendTemplate( - DEFAULT_TEMPLATE, new MeasurementSchema("sensor2", TSDataType.INT32, TSEncoding.RLE)); - schema.extendTemplate( - DEFAULT_TEMPLATE, new MeasurementSchema("sensor3", TSDataType.INT64, TSEncoding.RLE)); - - TsFileWriter tsFileWriter = new TsFileWriter(file, schema); - - long ts = 1617206403000L; - TSRecord tsRecord; - for (int i = 0; i < 1000; i++) { - for (int j = 0; j < 1000; j++) { - int k = seed * 100000000 + i * 1000 + j; - ts++; - tsRecord = new TSRecord(ts, "root.lemming.device1"); - tsRecord.addTuple(new FloatDataPoint("sensor1", k * 1.3f)); - tsRecord.addTuple(new IntDataPoint("sensor2", k)); - tsRecord.addTuple(new LongDataPoint("sensor3", k * k)); - tsFileWriter.write(tsRecord); - } - tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once - - for (int j = 0; j < 1000; j++) { - int k = seed * 100000000 + i * 2000 + j; - ts++; - tsRecord = new TSRecord(ts, "root.lemming.device2"); - tsRecord.addTuple(new IntDataPoint("sensor2", k)); - tsFileWriter.write(tsRecord); - } - tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once - - for (int j = 0; j < 1000; j++) { - int k = seed * 100000000 + i * 3000 + j; - ts++; - tsRecord = new TSRecord(ts, "root.lemming.device3"); - tsRecord.addTuple(new FloatDataPoint("sensor1", k * 1.3f)); - tsRecord.addTuple(new IntDataPoint("sensor2", k)); - tsFileWriter.write(tsRecord); - } - tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once - } - // close TsFile - tsFileWriter.close(); - } - - @Test - public void testManyBigTsfiles() throws IOException { - PipeOpManager pipeOpManager = new PipeOpManager(null); - - String sgName1 = "root.test1"; - pipeOpManager.appendOpBlock(sgName1, new TsFileOpBlock(sgName1, bigSeqTsFileName1, 1)); - pipeOpManager.appendOpBlock(sgName1, new TsFileOpBlock(sgName1, bigSeqTsFileName2, 2)); - pipeOpManager.appendOpBlock(sgName1, new TsFileOpBlock(sgName1, bigSeqTsFileName3, 3)); - - long idx = pipeOpManager.getFirstAvailableIndex(sgName1); - long sum = 0; - Operation operation; - int bulkSize = 1000; - while (true) { - operation = pipeOpManager.getOperation(sgName1, idx, bulkSize); - if (operation == null) { - System.out.println("operation == null, idx=" + idx + " length=" + bulkSize); - assertEquals(18000000, idx); - break; - } - long count = operation.getDataCount(); - // System.out.println("idx=" + idx + ", " + "count=" + count); - if (count == 0) { - break; - } - idx += count; - sum += count; - pipeOpManager.commitData(sgName1, idx - 1); - } - // System.out.println("data sum = " + sum); - assertEquals(18000000, sum); - - bulkSize = 777; - while (true) { - operation = pipeOpManager.getOperation(sgName1, idx, bulkSize); - if (operation == null) { - System.out.println("operation == null, idx=" + idx + " length=" + bulkSize); - assertEquals(18000000, idx); - break; - } - long count = operation.getDataCount(); - // System.out.println("idx=" + idx + ", " + "count=" + count); - if (count == 0) { - break; - } - idx += count; - sum += count; - pipeOpManager.commitData(sgName1, idx - 1); - } - // System.out.println("data sum = " + sum); - assertEquals(18000000, sum); - } -} diff --git a/server/src/test/java/org/apache/iotdb/db/sync/datasource/TsFileOpBlockTest.java b/server/src/test/java/org/apache/iotdb/db/sync/datasource/TsFileOpBlockTest.java deleted file mode 100644 index 34d7ead1e8f..00000000000 --- a/server/src/test/java/org/apache/iotdb/db/sync/datasource/TsFileOpBlockTest.java +++ /dev/null @@ -1,1123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.sync.datasource; - -import org.apache.iotdb.commons.exception.IllegalPathException; -import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.db.engine.modification.Deletion; -import org.apache.iotdb.db.engine.modification.Modification; -import org.apache.iotdb.db.engine.modification.ModificationFile; -import org.apache.iotdb.db.sync.externalpipe.operation.InsertOperation; -import org.apache.iotdb.db.sync.externalpipe.operation.Operation; -import org.apache.iotdb.tsfile.common.conf.TSFileConfig; -import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; -import org.apache.iotdb.tsfile.read.common.Path; -import org.apache.iotdb.tsfile.utils.MeasurementGroup; -import org.apache.iotdb.tsfile.write.TsFileWriter; -import org.apache.iotdb.tsfile.write.record.TSRecord; -import org.apache.iotdb.tsfile.write.record.Tablet; -import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; -import org.apache.iotdb.tsfile.write.record.datapoint.FloatDataPoint; -import org.apache.iotdb.tsfile.write.record.datapoint.IntDataPoint; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; -import org.apache.iotdb.tsfile.write.schema.Schema; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; - -public class TsFileOpBlockTest { - public static final String TMP_DIR = "target" + File.separator + "TsFileOpBlockTest"; - private final String tsFileName1 = TMP_DIR + File.separator + "test1.tsfile"; - private final String tsFileName2 = TMP_DIR + File.separator + "test2.tsfile"; - private final String modsFileName2 = tsFileName2 + ".mods"; - private final String tsFileName3 = TMP_DIR + File.separator + "test3.tsfile"; - private final String modsFileName3 = tsFileName3 + ".mods"; - - private final String alignedTsFileName1 = TMP_DIR + File.separator + "aligned1.tsfile"; - private final String alignedModsFileName1 = alignedTsFileName1 + ".mods"; - - private final String alignedTsFileName2 = TMP_DIR + File.separator + "aligned2.tsfile"; - - public final List<String> fileNameList = new LinkedList<>(); - - public final String DEFAULT_TEMPLATE = "template"; - - // == for Time-Aligned trunk test == - String nonAlignedDevice = "root.sg0.d0"; - String alignedDevice = "root.sg0.d1"; - String sensorPrefix = "sensor_"; - String alignedSensorPrefix = "alignedSensor_"; - int rowNumPerSensor = 20; // number of rows for every Sensor - int sensorNum = 13; - - int oldMaxNumberOfPointsInPage; - - @Before - public void prepareTestData() throws Exception { - oldMaxNumberOfPointsInPage = - TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage(); - - createTsfile1(tsFileName1); - fileNameList.add(tsFileName1); - - createTsfile2(tsFileName2); - fileNameList.add(tsFileName2); - creatModsFile2(modsFileName2); - fileNameList.add(modsFileName2); - - createTsfile2(tsFileName3); - fileNameList.add(tsFileName3); - creatModsFile3(modsFileName3); - fileNameList.add(modsFileName3); - - createAlignedTsfile1(alignedTsFileName1); - fileNameList.add(alignedTsFileName1); - - creatAlignedModsFile1(alignedModsFileName1); - fileNameList.add(alignedModsFileName1); - } - - @After - public void removeTestData() throws Exception { - TSFileDescriptor.getInstance() - .getConfig() - .setMaxNumberOfPointsInPage(oldMaxNumberOfPointsInPage); - - for (String fileName : fileNameList) { - File file = new File(fileName); - if (file.exists()) { - file.delete(); - } - } - } - - private void createTsfile1(String tsfilePath) throws Exception { - File file = new File(tsfilePath); - if (file.exists()) { - file.delete(); - } - - Schema schema = new Schema(); - schema.extendTemplate( - DEFAULT_TEMPLATE, new MeasurementSchema("sensor1", TSDataType.FLOAT, TSEncoding.RLE)); - schema.extendTemplate( - DEFAULT_TEMPLATE, new MeasurementSchema("sensor2", TSDataType.INT32, TSEncoding.TS_2DIFF)); - schema.extendTemplate( - DEFAULT_TEMPLATE, new MeasurementSchema("sensor3", TSDataType.INT32, TSEncoding.TS_2DIFF)); - - TsFileWriter tsFileWriter = new TsFileWriter(file, schema); - - // construct TSRecord - TSRecord tsRecord = new TSRecord(1617206403001L, "root.lemming.device1"); - DataPoint dPoint1 = new FloatDataPoint("sensor1", 1.1f); - DataPoint dPoint2 = new IntDataPoint("sensor2", 12); - DataPoint dPoint3 = new IntDataPoint("sensor3", 13); - tsRecord.addTuple(dPoint1); - tsRecord.addTuple(dPoint2); - tsRecord.addTuple(dPoint3); - tsFileWriter.write(tsRecord); - tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once - - tsRecord = new TSRecord(1617206403002L, "root.lemming.device2"); - dPoint2 = new IntDataPoint("sensor2", 22); - tsRecord.addTuple(dPoint2); - tsFileWriter.write(tsRecord); - tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once - - tsRecord = new TSRecord(1617206403003L, "root.lemming.device3"); - dPoint1 = new FloatDataPoint("sensor1", 3.1f); - dPoint2 = new IntDataPoint("sensor2", 32); - tsRecord.addTuple(dPoint1); - tsRecord.addTuple(dPoint2); - tsFileWriter.write(tsRecord); - tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once - - // close TsFile - tsFileWriter.close(); - } - - @Test(timeout = 10_000L) - public void testOpBlock() throws IOException { - TsFileOpBlock tsFileOpBlock = new TsFileOpBlock("root", tsFileName1, 0); - - assertEquals("root", tsFileOpBlock.getStorageGroup()); - assertEquals(0, tsFileOpBlock.getBeginIndex()); - assertEquals(6, tsFileOpBlock.getDataCount()); - assertEquals(6, tsFileOpBlock.getNextIndex()); - - tsFileOpBlock.setBeginIndex(2); - assertEquals(8, tsFileOpBlock.getNextIndex()); - - Operation operation = null; - for (int i = 0; i < tsFileOpBlock.getDataCount(); i++) { - operation = tsFileOpBlock.getOperation(i + 2, 1); - assertEquals("root", operation.getStorageGroup()); - assertEquals(1, operation.getDataCount()); - assertEquals(i + 2, operation.getStartIndex()); - assertEquals(i + 3, operation.getEndIndex()); - - assertEquals(true, operation instanceof InsertOperation); - InsertOperation insertOperation = (InsertOperation) operation; - assertEquals(1, insertOperation.getDataList().size()); - // System.out.println("=== data" + i + ": " + operation + ((InsertOperation) - // operation).getDataList()); - } - - InsertOperation insertOperation = (InsertOperation) operation; - - int k = 0; - assertEquals( - "root.lemming.device3.sensor2", insertOperation.getDataList().get(k).left.getFullPath()); - assertEquals(1617206403003L, insertOperation.getDataList().get(k).right.get(0).getTimestamp()); - assertEquals("32", insertOperation.getDataList().get(k).right.get(0).getValue().toString()); - - for (int i = 0; i <= tsFileOpBlock.getDataCount() - 3; i++) { - operation = tsFileOpBlock.getOperation(i + 2, 3); - assertEquals("root", operation.getStorageGroup()); - assertEquals(3, operation.getDataCount()); - assertEquals(i + 2, operation.getStartIndex()); - assertEquals(i + 5, operation.getEndIndex()); - // System.out.println("=== data" + i + ": " + operation); - } - - for (long i = 6; i < 8; i++) { - operation = tsFileOpBlock.getOperation(i, 3); - assertEquals("root", operation.getStorageGroup()); - assertEquals(8 - i, operation.getDataCount()); - assertEquals(i, operation.getStartIndex()); - assertEquals(8, operation.getEndIndex()); - // System.out.println("=== data" + i + ": " + operation); - } - - tsFileOpBlock.close(); - } - - // == test TsFile + .mods - - private void createTsfile2(String tsfilePath) throws Exception { - File file = new File(tsfilePath); - if (file.exists()) { - file.delete(); - } - - Schema schema = new Schema(); - schema.extendTemplate( - DEFAULT_TEMPLATE, new MeasurementSchema("sensor1", TSDataType.FLOAT, TSEncoding.RLE)); - schema.extendTemplate( - DEFAULT_TEMPLATE, new MeasurementSchema("sensor2", TSDataType.INT32, TSEncoding.TS_2DIFF)); - schema.extendTemplate( - DEFAULT_TEMPLATE, new MeasurementSchema("sensor3", TSDataType.INT32, TSEncoding.TS_2DIFF)); - - TsFileWriter tsFileWriter = new TsFileWriter(file, schema); - - // construct TSRecord - TSRecord tsRecord = new TSRecord(1617206403001L, "root.lemming.device1"); - DataPoint dPoint1 = new FloatDataPoint("sensor1", 1.1f); - DataPoint dPoint2 = new IntDataPoint("sensor2", 12); - DataPoint dPoint3 = new IntDataPoint("sensor3", 13); - tsRecord.addTuple(dPoint1); - tsRecord.addTuple(dPoint2); - tsRecord.addTuple(dPoint3); - tsFileWriter.write(tsRecord); - tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once - - tsRecord = new TSRecord(1617206403002L, "root.lemming.device2"); - dPoint2 = new IntDataPoint("sensor2", 22); - tsRecord.addTuple(dPoint2); - tsFileWriter.write(tsRecord); - tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once - - tsRecord = new TSRecord(1617206403003L, "root.lemming.device3"); - dPoint1 = new FloatDataPoint("sensor1", 3.1f); - dPoint2 = new IntDataPoint("sensor2", 32); - tsRecord.addTuple(dPoint1); - tsRecord.addTuple(dPoint2); - tsFileWriter.write(tsRecord); - tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once - - tsRecord = new TSRecord(1617206403004L, "root.lemming.device1"); - dPoint1 = new FloatDataPoint("sensor1", 4.1f); - dPoint2 = new IntDataPoint("sensor2", 42); - dPoint3 = new IntDataPoint("sensor3", 43); - tsRecord.addTuple(dPoint1); - tsRecord.addTuple(dPoint2); - tsRecord.addTuple(dPoint3); - tsFileWriter.write(tsRecord); - tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once - - // close TsFile - tsFileWriter.close(); - } - - private void creatModsFile2(String modsFilePath) throws IllegalPathException { - Modification[] modifications = - new Modification[] { - // new Deletion(new PartialPath(new String[] {"d1", "s2"}), 1, 2), - new Deletion(new PartialPath("root.lemming.device1.sensor1"), 2, 1), - new Deletion(new PartialPath("root.lemming.device1.sensor1"), 3, 2, 5), - new Deletion(new PartialPath("root.lemming.**"), 11, 1, Long.MAX_VALUE) - }; - - try (ModificationFile mFile = new ModificationFile(modsFilePath)) { - for (Modification mod : modifications) { - mFile.write(mod); - } - } catch (IOException e) { - fail(e.getMessage()); - } finally {; - } - } - - private void creatModsFile3(String modsFilePath) throws IllegalPathException { - Modification[] modifications = - new Modification[] { - new Deletion(new PartialPath("root.lemming.device1.sensor1"), 2, 1617206403001L), - new Deletion(new PartialPath("root.lemming.device2.*"), 3, 2, Long.MAX_VALUE), - }; - - try (ModificationFile mFile = new ModificationFile(modsFilePath)) { - for (Modification mod : modifications) { - mFile.write(mod); - } - } catch (IOException e) { - fail(e.getMessage()); - } finally {; - } - } - - @Test(timeout = 10_000L) - public void testOpBlockMods2() throws IOException { - - List<Modification> modificationList = null; - try (ModificationFile mFile = new ModificationFile(modsFileName2)) { - modificationList = (List<Modification>) mFile.getModifications(); - } - // System.out.println("=== data: " + modificationList); - - TsFileOpBlock tsFileOpBlock = new TsFileOpBlock("root", tsFileName2, modsFileName2, 0); - - assertEquals("root", tsFileOpBlock.getStorageGroup()); - assertEquals(0, tsFileOpBlock.getBeginIndex()); - assertEquals(9, tsFileOpBlock.getDataCount()); - assertEquals(9, tsFileOpBlock.getNextIndex()); - - // == check setBeginIndex() - tsFileOpBlock.setBeginIndex(55); - assertEquals(64, tsFileOpBlock.getNextIndex()); - - // == check result before and after calling tsFileOpBlock.getOperation() - assertNull(tsFileOpBlock.getFullPathToDeletionMap()); - assertNull(tsFileOpBlock.getModificationList()); - Operation operation = tsFileOpBlock.getOperation(55, 1); - ; - assertNotNull(tsFileOpBlock.getFullPathToDeletionMap()); - assertEquals(modificationList, tsFileOpBlock.getModificationList()); - assertEquals(9, tsFileOpBlock.getDataCount()); - - // == check tsFileOpBlock.getOperation() - for (int i = 0; i < tsFileOpBlock.getDataCount(); i++) { - operation = tsFileOpBlock.getOperation(i + 55, 1); - assertEquals("root", operation.getStorageGroup()); - assertEquals(1, operation.getDataCount()); - assertEquals(i + 55, operation.getStartIndex()); - assertEquals(i + 56, operation.getEndIndex()); - - assertEquals(true, operation instanceof InsertOperation); - InsertOperation insertOperation = (InsertOperation) operation; - assertEquals(1, insertOperation.getDataList().size()); - // System.out.println("=== data" + i + ": " + operation + ((InsertOperation) - // operation).getDataList()); - } - - // == check deleted data caused by .mods file - operation = tsFileOpBlock.getOperation(55, 15); - assertEquals(9, operation.getDataCount()); - InsertOperation insertOperation = (InsertOperation) operation; - - int i = 0; - assertEquals( - "root.lemming.device1.sensor1", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(null, insertOperation.getDataList().get(i).right.get(0)); - - i = 1; - assertEquals( - "root.lemming.device1.sensor2", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(null, insertOperation.getDataList().get(i).right.get(0)); - - i = 2; - assertEquals( - "root.lemming.device1.sensor3", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(null, insertOperation.getDataList().get(i).right.get(0)); - - i = 3; - assertEquals( - "root.lemming.device2.sensor2", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(null, insertOperation.getDataList().get(i).right.get(0)); - - i = 4; - assertEquals( - "root.lemming.device3.sensor1", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(null, insertOperation.getDataList().get(i).right.get(0)); - - i = 5; - assertEquals( - "root.lemming.device3.sensor2", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(null, insertOperation.getDataList().get(i).right.get(0)); - - // assertEquals(1617206403003L, - // insertOperation.getDataList().get(i).right.get(0).getTimestamp()); - // assertEquals("32", insertOperation.getDataList().get(i).right.get(0).getValue().toString()); - - tsFileOpBlock.close(); - } - - @Test(timeout = 10_000L) - public void testOpBlockMods3() throws IOException { - - List<Modification> modificationList = null; - try (ModificationFile mFile = new ModificationFile(modsFileName3)) { - modificationList = (List<Modification>) mFile.getModifications(); - } - - TsFileOpBlock tsFileOpBlock = new TsFileOpBlock("root", tsFileName2, modsFileName3, 0); - - assertEquals("root", tsFileOpBlock.getStorageGroup()); - assertEquals(0, tsFileOpBlock.getBeginIndex()); - assertEquals(9, tsFileOpBlock.getDataCount()); - assertEquals(9, tsFileOpBlock.getNextIndex()); - - // == check setBeginIndex() - tsFileOpBlock.setBeginIndex(55); - assertEquals(64, tsFileOpBlock.getNextIndex()); - - // == check result before and after calling tsFileOpBlock.getOperation() - assertNull(tsFileOpBlock.getFullPathToDeletionMap()); - assertNull(tsFileOpBlock.getModificationList()); - Operation operation = tsFileOpBlock.getOperation(55, 1); - - assertNotNull(tsFileOpBlock.getFullPathToDeletionMap()); - assertEquals(modificationList, tsFileOpBlock.getModificationList()); - assertEquals(9, tsFileOpBlock.getDataCount()); - - // == check tsFileOpBlock.getOperation() - for (int i = 0; i < tsFileOpBlock.getDataCount(); i++) { - operation = tsFileOpBlock.getOperation(i + 55, 1); - assertEquals("root", operation.getStorageGroup()); - assertEquals(1, operation.getDataCount()); - assertEquals(i + 55, operation.getStartIndex()); - assertEquals(i + 56, operation.getEndIndex()); - - assertEquals(true, operation instanceof InsertOperation); - InsertOperation insertOperation = (InsertOperation) operation; - assertEquals(1, insertOperation.getDataList().size()); - // System.out.println("=== data" + i + ": " + operation + ((InsertOperation) - // operation).getDataList()); - } - - // == check deleted data caused by .mods file - operation = tsFileOpBlock.getOperation(55, 20); - assertEquals(9, operation.getDataCount()); - InsertOperation insertOperation = (InsertOperation) operation; - - int i = 0; - assertEquals( - "root.lemming.device1.sensor1", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(null, insertOperation.getDataList().get(i).right.get(0)); - - i = 1; - assertEquals( - "root.lemming.device1.sensor2", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(1617206403001L, insertOperation.getDataList().get(i).right.get(0).getTimestamp()); - assertEquals("12", insertOperation.getDataList().get(i).right.get(0).getValue().toString()); - - i = 2; - assertEquals( - "root.lemming.device1.sensor3", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(1617206403001L, insertOperation.getDataList().get(i).right.get(0).getTimestamp()); - assertEquals("13", insertOperation.getDataList().get(i).right.get(0).getValue().toString()); - - i = 3; - assertEquals( - "root.lemming.device2.sensor2", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(null, insertOperation.getDataList().get(i).right.get(0)); - - i = 4; - assertEquals( - "root.lemming.device3.sensor1", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(1617206403003L, insertOperation.getDataList().get(i).right.get(0).getTimestamp()); - assertEquals("3.1", insertOperation.getDataList().get(i).right.get(0).getValue().toString()); - - i = 5; - assertEquals( - "root.lemming.device3.sensor2", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(1617206403003L, insertOperation.getDataList().get(i).right.get(0).getTimestamp()); - assertEquals("32", insertOperation.getDataList().get(i).right.get(0).getValue().toString()); - - i = 6; - assertEquals( - "root.lemming.device1.sensor1", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(1617206403004L, insertOperation.getDataList().get(i).right.get(0).getTimestamp()); - assertEquals("4.1", insertOperation.getDataList().get(i).right.get(0).getValue().toString()); - - i = 7; - assertEquals( - "root.lemming.device1.sensor2", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(1617206403004L, insertOperation.getDataList().get(i).right.get(0).getTimestamp()); - assertEquals("42", insertOperation.getDataList().get(i).right.get(0).getValue().toString()); - - i = 8; - assertEquals( - "root.lemming.device1.sensor3", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(1617206403004L, insertOperation.getDataList().get(i).right.get(0).getTimestamp()); - assertEquals("43", insertOperation.getDataList().get(i).right.get(0).getValue().toString()); - - // == test getting old data and page cache - operation = tsFileOpBlock.getOperation(59, 20); - assertEquals(5, operation.getDataCount()); - insertOperation = (InsertOperation) operation; - - i = 0; - assertEquals( - "root.lemming.device3.sensor1", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(1617206403003L, insertOperation.getDataList().get(i).right.get(0).getTimestamp()); - assertEquals("3.1", insertOperation.getDataList().get(i).right.get(0).getValue().toString()); - - i = 1; - assertEquals( - "root.lemming.device3.sensor2", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(1617206403003L, insertOperation.getDataList().get(i).right.get(0).getTimestamp()); - assertEquals("32", insertOperation.getDataList().get(i).right.get(0).getValue().toString()); - - i = 2; - assertEquals( - "root.lemming.device1.sensor1", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(1617206403004L, insertOperation.getDataList().get(i).right.get(0).getTimestamp()); - assertEquals("4.1", insertOperation.getDataList().get(i).right.get(0).getValue().toString()); - - i = 3; - assertEquals( - "root.lemming.device1.sensor2", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(1617206403004L, insertOperation.getDataList().get(i).right.get(0).getTimestamp()); - assertEquals("42", insertOperation.getDataList().get(i).right.get(0).getValue().toString()); - - i = 4; - assertEquals( - "root.lemming.device1.sensor3", insertOperation.getDataList().get(i).left.getFullPath()); - assertEquals(1617206403004L, insertOperation.getDataList().get(i).right.get(0).getTimestamp()); - assertEquals("43", insertOperation.getDataList().get(i).right.get(0).getValue().toString()); - - tsFileOpBlock.close(); - } - - // == Test Time-Aligned Tsfile - private void createAlignedTsfile1(String tsfilePath) throws Exception { - File file = new File(tsfilePath); - if (file.exists() && !file.delete()) { - throw new RuntimeException("Can not delete: " + file.getAbsolutePath()); - } - - try { - Schema schema = new Schema(); - - // == add NonAligned measurements into file schema - List<MeasurementSchema> measurementSchemas = new ArrayList<>(); - for (int i = 0; i < sensorNum; i++) { - MeasurementSchema measurementSchema1 = - new MeasurementSchema(sensorPrefix + i, TSDataType.INT64, TSEncoding.TS_2DIFF); - measurementSchemas.add(measurementSchema1); - schema.registerTimeseries(new Path(nonAlignedDevice), measurementSchema1); - } - - // == add aligned measurements into file schema - List<MeasurementSchema> alignedMeasurementSchemas = new ArrayList<>(); - for (int i = 0; i < sensorNum; i++) { - MeasurementSchema measurementSchema2 = - new MeasurementSchema(alignedSensorPrefix + i, TSDataType.INT64, TSEncoding.RLE); - alignedMeasurementSchemas.add(measurementSchema2); - } - MeasurementGroup group = new MeasurementGroup(true, alignedMeasurementSchemas); - schema.registerMeasurementGroup(new Path(alignedDevice), group); - - TSFileConfig tsFileConfig = TSFileDescriptor.getInstance().getConfig(); - tsFileConfig.setMaxNumberOfPointsInPage(5); - try (TsFileWriter tsFileWriter = new TsFileWriter(file, schema, tsFileConfig)) { - // == 1) construct NonAligned tablet and write to TsFile - Tablet tablet = new Tablet(nonAlignedDevice, measurementSchemas); - long[] timestamps = tablet.timestamps; - Object[] values = tablet.values; - long timestamp = 1617206401000L; - long value = 1000000L; - for (int r = 0; r < rowNumPerSensor; r++) { - int row = tablet.rowSize++; - timestamps[row] = timestamp++; - for (int i = 0; i < sensorNum; i++) { - long[] sensor = (long[]) values[i]; - sensor[row] = value + i * 1000 + r; - } - // write Tablet to TsFile - if (tablet.rowSize == tablet.getMaxRowNumber()) { - tsFileWriter.write(tablet); - tablet.reset(); - } - } - // write Tablet to TsFile - if (tablet.rowSize != 0) { - tsFileWriter.write(tablet); - tablet.reset(); - } - - // == 2) construct aligned tablet and write to TsFile - tablet = new Tablet(alignedDevice, alignedMeasurementSchemas); - timestamps = tablet.timestamps; - values = tablet.values; - timestamp = 1617206402000L; - value = 2000000L; - for (int r = 0; r < rowNumPerSensor; r++) { - int row = tablet.rowSize++; - timestamps[row] = timestamp++; - for (int i = 0; i < sensorNum; i++) { - if ((i + r) == 4) { - tablet.addValue(alignedSensorPrefix + i, r, null); - continue; - } - long[] sensor = (long[]) values[i]; - sensor[row] = value + i * 1000 + r; - } - // write Tablet to TsFile - if (tablet.rowSize == tablet.getMaxRowNumber()) { - tsFileWriter.writeAligned(tablet); - tablet.reset(); - } - } - // write Tablet to TsFile - if (tablet.rowSize != 0) { - tsFileWriter.writeAligned(tablet); - tablet.reset(); - } - - tsFileWriter.flushAllChunkGroups(); - - // == 3) add NonAligned measurements into file schema - tablet = new Tablet(nonAlignedDevice, measurementSchemas); - timestamps = tablet.timestamps; - values = tablet.values; - timestamp = 1617206403000L; - value = 3000000L; - for (int r = 0; r < rowNumPerSensor; r++) { - int row = tablet.rowSize++; - timestamps[row] = timestamp++; - for (int i = 0; i < sensorNum; i++) { - long[] sensor = (long[]) values[i]; - sensor[row] = value + i * 1000 + r; - } - // write Tablet to TsFile - if (tablet.rowSize == tablet.getMaxRowNumber()) { - tsFileWriter.write(tablet); - tablet.reset(); - } - } - // write Tablet to TsFile - if (tablet.rowSize != 0) { - tsFileWriter.write(tablet); - tablet.reset(); - } - - // == 4) construct aligned tablet and write to TsFile - tablet = new Tablet(alignedDevice, alignedMeasurementSchemas); - timestamps = tablet.timestamps; - values = tablet.values; - timestamp = 1617206404000L; - value = 4000000L; - for (int r = 0; r < rowNumPerSensor; r++) { - int row = tablet.rowSize++; - timestamps[row] = timestamp++; - for (int i = 0; i < sensorNum; i++) { - if (i == r) { - tablet.addValue(alignedSensorPrefix + i, r, null); - continue; - } - long[] sensor = (long[]) values[i]; - sensor[row] = value + i * 1000 + r; - } - // write Tablet to TsFile - if (tablet.rowSize == tablet.getMaxRowNumber()) { - tsFileWriter.writeAligned(tablet); - tablet.reset(); - } - } - // write Tablet to TsFile - if (tablet.rowSize != 0) { - tsFileWriter.writeAligned(tablet); - tablet.reset(); - } - } - } catch (Exception e) { - throw new Exception("meet error in TsFileWrite with tablet", e); - } - } - - @Test(timeout = 10_000L) - public void testOpBlockTimeAligned1() throws IOException { - TsFileOpBlock tsFileOpBlock = new TsFileOpBlock("root", alignedTsFileName1, null, 0); - - int allDataCount = rowNumPerSensor * sensorNum * 4; - assertEquals("root", tsFileOpBlock.getStorageGroup()); - assertEquals(0, tsFileOpBlock.getBeginIndex()); - assertEquals(allDataCount, tsFileOpBlock.getDataCount()); - assertEquals(allDataCount, tsFileOpBlock.getNextIndex()); - - // == check setBeginIndex() - tsFileOpBlock.setBeginIndex(55); - assertEquals(55 + allDataCount, tsFileOpBlock.getNextIndex()); - - // == check result before and after calling tsFileOpBlock.getOperation() - assertNull(tsFileOpBlock.getFullPathToDeletionMap()); - assertNull(tsFileOpBlock.getModificationList()); - Operation operation = tsFileOpBlock.getOperation(55, 1); - assertEquals(1, operation.getDataCount()); - - assertEquals(allDataCount, tsFileOpBlock.getDataCount()); - - // == check tsFileOpBlock.getOperation() - for (int i = 0; i < tsFileOpBlock.getDataCount(); i++) { - operation = tsFileOpBlock.getOperation(i + 55, 1); - assertEquals("root", operation.getStorageGroup()); - assertEquals(1, operation.getDataCount()); - assertEquals(i + 55, operation.getStartIndex()); - assertEquals(i + 56, operation.getEndIndex()); - - assertEquals(true, operation instanceof InsertOperation); - InsertOperation insertOperation = (InsertOperation) operation; - assertEquals(1, insertOperation.getDataList().size()); - } - - // == check operation's DataCount - operation = tsFileOpBlock.getOperation(55, 45); - assertEquals(45, operation.getDataCount()); - operation = tsFileOpBlock.getOperation(55, 5000); - assertEquals(allDataCount, operation.getDataCount()); - - InsertOperation insertOperation = (InsertOperation) operation; - assertEquals(sensorNum * 4, insertOperation.getDataList().size()); - - // == test 1st ChunkGroup: NonAligned Chunks - for (int i = 0; i < sensorNum; i++) { - assertEquals( - nonAlignedDevice + "." + sensorPrefix + i, - insertOperation.getDataList().get(i).left.getFullPath()); - for (int j = 0; j < rowNumPerSensor; j++) { - assertEquals( - 1617206401000L + j, insertOperation.getDataList().get(i).right.get(j).getTimestamp()); - assertEquals( - Long.toString(1000000L + i * 1000 + j), - insertOperation.getDataList().get(i).right.get(j).getValue().toString()); - } - } - - // == test 2nd ChunkGroup: Aligned Chunks - for (int i = 0; i < sensorNum; i++) { - int listIndex = sensorNum + i; - assertEquals( - alignedDevice + "." + alignedSensorPrefix + i, - insertOperation.getDataList().get(listIndex).left.getFullPath()); - for (int j = 0; j < rowNumPerSensor; j++) { - if ((i + j) == 4) { - assertNull(insertOperation.getDataList().get(listIndex).right.get(j)); - continue; - } - assertEquals( - 1617206402000L + j, - insertOperation.getDataList().get(listIndex).right.get(j).getTimestamp()); - assertEquals( - Long.toString(2000000L + i * 1000 + j), - insertOperation.getDataList().get(listIndex).right.get(j).getValue().toString()); - } - } - - // == test 3rd ChunkGroup: NonAligned Chunks - for (int i = 0; i < sensorNum; i++) { - int listIndex = sensorNum * 2 + i; - assertEquals( - nonAlignedDevice + "." + sensorPrefix + i, - insertOperation.getDataList().get(listIndex).left.getFullPath()); - for (int j = 0; j < rowNumPerSensor; j++) { - assertEquals( - 1617206403000L + j, - insertOperation.getDataList().get(listIndex).right.get(j).getTimestamp()); - assertEquals( - Long.toString(3000000L + i * 1000 + j), - insertOperation.getDataList().get(listIndex).right.get(j).getValue().toString()); - } - } - - // == test 4th ChunkGroup: Aligned Chunks - for (int i = 0; i < sensorNum; i++) { - int listIndex = sensorNum * 3 + i; - assertEquals( - alignedDevice + "." + alignedSensorPrefix + i, - insertOperation.getDataList().get(listIndex).left.getFullPath()); - for (int j = 0; j < rowNumPerSensor; j++) { - if (i == j) { - assertNull(insertOperation.getDataList().get(listIndex).right.get(j)); - continue; - } - assertEquals( - 1617206404000L + j, - insertOperation.getDataList().get(listIndex).right.get(j).getTimestamp()); - assertEquals( - Long.toString(4000000L + i * 1000 + j), - insertOperation.getDataList().get(listIndex).right.get(j).getValue().toString()); - } - } - - tsFileOpBlock.close(); - } - - private void creatAlignedModsFile1(String modsFilePath) throws IllegalPathException { - Modification[] modifications = - new Modification[] { - new Deletion(new PartialPath("root.sg0.d0.sensor_0"), 1, Long.MAX_VALUE), - new Deletion(new PartialPath("root.sg0.d0.sensor_1"), 2, 1617206401001L), - new Deletion(new PartialPath("root.sg0.d1.alignedSensor_0"), 3, 1617206402002L), - new Deletion(new PartialPath("root.sg0.d1.alignedSensor_1"), 4, 1617206402003L), - }; - - try (ModificationFile mFile = new ModificationFile(modsFilePath)) { - for (Modification mod : modifications) { - mFile.write(mod); - } - } catch (IOException e) { - fail(e.getMessage()); - } finally {; - } - } - - @Test(timeout = 10_000L) - public void testOpBlockTimeAlignedMods1() throws IOException { - TsFileOpBlock tsFileOpBlock = - new TsFileOpBlock("root", alignedTsFileName1, alignedModsFileName1, 0); - - int allDataCount = rowNumPerSensor * sensorNum * 4; - assertEquals("root", tsFileOpBlock.getStorageGroup()); - assertEquals(0, tsFileOpBlock.getBeginIndex()); - assertEquals(allDataCount, tsFileOpBlock.getDataCount()); - assertEquals(allDataCount, tsFileOpBlock.getNextIndex()); - - // == check setBeginIndex() - tsFileOpBlock.setBeginIndex(55); - assertEquals(55 + allDataCount, tsFileOpBlock.getNextIndex()); - - // == check result before and after calling tsFileOpBlock.getOperation() - assertNull(tsFileOpBlock.getFullPathToDeletionMap()); - assertNull(tsFileOpBlock.getModificationList()); - Operation operation = tsFileOpBlock.getOperation(55, 1); - assertEquals(1, operation.getDataCount()); - assertNotNull(tsFileOpBlock.getFullPathToDeletionMap()); - assertNotNull(tsFileOpBlock.getModificationList()); - - assertEquals(allDataCount, tsFileOpBlock.getDataCount()); - - // == check tsFileOpBlock.getOperation() - for (int i = 0; i < tsFileOpBlock.getDataCount(); i++) { - operation = tsFileOpBlock.getOperation(i + 55, 1); - assertEquals("root", operation.getStorageGroup()); - assertEquals(1, operation.getDataCount()); - assertEquals(i + 55, operation.getStartIndex()); - assertEquals(i + 56, operation.getEndIndex()); - - assertEquals(true, operation instanceof InsertOperation); - InsertOperation insertOperation = (InsertOperation) operation; - assertEquals(1, insertOperation.getDataList().size()); - } - - // == check operation's DataCount - operation = tsFileOpBlock.getOperation(55, 45); - assertEquals(45, operation.getDataCount()); - operation = tsFileOpBlock.getOperation(55, 5000); - assertEquals(allDataCount, operation.getDataCount()); - - InsertOperation insertOperation = (InsertOperation) operation; - assertEquals(sensorNum * 4, insertOperation.getDataList().size()); - - // == test 1st ChunkGroup: NonAligned Chunks - int sensorIndex = 0, rowIndex = 0; - assertEquals( - nonAlignedDevice + "." + sensorPrefix + sensorIndex, - insertOperation.getDataList().get(sensorIndex).left.getFullPath()); - for (rowIndex = 0; rowIndex < rowNumPerSensor; rowIndex++) { - // System.out.println(String.format("sensor=%s, sensorIndex=%d, rowIndex=%d.", - // insertOperation.getDataList().get(sensorIndex).left.getFullPath(), sensorIndex, rowIndex)); - assertNull(insertOperation.getDataList().get(sensorIndex).right.get(rowIndex)); - } - - sensorIndex = 1; - assertEquals( - nonAlignedDevice + "." + sensorPrefix + sensorIndex, - insertOperation.getDataList().get(sensorIndex).left.getFullPath()); - for (rowIndex = 0; rowIndex < rowNumPerSensor; rowIndex++) { - // System.out.println(String.format("sensor=%s, sensorIndex=%d, rowIndex=%d.", - // insertOperation.getDataList().get(sensorIndex).left, sensorIndex, rowIndex)); - if (rowIndex <= 1) { - assertNull(insertOperation.getDataList().get(sensorIndex).right.get(rowIndex)); - } else { - assertEquals( - 1617206401000L + rowIndex, - insertOperation.getDataList().get(sensorIndex).right.get(rowIndex).getTimestamp()); - assertEquals( - Long.toString(1000000L + sensorIndex * 1000 + rowIndex), - insertOperation - .getDataList() - .get(sensorIndex) - .right - .get(rowIndex) - .getValue() - .toString()); - } - } - - for (sensorIndex = 2; sensorIndex < sensorNum; sensorIndex++) { - assertEquals( - nonAlignedDevice + "." + sensorPrefix + sensorIndex, - insertOperation.getDataList().get(sensorIndex).left.getFullPath()); - for (rowIndex = 0; rowIndex < rowNumPerSensor; rowIndex++) { - // System.out.println(String.format("sensor=%s, sensorIndex=%d, rowIndex=%d.", - // insertOperation.getDataList().get(sensorIndex).left, sensorIndex, rowIndex)); - assertEquals( - 1617206401000L + rowIndex, - insertOperation.getDataList().get(sensorIndex).right.get(rowIndex).getTimestamp()); - assertEquals( - Long.toString(1000000L + sensorIndex * 1000 + rowIndex), - insertOperation - .getDataList() - .get(sensorIndex) - .right - .get(rowIndex) - .getValue() - .toString()); - } - } - - int listIndex = 0; - // == test 2nd ChunkGroup: Aligned Chunks - for (sensorIndex = 0; sensorIndex < 2; sensorIndex++) { - listIndex = sensorNum + sensorIndex; - assertEquals( - alignedDevice + "." + alignedSensorPrefix + sensorIndex, - insertOperation.getDataList().get(listIndex).left.getFullPath()); - for (rowIndex = 0; rowIndex < rowNumPerSensor; rowIndex++) { - // System.out.println(String.format("sensor=%s, sensorIndex=%d, rowIndex=%d. ", - // insertOperation.getDataList().get(listIndex).left, sensorIndex, rowIndex) - // + insertOperation.getDataList().get(listIndex).right.get(rowIndex)); - if ((sensorIndex + rowIndex) == 4) { - assertNull(insertOperation.getDataList().get(listIndex).right.get(rowIndex)); - continue; - } - - if (rowIndex <= 2) { - assertNull(insertOperation.getDataList().get(listIndex).right.get(rowIndex)); - } else { - assertEquals( - 1617206402000L + rowIndex, - insertOperation.getDataList().get(listIndex).right.get(rowIndex).getTimestamp()); - assertEquals( - Long.toString(2000000L + sensorIndex * 1000 + rowIndex), - insertOperation - .getDataList() - .get(listIndex) - .right - .get(rowIndex) - .getValue() - .toString()); - } - } - } - - for (sensorIndex = 2; sensorIndex < sensorNum; sensorIndex++) { - listIndex = sensorNum + sensorIndex; - assertEquals( - alignedDevice + "." + alignedSensorPrefix + sensorIndex, - insertOperation.getDataList().get(listIndex).left.getFullPath()); - for (rowIndex = 0; rowIndex < rowNumPerSensor; rowIndex++) { - // System.out.println(String.format("sensor=%s, sensorIndex=%d, rowIndex=%d.", - // insertOperation.getDataList().get(listIndex).left, sensorIndex, rowIndex)); - if ((sensorIndex + rowIndex) == 4) { - assertNull(insertOperation.getDataList().get(listIndex).right.get(rowIndex)); - continue; - } - - if (rowIndex <= 2) { - assertEquals( - 1617206402000L + rowIndex, - insertOperation.getDataList().get(listIndex).right.get(rowIndex).getTimestamp()); - assertEquals( - Long.toString(2000000L + sensorIndex * 1000 + rowIndex), - insertOperation - .getDataList() - .get(listIndex) - .right - .get(rowIndex) - .getValue() - .toString()); - } - } - } - - // == test 3rd ChunkGroup: NonAligned Chunks - sensorIndex = 0; - listIndex = sensorNum * 2; - assertEquals( - nonAlignedDevice + "." + sensorPrefix + sensorIndex, - insertOperation.getDataList().get(listIndex).left.getFullPath()); - for (rowIndex = 0; rowIndex < rowNumPerSensor; rowIndex++) { - // System.out.println(String.format("sensor=%s, sensorIndex=%d, rowIndex=%d.", - // insertOperation.getDataList().get(listIndex).left.getFullPath(), sensorIndex, rowIndex)); - assertNull(insertOperation.getDataList().get(listIndex).right.get(rowIndex)); - } - - sensorIndex = 1; - listIndex = sensorNum * 2 + sensorIndex; - assertEquals( - nonAlignedDevice + "." + sensorPrefix + sensorIndex, - insertOperation.getDataList().get(listIndex).left.getFullPath()); - - for (rowIndex = 0; rowIndex < rowNumPerSensor; rowIndex++) { - // System.out.println(String.format("sensor=%s, sensorIndex=%d, rowIndex=%d.", - // insertOperation.getDataList().get(listIndex).left, sensorIndex, rowIndex)); - if (rowIndex <= 1) { - assertNull(insertOperation.getDataList().get(sensorIndex).right.get(rowIndex)); - } else { - assertEquals( - 1617206403000L + rowIndex, - insertOperation.getDataList().get(listIndex).right.get(rowIndex).getTimestamp()); - assertEquals( - Long.toString(3000000L + sensorIndex * 1000 + rowIndex), - insertOperation.getDataList().get(listIndex).right.get(rowIndex).getValue().toString()); - } - } - - for (sensorIndex = 2; sensorIndex < sensorNum; sensorIndex++) { - listIndex = sensorNum * 2 + sensorIndex; - assertEquals( - nonAlignedDevice + "." + sensorPrefix + sensorIndex, - insertOperation.getDataList().get(listIndex).left.getFullPath()); - - for (rowIndex = 0; rowIndex < rowNumPerSensor; rowIndex++) { - // System.out.println(String.format("sensor=%s, sensorIndex=%d, rowIndex=%d.", - // insertOperation.getDataList().get(listIndex).left, sensorIndex, rowIndex)); - - assertEquals( - 1617206403000L + rowIndex, - insertOperation.getDataList().get(listIndex).right.get(rowIndex).getTimestamp()); - assertEquals( - Long.toString(3000000L + sensorIndex * 1000 + rowIndex), - insertOperation.getDataList().get(listIndex).right.get(rowIndex).getValue().toString()); - } - } - - // == test 4th ChunkGroup: Aligned Chunks - for (sensorIndex = 0; sensorIndex < sensorNum; sensorIndex++) { - listIndex = sensorNum * 3 + sensorIndex; - assertEquals( - alignedDevice + "." + alignedSensorPrefix + sensorIndex, - insertOperation.getDataList().get(listIndex).left.getFullPath()); - for (rowIndex = 0; rowIndex < rowNumPerSensor; rowIndex++) { - // System.out.println(String.format("sensor=%s, sensorIndex=%d, rowIndex=%d.", - // insertOperation.getDataList().get(listIndex).left, sensorIndex, rowIndex)); - if (sensorIndex == rowIndex) { - assertNull(insertOperation.getDataList().get(listIndex).right.get(rowIndex)); - continue; - } - - assertEquals( - 1617206404000L + rowIndex, - insertOperation.getDataList().get(listIndex).right.get(rowIndex).getTimestamp()); - assertEquals( - Long.toString(4000000L + sensorIndex * 1000 + rowIndex), - insertOperation.getDataList().get(listIndex).right.get(rowIndex).getValue().toString()); - } - } - - tsFileOpBlock.close(); - } - - @Test(timeout = 10_000L) - public void testOpBlockTimeAlignedModsRandomAccess() throws IOException { - TsFileOpBlock tsFileOpBlock = - new TsFileOpBlock("root", alignedTsFileName1, alignedModsFileName1, 0); - - int allDataCount = rowNumPerSensor * sensorNum * 4; - assertEquals(allDataCount, tsFileOpBlock.getDataCount()); - - // == check tsFileOpBlock.getOperation() - int beginIndex = 100; - tsFileOpBlock.setBeginIndex(beginIndex); - - // == directly access 2nd ChunkGroup: Aligned Chunks - int dataIndex = beginIndex + rowNumPerSensor * sensorNum + 3; - Operation operation = tsFileOpBlock.getOperation(dataIndex, 1); - assertEquals(1, operation.getDataCount()); - InsertOperation insertOperation = (InsertOperation) operation; - assertEquals(1, insertOperation.getDataList().size()); - assertEquals(1, insertOperation.getDataList().get(0).right.size()); - assertEquals( - 1617206402000L + 3, insertOperation.getDataList().get(0).right.get(0).getTimestamp()); - assertEquals( - Long.toString(2000000L + 3), - insertOperation.getDataList().get(0).right.get(0).getValue().toString()); - - dataIndex = beginIndex + rowNumPerSensor * sensorNum; - operation = tsFileOpBlock.getOperation(dataIndex, 3); - assertEquals(3, operation.getDataCount()); - insertOperation = (InsertOperation) operation; - assertEquals(1, insertOperation.getDataList().size()); - assertEquals(3, insertOperation.getDataList().get(0).right.size()); - assertNull(insertOperation.getDataList().get(0).right.get(0)); - assertNull(insertOperation.getDataList().get(0).right.get(1)); - assertNull(insertOperation.getDataList().get(0).right.get(2)); - - dataIndex = beginIndex + rowNumPerSensor * sensorNum * 3 + 3; - operation = tsFileOpBlock.getOperation(dataIndex, 1); - assertEquals(1, operation.getDataCount()); - insertOperation = (InsertOperation) operation; - assertEquals(1, insertOperation.getDataList().size()); - assertEquals(1, insertOperation.getDataList().get(0).right.size()); - assertEquals( - 1617206404000L + 3, insertOperation.getDataList().get(0).right.get(0).getTimestamp()); - assertEquals( - Long.toString(4000000L + 3), - insertOperation.getDataList().get(0).right.get(0).getValue().toString()); - - tsFileOpBlock.close(); - } -} diff --git a/server/src/test/java/org/apache/iotdb/db/sync/persistence/LocalSyncInfoTest.java b/server/src/test/java/org/apache/iotdb/db/sync/persistence/LocalSyncInfoTest.java deleted file mode 100644 index 92350d049e6..00000000000 --- a/server/src/test/java/org/apache/iotdb/db/sync/persistence/LocalSyncInfoTest.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.sync.persistence; - -import org.apache.iotdb.commons.exception.sync.PipeException; -import org.apache.iotdb.commons.exception.sync.PipeSinkNotExistException; -import org.apache.iotdb.commons.sync.pipe.PipeInfo; -import org.apache.iotdb.commons.sync.pipe.PipeMessage; -import org.apache.iotdb.commons.sync.pipe.SyncOperation; -import org.apache.iotdb.commons.sync.pipe.TsFilePipeInfo; -import org.apache.iotdb.db.exception.StorageEngineException; -import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement; -import org.apache.iotdb.db.sync.common.LocalSyncInfo; -import org.apache.iotdb.db.utils.EnvironmentUtils; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -public class LocalSyncInfoTest { - private static final String pipe1 = "pipe1"; - private static final String pipe2 = "pipe2"; - private static final long createdTime1 = System.currentTimeMillis(); - private static final long createdTime2 = System.currentTimeMillis() + 1; - - @Before - public void setUp() throws Exception { - EnvironmentUtils.envSetUp(); - } - - @After - public void tearDown() throws IOException, StorageEngineException { - EnvironmentUtils.cleanEnv(); - } - - @Test - public void testOperatePipe() throws Exception { - LocalSyncInfo localSyncInfo = new LocalSyncInfo(); - try { - CreatePipeSinkStatement createPipeSinkStatement = new CreatePipeSinkStatement(); - createPipeSinkStatement.setPipeSinkName("demo"); - createPipeSinkStatement.setPipeSinkType("IoTDB"); - Map<String, String> attributes = new HashMap<>(); - attributes.put("ip", "127.0.0.1"); - attributes.put("port", "6667"); - createPipeSinkStatement.setAttributes(attributes); - try { - localSyncInfo.addPipe(new TsFilePipeInfo(pipe1, "demo", createdTime1, 0, true)); - Assert.fail(); - } catch (Exception e) { - Assert.assertTrue(e instanceof PipeSinkNotExistException); - // throw exception because can not find pipeSink - } - localSyncInfo.addPipeSink(createPipeSinkStatement); - localSyncInfo.addPipe(new TsFilePipeInfo(pipe1, "demo", createdTime1, 0, true)); - localSyncInfo.addPipe(new TsFilePipeInfo(pipe2, "demo", createdTime2, 0, true)); - try { - localSyncInfo.addPipe(new TsFilePipeInfo(pipe2, "demo", createdTime2, 0, true)); - Assert.fail(); - } catch (PipeException e) { - // throw exception because pipe already exists - } - localSyncInfo.operatePipe(pipe2, SyncOperation.STOP_PIPE); - localSyncInfo.operatePipe(pipe2, SyncOperation.START_PIPE); - Assert.assertEquals(1, localSyncInfo.getAllPipeSink().size()); - Assert.assertEquals(2, localSyncInfo.getAllPipeInfos().size()); - localSyncInfo.changePipeMessage(pipe2, PipeMessage.PipeMessageType.WARN); - localSyncInfo.changePipeMessage(pipe2, PipeMessage.PipeMessageType.NORMAL); - PipeInfo pipeInfo1 = localSyncInfo.getPipeInfo(pipe2); - Assert.assertEquals(PipeMessage.PipeMessageType.WARN, pipeInfo1.getMessageType()); - localSyncInfo.changePipeMessage(pipe2, PipeMessage.PipeMessageType.ERROR); - PipeInfo pipeInfo2 = localSyncInfo.getPipeInfo(pipe2); - Assert.assertEquals(PipeMessage.PipeMessageType.ERROR, pipeInfo2.getMessageType()); - localSyncInfo.operatePipe(pipe1, SyncOperation.DROP_PIPE); - Assert.assertEquals(1, localSyncInfo.getAllPipeInfos().size()); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(); - } finally { - localSyncInfo.close(); - } - } -} diff --git a/server/src/test/java/org/apache/iotdb/db/sync/persistence/SyncLogTest.java b/server/src/test/java/org/apache/iotdb/db/sync/persistence/SyncLogTest.java deleted file mode 100644 index 469de3b3381..00000000000 --- a/server/src/test/java/org/apache/iotdb/db/sync/persistence/SyncLogTest.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.sync.persistence; - -import org.apache.iotdb.commons.sync.persistence.SyncLogReader; -import org.apache.iotdb.commons.sync.persistence.SyncLogWriter; -import org.apache.iotdb.commons.sync.pipe.PipeInfo; -import org.apache.iotdb.commons.sync.pipe.PipeMessage; -import org.apache.iotdb.commons.sync.pipe.PipeStatus; -import org.apache.iotdb.commons.sync.pipe.SyncOperation; -import org.apache.iotdb.commons.sync.pipe.TsFilePipeInfo; -import org.apache.iotdb.commons.sync.pipesink.IoTDBPipeSink; -import org.apache.iotdb.commons.sync.pipesink.PipeSink; -import org.apache.iotdb.commons.sync.utils.SyncPathUtil; -import org.apache.iotdb.db.exception.StorageEngineException; -import org.apache.iotdb.db.sync.SyncTestUtils; -import org.apache.iotdb.db.utils.EnvironmentUtils; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -/** This test is for ReceiverLog and ReceiverLogAnalyzer */ -public class SyncLogTest { - - private static final String pipe1 = "pipe1"; - private static final String pipe2 = "pipe2"; - private static final String pipe3 = "pipe3"; - private static final long createdTime1 = System.currentTimeMillis(); - private static final long createdTime2 = System.currentTimeMillis() + 1; - private static final long createdTime3 = System.currentTimeMillis() + 2; - - @Before - public void setUp() throws Exception { - EnvironmentUtils.envSetUp(); - } - - @After - public void tearDown() throws IOException, StorageEngineException { - EnvironmentUtils.cleanEnv(); - } - - @Test - public void testServiceLog() { - try { - SyncLogWriter log = new SyncLogWriter(new File(SyncPathUtil.getSysDir())); - PipeSink pipeSink = new IoTDBPipeSink("demo"); - Map<String, String> attributes = new HashMap<>(); - attributes.put("ip", "192.168.11.11"); - attributes.put("port", "7766"); - pipeSink.setAttribute(attributes); - log.addPipeSink(pipeSink); - PipeInfo pipeInfo1 = new TsFilePipeInfo(pipe1, "demo", createdTime1, 0, true); - PipeInfo pipeInfo2 = new TsFilePipeInfo(pipe2, "demo", createdTime2, 99, false); - PipeInfo pipeInfo3 = new TsFilePipeInfo(pipe3, "demo", createdTime3, 199, true); - log.addPipe(pipeInfo1); - log.operatePipe(pipe1, SyncOperation.DROP_PIPE); - - log.addPipe(pipeInfo2); - log.operatePipe(pipe2, SyncOperation.STOP_PIPE); - log.operatePipe(pipe2, SyncOperation.START_PIPE); - - log.addPipe(pipeInfo3); - log.close(); - SyncLogReader syncLogReader = new SyncLogReader(new File(SyncPathUtil.getSysDir())); - - syncLogReader.recover(); - - // check PipeSink - Map<String, PipeSink> allPipeSinks = syncLogReader.getAllPipeSinks(); - Assert.assertEquals(1, allPipeSinks.size()); - - // check Pipe - Map<String, PipeInfo> pipeInfoMap = syncLogReader.getPipes(); - Assert.assertEquals(2, pipeInfoMap.size()); - PipeInfo pipeInfoRecover1 = pipeInfoMap.get(pipe1); - Assert.assertNull(pipeInfoRecover1); - PipeInfo pipeInfoRecover2 = pipeInfoMap.get(pipe2); - SyncTestUtils.checkPipeInfo( - pipeInfoRecover2, - pipe2, - "demo", - PipeStatus.RUNNING, - createdTime2, - PipeMessage.PipeMessageType.NORMAL); - PipeInfo pipeInfoRecover3 = pipeInfoMap.get(pipe3); - SyncTestUtils.checkPipeInfo( - pipeInfoRecover3, - pipe3, - "demo", - PipeStatus.STOP, - createdTime3, - PipeMessage.PipeMessageType.NORMAL); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(); - } - } -} diff --git a/server/src/test/java/org/apache/iotdb/db/sync/pipedata/PipeDataTest.java b/server/src/test/java/org/apache/iotdb/db/sync/pipedata/PipeDataTest.java deleted file mode 100644 index 16746c1783a..00000000000 --- a/server/src/test/java/org/apache/iotdb/db/sync/pipedata/PipeDataTest.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.sync.pipedata; - -import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.db.engine.modification.Deletion; - -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; - -public class PipeDataTest { - private static final Logger logger = LoggerFactory.getLogger(PipeDataTest.class); - private static final String pipeLogPath = "target/pipelog"; - - @Test - public void testSerializeAndDeserialize() { - try { - File f1 = new File(pipeLogPath); - File f2 = new File(pipeLogPath); - PipeData pipeData1 = new TsFilePipeData("1", 1); - Deletion deletion = new Deletion(new PartialPath("root.sg1.d1.s1"), 0, 1, 5); - PipeData pipeData2 = new DeletionPipeData(deletion, 3); - DataOutputStream outputStream = new DataOutputStream(new FileOutputStream(f2)); - pipeData1.serialize(outputStream); - outputStream.flush(); - DataInputStream inputStream = new DataInputStream(new FileInputStream(f1)); - Assert.assertEquals(pipeData1, PipeData.createPipeData(inputStream)); - pipeData2.serialize(outputStream); - outputStream.flush(); - Assert.assertEquals(pipeData2, PipeData.createPipeData(inputStream)); - inputStream.close(); - outputStream.close(); - - Assert.assertEquals(pipeData1, PipeData.createPipeData(pipeData1.serialize())); - Assert.assertEquals(pipeData2, PipeData.createPipeData(pipeData2.serialize())); - } catch (Exception e) { - logger.error(e.getMessage()); - Assert.fail(); - } - } -}
