yyanyy commented on a change in pull request #1641:
URL: https://github.com/apache/iceberg/pull/1641#discussion_r516246224
##########
File path:
parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java
##########
@@ -137,11 +141,20 @@ public void write(int repetitionLevel, T value) {
public void setColumnStore(ColumnWriteStore columnStore) {
this.column.setColumnStore(columnStore);
}
+
+ @Override
+ public Stream<FieldMetrics> metrics() {
+ if (id != null) {
+ return Stream.of(new FieldMetrics(id.intValue(), 0L, 0L, 0L, null,
null));
Review comment:
I originally implemented it as `Stream.empty` and later added these
since I wanted to match the set of ids being reported by NaN counter with the
set from value/null counter from `writer.metrics`. But I guess since NaN will
never be non-0 for non-float/double fields, and ORC reports a different set of
ids than parquet anyway, there's no reason that the sets have to match. I'll
update `metrics()` to only create metric object for float and double writers.
##########
File path: parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
##########
@@ -121,7 +123,7 @@ public void add(T value) {
@Override
public Metrics metrics() {
- return ParquetUtil.footerMetrics(writer.getFooter(), metricsConfig);
+ return ParquetUtil.footerMetrics(writer.getFooter(), model.metrics(),
inputSchema, metricsConfig);
Review comment:
Thank you for the confirmation!
##########
File path: api/src/main/java/org/apache/iceberg/FieldMetrics.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+
+import java.nio.ByteBuffer;
+
+/**
+ * Iceberg internally tracked field level metrics.
+ */
+public class FieldMetrics {
+ private final int id;
+ private final long valueCount;
Review comment:
Since I'll implement the same for ORC and Avro, and Avro currently
doesn't have metrics support, in Avro we will populate the other fields. Ryan
brought up a good idea of having a `ParquetFieldMetrics` to limit the access to
non-nan fields, and I'll extend this class to implement `ParquetFieldMetrics`,
and only initialize `ParquetFieldMetrics` in parquet related writers.
##########
File path: parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
##########
@@ -149,9 +149,25 @@ public static Metrics footerMetrics(ParquetMetadata
metadata, MetricsConfig metr
}
return new Metrics(rowCount, columnSizes, valueCounts, nullValueCounts,
+ getNanValueCounts(fieldMetrics, metricsConfig, inputSchema),
toBufferMap(fileSchema, lowerBounds), toBufferMap(fileSchema,
upperBounds));
}
+ private static Map<Integer, Long> getNanValueCounts(
+ Stream<FieldMetrics> fieldMetrics, MetricsConfig metricsConfig, Schema
inputSchema) {
+ if (fieldMetrics == null || inputSchema == null) {
+ return Maps.newHashMap();
+ }
+
+ return fieldMetrics
+ .filter(metrics -> {
+ String alias = inputSchema.idToAlias(metrics.getId());
Review comment:
Thank you for the explanation!
##########
File path:
parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java
##########
@@ -165,9 +178,65 @@ public void writeDouble(int repetitionLevel, double value)
{
}
}
+ private static class FloatWriter extends UnboxedWriter<Float> {
+ private final ColumnDescriptor desc;
+ private long nanCount;
+
+ private FloatWriter(ColumnDescriptor desc, Type.ID id) {
+ super(desc, id);
+ this.desc = desc;
+ this.nanCount = 0;
+ }
+
+ @Override
+ public void write(int repetitionLevel, Float value) {
+ super.write(repetitionLevel, value);
+ if (Float.compare(Float.NaN, value) == 0) {
+ nanCount++;
+ }
+ }
+
+ @Override
+ public Stream<FieldMetrics> metrics() {
+ if (id != null) {
+ return Stream.of(new FieldMetrics(id.intValue(), 0L, 0L, nanCount,
null, null));
Review comment:
That's a good idea! Will update
##########
File path:
parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java
##########
@@ -165,9 +178,65 @@ public void writeDouble(int repetitionLevel, double value)
{
}
}
+ private static class FloatWriter extends UnboxedWriter<Float> {
+ private final ColumnDescriptor desc;
+ private long nanCount;
+
+ private FloatWriter(ColumnDescriptor desc, Type.ID id) {
+ super(desc, id);
+ this.desc = desc;
Review comment:
Not sure why I preserved it, thanks for pointing out!
##########
File path:
parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java
##########
@@ -165,9 +178,65 @@ public void writeDouble(int repetitionLevel, double value)
{
}
}
+ private static class FloatWriter extends UnboxedWriter<Float> {
+ private final ColumnDescriptor desc;
+ private long nanCount;
+
+ private FloatWriter(ColumnDescriptor desc, Type.ID id) {
+ super(desc, id);
+ this.desc = desc;
+ this.nanCount = 0;
+ }
+
+ @Override
+ public void write(int repetitionLevel, Float value) {
+ super.write(repetitionLevel, value);
+ if (Float.compare(Float.NaN, value) == 0) {
Review comment:
Didn't notice that this method exists, thank you!
##########
File path: parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
##########
@@ -65,28 +68,25 @@
private ParquetUtil() {
}
- // Access modifier is package-private, to only allow use from existing tests
- static Metrics fileMetrics(InputFile file) {
- return fileMetrics(file, MetricsConfig.getDefault());
- }
-
public static Metrics fileMetrics(InputFile file, MetricsConfig
metricsConfig) {
return fileMetrics(file, metricsConfig, null);
}
public static Metrics fileMetrics(InputFile file, MetricsConfig
metricsConfig, NameMapping nameMapping) {
try (ParquetFileReader reader =
ParquetFileReader.open(ParquetIO.file(file))) {
- return footerMetrics(reader.getFooter(), metricsConfig, nameMapping);
+ return footerMetrics(reader.getFooter(), Stream.empty(), null,
metricsConfig, nameMapping);
Review comment:
Thank you! I'll leave this as `Stream.empty` for now.
Regarding relying on recent Parquet versions to not produce min or max
values that are NaN, sounds like that also answers my question in the pr
description (i.e. we won't follow this approach to populate upper and lower
bounds). In my current code base it looks like parquet still gives us NaN as
max; do you happen to have a reference to the parquet version that supports NaN
properly? From a quick search I wasn't able to find it; I noticed
[this](https://github.com/apache/parquet-mr/pull/515) but I suspect it's for
something else. I'll look into it more deeply if you don't have it handy.
> so it should be safe to use these as long as we check that they are not
NaN.
Sorry to make sure I understand this correctly, sounds like only the three
following cases will be valid:
1) v1 table, no NaN counter, min/max could have NaN - use the existing
logic, we can't do much about min/max==NaN
2) v2 table, NaN counter exists, min/max will not be NaN - in this case
metrics are produced by iceberg writer
3) v2 table, no NaN counter, min/max will not be NaN - in this case the file
is imported or from this `fileMetrics`
Then to accommodate for (3) we will have to remember in evaluators that
absence of NaN counter doesn't necessarily mean there's no NaN value in the
column; but that might be fine since we will need this logic to accommodate (1)
as well (unless we implement evaluators in a way that we can differentiate v1/2
table; not sure if we want that).
##########
File path: data/src/test/java/org/apache/iceberg/TestMergingMetrics.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public abstract class TestMergingMetrics<T> {
+
+ // all supported fields, except for UUID which is on deprecation path: see
https://github.com/apache/iceberg/pull/1611
+ // as well as Types.TimeType and Types.TimestampType.withoutZone as both are
not supported by Spark
+ protected static final Types.NestedField ID_FIELD = required(1, "id",
Types.IntegerType.get());
+ protected static final Types.NestedField DATA_FIELD = optional(2, "data",
Types.StringType.get());
+ protected static final Types.NestedField FLOAT_FIELD = required(3, "float",
Types.FloatType.get());
+ protected static final Types.NestedField DOUBLE_FIELD = optional(4,
"double", Types.DoubleType.get());
+ protected static final Types.NestedField DECIMAL_FIELD = optional(5,
"decimal", Types.DecimalType.of(5, 3));
+ protected static final Types.NestedField FIXED_FIELD = optional(7, "fixed",
Types.FixedType.ofLength(4));
+ protected static final Types.NestedField BINARY_FIELD = optional(8,
"binary", Types.BinaryType.get());
+ protected static final Types.NestedField FLOAT_LIST = optional(9,
"floatlist",
+ Types.ListType.ofRequired(10, Types.FloatType.get()));
+ protected static final Types.NestedField LONG_FIELD = optional(11, "long",
Types.LongType.get());
+
+ protected static final Types.NestedField MAP_FIELD_1 = optional(17, "map1",
+ Types.MapType.ofOptional(18, 19, Types.FloatType.get(),
Types.StringType.get())
+ );
+ protected static final Types.NestedField MAP_FIELD_2 = optional(20, "map2",
+ Types.MapType.ofOptional(21, 22, Types.IntegerType.get(),
Types.DoubleType.get())
+ );
+ protected static final Types.NestedField STRUCT_FIELD = optional(23,
"structField", Types.StructType.of(
+ required(24, "booleanField", Types.BooleanType.get()),
+ optional(25, "date", Types.DateType.get()),
+ optional(27, "timestamp", Types.TimestampType.withZone())
+ ));
+
+ private static final Set<Integer> IDS_WITH_ZERO_NAN_COUNT =
ImmutableSet.of(1, 2, 5, 7, 8, 11, 24, 25, 27);
+
+ private static final Map<Types.NestedField, Integer>
FIELDS_WITH_NAN_COUNT_TO_ID = ImmutableMap.of(
+ FLOAT_FIELD, 3, DOUBLE_FIELD, 4, FLOAT_LIST, 10, MAP_FIELD_1, 18,
MAP_FIELD_2, 22
+ );
Review comment:
Thank you for the explanation!
##########
File path:
spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java
##########
@@ -146,8 +146,13 @@ public UTF8String getUTF8String(int ordinal) {
Review comment:
Thanks for the information!
For 1, thanks for the info! I wasn't aware of the contract.
I wasn't sure if we want to add `isNullAt` for this specific case though, as
I guess the problem comes from a difference in behavior between `InternalRow`
and `StructInternalRow`, and adding `isNullAt` might have a larger performance
penalty in production.
The behavior difference comes from the ability of calling
`internalRow.get()` that could return null. The NPE eventually comes from
[`struct.get(index,
types[index])`](https://github.com/apache/iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java#L420)
in `SparkParquetWriters`. While the actual `InternalRow` could return null for
null column, for `StructInternalRow` it assumes the underlying `get()` returns
non-null, and sometimes performs some actions to them that could lead to NPE.
e.g. [call `toString` for
converting](https://github.com/apache/iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java#L144),
or [return as primitive type
directly](https://github.com/apache/iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java#L132).
Thus `SparkParquetWriters` was able to call `get` fine under normal
circumstances, but it couldn't for the specific usage of `Struc
tInternalRow` in this test.
For 2, that's a better idea, will do this instead.
##########
File path:
parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriter.java
##########
@@ -28,4 +30,7 @@
List<TripleWriter<?>> columns();
void setColumnStore(ColumnWriteStore columnStore);
+
+ Stream<FieldMetrics> metrics();
Review comment:
I'll add documentations. For the name, I think `ParquetValueWriter`
already implies that it's field specific, so I guess `metrics` itself would
convey the idea that it's field specific metrics. Do you have better
suggestions?
##########
File path: core/src/test/java/org/apache/iceberg/TestMetrics.java
##########
@@ -257,31 +238,27 @@ public void testMetricsForDecimals() throws IOException {
record.setField("decimalAsInt64", new BigDecimal("4.75"));
record.setField("decimalAsFixed", new BigDecimal("5.80"));
- InputFile recordsFile = writeRecords(schema, record);
-
- Metrics metrics = getMetrics(recordsFile);
+ Metrics metrics = getMetrics(schema, record);
Assert.assertEquals(1L, (long) metrics.recordCount());
- assertCounts(1, 1L, 0L, metrics);
+ assertCounts(1, 1L, 0L, 0L, metrics);
Review comment:
I was thinking to make NaN count explicitly required so that people
adding new tests will remember to take it into consideration, but I guess it's
not needed at least before NaN is fully supported. Will update.
##########
File path: core/src/test/java/org/apache/iceberg/TestMetrics.java
##########
@@ -257,31 +238,27 @@ public void testMetricsForDecimals() throws IOException {
record.setField("decimalAsInt64", new BigDecimal("4.75"));
record.setField("decimalAsFixed", new BigDecimal("5.80"));
- InputFile recordsFile = writeRecords(schema, record);
-
- Metrics metrics = getMetrics(recordsFile);
+ Metrics metrics = getMetrics(schema, record);
Assert.assertEquals(1L, (long) metrics.recordCount());
- assertCounts(1, 1L, 0L, metrics);
+ assertCounts(1, 1L, 0L, 0L, metrics);
Review comment:
I was thinking to make NaN count explicitly required so that people
adding new tests will remember to take it into consideration, but I guess it's
not needed at least before NaN is fully supported. Will update.
##########
File path: core/src/test/java/org/apache/iceberg/TestMetrics.java
##########
@@ -257,31 +238,27 @@ public void testMetricsForDecimals() throws IOException {
record.setField("decimalAsInt64", new BigDecimal("4.75"));
record.setField("decimalAsFixed", new BigDecimal("5.80"));
- InputFile recordsFile = writeRecords(schema, record);
-
- Metrics metrics = getMetrics(recordsFile);
+ Metrics metrics = getMetrics(schema, record);
Assert.assertEquals(1L, (long) metrics.recordCount());
- assertCounts(1, 1L, 0L, metrics);
+ assertCounts(1, 1L, 0L, 0L, metrics);
Review comment:
I was thinking to make NaN count explicitly required so that people
adding new tests will remember to take it into consideration, but I guess it's
not needed at least before NaN is fully supported, especially now that we don't
want NaN count for non-double/float fields. Will update.
##########
File path: core/src/test/java/org/apache/iceberg/TestMetrics.java
##########
@@ -352,14 +327,34 @@ public void testMetricsForNullColumns() throws
IOException {
Record secondRecord = GenericRecord.create(schema);
secondRecord.setField("intCol", null);
- InputFile recordsFile = writeRecords(schema, firstRecord, secondRecord);
-
- Metrics metrics = getMetrics(recordsFile);
+ Metrics metrics = getMetrics(schema, firstRecord, secondRecord);
Assert.assertEquals(2L, (long) metrics.recordCount());
- assertCounts(1, 2L, 2L, metrics);
+ assertCounts(1, 2L, 2L, 0L, metrics);
assertBounds(1, IntegerType.get(), null, null, metrics);
}
+ @Test
+ public void testMetricsForNaNColumns() throws IOException {
Review comment:
Thanks for pointing this out! I guess this tests more about boundary
than of NaN count, but it would be very helpful when we start to exclude NaN
from upper/lower bounds. Will update.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]