This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.10.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 09d5a4a4ccb640992b5d0c6a79e54f780948923b
Author: Ryan Blue <[email protected]>
AuthorDate: Thu Nov 5 17:48:18 2020 -0800

    Core: Fix NullPointerException in ManifestReader (#1730)
    
    This fixes a NullPointerException that is thrown by a ManifestReader for 
delete files when there is a query filter. The DeleteFileIndex projects all 
fields of a delete manifest, so it doesn't call select to select specific 
columns, unlike ManifestGroup, which selects * by default. When select is not 
called, methods that check whether to add stats columns fail, but only if there 
is a row filter because stats columns are not needed if there is no row filter.
    
    Existing tests either called select to configure the reader, or didn't pass 
a row filter and projected all rows. This adds a test that uses DeleteFileIndex 
and a test for ManifestReader. This also fixes dropStats in addition to 
requireStatsProjection.
    
    Co-authored-by: 钟保罗 <[email protected]>
---
 .../java/org/apache/iceberg/ManifestReader.java    |  2 +
 .../org/apache/iceberg/TestManifestReader.java     | 20 ++++++++++
 .../spark/source/TestSparkReaderDeletes.java       | 46 ++++++++++++++++++++++
 3 files changed, 68 insertions(+)

diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java 
b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index 8311c19..939de8d 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -271,6 +271,7 @@ public class ManifestReader<F extends ContentFile<F>>
   private static boolean requireStatsProjection(Expression rowFilter, 
Collection<String> columns) {
     // Make sure we have all stats columns for metrics evaluator
     return rowFilter != Expressions.alwaysTrue() &&
+        columns != null &&
         !columns.containsAll(ManifestReader.ALL_COLUMNS) &&
         !columns.containsAll(STATS_COLUMNS);
   }
@@ -279,6 +280,7 @@ public class ManifestReader<F extends ContentFile<F>>
     // Make sure we only drop all stats if we had projected all stats
     // We do not drop stats even if we had partially added some stats columns
     return rowFilter != Expressions.alwaysTrue() &&
+        columns != null &&
         !columns.containsAll(ManifestReader.ALL_COLUMNS) &&
         Sets.intersection(Sets.newHashSet(columns), STATS_COLUMNS).isEmpty();
   }
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java 
b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
index 1706404..c1c2423 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
@@ -21,8 +21,12 @@ package org.apache.iceberg;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.iceberg.ManifestEntry.Status;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.types.Types;
 import org.junit.Assert;
 import org.junit.Test;
@@ -52,6 +56,22 @@ public class TestManifestReader extends TableTestBase {
   }
 
   @Test
+  public void testReaderWithFilterWithoutSelect() throws IOException {
+    ManifestFile manifest = writeManifest(1000L, FILE_A, FILE_B, FILE_C);
+    try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, 
FILE_IO)
+        .filterRows(Expressions.equal("id", 0))) {
+      List<String> files = Streams.stream(reader)
+          .map(file -> file.path().toString())
+          .collect(Collectors.toList());
+
+      // note that all files are returned because the reader returns data 
files that may match, and the partition is
+      // bucketing by data, which doesn't help filter files
+      Assert.assertEquals("Should read the expected files",
+          Lists.newArrayList(FILE_A.path(), FILE_B.path(), FILE_C.path()), 
files);
+    }
+  }
+
+  @Test
   public void testInvalidUsage() throws IOException {
     ManifestFile manifest = writeManifest(FILE_A, FILE_B);
     AssertHelpers.assertThrows(
diff --git 
a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
 
b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
index 229a361..68787a2 100644
--- 
a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+++ 
b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -19,19 +19,28 @@
 
 package org.apache.iceberg.spark.source;
 
+import java.io.IOException;
+import java.util.List;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.DeleteReadTests;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkStructLike;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.StructLikeSet;
@@ -40,7 +49,9 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.internal.SQLConf;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Test;
 
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
 
@@ -113,4 +124,39 @@ public abstract class TestSparkReaderDeletes extends 
DeleteReadTests {
 
     return set;
   }
+
+  @Test
+  public void testEqualityDeleteWithFilter() throws IOException {
+    String tableName = "test_with_filter";
+    Table table = createTable(tableName, SCHEMA, SPEC);
+    Schema deleteRowSchema = table.schema().select("data");
+    Record dataDelete = GenericRecord.create(deleteRowSchema);
+    List<Record> dataDeletes = Lists.newArrayList(
+        dataDelete.copy("data", "a"), // id = 29
+        dataDelete.copy("data", "d"), // id = 89
+        dataDelete.copy("data", "g") // id = 122
+    );
+
+    DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), 
dataDeletes, deleteRowSchema);
+
+    table.newRowDelta()
+        .addDeletes(eqDeletes)
+        .commit();
+
+    Types.StructType projection = table.schema().select("*").asStruct();
+    Dataset<Row> df = spark.read()
+        .format("iceberg")
+        .load(TableIdentifier.of("default", tableName).toString())
+        .filter("data = 'a'") // select a deleted row
+        .selectExpr("*");
+
+    StructLikeSet actual = StructLikeSet.create(projection);
+    df.collectAsList().forEach(row -> {
+      SparkStructLike rowWrapper = new SparkStructLike(projection);
+      actual.add(rowWrapper.wrap(row));
+    });
+
+    Assert.assertEquals("Table should contain no rows", 0, actual.size());
+  }
 }

Reply via email to