This is an automated email from the ASF dual-hosted git repository. aokolnychyi pushed a commit to branch 0.10.x in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit 09d5a4a4ccb640992b5d0c6a79e54f780948923b Author: Ryan Blue <[email protected]> AuthorDate: Thu Nov 5 17:48:18 2020 -0800 Core: Fix NullPointerException in ManifestReader (#1730) This fixes a NullPointerException that is thrown by a ManifestReader for delete files when there is a query filter. The DeleteFileIndex projects all fields of a delete manifest, so it doesn't call select to select specific columns, unlike ManifestGroup, which selects * by default. When select is not called, methods that check whether to add stats columns fail, but only if there is a row filter because stats columns are not needed if there is no row filter. Existing tests either called select to configure the reader, or didn't pass a row filter and projected all rows. This adds a test that uses DeleteFileIndex and a test for ManifestReader. This also fixes dropStats in addition to requireStatsProjection. Co-authored-by: 钟保罗 <[email protected]> --- .../java/org/apache/iceberg/ManifestReader.java | 2 + .../org/apache/iceberg/TestManifestReader.java | 20 ++++++++++ .../spark/source/TestSparkReaderDeletes.java | 46 ++++++++++++++++++++++ 3 files changed, 68 insertions(+) diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java index 8311c19..939de8d 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestReader.java +++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java @@ -271,6 +271,7 @@ public class ManifestReader<F extends ContentFile<F>> private static boolean requireStatsProjection(Expression rowFilter, Collection<String> columns) { // Make sure we have all stats columns for metrics evaluator return rowFilter != Expressions.alwaysTrue() && + columns != null && !columns.containsAll(ManifestReader.ALL_COLUMNS) && !columns.containsAll(STATS_COLUMNS); } @@ -279,6 +280,7 @@ public class ManifestReader<F extends ContentFile<F>> // Make sure we only drop all stats if we had projected all stats // We do not drop stats even if we had partially added some stats columns return rowFilter != Expressions.alwaysTrue() && + columns != null && !columns.containsAll(ManifestReader.ALL_COLUMNS) && Sets.intersection(Sets.newHashSet(columns), STATS_COLUMNS).isEmpty(); } diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java b/core/src/test/java/org/apache/iceberg/TestManifestReader.java index 1706404..c1c2423 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java @@ -21,8 +21,12 @@ package org.apache.iceberg; import java.io.IOException; import java.util.List; +import java.util.stream.Collectors; import org.apache.iceberg.ManifestEntry.Status; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.types.Types; import org.junit.Assert; import org.junit.Test; @@ -52,6 +56,22 @@ public class TestManifestReader extends TableTestBase { } @Test + public void testReaderWithFilterWithoutSelect() throws IOException { + ManifestFile manifest = writeManifest(1000L, FILE_A, FILE_B, FILE_C); + try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO) + .filterRows(Expressions.equal("id", 0))) { + List<String> files = Streams.stream(reader) + .map(file -> file.path().toString()) + .collect(Collectors.toList()); + + // note that all files are returned because the reader returns data files that may match, and the partition is + // bucketing by data, which doesn't help filter files + Assert.assertEquals("Should read the expected files", + Lists.newArrayList(FILE_A.path(), FILE_B.path(), FILE_C.path()), files); + } + } + + @Test public void testInvalidUsage() throws IOException { ManifestFile manifest = writeManifest(FILE_A, FILE_B); AssertHelpers.assertThrows( diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 229a361..68787a2 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -19,19 +19,28 @@ package org.apache.iceberg.spark.source; +import java.io.IOException; +import java.util.List; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.BaseTable; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.Files; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TestHelpers; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.DeleteReadTests; +import org.apache.iceberg.data.FileHelpers; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.hive.HiveCatalog; import org.apache.iceberg.hive.TestHiveMetastore; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkStructLike; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.StructLikeSet; @@ -40,7 +49,9 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.internal.SQLConf; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Test; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; @@ -113,4 +124,39 @@ public abstract class TestSparkReaderDeletes extends DeleteReadTests { return set; } + + @Test + public void testEqualityDeleteWithFilter() throws IOException { + String tableName = "test_with_filter"; + Table table = createTable(tableName, SCHEMA, SPEC); + Schema deleteRowSchema = table.schema().select("data"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + List<Record> dataDeletes = Lists.newArrayList( + dataDelete.copy("data", "a"), // id = 29 + dataDelete.copy("data", "d"), // id = 89 + dataDelete.copy("data", "g") // id = 122 + ); + + DeleteFile eqDeletes = FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), dataDeletes, deleteRowSchema); + + table.newRowDelta() + .addDeletes(eqDeletes) + .commit(); + + Types.StructType projection = table.schema().select("*").asStruct(); + Dataset<Row> df = spark.read() + .format("iceberg") + .load(TableIdentifier.of("default", tableName).toString()) + .filter("data = 'a'") // select a deleted row + .selectExpr("*"); + + StructLikeSet actual = StructLikeSet.create(projection); + df.collectAsList().forEach(row -> { + SparkStructLike rowWrapper = new SparkStructLike(projection); + actual.add(rowWrapper.wrap(row)); + }); + + Assert.assertEquals("Table should contain no rows", 0, actual.size()); + } }
