This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new 69607c44b26 [IOTDB-5969] [To rel/1.1] Add tsfile validate after
compaction (#10049)
69607c44b26 is described below
commit 69607c44b26034d416a84fee20eebcaf05bde4b3
Author: Liu Xuxin <[email protected]>
AuthorDate: Mon Jun 5 13:06:55 2023 +0800
[IOTDB-5969] [To rel/1.1] Add tsfile validate after compaction (#10049)
---
.../resources/conf/iotdb-common.properties | 2 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../execute/task/CrossSpaceCompactionTask.java | 5 +-
.../execute/task/InnerSpaceCompactionTask.java | 5 +-
.../compaction/execute/utils/CompactionUtils.java | 118 +++++++++++
.../compaction/CompactionValidationTest.java | 224 +++++++++++++++++++++
6 files changed, 350 insertions(+), 6 deletions(-)
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 76eba43576c..3b61243b129 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -661,7 +661,7 @@ cluster_name=defaultCluster
# Enable the check of sequence tsfile time range after compaction
# Datatype: boolean
-# enable_compaction_validation=true
+# enable_compaction_validation=false
####################
### Write Ahead Log Configuration
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 9943d007363..617289e4dca 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -499,7 +499,7 @@ public class IoTDBConfig {
*/
private int subCompactionTaskNum = 4;
- private boolean enableCompactionValidation = true;
+ private boolean enableCompactionValidation = false;
/** The size of candidate compaction task queue. */
private int candidateCompactionTaskQueueSize = 50;
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java
index 705942b12f5..86e72620cf1 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java
@@ -176,8 +176,9 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
}
if
(IoTDBDescriptor.getInstance().getConfig().isEnableCompactionValidation()
- && !CompactionUtils.validateTsFileResources(
- tsFileManager, storageGroupName, timePartition)) {
+ && (!CompactionUtils.validateTsFileResources(
+ tsFileManager, storageGroupName, timePartition)
+ ||
!CompactionUtils.validateTsFiles(targetTsfileResourceList))) {
LOGGER.error(
"Failed to pass compaction validation, source sequence files is:
{}, unsequence files is {}, target files is {}",
selectedSequenceFiles,
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
index f04c389b280..870f501f3b3 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -187,8 +187,9 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
}
if
(IoTDBDescriptor.getInstance().getConfig().isEnableCompactionValidation()
- && !CompactionUtils.validateTsFileResources(
- tsFileManager, storageGroupName, timePartition)) {
+ && (!CompactionUtils.validateTsFileResources(
+ tsFileManager, storageGroupName, timePartition)
+ || !CompactionUtils.validateTsFiles(targetTsFileList))) {
LOGGER.error(
"Failed to pass compaction validation, source files is: {},
target files is {}",
selectedTsFileResourceList,
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/CompactionUtils.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/CompactionUtils.java
index 5e873bea69f..c79d032acf0 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/CompactionUtils.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/CompactionUtils.java
@@ -25,11 +25,27 @@ import
org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.timeindex.DeviceTimeIndex;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
+import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
@@ -37,6 +53,8 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -297,4 +315,104 @@ public class CompactionUtils {
}
return true;
}
+
+ /**
+ * Validate TsFiles by reading them sequentially. This method should be fast
because the read is
+ * sequential.
+ *
+ * @param tsFileResourceList the tsfiles to be checked
+ * @return true if all tsfiles are valid, false if any of the tsfiles is
invalid
+ * @throws IOException if an I/O error occurs
+ */
+ public static boolean validateTsFiles(List<TsFileResource>
tsFileResourceList)
+ throws IOException {
+ for (TsFileResource tsFileResource : tsFileResourceList) {
+ if (!validateSingleTsFiles(tsFileResource)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public static boolean validateSingleTsFiles(TsFileResource resource) {
+ try (TsFileSequenceReader reader = new
TsFileSequenceReader(resource.getTsFilePath())) {
+ reader.readHeadMagic();
+ reader.readTailMagic();
+
+ reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1);
+ List<long[]> timeBatch = new ArrayList<>();
+ int pageIndex = 0;
+ byte marker;
+ while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
+ switch (marker) {
+ case MetaMarker.CHUNK_HEADER:
+ case MetaMarker.TIME_CHUNK_HEADER:
+ case MetaMarker.VALUE_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
+ ChunkHeader header = reader.readChunkHeader(marker);
+ if (header.getDataSize() == 0) {
+ // empty value chunk
+ break;
+ }
+
CompactionMetricsManager.getInstance().recordReadInfo(header.getDataSize());
+ Decoder defaultTimeDecoder =
+ Decoder.getDecoderByType(
+
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+ TSDataType.INT64);
+ Decoder valueDecoder =
+ Decoder.getDecoderByType(header.getEncodingType(),
header.getDataType());
+ int dataSize = header.getDataSize();
+ pageIndex = 0;
+ if (header.getDataType() == TSDataType.VECTOR) {
+ timeBatch.clear();
+ }
+ while (dataSize > 0) {
+ valueDecoder.reset();
+ PageHeader pageHeader =
+ reader.readPageHeader(
+ header.getDataType(),
+ (header.getChunkType() & 0x3F) ==
MetaMarker.CHUNK_HEADER);
+ ByteBuffer pageData = reader.readPage(pageHeader,
header.getCompressionType());
+ if ((header.getChunkType() & (byte)
TsFileConstant.TIME_COLUMN_MASK)
+ == (byte) TsFileConstant.TIME_COLUMN_MASK) { // Time Chunk
+ TimePageReader timePageReader =
+ new TimePageReader(pageHeader, pageData,
defaultTimeDecoder);
+ timeBatch.add(timePageReader.getNextTimeBatch());
+ } else if ((header.getChunkType() & (byte)
TsFileConstant.VALUE_COLUMN_MASK)
+ == (byte) TsFileConstant.VALUE_COLUMN_MASK) { // Value Chunk
+ ValuePageReader valuePageReader =
+ new ValuePageReader(pageHeader, pageData,
header.getDataType(), valueDecoder);
+ TsPrimitiveType[] valueBatch =
+ valuePageReader.nextValueBatch(timeBatch.get(pageIndex));
+ } else { // NonAligned Chunk
+ PageReader pageReader =
+ new PageReader(
+ pageData, header.getDataType(), valueDecoder,
defaultTimeDecoder, null);
+ BatchData batchData = pageReader.getAllSatisfiedPageData();
+ }
+ pageIndex++;
+ dataSize -= pageHeader.getSerializedPageSize();
+ }
+ break;
+ case MetaMarker.CHUNK_GROUP_HEADER:
+ ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
+ break;
+ case MetaMarker.OPERATION_INDEX_RANGE:
+ reader.readPlanIndex();
+ break;
+ default:
+ MetaMarker.handleUnexpectedMarker(marker);
+ }
+ }
+ for (String device : reader.getAllDevices()) {
+ Map<String, List<ChunkMetadata>> seriesMetaData =
reader.readChunkMetadataInDevice(device);
+ }
+ } catch (Exception e) {
+ logger.error("Meets error when validating TsFile {}, ",
resource.getTsFilePath(), e);
+ return false;
+ }
+ return true;
+ }
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionValidationTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionValidationTest.java
new file mode 100644
index 00000000000..f6e940a21fb
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionValidationTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.compaction;
+
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.compaction.execute.utils.CompactionUtils;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class CompactionValidationTest {
+ static final String SENSOR_1 = "sensor_1";
+ static final String SENSOR_2 = "sensor_2";
+ static final String SENSOR_3 = "sensor_3";
+
+ static final String DEVICE_1 = "root.sg.device_1";
+ final String dir = TestConstant.OUTPUT_DATA_DIR + "test-validation";
+
+ @Before
+ public void setUp() throws IOException {
+ FileUtils.forceMkdir(new File(dir));
+ }
+
+ private void writeOneFile(String path) {
+ try {
+ File f = FSFactoryProducer.getFSFactory().getFile(path);
+ if (f.exists() && !f.delete()) {
+ throw new RuntimeException("can not delete " + f.getAbsolutePath());
+ }
+
+ try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
+ List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+ measurementSchemas.add(new MeasurementSchema(SENSOR_1,
TSDataType.TEXT, TSEncoding.PLAIN));
+ measurementSchemas.add(new MeasurementSchema(SENSOR_2,
TSDataType.TEXT, TSEncoding.PLAIN));
+ measurementSchemas.add(new MeasurementSchema(SENSOR_3,
TSDataType.TEXT, TSEncoding.PLAIN));
+
+ // register nonAligned timeseries
+ tsFileWriter.registerTimeseries(new Path(DEVICE_1),
measurementSchemas);
+
+ List<MeasurementSchema> writeMeasurementScheams = new ArrayList<>();
+ // example 1
+ writeMeasurementScheams.add(measurementSchemas.get(0));
+ writeMeasurementScheams.add(measurementSchemas.get(1));
+ writeMeasurementScheams.add(measurementSchemas.get(2));
+ writeWithTablet(tsFileWriter, DEVICE_1, writeMeasurementScheams,
10000, 0, 0);
+ }
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ private void writeWithTablet(
+ TsFileWriter tsFileWriter,
+ String deviceId,
+ List<MeasurementSchema> schemas,
+ long rowNum,
+ long startTime,
+ long startValue)
+ throws IOException, WriteProcessException {
+ Tablet tablet = new Tablet(deviceId, schemas);
+ long[] timestamps = tablet.timestamps;
+ Object[] values = tablet.values;
+ long sensorNum = schemas.size();
+
+ for (long r = 0; r < rowNum; r++, startValue++) {
+ int row = tablet.rowSize++;
+ timestamps[row] = startTime++;
+ for (int i = 0; i < sensorNum; i++) {
+ Binary[] textSensor = (Binary[]) values[i];
+ textSensor[row] = new Binary("testString.........");
+ }
+ // write
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ tsFileWriter.write(tablet);
+ tablet.reset();
+ }
+ }
+ // write
+ if (tablet.rowSize != 0) {
+ tsFileWriter.write(tablet);
+ tablet.reset();
+ }
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ File[] files = new File(dir).listFiles();
+ if (files != null) {
+ for (File f : files) {
+ FileUtils.delete(f);
+ }
+ }
+
+ FileUtils.forceDelete(new File(dir));
+ }
+
+ @Test
+ public void testSingleCompleteFile() throws IOException {
+ String path = dir + File.separator + "test.tsfile";
+ writeOneFile(path);
+ TsFileResource mockTsFile = Mockito.mock(TsFileResource.class);
+ Mockito.when(mockTsFile.getTsFilePath()).thenReturn(path);
+
Assert.assertTrue(CompactionUtils.validateTsFiles(Collections.singletonList(mockTsFile)));
+ }
+
+ @Test
+ public void testMultiCompleteFile() throws IOException {
+ List<TsFileResource> resources = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ String path = dir + File.separator + "test" + i + ".tsfile";
+ writeOneFile(path);
+ TsFileResource mockTsFile = Mockito.mock(TsFileResource.class);
+ Mockito.when(mockTsFile.getTsFilePath()).thenReturn(path);
+ resources.add(mockTsFile);
+ }
+ Assert.assertTrue(CompactionUtils.validateTsFiles(resources));
+ }
+
+ @Test
+ public void testOneUncompletedFile() throws IOException {
+ String path = dir + File.separator + "test.tsfile";
+ writeOneFile(path);
+ RandomAccessFile randomAccessFile = new RandomAccessFile(path, "rw");
+ randomAccessFile.seek(1024);
+ randomAccessFile.write(new byte[] {1, 2, 3, 4, 5, 6, 7, 8});
+ randomAccessFile.close();
+ TsFileResource mockTsFile = Mockito.mock(TsFileResource.class);
+ Mockito.when(mockTsFile.getTsFilePath()).thenReturn(path);
+
Assert.assertFalse(CompactionUtils.validateTsFiles(Collections.singletonList(mockTsFile)));
+ }
+
+ @Test
+ public void testMultiUncompletedFiles() throws IOException {
+ List<TsFileResource> resources = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ String path = dir + File.separator + "test" + i + ".tsfile";
+ writeOneFile(path);
+ RandomAccessFile randomAccessFile = new RandomAccessFile(path, "rw");
+ randomAccessFile.seek(1024);
+ randomAccessFile.write(new byte[] {1, 2, 3, 4, 5, 6, 7, 8});
+ randomAccessFile.close();
+ TsFileResource mockTsFile = Mockito.mock(TsFileResource.class);
+ Mockito.when(mockTsFile.getTsFilePath()).thenReturn(path);
+ resources.add(mockTsFile);
+ }
+ Assert.assertFalse(CompactionUtils.validateTsFiles(resources));
+ }
+
+ @Test // broken in chunk
+ public void testOneUncompletedInMultiCompletedFiles1() throws IOException {
+ List<TsFileResource> resources = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ String path = dir + File.separator + "test" + i + ".tsfile";
+ writeOneFile(path);
+ if (i == 5) {
+ RandomAccessFile randomAccessFile = new RandomAccessFile(path, "rw");
+ randomAccessFile.seek(1024);
+ randomAccessFile.write(new byte[] {1, 2, 3, 4, 5, 6, 7, 8});
+ randomAccessFile.close();
+ }
+ TsFileResource mockTsFile = Mockito.mock(TsFileResource.class);
+ Mockito.when(mockTsFile.getTsFilePath()).thenReturn(path);
+ resources.add(mockTsFile);
+ }
+ Assert.assertFalse(CompactionUtils.validateTsFiles(resources));
+ }
+
+ @Test // broken in metadata
+ public void testOneUncompletedInMultiCompletedFiles2() throws IOException {
+ List<TsFileResource> resources = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ String path = dir + File.separator + "test" + i + ".tsfile";
+ writeOneFile(path);
+ if (i == 5) {
+ RandomAccessFile randomAccessFile = new RandomAccessFile(path, "rw");
+ randomAccessFile.seek(randomAccessFile.length() - 100);
+ randomAccessFile.write(new byte[] {1, 2, 3, 4, 5, 6, 7, 8});
+ randomAccessFile.close();
+ }
+ TsFileResource mockTsFile = Mockito.mock(TsFileResource.class);
+ Mockito.when(mockTsFile.getTsFilePath()).thenReturn(path);
+ resources.add(mockTsFile);
+ }
+ Assert.assertFalse(CompactionUtils.validateTsFiles(resources));
+ }
+}