This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch rc/1.1
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/rc/1.1 by this push:
new 9a0a8bc6 Fix the problem that ChunkMeta of empty Chunk is not
processed (#284)
9a0a8bc6 is described below
commit 9a0a8bc63d9e9b6e57bc36e459d6ca7a95f26910
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Nov 6 15:02:09 2024 +0800
Fix the problem that ChunkMeta of empty Chunk is not processed (#284)
* Fix the problem that ChunkMeta of empty Chunk is not processed
* add TsFileReaderEmptyChunkTest
* fix
* add licensed
* modify code format
(cherry picked from commit 8310640337ce2e3fe73a246f07ce7dbda068256e)
---
cpp/pom.xml | 4 +-
.../read/reader/series/FileSeriesReader.java | 10 +-
.../read/reader/TsFileReaderEmptyChunkTest.java | 183 +++++++++++++++++++++
pom.xml | 6 +-
4 files changed, 197 insertions(+), 6 deletions(-)
diff --git a/cpp/pom.xml b/cpp/pom.xml
index bc8a355a..fce86db8 100644
--- a/cpp/pom.xml
+++ b/cpp/pom.xml
@@ -72,8 +72,8 @@
<option>-DBUILD_PHASE=test-compile</option>
<option>${cmake.addition.option}</option>
</options>
- <sourcePath />
- <targetPath />
+ <sourcePath/>
+ <targetPath/>
</configuration>
</execution>
<!-- Compile the test code -->
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/series/FileSeriesReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/series/FileSeriesReader.java
index 1704d553..c131c975 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/series/FileSeriesReader.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/series/FileSeriesReader.java
@@ -45,16 +45,24 @@ public class FileSeriesReader extends
AbstractFileSeriesReader {
@Override
protected void initChunkReader(IChunkMetadata chunkMetaData) throws
IOException {
+ currentChunkMeasurementNames.clear();
if (chunkMetaData instanceof ChunkMetadata) {
Chunk chunk = chunkLoader.loadChunk((ChunkMetadata) chunkMetaData);
this.chunkReader = new ChunkReader(chunk, filter);
+ currentChunkMeasurementNames.add(chunkMetaData.getMeasurementUid());
} else {
AlignedChunkMetadata alignedChunkMetadata = (AlignedChunkMetadata)
chunkMetaData;
Chunk timeChunk =
chunkLoader.loadChunk((ChunkMetadata)
(alignedChunkMetadata.getTimeChunkMetadata()));
List<Chunk> valueChunkList = new ArrayList<>();
for (IChunkMetadata metadata :
alignedChunkMetadata.getValueChunkMetadataList()) {
- valueChunkList.add(chunkLoader.loadChunk((ChunkMetadata) metadata));
+ if (metadata != null) {
+ valueChunkList.add(chunkLoader.loadChunk((ChunkMetadata) metadata));
+ currentChunkMeasurementNames.add(metadata.getMeasurementUid());
+ continue;
+ }
+ valueChunkList.add(null);
+ currentChunkMeasurementNames.add(null);
}
this.chunkReader = new AlignedChunkReader(timeChunk, valueChunkList,
filter);
}
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileReaderEmptyChunkTest.java
b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileReaderEmptyChunkTest.java
new file mode 100644
index 00000000..6c77a7fe
--- /dev/null
+++
b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileReaderEmptyChunkTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tsfile.read.reader;
+
+import org.apache.tsfile.constant.TestConstant;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.file.metadata.statistics.LongStatistics;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.controller.CachedChunkLoaderImpl;
+import org.apache.tsfile.read.controller.MetadataQuerierByFileImpl;
+import org.apache.tsfile.read.query.executor.TableQueryExecutor;
+import org.apache.tsfile.read.reader.block.TsBlockReader;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+public class TsFileReaderEmptyChunkTest {
+
+ private static final String FILE_PATH =
+
TestConstant.BASE_OUTPUT_PATH.concat("TsFileReaderEmptyChunkTest.tsfile");
+
+ @After
+ public void teardown() {
+ new File(FILE_PATH).delete();
+ }
+
+ @Test
+ public void testReadEmptyChunk() throws IOException {
+ TsFileSequenceReader tsFileSequenceReader = null;
+ TableQueryExecutor tableQueryExecutor = null;
+ final List<String> measurementNames = Arrays.asList("s1", "s2", "s3",
"s4");
+ try (TsFileIOWriter writer = new TsFileIOWriter(new File(FILE_PATH))) {
+ final String tableName = "table";
+ registerTableSchema(writer, tableName);
+ generateDevice(writer, tableName, 1, 1, 10);
+ writer.endFile();
+
+ tsFileSequenceReader = new TsFileSequenceReader(FILE_PATH, true, true);
+ tableQueryExecutor =
+ new TableQueryExecutor(
+ new MetadataQuerierByFileImpl(tsFileSequenceReader),
+ new CachedChunkLoaderImpl(tsFileSequenceReader),
+ TableQueryExecutor.TableQueryOrdering.DEVICE);
+ final TsBlockReader tsBlockReader =
+ tableQueryExecutor.query(tableName, measurementNames, null, null,
null);
+
+ int nullValueCount = 0;
+
+ while (tsBlockReader.hasNext()) {
+ final TsBlock tsBlock = tsBlockReader.next();
+ final TsBlock.TsBlockRowIterator iterator =
tsBlock.getTsBlockRowIterator();
+ while (iterator.hasNext()) {
+ final Object[] row = iterator.next();
+ for (Object o : row) {
+ if (o == null) {
+ nullValueCount++;
+ }
+ }
+ }
+ }
+
+ Assert.assertEquals(10, nullValueCount);
+ } catch (final Exception e) {
+ Assert.fail(e.getMessage());
+ } finally {
+ if (tsFileSequenceReader != null) {
+ tsFileSequenceReader.close();
+ }
+ }
+ }
+
+ private void registerTableSchema(final TsFileIOWriter writer, final String
tableName) {
+ final List<IMeasurementSchema> schemas =
+ Arrays.asList(
+ new MeasurementSchema(
+ "id", TSDataType.TEXT, TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED),
+ new MeasurementSchema("s1", TSDataType.INT64),
+ new MeasurementSchema("s2", TSDataType.INT64),
+ new MeasurementSchema("s3", TSDataType.INT64),
+ new MeasurementSchema("s4", TSDataType.INT64));
+ final List<Tablet.ColumnType> columnTypes =
+ Arrays.asList(
+ Tablet.ColumnType.ID,
+ Tablet.ColumnType.MEASUREMENT,
+ Tablet.ColumnType.MEASUREMENT,
+ Tablet.ColumnType.MEASUREMENT,
+ Tablet.ColumnType.MEASUREMENT);
+ final TableSchema tableSchema = new TableSchema(tableName, schemas,
columnTypes);
+ writer.getSchema().registerTableSchema(tableSchema);
+ }
+
+ private void generateDevice(
+ final TsFileIOWriter writer,
+ final String tableName,
+ final int deviceNum,
+ final int minTime,
+ final int maxTime)
+ throws IOException {
+ for (int i = 0; i < deviceNum; i++) {
+ final IDeviceID deviceID =
+ IDeviceID.Factory.DEFAULT_FACTORY.create(new String[] {tableName,
"d" + i});
+ writer.startChunkGroup(deviceID);
+ final List<String> measurementNames = Arrays.asList("s1", "s2", "s3",
"s4");
+ generateSimpleAlignedSeriesToCurrentDevice(
+ writer,
+ Arrays.asList("s1", "s2", "s3", "s4"),
+ new TimeRange[] {new TimeRange(minTime, maxTime)},
+ new Random().nextInt(measurementNames.size()));
+ writer.endChunkGroup();
+ }
+ }
+
+ public void generateSimpleAlignedSeriesToCurrentDevice(
+ final TsFileIOWriter writer,
+ final List<String> measurementNames,
+ final TimeRange[] toGenerateChunkTimeRanges,
+ final int emptyChunkIndex)
+ throws IOException {
+ List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
+ for (String measurementName : measurementNames) {
+ measurementSchemas.add(
+ new MeasurementSchema(
+ measurementName, TSDataType.INT64, TSEncoding.RLE,
CompressionType.LZ4));
+ }
+
+ for (TimeRange toGenerateChunk : toGenerateChunkTimeRanges) {
+ final AlignedChunkWriterImpl alignedChunkWriter =
+ new AlignedChunkWriterImpl(measurementSchemas);
+ for (long time = toGenerateChunk.getMin(); time <=
toGenerateChunk.getMax(); time++) {
+ alignedChunkWriter.getTimeChunkWriter().write(time);
+ for (int i = 0; i < measurementNames.size(); i++) {
+ if (i == emptyChunkIndex) {
+ continue;
+ }
+
+ alignedChunkWriter.getValueChunkWriterByIndex(i).write(time, time,
false);
+ }
+ }
+ alignedChunkWriter.writeToFileWriter(writer);
+ writer.writeEmptyValueChunk(
+ measurementNames.get(emptyChunkIndex),
+ CompressionType.LZ4,
+ TSDataType.INT64,
+ TSEncoding.PLAIN,
+ new LongStatistics());
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 4be4ea61..2aa2580b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -34,7 +34,7 @@
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
- <argLine />
+ <argLine/>
<spotless.skip>false</spotless.skip>
<cmake.version>3.29.3-b2</cmake.version>
<spotless.version>2.43.0</spotless.version>
@@ -144,7 +144,7 @@
<importOrder>
<order>org.apache.tsfile,,javax,java,\#</order>
</importOrder>
- <removeUnusedImports />
+ <removeUnusedImports/>
</java>
<lineEndings>UNIX</lineEndings>
</configuration>
@@ -259,7 +259,7 @@
<phase>validate</phase>
<configuration>
<rules>
- <dependencyConvergence />
+ <dependencyConvergence/>
</rules>
</configuration>
</execution>