This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch deletion_expr_plus in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9ad2551d8b2b3de84c2de5a2629418f7de32501d Author: Tian Jiang <[email protected]> AuthorDate: Fri Sep 27 19:04:11 2024 +0800 add expr --- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 24 +- .../org/apache/iotdb/db/expr/DeletionExprMain.java | 237 +++++++++------ .../iotdb/db/expr/conf/SimulationConfig.java | 6 +- .../iotdb/db/expr/entity/SimpleModAllocator.java | 7 +- .../db/expr/event/ExecuteLastPointQueryEvent.java | 12 +- .../db/expr/event/ExecuteRangeQueryEvent.java | 3 +- .../iotdb/db/expr/event/GenerateDeletionEvent.java | 2 +- .../iotdb/db/expr/executor/SimpleExecutor.java | 15 +- .../iotdb/db/expr/simulator/SimpleSimulator.java | 21 +- .../task/RepairUnsortedFileCompactionTask.java | 3 +- .../compaction/execute/utils/CompactionUtils.java | 3 +- .../execute/utils/log/TsFileIdentifier.java | 3 +- .../dataregion/modification/DeletionPredicate.java | 11 +- .../dataregion/modification/ModEntry.java | 12 +- .../dataregion/modification/ModFileManager.java | 53 ++-- .../dataregion/modification/ModificationFile.java | 14 +- .../modification/TableDeletionEntry.java | 3 +- .../dataregion/modification/TreeDeletionEntry.java | 8 +- .../dataregion/tsfile/TsFileResource.java | 14 +- .../apache/iotdb/db/tools/TsFileSketchTool.java | 323 ++++++++++++++++++++- .../iotdb/db/tools/TsFileSplitByPartitionTool.java | 3 +- .../java/org/apache/iotdb/db/utils/IOUtils.java | 2 + 22 files changed, 598 insertions(+), 181 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 3ccf61c672e..dcb4043d1d7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -438,8 +438,16 @@ public class IoTDBDescriptor { conf.setWALCompressionAlgorithm( enableWALCompression ? CompressionType.LZ4 : CompressionType.UNCOMPRESSED); - conf.setLevelModFileCntThreshold(Integer.parseInt(properties.getProperty("level_mod_file_cnt_threshold", Integer.toString(conf.getLevelModFileCntThreshold())))); - conf.setSingleModFileSizeThreshold(Long.parseLong(properties.getProperty("single_mod_file_size_threshold", Integer.toString(conf.getLevelModFileCntThreshold())))); + conf.setLevelModFileCntThreshold( + Integer.parseInt( + properties.getProperty( + "level_mod_file_cnt_threshold", + Integer.toString(conf.getLevelModFileCntThreshold())))); + conf.setSingleModFileSizeThreshold( + Long.parseLong( + properties.getProperty( + "single_mod_file_size_threshold", + Integer.toString(conf.getLevelModFileCntThreshold())))); conf.setCompactionScheduleIntervalInMs( Long.parseLong( @@ -1966,8 +1974,16 @@ public class IoTDBDescriptor { conf.setWALCompressionAlgorithm( enableWALCompression ? CompressionType.LZ4 : CompressionType.UNCOMPRESSED); - conf.setLevelModFileCntThreshold(Integer.parseInt(properties.getProperty("level_mod_file_cnt_threshold", Integer.toString(conf.getLevelModFileCntThreshold())))); - conf.setSingleModFileSizeThreshold(Long.parseLong(properties.getProperty("single_mod_file_size_threshold", Integer.toString(conf.getLevelModFileCntThreshold())))); + conf.setLevelModFileCntThreshold( + Integer.parseInt( + properties.getProperty( + "level_mod_file_cnt_threshold", + Integer.toString(conf.getLevelModFileCntThreshold())))); + conf.setSingleModFileSizeThreshold( + Long.parseLong( + properties.getProperty( + "single_mod_file_size_threshold", + Integer.toString(conf.getLevelModFileCntThreshold())))); // update Consensus config reloadConsensusProps(properties); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/DeletionExprMain.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/DeletionExprMain.java index bda0122e1f7..74541657c03 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/DeletionExprMain.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/DeletionExprMain.java @@ -19,15 +19,6 @@ package org.apache.iotdb.db.expr; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; import org.apache.iotdb.db.expr.conf.SimulationConfig; import org.apache.iotdb.db.expr.distribution.FixedIntervalGenerator; import org.apache.iotdb.db.expr.entity.SimDeletion; @@ -42,6 +33,15 @@ import org.apache.iotdb.db.expr.simulator.SimpleSimulator; import org.apache.tsfile.read.common.TimeRange; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.function.Function; +import java.util.stream.Collectors; + public class DeletionExprMain { private SimulationConfig config; @@ -51,6 +51,7 @@ public class DeletionExprMain { private long maxTimestamp; private ExprReport report; private static int maxFileCntThreshold = 30; + private static int cntThresholdStep = 1; public DeletionExprMain() { init(); @@ -60,8 +61,7 @@ public class DeletionExprMain { public void init() { config = new SimulationConfig(); simulator = new SimpleSimulator(config); - simpleModAllocator = new SimpleModAllocator(config, - simulator.getSimulationContext()); + simpleModAllocator = new SimpleModAllocator(config, simulator.getSimulationContext()); maxStep = 10000; maxTimestamp = Long.MAX_VALUE; } @@ -86,8 +86,10 @@ public class DeletionExprMain { GenerateDeletionEvent generatePartialDeletionEvent = new GenerateDeletionEvent( config, - new SimDeletion(new TimeRange(config.partialDeletionOffset, - config.partialDeletionOffset + config.partialDeletionRange)), + new SimDeletion( + new TimeRange( + config.partialDeletionOffset, + config.partialDeletionOffset + config.partialDeletionRange)), config.partialDeletionStep, new FixedIntervalGenerator(config.generatePartialDeletionInterval)); generatePartialDeletionEvent.generateTimestamp = config.deletionStartTime; @@ -104,15 +106,19 @@ public class DeletionExprMain { ExecuteRangeQueryEvent executeRangeQueryEvent = new ExecuteRangeQueryEvent( config, - new TimeRange(config.rangeQueryOffset, - config.rangeQueryRange + config.rangeQueryOffset), + new TimeRange( + config.rangeQueryOffset, config.rangeQueryRange + config.rangeQueryOffset), config.rangeQueryStep, new FixedIntervalGenerator(config.rangeQueryInterval)); - ExecuteLastPointQueryEvent executeLastPointQueryEvent = new ExecuteLastPointQueryEvent(config, - new TimeRange(0, 1), new FixedIntervalGenerator(config.pointQueryInterval)); - ExecuteRangeQueryEvent executeFullQueryEvent = new ExecuteRangeQueryEvent(config, - new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE), 0, - new FixedIntervalGenerator(config.fullQueryInterval)); + ExecuteLastPointQueryEvent executeLastPointQueryEvent = + new ExecuteLastPointQueryEvent( + config, new TimeRange(0, 1), new FixedIntervalGenerator(config.pointQueryInterval)); + ExecuteRangeQueryEvent executeFullQueryEvent = + new ExecuteRangeQueryEvent( + config, + new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE), + 0, + new FixedIntervalGenerator(config.fullQueryInterval)); events.add(executeRangeQueryEvent); events.add(executeLastPointQueryEvent); events.add(executeFullQueryEvent); @@ -134,18 +140,21 @@ public class DeletionExprMain { if (printState) { System.out.println(simulator); System.out.println(simulator.getStatistics()); - System.out.println(simulator.getSimulationContext().modFileManager.modFileList.stream() - .map(s -> s.mods.size()).collect( - Collectors.toList())); - System.out.println(simulator.getSimulationContext().modFileManager.modFileList.stream() - .map(s -> s.tsfileReferences.size()).collect( - Collectors.toList())); + System.out.println( + simulator.getSimulationContext().modFileManager.modFileList.stream() + .map(s -> s.mods.size()) + .collect(Collectors.toList())); + System.out.println( + simulator.getSimulationContext().modFileManager.modFileList.stream() + .map(s -> s.tsfileReferences.size()) + .collect(Collectors.toList())); } } private void writeReport() { - report.deletionWriteTime = simulator.getStatistics().partialDeletionExecutedTime - + simulator.getStatistics().fullDeletionExecutedTime; + report.deletionWriteTime = + simulator.getStatistics().partialDeletionExecutedTime + + simulator.getStatistics().fullDeletionExecutedTime; report.deletionTimeList.add(report.deletionWriteTime); report.deletionReadTime = simulator.getStatistics().queryReadDeletionTime; report.queryTimeList.add(report.deletionReadTime); @@ -188,7 +197,7 @@ public class DeletionExprMain { } // use modFileCntThreshold as the x-axis - for (int i = 1; i < maxFileCntThreshold; i++) { + for (int i = 1; i < maxFileCntThreshold; i+=cntThresholdStep) { initExpr(expr); configurer.configure(expr, exprNum); expr.config.modFileCntThreshold = i; @@ -210,30 +219,40 @@ public class DeletionExprMain { expr.config.generatePartialDeletionInterval = 2_000_000; expr.config.partialDeletionRange = expr.config.tsfileRange * 3; expr.config.partialDeletionOffset = -expr.config.partialDeletionRange; - expr.config.partialDeletionStep = (long) (expr.config.tsfileRange / ( - 1.0 * expr.config.generateTsFileInterval / expr.config.generatePartialDeletionInterval)); + expr.config.partialDeletionStep = + (long) + (expr.config.tsfileRange + / (1.0 + * expr.config.generateTsFileInterval + / expr.config.generatePartialDeletionInterval)); expr.config.generateTsFileInterval = 10_000_000; expr.config.modFileSizeThreshold = 64 * 1024; expr.config.deletionStartTime = 1000 * expr.config.generateTsFileInterval; -// expr.config.queryRange = maxTimestamp; -// expr.config.queryStep = 0; + // expr.config.queryRange = maxTimestamp; + // expr.config.queryStep = 0; expr.config.rangeQueryRange = expr.config.tsfileRange * 1000; - expr.config.rangeQueryStep = expr.config.tsfileRange / (expr.config.generateTsFileInterval - / expr.config.rangeQueryInterval); - expr.config.rangeQueryOffset = -expr.config.rangeQueryRange - + expr.config.deletionStartTime / expr.config.generateTsFileInterval - * expr.config.tsfileRange; + expr.config.rangeQueryStep = + expr.config.tsfileRange + / (expr.config.generateTsFileInterval / expr.config.rangeQueryInterval); + expr.config.rangeQueryOffset = + -expr.config.rangeQueryRange + + expr.config.deletionStartTime + / expr.config.generateTsFileInterval + * expr.config.tsfileRange; } - private static void parallelExpr(Configurer configurer, int exprNum, Function<Integer, String> argsToString, boolean runBaseline) + private static void parallelExpr( + Configurer configurer, + int exprNum, + Function<Integer, String> argsToString, + boolean runBaseline) throws ExecutionException, InterruptedException { ExecutorService service = Executors.newCachedThreadPool(); List<Future<ExprReport>> asyncReports = new ArrayList<>(); for (int i = 0; i < exprNum; i++) { int finalI = i; - asyncReports.add(service.submit(() -> oneExpr(configurer, - finalI, runBaseline))); + asyncReports.add(service.submit(() -> oneExpr(configurer, finalI, runBaseline))); } for (Future<ExprReport> asyncReport : asyncReports) { @@ -249,81 +268,109 @@ public class DeletionExprMain { private static void testSizeThreshold() throws ExecutionException, InterruptedException { String argName = "sizeThreshold"; - long[] exprArgs = new long[]{ - 16 * 1024, - 32 * 1024, - 64 * 1024, - 128 * 1024, - 256 * 1024, - }; - Configurer configurer = (expr, j) -> { - expr.config.modFileSizeThreshold = exprArgs[j]; - }; + long[] exprArgs = + new long[]{ + 16 * 1024, 32 * 1024, 64 * 1024, 128 * 1024, 256 * 1024, + }; + Configurer configurer = + (expr, j) -> { + expr.config.modFileSizeThreshold = exprArgs[j]; + }; parallelExpr(configurer, exprArgs.length, (i) -> argName + ":" + exprArgs[i], true); } private static void testQueryInterval() throws ExecutionException, InterruptedException { String argName = "queryInterval"; - long[] exprArgs = new long[]{ - 500_000, - 1000_000, - 1500_000, - 2000_000, - 2500_000 - }; - Configurer configurer = (expr, j) -> { - expr.config.pointQueryInterval = exprArgs[j]; - expr.config.rangeQueryInterval = exprArgs[j]; - expr.config.fullQueryInterval = exprArgs[j]; - }; + long[] exprArgs = new long[]{500_000, 1000_000, 1500_000, 2000_000, 2500_000}; + Configurer configurer = + (expr, j) -> { + expr.config.pointQueryInterval = exprArgs[j]; + expr.config.rangeQueryInterval = exprArgs[j]; + expr.config.fullQueryInterval = exprArgs[j]; + }; parallelExpr(configurer, exprArgs.length, (i) -> argName + ":" + exprArgs[i], true); } private static void testSimulationTime() throws ExecutionException, InterruptedException { String argName = "simulationTime"; - long[] exprArgs = new long[]{ - 24 * 60 * 60 * 1000 * 1000L, - 2 * 24 * 60 * 60 * 1000 * 1000L, - 3 * 24 * 60 * 60 * 1000 * 1000L, - 4 * 24 * 60 * 60 * 1000 * 1000L, - 5 * 24 * 60 * 60 * 1000 * 1000L - }; - Configurer configurer = (expr, j) -> { - expr.maxTimestamp = exprArgs[j]; - }; + long[] exprArgs = + new long[]{ + 24 * 60 * 60 * 1000 * 1000L, + 2 * 24 * 60 * 60 * 1000 * 1000L, + 3 * 24 * 60 * 60 * 1000 * 1000L, + 4 * 24 * 60 * 60 * 1000 * 1000L, + 5 * 24 * 60 * 60 * 1000 * 1000L + }; + Configurer configurer = + (expr, j) -> { + expr.maxTimestamp = exprArgs[j]; + }; parallelExpr(configurer, exprArgs.length, (i) -> argName + ":" + exprArgs[i], false); } private static void testDeletionRatio() throws ExecutionException, InterruptedException { String argName1 = "fullDeletionInterval"; - long[] exprArgs1 = new long[]{ - 200_000_000, - 20_000_000, - 2_000_000, - 2_000_000, - 2_000_000 - }; + long[] exprArgs1; + exprArgs1 = new long[]{200_000_000, 20_000_000, 2_000_000, 2_000_000, 2_000_000}; +// exprArgs1 = new long[]{2_000_000, 2_000_000}; String argName2 = "partialDeletionInterval"; - long[] exprArgs2 = new long[]{ - 2_000_000, - 2_000_000, - 2_000_000, - 20_000_000, - 200_000_000, + long[] exprArgs2; + exprArgs2 = new long[]{ + 2_000_000, 2_000_000, 2_000_000, 20_000_000, 200_000_000, + }; +// exprArgs2 = +// new long[]{ +// 20_000_000, 200_000_000, +// }; + Configurer configurer = + (expr, j) -> { + expr.config.generateFullDeletionInterval = exprArgs1[j]; + expr.config.generatePartialDeletionInterval = exprArgs2[j]; + expr.config.partialDeletionStep = + (long) + (expr.config.tsfileRange + / (1.0 + * expr.config.generateTsFileInterval + / expr.config.generatePartialDeletionInterval)); + }; + parallelExpr( + configurer, + exprArgs1.length, + (i) -> argName1 + ":" + exprArgs1[i] + ";" + argName2 + ":" + exprArgs2[i], + true); + } + + private static void testTsFileGenInterval() throws ExecutionException, InterruptedException { + String argName = "tsFileGenerationInterval"; + long[] exprArgs = new long[]{ + 10_000_000L, + 20_000_000L, + 40_000_000L, + 80_000_000L, + 160_000_000L, }; Configurer configurer = (expr, j) -> { - expr.config.generateFullDeletionInterval = exprArgs1[j]; - expr.config.generatePartialDeletionInterval = exprArgs2[j]; + expr.config.generateTsFileInterval = exprArgs[j]; + expr.config.partialDeletionStep = (long) (expr.config.tsfileRange / ( + 1.0 * expr.config.generateTsFileInterval / expr.config.generatePartialDeletionInterval)); + expr.config.rangeQueryStep = expr.config.tsfileRange / (expr.config.generateTsFileInterval + / expr.config.rangeQueryInterval); + expr.config.rangeQueryOffset = -expr.config.rangeQueryRange + + expr.config.deletionStartTime / expr.config.generateTsFileInterval + * expr.config.tsfileRange; }; - parallelExpr(configurer, exprArgs1.length, (i) -> argName1 + ":" + exprArgs1[i] +";" + argName2 + ":" + exprArgs2[i], true); + parallelExpr(configurer, exprArgs.length, (i) -> argName + ":" + exprArgs[i], true); } - public static void main(String[] args) throws ExecutionException, InterruptedException { - maxFileCntThreshold = 30; -// testSizeThreshold(); -// testQueryInterval(); -// testSimulationTime(); - testDeletionRatio(); + public static void main(String[] args) throws ExecutionException, InterruptedException { + maxFileCntThreshold = 101; + cntThresholdStep = 5; + + testSizeThreshold(); + // testQueryInterval(); + // testSimulationTime(); +// testDeletionRatio(); +// testTsFileGenInterval(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/conf/SimulationConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/conf/SimulationConfig.java index 83ea23cc578..35fd6d634d2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/conf/SimulationConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/conf/SimulationConfig.java @@ -21,8 +21,10 @@ public class SimulationConfig { public long generatePartialDeletionInterval = 20_000_000L; public long generateFullDeletionInterval = 20_000_000L; - // the first deletion ranges from [partialDeletionOffset, partialDeletionRange + partialDeletionOffset], - // and the next one ranges from [partialDeletionOffset + partialDeletionStep, partialDeletionRange + partialDeletionOffset + partialDeletionStep], + // the first deletion ranges from [partialDeletionOffset, partialDeletionRange + + // partialDeletionOffset], + // and the next one ranges from [partialDeletionOffset + partialDeletionStep, partialDeletionRange + // + partialDeletionOffset + partialDeletionStep], // and so on public long partialDeletionRange = tsfileRange * 3; public long partialDeletionStep = tsfileRange / 2; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/entity/SimpleModAllocator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/entity/SimpleModAllocator.java index 7825140d156..795b9027ef1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/entity/SimpleModAllocator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/entity/SimpleModAllocator.java @@ -39,9 +39,10 @@ public class SimpleModAllocator { // can allocate more mod file long totalSize = prevModFile.mods.size() * config.deletionSizeInByte; if (totalSize > config.modFileSizeThreshold) { -// System.out.printf( -// "When allocating new Mod File, there are %d partial deletion and %d full deletion%n", -// prevModFile.partialDeletionCnt, prevModFile.fullDeletionCnt); + // System.out.printf( + // "When allocating new Mod File, there are %d partial deletion and %d full + // deletion%n", + // prevModFile.partialDeletionCnt, prevModFile.fullDeletionCnt); // the previous one is already large enough, allocate a new one return allocateNew(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/event/ExecuteLastPointQueryEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/event/ExecuteLastPointQueryEvent.java index 99853004d70..d67db56dd9b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/event/ExecuteLastPointQueryEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/event/ExecuteLastPointQueryEvent.java @@ -19,15 +19,15 @@ package org.apache.iotdb.db.expr.event; -import java.util.Collections; -import java.util.List; -import java.util.function.Supplier; import org.apache.iotdb.db.expr.conf.SimulationConfig; -import org.apache.iotdb.db.expr.entity.SimDeletion; -import org.apache.iotdb.db.expr.entity.SimTsFile; import org.apache.iotdb.db.expr.simulator.SimulationContext; + import org.apache.tsfile.read.common.TimeRange; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; + public class ExecuteLastPointQueryEvent extends ExecuteRangeQueryEvent { public ExecuteLastPointQueryEvent( @@ -39,7 +39,7 @@ public class ExecuteLastPointQueryEvent extends ExecuteRangeQueryEvent { long currentTimestamp = context.getSimulator().currentTimestamp; long lastTsFileVersion = currentTimestamp / context.getConfig().generateTsFileInterval; long lastTsFileTime = lastTsFileVersion * context.getConfig().tsfileRange; - return new TimeRange(lastTsFileTime,lastTsFileTime + 1); + return new TimeRange(lastTsFileTime, lastTsFileTime + 1); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/event/ExecuteRangeQueryEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/event/ExecuteRangeQueryEvent.java index 0e547e7ccb5..f4621640ded 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/event/ExecuteRangeQueryEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/event/ExecuteRangeQueryEvent.java @@ -78,8 +78,7 @@ public class ExecuteRangeQueryEvent extends Event { 1.0 * deletions.size() * config.deletionSizeInByte / config.IoBandwidthBytesPerTimestamp; readDeletionTransTimeSum += transTime; readDeletionSeekTimeSum += config.IoSeekTimestamp; - return transTime - + config.IoSeekTimestamp; + return transTime + config.IoSeekTimestamp; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/event/GenerateDeletionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/event/GenerateDeletionEvent.java index 4b1ac0b1767..72c06a2b134 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/event/GenerateDeletionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/event/GenerateDeletionEvent.java @@ -26,7 +26,7 @@ public class GenerateDeletionEvent extends Event { long step, Supplier<Long> intervalGenerator) { super(config); - + this.currentDeletion = currentDeletion; this.step = step; this.intervalGenerator = intervalGenerator; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/executor/SimpleExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/executor/SimpleExecutor.java index 50c826cb216..bca6d78d6bd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/executor/SimpleExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/executor/SimpleExecutor.java @@ -102,12 +102,13 @@ public class SimpleExecutor implements EventExecutor<SimpleContext> { simpleContext.getStatistics().queryExecutedCnt++; simpleContext.getStatistics().queryExecutedTime += event.getTimeConsumption(); simpleContext.getStatistics().queryReadDeletionTime += Math.round(event.readDeletionTimeSum); - simpleContext.getStatistics().queryReadDeletionSeekTime += Math.round( - event.readDeletionSeekTimeSum); - simpleContext.getStatistics().queryReadDeletionTransTime += Math.round( - event.readDeletionTransTimeSum); -// System.out.println( -// simpleContext.getSimulator().currentStep + " " + simpleContext.getSimulator().currentTimestamp -// + " " + event.readDeletionSeekTimeSum + " " + event.readDeletionTransTimeSum); + simpleContext.getStatistics().queryReadDeletionSeekTime += + Math.round(event.readDeletionSeekTimeSum); + simpleContext.getStatistics().queryReadDeletionTransTime += + Math.round(event.readDeletionTransTimeSum); + // System.out.println( + // simpleContext.getSimulator().currentStep + " " + + // simpleContext.getSimulator().currentTimestamp + // + " " + event.readDeletionSeekTimeSum + " " + event.readDeletionTransTimeSum); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/simulator/SimpleSimulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/simulator/SimpleSimulator.java index 58aacbfcdb9..8bafae3d262 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/simulator/SimpleSimulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/expr/simulator/SimpleSimulator.java @@ -19,13 +19,13 @@ package org.apache.iotdb.db.expr.simulator; -import java.util.List; import org.apache.iotdb.db.expr.conf.SimulationConfig; import org.apache.iotdb.db.expr.entity.SimModFileManager; import org.apache.iotdb.db.expr.entity.SimTsFileManager; import org.apache.iotdb.db.expr.event.Event; import org.apache.iotdb.db.expr.executor.SimpleExecutor; +import java.util.List; import java.util.PriorityQueue; public class SimpleSimulator { @@ -94,13 +94,18 @@ public class SimpleSimulator { @Override public String toString() { - return "SimpleSimulator{" + - "currentTimestamp=" + currentTimestamp + - "\n, currentStep=" + currentStep + - "\n, eventQueue=" + eventQueue + - "\n, maxStep=" + maxStep + - "\n, maxTimestamp=" + maxTimestamp + - '}'; + return "SimpleSimulator{" + + "currentTimestamp=" + + currentTimestamp + + "\n, currentStep=" + + currentStep + + "\n, eventQueue=" + + eventQueue + + "\n, maxStep=" + + maxStep + + "\n, maxTimestamp=" + + maxTimestamp + + '}'; } public class SimpleContext implements SimulationContext { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java index 08c81808f84..21b3b70cc70 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java @@ -207,7 +207,8 @@ public class RepairUnsortedFileCompactionTask extends InnerSpaceCompactionTask { } else { if (sourceFile.modFileExists()) { Files.createLink( - new File(filesView.targetFilesInPerformer.get(0).getOldModFile().getFilePath()).toPath(), + new File(filesView.targetFilesInPerformer.get(0).getOldModFile().getFilePath()) + .toPath(), new File(sourceFile.getOldModFile().getFilePath()).toPath()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java index 8cb6f06319d..0fe87391a5f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java @@ -141,7 +141,8 @@ public class CompactionUtils { continue; } Set<Modification> seqModifications = - new HashSet<>(ModificationFileV1.getCompactionMods(seqResources.get(i)).getModifications()); + new HashSet<>( + ModificationFileV1.getCompactionMods(seqResources.get(i)).getModifications()); modifications.addAll(seqModifications); updateOneTargetMods(targetResource, modifications); if (!modifications.isEmpty()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/TsFileIdentifier.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/TsFileIdentifier.java index 9ab77d7fe49..d4e9076c8a1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/TsFileIdentifier.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/log/TsFileIdentifier.java @@ -195,7 +195,8 @@ public class TsFileIdentifier { if (file.exists() || new File(file.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).exists() || new File(file.getAbsolutePath() + ModificationFileV1.FILE_SUFFIX).exists() - || new File(file.getAbsolutePath() + ModificationFileV1.COMPACTION_FILE_SUFFIX).exists()) { + || new File(file.getAbsolutePath() + ModificationFileV1.COMPACTION_FILE_SUFFIX) + .exists()) { return file; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/DeletionPredicate.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/DeletionPredicate.java index 1918d479040..c9a858929e2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/DeletionPredicate.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/DeletionPredicate.java @@ -18,15 +18,17 @@ */ package org.apache.iotdb.db.storageengine.dataregion.modification; +import org.apache.iotdb.db.utils.IOUtils.StreamSerializable; + +import org.apache.tsfile.utils.ReadWriteForEncodingUtils; +import org.apache.tsfile.utils.ReadWriteIOUtils; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.apache.iotdb.db.utils.IOUtils.StreamSerializable; -import org.apache.tsfile.utils.ReadWriteForEncodingUtils; -import org.apache.tsfile.utils.ReadWriteIOUtils; public class DeletionPredicate implements StreamSerializable { @@ -57,11 +59,10 @@ public class DeletionPredicate implements StreamSerializable { measurementNames.add(ReadWriteIOUtils.readVarIntString(stream)); } } else { - measurementNames = Collections.emptyList(); + measurementNames = Collections.emptyList(); } } - public static class IDPredicate implements StreamSerializable { @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModEntry.java index e3c1fd6aa47..12262a7d66b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModEntry.java @@ -18,13 +18,15 @@ */ package org.apache.iotdb.db.storageengine.dataregion.modification; +import org.apache.iotdb.db.utils.IOUtils.StreamSerializable; + +import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.utils.ReadWriteIOUtils; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; -import org.apache.iotdb.db.utils.IOUtils.StreamSerializable; -import org.apache.tsfile.read.common.TimeRange; -import org.apache.tsfile.utils.ReadWriteIOUtils; public abstract class ModEntry implements StreamSerializable { protected ModType modType; @@ -43,8 +45,8 @@ public abstract class ModEntry implements StreamSerializable { @Override public void deserialize(InputStream stream) throws IOException { - this.timeRange = new TimeRange(ReadWriteIOUtils.readLong(stream), - ReadWriteIOUtils.readLong(stream)); + this.timeRange = + new TimeRange(ReadWriteIOUtils.readLong(stream), ReadWriteIOUtils.readLong(stream)); } public static ModEntry createFrom(InputStream stream) throws IOException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModFileManager.java index 79feb967128..db9f55b3ac5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModFileManager.java @@ -19,6 +19,13 @@ package org.apache.iotdb.db.storageengine.dataregion.modification; +import org.apache.iotdb.commons.utils.FileUtils; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -26,21 +33,15 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; -import org.apache.iotdb.commons.utils.FileUtils; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -/** - * A ModFileManager manages the ModificationFiles of a Time Partition. - */ +/** A ModFileManager manages the ModificationFiles of a Time Partition. */ @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter"}) public class ModFileManager { private static final Logger LOGGER = LoggerFactory.getLogger(ModFileManager.class); // levelNum -> modFileNum -> modFile - private final Map<Long, TreeMap<Long, ModificationFile>> allLevelModsFileMap = new ConcurrentHashMap<>(); + private final Map<Long, TreeMap<Long, ModificationFile>> allLevelModsFileMap = + new ConcurrentHashMap<>(); private final int levelModFileCntThreshold; private final long singleModFileSizeThreshold; @@ -55,15 +56,17 @@ public class ModFileManager { String name = file.getName(); long[] levelNumAndModNum = ModificationFile.parseFileName(name); - ModificationFile modificationFile = allLevelModsFileMap.computeIfAbsent(levelNumAndModNum[0], - k -> new TreeMap<>()).computeIfAbsent(levelNumAndModNum[1], k -> new ModificationFile(file, resource)); + ModificationFile modificationFile = + allLevelModsFileMap + .computeIfAbsent(levelNumAndModNum[0], k -> new TreeMap<>()) + .computeIfAbsent(levelNumAndModNum[1], k -> new ModificationFile(file, resource)); modificationFile.addReference(resource); return modificationFile; } private long maxModNum(long levelNum) { - TreeMap<Long, ModificationFile> levelModFileMap = allLevelModsFileMap.computeIfAbsent( - levelNum, k -> new TreeMap<>()); + TreeMap<Long, ModificationFile> levelModFileMap = + allLevelModsFileMap.computeIfAbsent(levelNum, k -> new TreeMap<>()); if (levelModFileMap.isEmpty()) { return -1; } else { @@ -126,12 +129,15 @@ public class ModFileManager { TsFileID tsFileID = resource.getTsFileID(); long levelNum = tsFileID.getInnerCompactionCount(); long nextModNum = maxModNum(levelNum) + 1; - File file = new File(resource.getTsFile().getParentFile(), ModificationFile.composeFileName(levelNum, nextModNum)); - TreeMap<Long, ModificationFile> levelModsFileMap = this.allLevelModsFileMap.computeIfAbsent( - levelNum, - k -> new TreeMap<>()); + File file = + new File( + resource.getTsFile().getParentFile(), + ModificationFile.composeFileName(levelNum, nextModNum)); + TreeMap<Long, ModificationFile> levelModsFileMap = + this.allLevelModsFileMap.computeIfAbsent(levelNum, k -> new TreeMap<>()); synchronized (levelModsFileMap) { - return levelModsFileMap.computeIfAbsent(nextModNum, k -> new ModificationFile(file, resource)); + return levelModsFileMap.computeIfAbsent( + nextModNum, k -> new ModificationFile(file, resource)); } } @@ -139,11 +145,12 @@ public class ModFileManager { for (TreeMap<Long, ModificationFile> levelModFileMap : allLevelModsFileMap.values()) { List<Long> modFilesToRemove = new ArrayList<>(); synchronized (levelModFileMap) { - levelModFileMap.forEach((modNum, modFile) -> { - if (!modFile.hasReference()) { - modFilesToRemove.add(modNum); - } - }); + levelModFileMap.forEach( + (modNum, modFile) -> { + if (!modFile.hasReference()) { + modFilesToRemove.add(modNum); + } + }); } synchronized (levelModFileMap) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java index 6ee348047fd..d8a5e8c11c8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java @@ -19,6 +19,11 @@ package org.apache.iotdb.db.storageengine.dataregion.modification; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.BufferedOutputStream; import java.io.EOFException; import java.io.File; @@ -34,9 +39,6 @@ import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class ModificationFile implements AutoCloseable { @@ -47,8 +49,8 @@ public class ModificationFile implements AutoCloseable { private FileChannel channel; private OutputStream fileOutputStream; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private final Set<TsFileResource> tsFileRefs = new ConcurrentSkipListSet<>(Comparator.comparing( - TsFileResource::getTsFilePath)); + private final Set<TsFileResource> tsFileRefs = + new ConcurrentSkipListSet<>(Comparator.comparing(TsFileResource::getTsFilePath)); public ModificationFile(File file, TsFileResource firstResource) { this.file = file; @@ -82,6 +84,7 @@ public class ModificationFile implements AutoCloseable { /** * Add a TsFile to the reference set only if the set is not empty. + * * @param tsFile TsFile to be added * @return true if the TsFile is successfully added, false if the reference set is empty. */ @@ -101,6 +104,7 @@ public class ModificationFile implements AutoCloseable { /** * Remove the references of the given TsFiles. + * * @param tsFiles references to remove * @return true if the ref set is empty after removal, false otherwise */ diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntry.java index 2f09b9252d7..44c0d9d7bab 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntry.java @@ -18,10 +18,11 @@ */ package org.apache.iotdb.db.storageengine.dataregion.modification; +import org.apache.tsfile.read.common.TimeRange; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import org.apache.tsfile.read.common.TimeRange; public class TableDeletionEntry extends ModEntry { private DeletionPredicate predicate; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TreeDeletionEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TreeDeletionEntry.java index a2607f49940..692e780309b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TreeDeletionEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TreeDeletionEntry.java @@ -18,15 +18,17 @@ */ package org.apache.iotdb.db.storageengine.dataregion.modification; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.storageengine.dataregion.modification.v1.Deletion; + import org.apache.tsfile.read.common.TimeRange; import org.apache.tsfile.utils.ReadWriteIOUtils; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + public class TreeDeletionEntry extends ModEntry { private PartialPath pathPattern; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java index 4ae4e4a748a..4ecb97addf2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile; -import java.nio.channels.FileChannel; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.ProgressIndexType; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; @@ -65,6 +64,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -111,11 +111,13 @@ public class TsFileResource { private ModificationFile modFile; private long modFileOffset; + @SuppressWarnings("squid:S3077") private volatile ModificationFileV1 oldModFile; @SuppressWarnings("squid:S3077") private volatile ModificationFileV1 compactionModFile; + private ModFileManager modFileManager; // the start pos of mod file path in this TsFileResource private long modFilePathOffset = -1; @@ -248,7 +250,8 @@ public class TsFileResource { fsFactory.moveFile(src, dest); } - private void serializeTo(BufferedOutputStream outputStream, FileOutputStream fileOutputStream) throws IOException { + private void serializeTo(BufferedOutputStream outputStream, FileOutputStream fileOutputStream) + throws IOException { ReadWriteIOUtils.write(VERSION_NUMBER, outputStream); timeIndex.serialize(outputStream); @@ -310,9 +313,7 @@ public class TsFileResource { modFilePathDeserialized = true; } - /** - * deserialize only the mod file related fields from the tail of the file. - */ + /** deserialize only the mod file related fields from the tail of the file. */ private void deserializeModFilePath() throws IOException { if (modFilePathDeserialized) { return; @@ -351,7 +352,8 @@ public class TsFileResource { try (FileChannel fileChannel = FileChannel.open(resFile.toPath())) { fileChannel.truncate(modFilePathOffset); } - FileOutputStream fileOutputStream = new FileOutputStream(file + RESOURCE_SUFFIX + TEMP_SUFFIX, true); + FileOutputStream fileOutputStream = + new FileOutputStream(file + RESOURCE_SUFFIX + TEMP_SUFFIX, true); BufferedOutputStream outputStream = new BufferedOutputStream(fileOutputStream); try { ReadWriteIOUtils.writeVar(modFile.getFile().getAbsolutePath(), outputStream); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java index 00416063558..7950ba8fc08 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java @@ -19,10 +19,18 @@ package org.apache.iotdb.db.tools; +import java.io.File; +import java.io.Serializable; +import java.util.HashMap; import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.common.constant.TsFileConstant; +import org.apache.tsfile.encoding.decoder.Decoder; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.IMetadataIndexEntry; import org.apache.tsfile.file.MetaMarker; import org.apache.tsfile.file.header.ChunkGroupHeader; +import org.apache.tsfile.file.header.ChunkHeader; import org.apache.tsfile.file.header.PageHeader; import org.apache.tsfile.file.metadata.ChunkGroupMetadata; import org.apache.tsfile.file.metadata.ChunkMetadata; @@ -34,11 +42,17 @@ import org.apache.tsfile.file.metadata.PlainDeviceID; import org.apache.tsfile.file.metadata.TimeseriesMetadata; import org.apache.tsfile.file.metadata.TsFileMetadata; import org.apache.tsfile.file.metadata.enums.MetadataIndexNodeType; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.file.metadata.statistics.Statistics; import org.apache.tsfile.fileSystem.FSFactoryProducer; import org.apache.tsfile.read.TsFileCheckStatus; import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.common.BatchData; import org.apache.tsfile.read.common.Chunk; import org.apache.tsfile.read.common.Path; +import org.apache.tsfile.read.reader.page.PageReader; +import org.apache.tsfile.read.reader.page.TimePageReader; +import org.apache.tsfile.read.reader.page.ValuePageReader; import org.apache.tsfile.utils.BloomFilter; import org.apache.tsfile.utils.Pair; @@ -53,9 +67,15 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.TreeMap; +import org.apache.tsfile.utils.TsPrimitiveType; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TsFileSketchTool { + private static final Logger LOGGER = LoggerFactory.getLogger(TsFileSketchTool.class); private String filename; private PrintWriter pw; private TsFileSketchToolReader reader; @@ -82,7 +102,308 @@ public class TsFileSketchTool { try { this.filename = filename; pw = new PrintWriter(new FileWriter(outFile)); - reader = new TsFileSketchToolReader(filename); + reader = new TsFileSketchToolReader(filename) { + @Override + public long selfCheck(Map<Path, IMeasurementSchema> newSchema, + List<ChunkGroupMetadata> chunkGroupMetadataList, boolean fastFinish) + throws IOException { + File checkFile = FSFactoryProducer.getFSFactory().getFile(this.file); + long fileSize; + if (!checkFile.exists()) { + return TsFileCheckStatus.FILE_NOT_FOUND; + } else { + fileSize = checkFile.length(); + } + ChunkMetadata currentChunk; + String measurementID; + TSDataType dataType; + long fileOffsetOfChunk; + + // ChunkMetadata of current ChunkGroup + List<ChunkMetadata> chunkMetadataList = new ArrayList<>(); + + int headerLength = TSFileConfig.MAGIC_STRING.getBytes().length + Byte.BYTES; + if (fileSize < headerLength) { + return TsFileCheckStatus.INCOMPATIBLE_FILE; + } + if (!TSFileConfig.MAGIC_STRING.equals(readHeadMagic()) + || (TSFileConfig.VERSION_NUMBER != readVersionNumber())) { + return TsFileCheckStatus.INCOMPATIBLE_FILE; + } + + tsFileInput.position(headerLength); + boolean isComplete = isComplete(); + if (fileSize == headerLength) { + return headerLength; + } else if (isComplete) { + loadMetadataSize(); + if (fastFinish) { + return TsFileCheckStatus.COMPLETE_FILE; + } + } + // if not a complete file, we will recover it... + long truncatedSize = headerLength; + byte marker; + List<long[]> timeBatch = new ArrayList<>(); + IDeviceID lastDeviceId = null; + List<IMeasurementSchema> measurementSchemaList = new ArrayList<>(); + Map<String, Integer> valueColumn2TimeBatchIndex = new HashMap<>(); + try { + while ((marker = this.readMarker()) != MetaMarker.SEPARATOR) { + switch (marker) { + case MetaMarker.CHUNK_HEADER: + case MetaMarker.TIME_CHUNK_HEADER: + case MetaMarker.VALUE_CHUNK_HEADER: + case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER: + case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER: + case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER: + fileOffsetOfChunk = this.position() - 1; + if (fileOffsetOfChunk == 350064120) { + tsFileInput.position(350249561); + break; + } + // if there is something wrong with a chunk, we will drop the whole ChunkGroup + // as different chunks may be created by the same insertions(sqls), and partial + // insertion is not tolerable + ChunkHeader chunkHeader = this.readChunkHeader(marker); + measurementID = chunkHeader.getMeasurementID(); + IMeasurementSchema measurementSchema = + new MeasurementSchema( + measurementID, + chunkHeader.getDataType(), + chunkHeader.getEncodingType(), + chunkHeader.getCompressionType()); + measurementSchemaList.add(measurementSchema); + dataType = chunkHeader.getDataType(); + + Statistics<? extends Serializable> chunkStatistics = + Statistics.getStatsByType(dataType); + int dataSize = chunkHeader.getDataSize(); + + if (dataSize > 0) { + if (marker == MetaMarker.TIME_CHUNK_HEADER) { + timeBatch.add(null); + } + if (((byte) (chunkHeader.getChunkType() & 0x3F)) + == MetaMarker + .CHUNK_HEADER) { // more than one page, we could use page statistics to + if (marker == MetaMarker.VALUE_CHUNK_HEADER) { + int timeBatchIndex = + valueColumn2TimeBatchIndex.getOrDefault(chunkHeader.getMeasurementID(), 0); + valueColumn2TimeBatchIndex.put( + chunkHeader.getMeasurementID(), timeBatchIndex + 1); + } + // generate chunk statistic + while (dataSize > 0) { + // a new Page + PageHeader pageHeader = this.readPageHeader(chunkHeader.getDataType(), true); + if (pageHeader.getUncompressedSize() != 0) { + // not empty page + chunkStatistics.mergeStatistics(pageHeader.getStatistics()); + } + this.skipPageData(pageHeader); + dataSize -= pageHeader.getSerializedPageSize(); + chunkHeader.increasePageNums(1); + } + } else { // only one page without statistic, we need to iterate each point to generate + // chunk statistic + PageHeader pageHeader = this.readPageHeader(chunkHeader.getDataType(), false); + Decoder valueDecoder = + Decoder.getDecoderByType( + chunkHeader.getEncodingType(), chunkHeader.getDataType()); + ByteBuffer pageData = readPage(pageHeader, chunkHeader.getCompressionType()); + Decoder timeDecoder = + Decoder.getDecoderByType( + TSEncoding.valueOf( + TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), + TSDataType.INT64); + + if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK) + == TsFileConstant.TIME_COLUMN_MASK) { // Time Chunk with only one page + + TimePageReader timePageReader = + new TimePageReader(pageHeader, pageData, timeDecoder); + long[] currentTimeBatch = timePageReader.getNextTimeBatch(); + timeBatch.add(currentTimeBatch); + for (long currentTime : currentTimeBatch) { + chunkStatistics.update(currentTime); + } + } else if ((chunkHeader.getChunkType() & TsFileConstant.VALUE_COLUMN_MASK) + == TsFileConstant.VALUE_COLUMN_MASK) { // Value Chunk with only one page + + ValuePageReader valuePageReader = + new ValuePageReader( + pageHeader, pageData, chunkHeader.getDataType(), valueDecoder); + int timeBatchIndex = + valueColumn2TimeBatchIndex.getOrDefault(chunkHeader.getMeasurementID(), 0); + valueColumn2TimeBatchIndex.put( + chunkHeader.getMeasurementID(), timeBatchIndex + 1); + TsPrimitiveType[] valueBatch = + valuePageReader.nextValueBatch(timeBatch.get(timeBatchIndex)); + + if (valueBatch != null && valueBatch.length != 0) { + for (int i = 0; i < valueBatch.length; i++) { + TsPrimitiveType value = valueBatch[i]; + if (value == null) { + continue; + } + long timeStamp = timeBatch.get(timeBatchIndex)[i]; + switch (dataType) { + case INT32: + case DATE: + chunkStatistics.update(timeStamp, value.getInt()); + break; + case INT64: + case TIMESTAMP: + chunkStatistics.update(timeStamp, value.getLong()); + break; + case FLOAT: + chunkStatistics.update(timeStamp, value.getFloat()); + break; + case DOUBLE: + chunkStatistics.update(timeStamp, value.getDouble()); + break; + case BOOLEAN: + chunkStatistics.update(timeStamp, value.getBoolean()); + break; + case TEXT: + case BLOB: + case STRING: + chunkStatistics.update(timeStamp, value.getBinary()); + break; + default: + throw new IOException("Unexpected type " + dataType); + } + } + } + + } else { // NonAligned Chunk with only one page + PageReader reader = + new PageReader( + pageHeader, + pageData, + chunkHeader.getDataType(), + valueDecoder, + timeDecoder); + BatchData batchData = reader.getAllSatisfiedPageData(); + while (batchData.hasCurrent()) { + switch (dataType) { + case INT32: + case DATE: + chunkStatistics.update(batchData.currentTime(), batchData.getInt()); + break; + case INT64: + case TIMESTAMP: + chunkStatistics.update(batchData.currentTime(), batchData.getLong()); + break; + case FLOAT: + chunkStatistics.update(batchData.currentTime(), batchData.getFloat()); + break; + case DOUBLE: + chunkStatistics.update(batchData.currentTime(), batchData.getDouble()); + break; + case BOOLEAN: + chunkStatistics.update(batchData.currentTime(), batchData.getBoolean()); + break; + case TEXT: + case BLOB: + case STRING: + chunkStatistics.update(batchData.currentTime(), batchData.getBinary()); + break; + default: + throw new IOException("Unexpected type " + dataType); + } + batchData.next(); + } + } + chunkHeader.increasePageNums(1); + } + } else if (marker == MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER + || marker == MetaMarker.VALUE_CHUNK_HEADER) { + int timeBatchIndex = + valueColumn2TimeBatchIndex.getOrDefault(chunkHeader.getMeasurementID(), 0); + valueColumn2TimeBatchIndex.put(chunkHeader.getMeasurementID(), timeBatchIndex + 1); + } + currentChunk = + new ChunkMetadata(measurementID, dataType, fileOffsetOfChunk, chunkStatistics); + chunkMetadataList.add(currentChunk); + break; + case MetaMarker.CHUNK_GROUP_HEADER: + // if there is something wrong with the ChunkGroup Header, we will drop this ChunkGroup + // because we can not guarantee the correctness of the deviceId. + truncatedSize = this.position() - 1; + if (lastDeviceId != null) { + // schema of last chunk group + if (newSchema != null) { + for (IMeasurementSchema tsSchema : measurementSchemaList) { + newSchema.putIfAbsent( + new Path(lastDeviceId, tsSchema.getMeasurementId(), true), tsSchema); + } + } + measurementSchemaList = new ArrayList<>(); + // last chunk group Metadata + chunkGroupMetadataList.add(new ChunkGroupMetadata(lastDeviceId, chunkMetadataList)); + } + // this is a chunk group + chunkMetadataList = new ArrayList<>(); + ChunkGroupHeader chunkGroupHeader = this.readChunkGroupHeader(); + lastDeviceId = chunkGroupHeader.getDeviceID(); + timeBatch.clear(); + valueColumn2TimeBatchIndex.clear(); + break; + case MetaMarker.OPERATION_INDEX_RANGE: + truncatedSize = this.position() - 1; + if (lastDeviceId != null) { + // schema of last chunk group + if (newSchema != null) { + for (IMeasurementSchema tsSchema : measurementSchemaList) { + newSchema.putIfAbsent( + new Path(lastDeviceId, tsSchema.getMeasurementId(), true), tsSchema); + } + } + measurementSchemaList = new ArrayList<>(); + // last chunk group Metadata + chunkGroupMetadataList.add(new ChunkGroupMetadata(lastDeviceId, chunkMetadataList)); + lastDeviceId = null; + } + readPlanIndex(); + truncatedSize = this.position(); + break; + default: + // the disk file is corrupted, using this file may be dangerous + throw new IOException("Unexpected marker " + marker); + } + } + // now we read the tail of the data section, so we are sure that the last + // ChunkGroupFooter is complete. + if (lastDeviceId != null) { + // schema of last chunk group + if (newSchema != null) { + for (IMeasurementSchema tsSchema : measurementSchemaList) { + newSchema.putIfAbsent( + new Path(lastDeviceId, tsSchema.getMeasurementId(), true), tsSchema); + } + } + // last chunk group Metadata + chunkGroupMetadataList.add(new ChunkGroupMetadata(lastDeviceId, chunkMetadataList)); + } + if (isComplete) { + truncatedSize = TsFileCheckStatus.COMPLETE_FILE; + } else { + truncatedSize = this.position() - 1; + } + } catch (Exception e) { + LOGGER.warn( + "TsFile {} self-check cannot proceed at position {} " + "recovered, because : {}", + file, + this.position(), + e.getMessage()); + } + // Despite the completeness of the data section, we will discard current FileMetadata + // so that we can continue to write data into this tsfile. + return truncatedSize; + } + }; StringBuilder str1 = new StringBuilder(); for (int i = 0; i < 21; i++) { str1.append("|"); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileSplitByPartitionTool.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileSplitByPartitionTool.java index 62c4ba3cae3..a3b1396660d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileSplitByPartitionTool.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileSplitByPartitionTool.java @@ -106,7 +106,8 @@ public class TsFileSplitByPartitionTool implements AutoCloseable { reader = new TsFileSequenceReader(file); partitionWriterMap = new HashMap<>(); if (FSFactoryProducer.getFSFactory().getFile(file + ModificationFileV1.FILE_SUFFIX).exists()) { - oldModification = (List<Modification>) resourceToBeRewritten.getOldModFile().getModifications(); + oldModification = + (List<Modification>) resourceToBeRewritten.getOldModFile().getModifications(); modsIterator = oldModification.iterator(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/IOUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/IOUtils.java index 73aa2562a43..df51f21a63a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/IOUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/IOUtils.java @@ -8,11 +8,13 @@ import java.nio.ByteBuffer; public class IOUtils { public interface BufferSerializable { void serialize(ByteBuffer buffer); + void deserialize(ByteBuffer buffer); } public interface StreamSerializable { void serialize(OutputStream stream) throws IOException; + void deserialize(InputStream stream) throws IOException; } }
