yyanyy commented on a change in pull request #1641:
URL: https://github.com/apache/iceberg/pull/1641#discussion_r516246224



##########
File path: 
parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java
##########
@@ -137,11 +141,20 @@ public void write(int repetitionLevel, T value) {
     public void setColumnStore(ColumnWriteStore columnStore) {
       this.column.setColumnStore(columnStore);
     }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      if (id != null) {
+        return Stream.of(new FieldMetrics(id.intValue(), 0L, 0L, 0L, null, 
null));

Review comment:
       I originally implemented it as `Stream.empty` and later added these 
since I wanted to match the set of ids being reported by NaN counter with the 
set from value/null counter from `writer.metrics`. But I guess since NaN will 
never be non-0 for non-float/double fields, and ORC reports a different set of 
ids than parquet anyway, there's no reason that the sets have to match. I'll 
update `metrics()` to only create metric object for float and double writers. 

##########
File path: parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
##########
@@ -121,7 +123,7 @@ public void add(T value) {
 
   @Override
   public Metrics metrics() {
-    return ParquetUtil.footerMetrics(writer.getFooter(), metricsConfig);
+    return ParquetUtil.footerMetrics(writer.getFooter(), model.metrics(), 
inputSchema, metricsConfig);

Review comment:
       Thank you for the confirmation! 

##########
File path: api/src/main/java/org/apache/iceberg/FieldMetrics.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+
+import java.nio.ByteBuffer;
+
+/**
+ * Iceberg internally tracked field level metrics.
+ */
+public class FieldMetrics {
+  private final int id;
+  private final long valueCount;

Review comment:
       Since I'll implement the same for ORC and Avro, and Avro currently 
doesn't have metrics support, in Avro we will populate the other fields. Ryan 
brought up a good idea of having a `ParquetFieldMetrics` to limit the access to 
non-nan fields, and I'll extend this class to implement `ParquetFieldMetrics`, 
and only initialize `ParquetFieldMetrics` in parquet related writers. 

##########
File path: parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
##########
@@ -149,9 +149,25 @@ public static Metrics footerMetrics(ParquetMetadata 
metadata, MetricsConfig metr
     }
 
     return new Metrics(rowCount, columnSizes, valueCounts, nullValueCounts,
+        getNanValueCounts(fieldMetrics, metricsConfig, inputSchema),
         toBufferMap(fileSchema, lowerBounds), toBufferMap(fileSchema, 
upperBounds));
   }
 
+  private static Map<Integer, Long> getNanValueCounts(
+      Stream<FieldMetrics> fieldMetrics, MetricsConfig metricsConfig, Schema 
inputSchema) {
+    if (fieldMetrics == null || inputSchema == null) {
+      return Maps.newHashMap();
+    }
+
+    return fieldMetrics
+        .filter(metrics -> {
+          String alias = inputSchema.idToAlias(metrics.getId());

Review comment:
       Thank you for the explanation! 

##########
File path: 
parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java
##########
@@ -165,9 +178,65 @@ public void writeDouble(int repetitionLevel, double value) 
{
     }
   }
 
+  private static class FloatWriter extends UnboxedWriter<Float> {
+    private final ColumnDescriptor desc;
+    private long nanCount;
+
+    private FloatWriter(ColumnDescriptor desc, Type.ID id) {
+      super(desc, id);
+      this.desc = desc;
+      this.nanCount = 0;
+    }
+
+    @Override
+    public void write(int repetitionLevel, Float value) {
+      super.write(repetitionLevel, value);
+      if (Float.compare(Float.NaN, value) == 0) {
+        nanCount++;
+      }
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      if (id != null) {
+        return Stream.of(new FieldMetrics(id.intValue(), 0L, 0L, nanCount, 
null, null));

Review comment:
       That's a good idea! Will update

##########
File path: 
parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java
##########
@@ -165,9 +178,65 @@ public void writeDouble(int repetitionLevel, double value) 
{
     }
   }
 
+  private static class FloatWriter extends UnboxedWriter<Float> {
+    private final ColumnDescriptor desc;
+    private long nanCount;
+
+    private FloatWriter(ColumnDescriptor desc, Type.ID id) {
+      super(desc, id);
+      this.desc = desc;

Review comment:
       Not sure why I preserved it, thanks for pointing out! 

##########
File path: 
parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java
##########
@@ -165,9 +178,65 @@ public void writeDouble(int repetitionLevel, double value) 
{
     }
   }
 
+  private static class FloatWriter extends UnboxedWriter<Float> {
+    private final ColumnDescriptor desc;
+    private long nanCount;
+
+    private FloatWriter(ColumnDescriptor desc, Type.ID id) {
+      super(desc, id);
+      this.desc = desc;
+      this.nanCount = 0;
+    }
+
+    @Override
+    public void write(int repetitionLevel, Float value) {
+      super.write(repetitionLevel, value);
+      if (Float.compare(Float.NaN, value) == 0) {

Review comment:
       Didn't notice that this method exists, thank you! 

##########
File path: parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
##########
@@ -65,28 +68,25 @@
   private ParquetUtil() {
   }
 
-  // Access modifier is package-private, to only allow use from existing tests
-  static Metrics fileMetrics(InputFile file) {
-    return fileMetrics(file, MetricsConfig.getDefault());
-  }
-
   public static Metrics fileMetrics(InputFile file, MetricsConfig 
metricsConfig) {
     return fileMetrics(file, metricsConfig, null);
   }
 
   public static Metrics fileMetrics(InputFile file, MetricsConfig 
metricsConfig, NameMapping nameMapping) {
     try (ParquetFileReader reader = 
ParquetFileReader.open(ParquetIO.file(file))) {
-      return footerMetrics(reader.getFooter(), metricsConfig, nameMapping);
+      return footerMetrics(reader.getFooter(), Stream.empty(), null, 
metricsConfig, nameMapping);

Review comment:
       Thank you! I'll leave this as `Stream.empty` for now.
   
   Regarding relying on recent Parquet versions to not produce min or max 
values that are NaN, sounds like that also answers my question in the pr 
description (i.e. we won't follow this approach to populate upper and lower 
bounds). In my current code base it looks like parquet still gives us NaN as 
max; do you happen to have a reference to the parquet version that supports NaN 
properly? From a quick search I wasn't able to find it; I noticed 
[this](https://github.com/apache/parquet-mr/pull/515) but I suspect it's for 
something else. I'll look into it more deeply if you don't have it handy. 
   
   > so it should be safe to use these as long as we check that they are not 
NaN.
   
   Sorry to make sure I understand this correctly, sounds like only the three 
following cases will be valid:
   1) v1 table, no NaN counter, min/max could have NaN - use the existing 
logic, we can't do much about min/max==NaN
   2) v2 table, NaN counter exists, min/max will not be NaN - in this case 
metrics are produced by iceberg writer
   3) v2 table, no NaN counter, min/max will not be NaN - in this case the file 
is imported or from this `fileMetrics`
   
   Then to accommodate for (3) we will have to remember in evaluators that 
absence of NaN counter doesn't necessarily mean there's no NaN value in the 
column; but that might be fine since we will need this logic to accommodate (1) 
as well (unless we implement evaluators in a way that we can differentiate v1/2 
table; not sure if we want that). 

##########
File path: data/src/test/java/org/apache/iceberg/TestMergingMetrics.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public abstract class TestMergingMetrics<T> {
+
+  // all supported fields, except for UUID which is on deprecation path: see 
https://github.com/apache/iceberg/pull/1611
+  // as well as Types.TimeType and Types.TimestampType.withoutZone as both are 
not supported by Spark
+  protected static final Types.NestedField ID_FIELD = required(1, "id", 
Types.IntegerType.get());
+  protected static final Types.NestedField DATA_FIELD = optional(2, "data", 
Types.StringType.get());
+  protected static final Types.NestedField FLOAT_FIELD = required(3, "float", 
Types.FloatType.get());
+  protected static final Types.NestedField DOUBLE_FIELD = optional(4, 
"double", Types.DoubleType.get());
+  protected static final Types.NestedField DECIMAL_FIELD = optional(5, 
"decimal", Types.DecimalType.of(5, 3));
+  protected static final Types.NestedField FIXED_FIELD = optional(7, "fixed", 
Types.FixedType.ofLength(4));
+  protected static final Types.NestedField BINARY_FIELD = optional(8, 
"binary", Types.BinaryType.get());
+  protected static final Types.NestedField FLOAT_LIST = optional(9, 
"floatlist",
+      Types.ListType.ofRequired(10, Types.FloatType.get()));
+  protected static final Types.NestedField LONG_FIELD = optional(11, "long", 
Types.LongType.get());
+
+  protected static final Types.NestedField MAP_FIELD_1 = optional(17, "map1",
+      Types.MapType.ofOptional(18, 19, Types.FloatType.get(), 
Types.StringType.get())
+  );
+  protected static final Types.NestedField MAP_FIELD_2 = optional(20, "map2",
+      Types.MapType.ofOptional(21, 22, Types.IntegerType.get(), 
Types.DoubleType.get())
+  );
+  protected static final Types.NestedField STRUCT_FIELD = optional(23, 
"structField", Types.StructType.of(
+      required(24, "booleanField", Types.BooleanType.get()),
+      optional(25, "date", Types.DateType.get()),
+      optional(27, "timestamp", Types.TimestampType.withZone())
+  ));
+
+  private static final Set<Integer> IDS_WITH_ZERO_NAN_COUNT = 
ImmutableSet.of(1, 2, 5, 7, 8, 11, 24, 25, 27);
+
+  private static final Map<Types.NestedField, Integer> 
FIELDS_WITH_NAN_COUNT_TO_ID = ImmutableMap.of(
+      FLOAT_FIELD, 3, DOUBLE_FIELD, 4, FLOAT_LIST, 10, MAP_FIELD_1, 18, 
MAP_FIELD_2, 22
+  );

Review comment:
       Thank you for the explanation!

##########
File path: 
spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java
##########
@@ -146,8 +146,13 @@ public UTF8String getUTF8String(int ordinal) {
 

Review comment:
       Thanks for the information!
   
   For 1, thanks for the info! I wasn't aware of the contract. 
   
   I wasn't sure if we want to add `isNullAt` for this specific case though, as 
I guess the problem comes from a difference in behavior between `InternalRow` 
and `StructInternalRow`, and adding `isNullAt` might have a larger performance 
penalty in production.  
   
   The behavior difference comes from the ability of calling 
`internalRow.get()` that could return null. The NPE eventually comes from 
[`struct.get(index, 
types[index])`](https://github.com/apache/iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java#L420)
 in `SparkParquetWriters`. While the actual `InternalRow` could return null for 
null column, for `StructInternalRow` it assumes the underlying `get()` returns 
non-null, and sometimes performs some actions to them that could lead to NPE. 
e.g. [call `toString` for 
converting](https://github.com/apache/iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java#L144),
 or [return as primitive type 
directly](https://github.com/apache/iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java#L132).
 Thus `SparkParquetWriters` was able to call `get` fine under normal 
circumstances, but it couldn't for the specific usage of `Struc
 tInternalRow` in this test. 
   
   For 2, that's a better idea, will do this instead. 

##########
File path: 
parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriter.java
##########
@@ -28,4 +30,7 @@
   List<TripleWriter<?>> columns();
 
   void setColumnStore(ColumnWriteStore columnStore);
+
+  Stream<FieldMetrics> metrics();

Review comment:
       I'll add documentations. For the name, I think `ParquetValueWriter` 
already implies that it's field specific, so I guess `metrics` itself would 
convey the idea that it's field specific metrics. Do you have better 
suggestions? 

##########
File path: core/src/test/java/org/apache/iceberg/TestMetrics.java
##########
@@ -257,31 +238,27 @@ public void testMetricsForDecimals() throws IOException {
     record.setField("decimalAsInt64", new BigDecimal("4.75"));
     record.setField("decimalAsFixed", new BigDecimal("5.80"));
 
-    InputFile recordsFile = writeRecords(schema, record);
-
-    Metrics metrics = getMetrics(recordsFile);
+    Metrics metrics = getMetrics(schema, record);
     Assert.assertEquals(1L, (long) metrics.recordCount());
-    assertCounts(1, 1L, 0L, metrics);
+    assertCounts(1, 1L, 0L, 0L, metrics);

Review comment:
       I was thinking to make NaN count explicitly required so that people 
adding new tests will remember to take it into consideration, but I guess it's 
not needed at least before NaN is fully supported. Will update. 

##########
File path: core/src/test/java/org/apache/iceberg/TestMetrics.java
##########
@@ -257,31 +238,27 @@ public void testMetricsForDecimals() throws IOException {
     record.setField("decimalAsInt64", new BigDecimal("4.75"));
     record.setField("decimalAsFixed", new BigDecimal("5.80"));
 
-    InputFile recordsFile = writeRecords(schema, record);
-
-    Metrics metrics = getMetrics(recordsFile);
+    Metrics metrics = getMetrics(schema, record);
     Assert.assertEquals(1L, (long) metrics.recordCount());
-    assertCounts(1, 1L, 0L, metrics);
+    assertCounts(1, 1L, 0L, 0L, metrics);

Review comment:
       I was thinking to make NaN count explicitly required so that people 
adding new tests will remember to take it into consideration, but I guess it's 
not needed at least before NaN is fully supported. Will update.
   
   

##########
File path: core/src/test/java/org/apache/iceberg/TestMetrics.java
##########
@@ -257,31 +238,27 @@ public void testMetricsForDecimals() throws IOException {
     record.setField("decimalAsInt64", new BigDecimal("4.75"));
     record.setField("decimalAsFixed", new BigDecimal("5.80"));
 
-    InputFile recordsFile = writeRecords(schema, record);
-
-    Metrics metrics = getMetrics(recordsFile);
+    Metrics metrics = getMetrics(schema, record);
     Assert.assertEquals(1L, (long) metrics.recordCount());
-    assertCounts(1, 1L, 0L, metrics);
+    assertCounts(1, 1L, 0L, 0L, metrics);

Review comment:
       I was thinking to make NaN count explicitly required so that people 
adding new tests will remember to take it into consideration, but I guess it's 
not needed at least before NaN is fully supported, especially now that we don't 
want NaN count for non-double/float fields. Will update.
   
   

##########
File path: core/src/test/java/org/apache/iceberg/TestMetrics.java
##########
@@ -352,14 +327,34 @@ public void testMetricsForNullColumns() throws 
IOException {
     Record secondRecord = GenericRecord.create(schema);
     secondRecord.setField("intCol", null);
 
-    InputFile recordsFile = writeRecords(schema, firstRecord, secondRecord);
-
-    Metrics metrics = getMetrics(recordsFile);
+    Metrics metrics = getMetrics(schema, firstRecord, secondRecord);
     Assert.assertEquals(2L, (long) metrics.recordCount());
-    assertCounts(1, 2L, 2L, metrics);
+    assertCounts(1, 2L, 2L, 0L, metrics);
     assertBounds(1, IntegerType.get(), null, null, metrics);
   }
 
+  @Test
+  public void testMetricsForNaNColumns() throws IOException {

Review comment:
       Thanks for pointing this out! I guess this tests more about boundary 
than of NaN count, but it would be very helpful when we start to exclude NaN 
from upper/lower bounds. Will update. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to