This is an automated email from the ASF dual-hosted git repository. ibzib pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 136eadc [BEAM-13104] ParquetIO: SplitReadFn must read the whole block new 46c649a Merge pull request #15789 from aromanenko-dev/BEAM-13104-ParquetIO-filter 136eadc is described below commit 136eadc121e136e25aafc2b65f130526e7f20142 Author: Alexey Romanenko <aromanenko....@gmail.com> AuthorDate: Mon Oct 25 17:21:47 2021 +0200 [BEAM-13104] ParquetIO: SplitReadFn must read the whole block --- .../src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java | 6 +++--- .../src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java index 81f5978..c733f67 100644 --- a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java +++ b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java @@ -888,12 +888,12 @@ public class ParquetIO { continue; } if (record == null) { - // only happens with FilteredRecordReader at end of block + // it happens when a record is filtered out in this block LOG.debug( - "filtered record reader reached end of block in block {} in file {}", + "record is filtered out by reader in block {} in file {}", currentBlock, file.toString()); - break; + continue; } if (recordReader.shouldSkipCurrentRecord()) { // this record is being filtered via the filter2 package diff --git a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java index d2609b4..261abd9 100644 --- a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java +++ b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java @@ -574,7 +574,8 @@ public class ParquetIOTest implements Serializable { readPipeline.apply( ParquetIO.read(SCHEMA) .from(temporaryFolder.getRoot().getAbsolutePath() + "/*") - .withConfiguration(configuration)); + .withConfiguration(configuration) + .withSplit()); PAssert.that(readBack).containsInAnyOrder(expectedRecords); readPipeline.run().waitUntilFinish(); }