This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new ca468d5  Fix NPE when counting entries (#1077)
ca468d5 is described below

commit ca468d569231e2cd5d502003586d5a49f53de470
Author: Xiang Li <[email protected]>
AuthorDate: Fri May 29 22:36:46 2020 +0800

    Fix NPE when counting entries (#1077)
    
    Closes #1077
---
 api/src/main/java/org/apache/iceberg/Schema.java   | 14 ++++++++++++-
 .../java/org/apache/iceberg/AllEntriesTable.java   |  4 +++-
 .../org/apache/iceberg/ManifestEntriesTable.java   |  4 +++-
 .../spark/source/TestIcebergSourceTablesBase.java  | 24 ++++++++++++++++++++++
 4 files changed, 43 insertions(+), 3 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/Schema.java 
b/api/src/main/java/org/apache/iceberg/Schema.java
index 86a89c0..0ecbfd4 100644
--- a/api/src/main/java/org/apache/iceberg/Schema.java
+++ b/api/src/main/java/org/apache/iceberg/Schema.java
@@ -124,9 +124,21 @@ public class Schema implements Serializable {
     return struct.fields();
   }
 
+  /**
+   * Returns the {@link Type} of a sub-field identified by the field name.
+   *
+   * @param name a field name
+   * @return a Type for the sub-field or null if it is not found
+   */
   public Type findType(String name) {
     Preconditions.checkArgument(!name.isEmpty(), "Invalid column name: 
(empty)");
-    return findType(lazyNameToId().get(name));
+    Integer id = lazyNameToId().get(name);
+    if (id != null) {  // name is found
+      return findType(id);
+    }
+
+    // name could not be found
+    return null;
   }
 
   /**
diff --git a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java 
b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
index bb19c41..1ab918d 100644
--- a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
@@ -28,6 +28,7 @@ import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.ParallelIterable;
 import org.apache.iceberg.util.ThreadPools;
@@ -100,7 +101,8 @@ public class AllEntriesTable extends BaseMetadataTable {
         TableOperations ops, Snapshot snapshot, Expression rowFilter,
         boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
       CloseableIterable<ManifestFile> manifests = 
allManifestFiles(ops.current().snapshots());
-      Schema fileSchema = new 
Schema(schema().findType("data_file").asStructType().fields());
+      Type fileProjection = schema().findType("data_file");
+      Schema fileSchema = fileProjection != null ? new 
Schema(fileProjection.asStructType().fields()) : new Schema();
       String schemaString = SchemaParser.toJson(schema());
       String specString = 
PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
       Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : 
rowFilter;
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java 
b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
index e4288c0..3c43fc4 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
@@ -26,6 +26,7 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.TypeUtil;
 
 /**
@@ -97,7 +98,8 @@ public class ManifestEntriesTable extends BaseMetadataTable {
         boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
       // return entries from both data and delete manifests
       CloseableIterable<ManifestFile> manifests = 
CloseableIterable.withNoopClose(snapshot.allManifests());
-      Schema fileSchema = new 
Schema(schema().findType("data_file").asStructType().fields());
+      Type fileProjection = schema().findType("data_file");
+      Schema fileSchema = fileProjection != null ? new 
Schema(fileProjection.asStructType().fields()) : new Schema();
       String schemaString = SchemaParser.toJson(schema());
       String specString = 
PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
       Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : 
rowFilter;
diff --git 
a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
 
b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index c8a72b5..def72d0 100644
--- 
a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ 
b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -198,6 +198,30 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
   }
 
   @Test
+  public void testCountEntriesTable() {
+    TableIdentifier tableIdentifier = TableIdentifier.of("db", 
"count_entries_test");
+    createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
+
+    // init load
+    List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
+    Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
+    inputDf.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .save(loadLocation(tableIdentifier));
+
+    final int expectedEntryCount = 1;
+
+    // count entries
+    Assert.assertEquals("Count should return " + expectedEntryCount,
+        expectedEntryCount, 
spark.read().format("iceberg").load(loadLocation(tableIdentifier, 
"entries")).count());
+
+    // count all_entries
+    Assert.assertEquals("Count should return " + expectedEntryCount,
+        expectedEntryCount, 
spark.read().format("iceberg").load(loadLocation(tableIdentifier, 
"all_entries")).count());
+  }
+
+  @Test
   public void testFilesTable() throws Exception {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
     Table table = createTable(tableIdentifier, SCHEMA, 
PartitionSpec.builderFor(SCHEMA).identity("id").build());

Reply via email to