This is an automated email from the ASF dual-hosted git repository. xuekaifeng pushed a commit to branch xkf_id_table_flush_time in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3355a9ecd2dc68871bd2e76f71daee9c10b7a314 Author: 151250176 <[email protected]> AuthorDate: Wed Jan 5 10:14:52 2022 +0800 init --- .../storagegroup/IDTableFlushTimeManager.java | 212 +++++++++++++++++++ .../storagegroup/VirtualStorageGroupProcessor.java | 78 +++---- .../apache/iotdb/db/metadata/idtable/IDTable.java | 16 ++ .../db/metadata/idtable/IDTableHashmapImpl.java | 39 +++- .../db/metadata/idtable/entry/DeviceEntry.java | 65 ++++++ .../db/metadata/idtable/entry/DeviceIDFactory.java | 16 +- .../db/metadata/idtable/IDTableFlushTimeTest.java | 224 +++++++++++++++++++++ 7 files changed, 605 insertions(+), 45 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableFlushTimeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableFlushTimeManager.java new file mode 100644 index 0000000..b7ff81b --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableFlushTimeManager.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.storagegroup; + +import org.apache.iotdb.db.metadata.idtable.IDTable; +import org.apache.iotdb.db.metadata.idtable.entry.DeviceEntry; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * This class manages last time and flush time for sequence and unsequence determination This class + * This class is NOT thread safe, caller should ensure synchronization This class not support + * upgrade + */ +public class IDTableFlushTimeManager implements ILastFlushTimeManager { + private static final Logger logger = LoggerFactory.getLogger(LastFlushTimeManager.class); + + IDTable idTable; + + public IDTableFlushTimeManager(IDTable idTable) { + this.idTable = idTable; + } + + // region set + @Override + public void setLastTimeAll(long timePartitionId, Map<String, Long> lastTimeMap) { + for (Map.Entry<String, Long> entry : lastTimeMap.entrySet()) { + idTable.getDeviceEntry(entry.getKey()).putLastTimeMap(timePartitionId, entry.getValue()); + } + } + + @Override + public void setLastTime(long timePartitionId, String path, long time) { + idTable.getDeviceEntry(path).putLastTimeMap(timePartitionId, time); + } + + @Override + public void setFlushedTimeAll(long timePartitionId, Map<String, Long> flushedTimeMap) { + for (Map.Entry<String, Long> entry : flushedTimeMap.entrySet()) { + idTable.getDeviceEntry(entry.getKey()).putFlushTimeMap(timePartitionId, entry.getValue()); + } + } + + @Override + public void setFlushedTime(long timePartitionId, String path, long time) { + idTable.getDeviceEntry(path).putFlushTimeMap(timePartitionId, time); + } + + @Override + public void setGlobalFlushedTimeAll(Map<String, Long> globalFlushedTimeMap) { + for (Map.Entry<String, Long> entry : globalFlushedTimeMap.entrySet()) { + idTable.getDeviceEntry(entry.getKey()).setGlobalFlushTime(entry.getValue()); + } + } + + @Override + public void setGlobalFlushedTime(String path, long time) { + idTable.getDeviceEntry(path).setGlobalFlushTime(time); + } + + // endregion + + // region update + + @Override + public void updateLastTime(long timePartitionId, String path, long time) { + idTable.getDeviceEntry(path).updateLastTimeMap(timePartitionId, time); + } + + @Override + public void updateFlushedTime(long timePartitionId, String path, long time) { + idTable.getDeviceEntry(path).updateFlushTimeMap(timePartitionId, time); + } + + @Override + public void updateGlobalFlushedTime(String path, long time) { + idTable.getDeviceEntry(path).updateGlobalFlushTime(time); + } + + @Override + public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice( + long partitionId, String deviceId, long time) { + throw new UnsupportedOperationException("IDTableFlushTimeManager doesn't support upgrade"); + } + + // endregion + + // region ensure + + @Override + public void ensureLastTimePartition(long timePartitionId) { + // do nothing is correct + } + + @Override + public void ensureFlushedTimePartition(long timePartitionId) { + // do nothing is correct + } + + @Override + public long ensureFlushedTimePartitionAndInit(long timePartitionId, String path, long initTime) { + return idTable.getDeviceEntry(path).updateFlushTimeMap(timePartitionId, initTime); + } + + // endregion + + // region upgrade support methods + + @Override + public void applyNewlyFlushedTimeToFlushedTime() { + throw new UnsupportedOperationException("IDTableFlushTimeManager doesn't support upgrade"); + } + + /** + * update latest flush time for partition id + * + * @param partitionId partition id + * @param latestFlushTime lastest flush time + * @return true if update latest flush time success + */ + @Override + public boolean updateLatestFlushTimeToPartition(long partitionId, long latestFlushTime) { + for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) { + deviceEntry.putLastTimeMap(partitionId, latestFlushTime); + deviceEntry.putFlushTimeMap(partitionId, latestFlushTime); + deviceEntry.updateGlobalFlushTime(latestFlushTime); + } + + return true; + } + + @Override + public boolean updateLatestFlushTime(long partitionId) { + boolean updated = false; + + for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) { + Long lastTime = deviceEntry.getLastTime(partitionId); + if (lastTime == null) { + continue; + } + + updated = true; + deviceEntry.putFlushTimeMap(partitionId, lastTime); + deviceEntry.updateGlobalFlushTime(lastTime); + } + + return updated; + } + + // endregion + + // region query + @Override + public long getFlushedTime(long timePartitionId, String path) { + return idTable.getDeviceEntry(path).getFLushTimeWithDefaultValue(timePartitionId); + } + + @Override + public long getLastTime(long timePartitionId, String path) { + return idTable.getDeviceEntry(path).getLastTimeWithDefaultValue(timePartitionId); + } + + @Override + public long getGlobalFlushedTime(String path) { + return idTable.getDeviceEntry(path).getGlobalFlushTime(); + } + + // endregion + + // region clear + @Override + public void clearLastTime() { + for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) { + deviceEntry.clearLastTime(); + } + } + + @Override + public void clearFlushedTime() { + for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) { + deviceEntry.clearFlushTime(); + } + } + + @Override + public void clearGlobalFlushedTime() { + for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) { + deviceEntry.setGlobalFlushTime(Long.MIN_VALUE); + } + } + // endregion +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java index 12cd817..db351f1 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java @@ -18,6 +18,39 @@ */ package org.apache.iotdb.db.engine.storagegroup; +import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR; +import static org.apache.iotdb.db.engine.compaction.cross.inplace.task.CrossSpaceMergeTask.MERGE_SUFFIX; +import static org.apache.iotdb.db.engine.compaction.inner.utils.SizeTieredCompactionLogger.COMPACTION_LOG_NAME; +import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX; +import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.Deque; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Pattern; +import org.apache.commons.io.FileUtils; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -89,44 +122,9 @@ import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; - -import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.MappedByteBuffer; -import java.nio.file.Files; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.Deque; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.regex.Pattern; - -import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR; -import static org.apache.iotdb.db.engine.compaction.cross.inplace.task.CrossSpaceMergeTask.MERGE_SUFFIX; -import static org.apache.iotdb.db.engine.compaction.inner.utils.SizeTieredCompactionLogger.COMPACTION_LOG_NAME; -import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX; -import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; - /** * For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one * TsFileProcessor in the working status. <br> @@ -262,7 +260,7 @@ public class VirtualStorageGroupProcessor { // DEFAULT_POOL_TRIM_INTERVAL_MILLIS private long timeWhenPoolNotEmpty = Long.MAX_VALUE; - private LastFlushTimeManager lastFlushTimeManager = new LastFlushTimeManager(); + private ILastFlushTimeManager lastFlushTimeManager = new LastFlushTimeManager(); /** * record the insertWriteLock in SG is being hold by which method, it will be empty string if on @@ -396,9 +394,12 @@ public class VirtualStorageGroupProcessor { if (config.isEnableIDTable()) { try { idTable = IDTableManager.getInstance().getIDTable(new PartialPath(logicalStorageGroupName)); + lastFlushTimeManager = new IDTableFlushTimeManager(idTable); } catch (IllegalPathException e) { logger.error("failed to create id table"); } + } else { + lastFlushTimeManager = new LastFlushTimeManager(); } recover(); if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()) { @@ -3233,4 +3234,9 @@ public class VirtualStorageGroupProcessor { public IDTable getIdTable() { return idTable; } + + @TestOnly + public ILastFlushTimeManager getLastFlushTimeManager() { + return lastFlushTimeManager; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java index 07f4df7..c04e063 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java @@ -40,6 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.List; import java.util.Map; public interface IDTable { @@ -139,6 +140,21 @@ public interface IDTable { void clear() throws IOException; /** + * get device entry from device path + * + * @param deviceName device name of the time series + * @return device entry of the timeseries + */ + public DeviceEntry getDeviceEntry(String deviceName); + + /** + * get all device entries + * + * @return all device entries + */ + public List<DeviceEntry> getAllDeviceEntry(); + + /** * translate query path's device path to device id * * @param fullPath full query path diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java index a4faf6a..fd3a99f 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java @@ -48,7 +48,9 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; /** id table belongs to a storage group and mapping timeseries path to it's schema */ @@ -88,7 +90,7 @@ public class IDTableHashmapImpl implements IDTable { */ public synchronized void createAlignedTimeseries(CreateAlignedTimeSeriesPlan plan) throws MetadataException { - DeviceEntry deviceEntry = getDeviceEntry(plan.getPrefixPath(), true); + DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(plan.getPrefixPath(), true); for (int i = 0; i < plan.getMeasurements().size(); i++) { PartialPath fullPath = @@ -112,7 +114,7 @@ public class IDTableHashmapImpl implements IDTable { * @throws MetadataException if the device is aligned, throw it */ public synchronized void createTimeseries(CreateTimeSeriesPlan plan) throws MetadataException { - DeviceEntry deviceEntry = getDeviceEntry(plan.getPath().getDevicePath(), false); + DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(plan.getPath().getDevicePath(), false); SchemaEntry schemaEntry = new SchemaEntry( plan.getDataType(), @@ -137,7 +139,7 @@ public class IDTableHashmapImpl implements IDTable { IMeasurementMNode[] measurementMNodes = plan.getMeasurementMNodes(); // 1. get device entry and check align - DeviceEntry deviceEntry = getDeviceEntry(devicePath, plan.isAligned()); + DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(devicePath, plan.isAligned()); // 2. get schema of each measurement for (int i = 0; i < measurementList.length; i++) { @@ -222,7 +224,7 @@ public class IDTableHashmapImpl implements IDTable { public synchronized void registerTrigger(PartialPath fullPath, IMeasurementMNode measurementMNode) throws MetadataException { boolean isAligned = measurementMNode.getParent().isAligned(); - DeviceEntry deviceEntry = getDeviceEntry(fullPath.getDevicePath(), isAligned); + DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(fullPath.getDevicePath(), isAligned); deviceEntry.getSchemaEntry(fullPath.getMeasurement()).setUsingTrigger(); } @@ -237,7 +239,7 @@ public class IDTableHashmapImpl implements IDTable { public synchronized void deregisterTrigger( PartialPath fullPath, IMeasurementMNode measurementMNode) throws MetadataException { boolean isAligned = measurementMNode.getParent().isAligned(); - DeviceEntry deviceEntry = getDeviceEntry(fullPath.getDevicePath(), isAligned); + DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(fullPath.getDevicePath(), isAligned); deviceEntry.getSchemaEntry(fullPath.getMeasurement()).setUnUsingTrigger(); } @@ -279,6 +281,31 @@ public class IDTableHashmapImpl implements IDTable { } /** + * get device entry from device path + * + * @param deviceName device name of the time series + * @return device entry of the timeseries + */ + @Override + public DeviceEntry getDeviceEntry(String deviceName) { + IDeviceID deviceID = DeviceIDFactory.getInstance().getDeviceID(deviceName); + int slot = calculateSlot(deviceID); + + // reuse device entry in map + return idTables[slot].get(deviceID); + } + + @Override + public List<DeviceEntry> getAllDeviceEntry() { + List<DeviceEntry> res = new ArrayList<>(); + for (int i = 0; i < NUM_OF_SLOTS; i++) { + res.addAll(idTables[i].values()); + } + + return res; + } + + /** * check whether a time series is exist if exist, check the type consistency if not exist, call * MManager to create it * @@ -345,7 +372,7 @@ public class IDTableHashmapImpl implements IDTable { * @param isAligned whether the insert plan is aligned * @return device entry of the timeseries */ - private DeviceEntry getDeviceEntry(PartialPath deviceName, boolean isAligned) + private DeviceEntry getDeviceEntryWithAlignedCheck(PartialPath deviceName, boolean isAligned) throws MetadataException { IDeviceID deviceID = DeviceIDFactory.getInstance().getDeviceID(deviceName); int slot = calculateSlot(deviceID); diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java index eb7041c..c4f0369 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java @@ -32,9 +32,18 @@ public class DeviceEntry { boolean isAligned; + // for manages flush time + Map<Long, Long> lastTimeMap; + + Map<Long, Long> flushTimeMap; + + long globalFlushTime = Long.MIN_VALUE; + public DeviceEntry(IDeviceID deviceID) { this.deviceID = deviceID; measurementMap = new HashMap<>(); + lastTimeMap = new HashMap<>(); + flushTimeMap = new HashMap<>(); } /** @@ -78,4 +87,60 @@ public class DeviceEntry { public IDeviceID getDeviceID() { return deviceID; } + + // region support flush time + public void putLastTimeMap(long timePartition, long lastTime) { + lastTimeMap.put(timePartition, lastTime); + } + + public void putFlushTimeMap(long timePartition, long flushTime) { + flushTimeMap.put(timePartition, flushTime); + } + + public long updateLastTimeMap(long timePartition, long lastTime) { + return lastTimeMap.compute( + timePartition, (k, v) -> v == null ? lastTime : Math.max(v, lastTime)); + } + + public long updateFlushTimeMap(long timePartition, long flushTime) { + return flushTimeMap.compute( + timePartition, (k, v) -> v == null ? flushTime : Math.max(v, flushTime)); + } + + public void updateGlobalFlushTime(long flushTime) { + globalFlushTime = Math.max(globalFlushTime, flushTime); + } + + public void setGlobalFlushTime(long globalFlushTime) { + this.globalFlushTime = globalFlushTime; + } + + public Long getLastTime(long timePartition) { + return lastTimeMap.get(timePartition); + } + + public Long getFlushTime(long timePartition) { + return flushTimeMap.get(timePartition); + } + + public Long getLastTimeWithDefaultValue(long timePartition) { + return lastTimeMap.getOrDefault(timePartition, Long.MIN_VALUE); + } + + public Long getFLushTimeWithDefaultValue(long timePartition) { + return flushTimeMap.getOrDefault(timePartition, Long.MIN_VALUE); + } + + public long getGlobalFlushTime() { + return globalFlushTime; + } + + public void clearLastTime() { + lastTimeMap.clear(); + } + + public void clearFlushTime() { + flushTimeMap.clear(); + } + // endregion } diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceIDFactory.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceIDFactory.java index ba47c35..c4a9676 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceIDFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceIDFactory.java @@ -27,7 +27,7 @@ import java.util.function.Function; /** factory to build device id according to configured algorithm */ public class DeviceIDFactory { - Function<PartialPath, IDeviceID> getDeviceIDFunction; + Function<String, IDeviceID> getDeviceIDFunction; // region DeviceIDFactory Singleton private static class DeviceIDFactoryHolder { @@ -54,9 +54,9 @@ public class DeviceIDFactory { .getConfig() .getDeviceIDTransformationMethod() .equals("SHA256")) { - getDeviceIDFunction = partialPath -> new SHA256DeviceID(partialPath.toString()); + getDeviceIDFunction = SHA256DeviceID::new; } else { - getDeviceIDFunction = partialPath -> new PlainDeviceID(partialPath.toString()); + getDeviceIDFunction = PlainDeviceID::new; } } // endregion @@ -68,6 +68,16 @@ public class DeviceIDFactory { * @return device id of the timeseries */ public IDeviceID getDeviceID(PartialPath devicePath) { + return getDeviceIDFunction.apply(devicePath.toString()); + } + + /** + * get device id by full path + * + * @param devicePath device path of the timeseries + * @return device id of the timeseries + */ + public IDeviceID getDeviceID(String devicePath) { return getDeviceIDFunction.apply(devicePath); } diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableFlushTimeTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableFlushTimeTest.java new file mode 100644 index 0000000..32590f6 --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableFlushTimeTest.java @@ -0,0 +1,224 @@ +/* + *Licensed to the Apache Software Foundation(ASF)under one + *or more contributor license agreements.See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership.The ASF licenses this file + *to you under the Apache License,Version2.0(the + *"License");you may not use this file except in compliance + *with the License.You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS"BASIS,WITHOUT WARRANTIES OR CONDITIONS OF ANY + *KIND,either express or implied.See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +package org.apache.iotdb.db.metadata.idtable; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.StorageEngine; +import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.metadata.IllegalPathException; +import org.apache.iotdb.db.exception.metadata.MetadataException; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.metadata.path.PartialPath; +import org.apache.iotdb.db.qp.Planner; +import org.apache.iotdb.db.qp.executor.PlanExecutor; +import org.apache.iotdb.db.qp.physical.PhysicalPlan; +import org.apache.iotdb.db.qp.physical.crud.DeletePartitionPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.utils.Binary; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class IDTableFlushTimeTest { + private PlanExecutor executor = new PlanExecutor(); + + private final Planner processor = new Planner(); + + private boolean isEnableIDTable = false; + + private String originalDeviceIDTransformationMethod = null; + + private boolean isEnableIDTableLogFile = false; + + public IDTableFlushTimeTest() throws QueryProcessException {} + + @Before + public void before() { + IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(true); + isEnableIDTable = IoTDBDescriptor.getInstance().getConfig().isEnableIDTable(); + originalDeviceIDTransformationMethod = + IoTDBDescriptor.getInstance().getConfig().getDeviceIDTransformationMethod(); + isEnableIDTableLogFile = IoTDBDescriptor.getInstance().getConfig().isEnableIDTableLogFile(); + + IoTDBDescriptor.getInstance().getConfig().setEnableIDTable(true); + IoTDBDescriptor.getInstance().getConfig().setDeviceIDTransformationMethod("SHA256"); + IoTDBDescriptor.getInstance().getConfig().setEnableIDTableLogFile(true); + EnvironmentUtils.envSetUp(); + } + + @After + public void clean() throws IOException, StorageEngineException { + IoTDBDescriptor.getInstance().getConfig().setEnableIDTable(isEnableIDTable); + IoTDBDescriptor.getInstance() + .getConfig() + .setDeviceIDTransformationMethod(originalDeviceIDTransformationMethod); + IoTDBDescriptor.getInstance().getConfig().setEnableIDTableLogFile(isEnableIDTableLogFile); + EnvironmentUtils.cleanEnv(); + } + + @Test + public void testSequenceInsert() + throws MetadataException, QueryProcessException, StorageEngineException { + insertData(0); + insertData(10); + PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush"); + executor.processNonQuery(flushPlan); + + insertData(20); + + VirtualStorageGroupProcessor storageGroupProcessor = + StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1")); + assertEquals(2, storageGroupProcessor.getSequenceFileTreeSet().size()); + assertEquals(0, storageGroupProcessor.getUnSequenceFileList().size()); + } + + @Test + public void testUnSequenceInsert() + throws MetadataException, QueryProcessException, StorageEngineException { + insertData(100); + PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush"); + executor.processNonQuery(flushPlan); + + insertData(20); + + VirtualStorageGroupProcessor storageGroupProcessor = + StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1")); + assertEquals(1, storageGroupProcessor.getSequenceFileTreeSet().size()); + assertEquals(1, storageGroupProcessor.getUnSequenceFileList().size()); + } + + @Test + public void testSequenceAndUnSequenceInsert() + throws MetadataException, QueryProcessException, StorageEngineException { + // sequence + insertData(100); + PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush"); + executor.processNonQuery(flushPlan); + + // sequence + insertData(120); + executor.processNonQuery(flushPlan); + + // unsequence + insertData(20); + // sequence + insertData(130); + executor.processNonQuery(flushPlan); + + // sequence + insertData(150); + // unsequence + insertData(90); + + VirtualStorageGroupProcessor storageGroupProcessor = + StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1")); + assertEquals(4, storageGroupProcessor.getSequenceFileTreeSet().size()); + assertEquals(2, storageGroupProcessor.getUnSequenceFileList().size()); + assertEquals(1, storageGroupProcessor.getWorkSequenceTsFileProcessors().size()); + assertEquals(1, storageGroupProcessor.getWorkUnsequenceTsFileProcessors().size()); + } + + @Test + public void testDeletePartition() + throws MetadataException, QueryProcessException, StorageEngineException { + insertData(100); + PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush"); + executor.processNonQuery(flushPlan); + insertData(20); + insertData(120); + + VirtualStorageGroupProcessor storageGroupProcessor = + StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1")); + + assertEquals( + 103L, storageGroupProcessor.getLastFlushTimeManager().getFlushedTime(0L, "root.isp.d1")); + assertEquals( + 123L, storageGroupProcessor.getLastFlushTimeManager().getLastTime(0L, "root.isp.d1")); + assertEquals( + 103L, storageGroupProcessor.getLastFlushTimeManager().getGlobalFlushedTime("root.isp.d1")); + + // delete time partition + Set<Long> deletedPartition = new HashSet<>(); + deletedPartition.add(0L); + DeletePartitionPlan deletePartitionPlan = + new DeletePartitionPlan(new PartialPath("root.isp"), deletedPartition); + executor.processNonQuery(deletePartitionPlan); + + assertEquals( + Long.MIN_VALUE, + storageGroupProcessor.getLastFlushTimeManager().getFlushedTime(0L, "root.isp.d1")); + assertEquals( + Long.MIN_VALUE, + storageGroupProcessor.getLastFlushTimeManager().getLastTime(0L, "root.isp.d1")); + assertEquals( + 123L, storageGroupProcessor.getLastFlushTimeManager().getGlobalFlushedTime("root.isp.d1")); + } + + private void insertData(long initTime) throws IllegalPathException, QueryProcessException { + + long[] times = new long[] {initTime, initTime + 1, initTime + 2, initTime + 3}; + List<Integer> dataTypes = new ArrayList<>(); + dataTypes.add(TSDataType.DOUBLE.ordinal()); + dataTypes.add(TSDataType.FLOAT.ordinal()); + dataTypes.add(TSDataType.INT64.ordinal()); + dataTypes.add(TSDataType.INT32.ordinal()); + dataTypes.add(TSDataType.BOOLEAN.ordinal()); + dataTypes.add(TSDataType.TEXT.ordinal()); + + Object[] columns = new Object[6]; + columns[0] = new double[4]; + columns[1] = new float[4]; + columns[2] = new long[4]; + columns[3] = new int[4]; + columns[4] = new boolean[4]; + columns[5] = new Binary[4]; + + for (int r = 0; r < 4; r++) { + ((double[]) columns[0])[r] = 10.0 + r; + ((float[]) columns[1])[r] = 20 + r; + ((long[]) columns[2])[r] = 100000 + r; + ((int[]) columns[3])[r] = 1000 + r; + ((boolean[]) columns[4])[r] = false; + ((Binary[]) columns[5])[r] = new Binary("mm" + r); + } + + InsertTabletPlan tabletPlan = + new InsertTabletPlan( + new PartialPath("root.isp.d1"), + new String[] {"s1", "s2", "s3", "s4", "s5", "s6"}, + dataTypes); + tabletPlan.setTimes(times); + tabletPlan.setColumns(columns); + tabletPlan.setRowCount(times.length); + + PlanExecutor executor = new PlanExecutor(); + executor.insertTablet(tabletPlan); + } +}
