This is an automated email from the ASF dual-hosted git repository. dzamo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new 03fd1f0b8c DRILL-8458: Use correct size of definition level bytes slice when reading Parquet v2 data page (#2838) 03fd1f0b8c is described below commit 03fd1f0b8c5d01fa4befc5df122714b51f8d9ce8 Author: Peter Franzen <pe...@myire.org> AuthorDate: Tue Oct 31 15:53:34 2023 +0100 DRILL-8458: Use correct size of definition level bytes slice when reading Parquet v2 data page (#2838) --- .../parquet/hadoop/ColumnChunkIncReadStore.java | 4 +-- .../parquet/ParquetSimpleTestFileGenerator.java | 39 ++++++++++++++++++++- .../exec/store/parquet/TestParquetComplex.java | 12 +++++++ .../parquet/parquet_v2_repeated_int.parquet | Bin 0 -> 602 bytes 4 files changed, 52 insertions(+), 3 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java index 773a861213..7834eaa816 100644 --- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java +++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java @@ -217,13 +217,13 @@ public class ColumnChunkIncReadStore implements PageReadStore { int pageBufOffset = 0; ByteBuffer bb = (ByteBuffer) pageBuf.position(pageBufOffset); BytesInput repLevelBytes = BytesInput.from( - (ByteBuffer) bb.slice().limit(pageBufOffset + repLevelSize) + (ByteBuffer) bb.slice().limit(repLevelSize) ); pageBufOffset += repLevelSize; bb = (ByteBuffer) pageBuf.position(pageBufOffset); final BytesInput defLevelBytes = BytesInput.from( - (ByteBuffer) bb.slice().limit(pageBufOffset + defLevelSize) + (ByteBuffer) bb.slice().limit(defLevelSize) ); pageBufOffset += defLevelSize; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java index 232aec9a6a..efd1b4fd17 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java @@ -206,11 +206,17 @@ public class ParquetSimpleTestFileGenerator { " } \n" + " } \n" + "} \n"; + public static String repeatedIntSchemaMsg = + "message ParquetRepeated { \n" + + " required int32 rowKey; \n" + + " repeated int32 repeatedInt ( INTEGER(32,true) ) ; \n" + + "} \n"; public static MessageType simpleSchema = MessageTypeParser.parseMessageType(simpleSchemaMsg); public static MessageType complexSchema = MessageTypeParser.parseMessageType(complexSchemaMsg); public static MessageType simpleNullableSchema = MessageTypeParser.parseMessageType(simpleNullableSchemaMsg); public static MessageType complexNullableSchema = MessageTypeParser.parseMessageType(complexNullableSchemaMsg); + public static MessageType repeatedIntSchema = MessageTypeParser.parseMessageType(repeatedIntSchemaMsg); public static Path initFile(String fileName) { @@ -218,6 +224,14 @@ public class ParquetSimpleTestFileGenerator { } public static ParquetWriter<Group> initWriter(MessageType schema, String fileName, boolean dictEncoding) throws IOException { + return initWriter(schema, fileName, ParquetProperties.WriterVersion.PARQUET_1_0, dictEncoding); + } + + public static ParquetWriter<Group> initWriter( + MessageType schema, + String fileName, + ParquetProperties.WriterVersion version, + boolean dictEncoding) throws IOException { GroupWriteSupport.setSchema(schema, conf); @@ -228,7 +242,7 @@ public class ParquetSimpleTestFileGenerator { .withPageSize(1024) .withDictionaryPageSize(512) .withValidation(false) - .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0) + .withWriterVersion(version) .withConf(conf) .build(); } @@ -455,12 +469,32 @@ public class ParquetSimpleTestFileGenerator { } } + public static void writeRepeatedIntValues( + SimpleGroupFactory groupFactory, + ParquetWriter<Group> writer, + int numRows) throws IOException { + + int[] repeatedValues = {666, 1492, 4711}; + + for (int i = 0; i< numRows; i++) { + + Group g = groupFactory.newGroup(); + g.append("rowKey", i+1); + for (int r :repeatedValues) { + g.append("repeatedInt", r); + } + + writer.write(g); + } + } + public static void main(String[] args) throws IOException { SimpleGroupFactory sgf = new SimpleGroupFactory(simpleSchema); GroupFactory gf = new SimpleGroupFactory(complexSchema); SimpleGroupFactory sngf = new SimpleGroupFactory(simpleNullableSchema); GroupFactory ngf = new SimpleGroupFactory(complexNullableSchema); + SimpleGroupFactory repeatedIntGroupFactory = new SimpleGroupFactory(repeatedIntSchema); // Generate files with dictionary encoding enabled and disabled ParquetWriter<Group> simpleWriter = initWriter(simpleSchema, "drill/parquet_test_file_simple", true); @@ -471,6 +505,7 @@ public class ParquetSimpleTestFileGenerator { ParquetWriter<Group> complexNoDictWriter = initWriter(complexSchema, "drill/parquet_test_file_complex_nodict", false); ParquetWriter<Group> simpleNullableNoDictWriter = initWriter(simpleNullableSchema, "drill/parquet_test_file_simple_nullable_nodict", false); ParquetWriter<Group> complexNullableNoDictWriter = initWriter(complexNullableSchema, "drill/parquet_test_file_complex_nullable_nodict", false); + ParquetWriter<Group> repeatedIntV2Writer = initWriter(repeatedIntSchema, "drill/parquet_v2_repeated_int.parquet", ParquetProperties.WriterVersion.PARQUET_2_0, true); ParquetSimpleTestFileGenerator.writeSimpleValues(sgf, simpleWriter, false); ParquetSimpleTestFileGenerator.writeSimpleValues(sngf, simpleNullableWriter, true); @@ -480,6 +515,7 @@ public class ParquetSimpleTestFileGenerator { ParquetSimpleTestFileGenerator.writeSimpleValues(sngf, simpleNullableNoDictWriter, true); ParquetSimpleTestFileGenerator.writeComplexValues(gf, complexNoDictWriter, false); ParquetSimpleTestFileGenerator.writeComplexValues(ngf, complexNullableNoDictWriter, true); + ParquetSimpleTestFileGenerator.writeRepeatedIntValues(repeatedIntGroupFactory, repeatedIntV2Writer, 100); simpleWriter.close(); complexWriter.close(); @@ -489,6 +525,7 @@ public class ParquetSimpleTestFileGenerator { complexNoDictWriter.close(); simpleNullableNoDictWriter.close(); complexNullableNoDictWriter.close(); + repeatedIntV2Writer.close(); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java index e03af04e2f..579f3ff2ad 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java @@ -901,4 +901,16 @@ public class TestParquetComplex extends BaseTestQuery { .baselineValues(firstValue, null, secondValue) .go(); } + + + @Test + public void testSelectRepeatedInt() throws Exception { + // DRILL-8458 + String query = "select repeatedInt as r from %s"; + testBuilder() + .sqlQuery(query, "cp.`parquet/parquet_v2_repeated_int.parquet`") + .unOrdered() + .expectsNumRecords(100) + .go(); + } } diff --git a/exec/java-exec/src/test/resources/parquet/parquet_v2_repeated_int.parquet b/exec/java-exec/src/test/resources/parquet/parquet_v2_repeated_int.parquet new file mode 100644 index 0000000000..91fed9b844 Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/parquet_v2_repeated_int.parquet differ