This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch support-compaction-validte
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c19864869eba63447238c14a2f9927475adc3f01
Author: Liu Xuxin <[email protected]>
AuthorDate: Sun Jun 4 19:59:49 2023 +0800

    add tsfile validate after compaction
---
 .../execute/task/CrossSpaceCompactionTask.java     |   5 +-
 .../execute/task/InnerSpaceCompactionTask.java     |   5 +-
 .../compaction/execute/utils/CompactionUtils.java  | 116 +++++++++++
 .../compaction/CompactionValidationTest.java       | 224 +++++++++++++++++++++
 4 files changed, 346 insertions(+), 4 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java
index 705942b12f5..86e72620cf1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java
@@ -176,8 +176,9 @@ public class CrossSpaceCompactionTask extends 
AbstractCompactionTask {
         }
 
         if 
(IoTDBDescriptor.getInstance().getConfig().isEnableCompactionValidation()
-            && !CompactionUtils.validateTsFileResources(
-                tsFileManager, storageGroupName, timePartition)) {
+            && (!CompactionUtils.validateTsFileResources(
+                    tsFileManager, storageGroupName, timePartition)
+                || 
!CompactionUtils.validateTsFiles(targetTsfileResourceList))) {
           LOGGER.error(
               "Failed to pass compaction validation, source sequence files is: 
{}, unsequence files is {}, target files is {}",
               selectedSequenceFiles,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
index f04c389b280..870f501f3b3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -187,8 +187,9 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
         }
 
         if 
(IoTDBDescriptor.getInstance().getConfig().isEnableCompactionValidation()
-            && !CompactionUtils.validateTsFileResources(
-                tsFileManager, storageGroupName, timePartition)) {
+            && (!CompactionUtils.validateTsFileResources(
+                    tsFileManager, storageGroupName, timePartition)
+                || !CompactionUtils.validateTsFiles(targetTsFileList))) {
           LOGGER.error(
               "Failed to pass compaction validation, source files is: {}, 
target files is {}",
               selectedTsFileResourceList,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/CompactionUtils.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/CompactionUtils.java
index 5e873bea69f..a9ea6492c69 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/CompactionUtils.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/CompactionUtils.java
@@ -25,11 +25,26 @@ import 
org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.storagegroup.timeindex.DeviceTimeIndex;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
+import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
 import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import org.slf4j.Logger;
@@ -37,6 +52,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -297,4 +314,103 @@ public class CompactionUtils {
     }
     return true;
   }
+
+  /**
+   * Validate TsFiles by reading them sequentially. This method should be fast 
because the read is
+   * sequential.
+   *
+   * @param tsFileResourceList the tsfiles to be checked
+   * @return true if all tsfiles are valid, false if any of the tsfiles is 
invalid
+   * @throws IOException if an I/O error occurs
+   */
+  public static boolean validateTsFiles(List<TsFileResource> 
tsFileResourceList)
+      throws IOException {
+    for (TsFileResource tsFileResource : tsFileResourceList) {
+      if (!validateSingleTsFiles(tsFileResource)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public static boolean validateSingleTsFiles(TsFileResource resource) throws 
IOException {
+    try (TsFileSequenceReader reader = new 
TsFileSequenceReader(resource.getTsFilePath())) {
+      reader.readHeadMagic();
+      reader.readTailMagic();
+
+      reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1);
+      List<long[]> timeBatch = new ArrayList<>();
+      int pageIndex = 0;
+      byte marker;
+      while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
+        switch (marker) {
+          case MetaMarker.CHUNK_HEADER:
+          case MetaMarker.TIME_CHUNK_HEADER:
+          case MetaMarker.VALUE_CHUNK_HEADER:
+          case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
+          case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
+          case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
+            ChunkHeader header = reader.readChunkHeader(marker);
+            if (header.getDataSize() == 0) {
+              // empty value chunk
+              break;
+            }
+            Decoder defaultTimeDecoder =
+                Decoder.getDecoderByType(
+                    
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+                    TSDataType.INT64);
+            Decoder valueDecoder =
+                Decoder.getDecoderByType(header.getEncodingType(), 
header.getDataType());
+            int dataSize = header.getDataSize();
+            pageIndex = 0;
+            if (header.getDataType() == TSDataType.VECTOR) {
+              timeBatch.clear();
+            }
+            while (dataSize > 0) {
+              valueDecoder.reset();
+              PageHeader pageHeader =
+                  reader.readPageHeader(
+                      header.getDataType(),
+                      (header.getChunkType() & 0x3F) == 
MetaMarker.CHUNK_HEADER);
+              ByteBuffer pageData = reader.readPage(pageHeader, 
header.getCompressionType());
+              if ((header.getChunkType() & (byte) 
TsFileConstant.TIME_COLUMN_MASK)
+                  == (byte) TsFileConstant.TIME_COLUMN_MASK) { // Time Chunk
+                TimePageReader timePageReader =
+                    new TimePageReader(pageHeader, pageData, 
defaultTimeDecoder);
+                timeBatch.add(timePageReader.getNextTimeBatch());
+              } else if ((header.getChunkType() & (byte) 
TsFileConstant.VALUE_COLUMN_MASK)
+                  == (byte) TsFileConstant.VALUE_COLUMN_MASK) { // Value Chunk
+                ValuePageReader valuePageReader =
+                    new ValuePageReader(pageHeader, pageData, 
header.getDataType(), valueDecoder);
+                TsPrimitiveType[] valueBatch =
+                    valuePageReader.nextValueBatch(timeBatch.get(pageIndex));
+              } else { // NonAligned Chunk
+                PageReader pageReader =
+                    new PageReader(
+                        pageData, header.getDataType(), valueDecoder, 
defaultTimeDecoder, null);
+                BatchData batchData = pageReader.getAllSatisfiedPageData();
+              }
+              pageIndex++;
+              dataSize -= pageHeader.getSerializedPageSize();
+            }
+            break;
+          case MetaMarker.CHUNK_GROUP_HEADER:
+            ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
+            break;
+          case MetaMarker.OPERATION_INDEX_RANGE:
+            reader.readPlanIndex();
+            break;
+          default:
+            MetaMarker.handleUnexpectedMarker(marker);
+        }
+      }
+      for (String device : reader.getAllDevices()) {
+        Map<String, List<ChunkMetadata>> seriesMetaData = 
reader.readChunkMetadataInDevice(device);
+      }
+    } catch (Exception e) {
+      logger.error("Meets error when validating TsFile {}, ", 
resource.getTsFilePath(), e);
+      return false;
+    }
+    return true;
+  }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionValidationTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionValidationTest.java
new file mode 100644
index 00000000000..f6e940a21fb
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionValidationTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.compaction;
+
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.compaction.execute.utils.CompactionUtils;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class CompactionValidationTest {
+  static final String SENSOR_1 = "sensor_1";
+  static final String SENSOR_2 = "sensor_2";
+  static final String SENSOR_3 = "sensor_3";
+
+  static final String DEVICE_1 = "root.sg.device_1";
+  final String dir = TestConstant.OUTPUT_DATA_DIR + "test-validation";
+
+  @Before
+  public void setUp() throws IOException {
+    FileUtils.forceMkdir(new File(dir));
+  }
+
+  private void writeOneFile(String path) {
+    try {
+      File f = FSFactoryProducer.getFSFactory().getFile(path);
+      if (f.exists() && !f.delete()) {
+        throw new RuntimeException("can not delete " + f.getAbsolutePath());
+      }
+
+      try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
+        List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+        measurementSchemas.add(new MeasurementSchema(SENSOR_1, 
TSDataType.TEXT, TSEncoding.PLAIN));
+        measurementSchemas.add(new MeasurementSchema(SENSOR_2, 
TSDataType.TEXT, TSEncoding.PLAIN));
+        measurementSchemas.add(new MeasurementSchema(SENSOR_3, 
TSDataType.TEXT, TSEncoding.PLAIN));
+
+        // register nonAligned timeseries
+        tsFileWriter.registerTimeseries(new Path(DEVICE_1), 
measurementSchemas);
+
+        List<MeasurementSchema> writeMeasurementScheams = new ArrayList<>();
+        // example 1
+        writeMeasurementScheams.add(measurementSchemas.get(0));
+        writeMeasurementScheams.add(measurementSchemas.get(1));
+        writeMeasurementScheams.add(measurementSchemas.get(2));
+        writeWithTablet(tsFileWriter, DEVICE_1, writeMeasurementScheams, 
10000, 0, 0);
+      }
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  private void writeWithTablet(
+      TsFileWriter tsFileWriter,
+      String deviceId,
+      List<MeasurementSchema> schemas,
+      long rowNum,
+      long startTime,
+      long startValue)
+      throws IOException, WriteProcessException {
+    Tablet tablet = new Tablet(deviceId, schemas);
+    long[] timestamps = tablet.timestamps;
+    Object[] values = tablet.values;
+    long sensorNum = schemas.size();
+
+    for (long r = 0; r < rowNum; r++, startValue++) {
+      int row = tablet.rowSize++;
+      timestamps[row] = startTime++;
+      for (int i = 0; i < sensorNum; i++) {
+        Binary[] textSensor = (Binary[]) values[i];
+        textSensor[row] = new Binary("testString.........");
+      }
+      // write
+      if (tablet.rowSize == tablet.getMaxRowNumber()) {
+        tsFileWriter.write(tablet);
+        tablet.reset();
+      }
+    }
+    // write
+    if (tablet.rowSize != 0) {
+      tsFileWriter.write(tablet);
+      tablet.reset();
+    }
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    File[] files = new File(dir).listFiles();
+    if (files != null) {
+      for (File f : files) {
+        FileUtils.delete(f);
+      }
+    }
+
+    FileUtils.forceDelete(new File(dir));
+  }
+
+  @Test
+  public void testSingleCompleteFile() throws IOException {
+    String path = dir + File.separator + "test.tsfile";
+    writeOneFile(path);
+    TsFileResource mockTsFile = Mockito.mock(TsFileResource.class);
+    Mockito.when(mockTsFile.getTsFilePath()).thenReturn(path);
+    
Assert.assertTrue(CompactionUtils.validateTsFiles(Collections.singletonList(mockTsFile)));
+  }
+
+  @Test
+  public void testMultiCompleteFile() throws IOException {
+    List<TsFileResource> resources = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      String path = dir + File.separator + "test" + i + ".tsfile";
+      writeOneFile(path);
+      TsFileResource mockTsFile = Mockito.mock(TsFileResource.class);
+      Mockito.when(mockTsFile.getTsFilePath()).thenReturn(path);
+      resources.add(mockTsFile);
+    }
+    Assert.assertTrue(CompactionUtils.validateTsFiles(resources));
+  }
+
+  @Test
+  public void testOneUncompletedFile() throws IOException {
+    String path = dir + File.separator + "test.tsfile";
+    writeOneFile(path);
+    RandomAccessFile randomAccessFile = new RandomAccessFile(path, "rw");
+    randomAccessFile.seek(1024);
+    randomAccessFile.write(new byte[] {1, 2, 3, 4, 5, 6, 7, 8});
+    randomAccessFile.close();
+    TsFileResource mockTsFile = Mockito.mock(TsFileResource.class);
+    Mockito.when(mockTsFile.getTsFilePath()).thenReturn(path);
+    
Assert.assertFalse(CompactionUtils.validateTsFiles(Collections.singletonList(mockTsFile)));
+  }
+
+  @Test
+  public void testMultiUncompletedFiles() throws IOException {
+    List<TsFileResource> resources = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      String path = dir + File.separator + "test" + i + ".tsfile";
+      writeOneFile(path);
+      RandomAccessFile randomAccessFile = new RandomAccessFile(path, "rw");
+      randomAccessFile.seek(1024);
+      randomAccessFile.write(new byte[] {1, 2, 3, 4, 5, 6, 7, 8});
+      randomAccessFile.close();
+      TsFileResource mockTsFile = Mockito.mock(TsFileResource.class);
+      Mockito.when(mockTsFile.getTsFilePath()).thenReturn(path);
+      resources.add(mockTsFile);
+    }
+    Assert.assertFalse(CompactionUtils.validateTsFiles(resources));
+  }
+
+  @Test // broken in chunk
+  public void testOneUncompletedInMultiCompletedFiles1() throws IOException {
+    List<TsFileResource> resources = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      String path = dir + File.separator + "test" + i + ".tsfile";
+      writeOneFile(path);
+      if (i == 5) {
+        RandomAccessFile randomAccessFile = new RandomAccessFile(path, "rw");
+        randomAccessFile.seek(1024);
+        randomAccessFile.write(new byte[] {1, 2, 3, 4, 5, 6, 7, 8});
+        randomAccessFile.close();
+      }
+      TsFileResource mockTsFile = Mockito.mock(TsFileResource.class);
+      Mockito.when(mockTsFile.getTsFilePath()).thenReturn(path);
+      resources.add(mockTsFile);
+    }
+    Assert.assertFalse(CompactionUtils.validateTsFiles(resources));
+  }
+
+  @Test // broken in metadata
+  public void testOneUncompletedInMultiCompletedFiles2() throws IOException {
+    List<TsFileResource> resources = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      String path = dir + File.separator + "test" + i + ".tsfile";
+      writeOneFile(path);
+      if (i == 5) {
+        RandomAccessFile randomAccessFile = new RandomAccessFile(path, "rw");
+        randomAccessFile.seek(randomAccessFile.length() - 100);
+        randomAccessFile.write(new byte[] {1, 2, 3, 4, 5, 6, 7, 8});
+        randomAccessFile.close();
+      }
+      TsFileResource mockTsFile = Mockito.mock(TsFileResource.class);
+      Mockito.when(mockTsFile.getTsFilePath()).thenReturn(path);
+      resources.add(mockTsFile);
+    }
+    Assert.assertFalse(CompactionUtils.validateTsFiles(resources));
+  }
+}

Reply via email to