yyanyy commented on a change in pull request #2464:
URL: https://github.com/apache/iceberg/pull/2464#discussion_r611953265
##########
File path: parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
##########
@@ -85,6 +89,7 @@ public static Metrics footerMetrics(ParquetMetadata metadata,
Stream<FieldMetric
return footerMetrics(metadata, fieldMetrics, metricsConfig, null);
}
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
Review comment:
Decided to not refactor this method (at least not in this PR) since in
this method we are essentially updating all column stats, and by moving logic
directly out of the method we need to pass in those 6 stats map with some extra
info, which could easily make the helper method exceeding 10 arguments.
##########
File path: data/src/test/java/org/apache/iceberg/TestMergingMetrics.java
##########
@@ -105,74 +117,152 @@ public TestMergingMetrics(FileFormat fileFormat) {
@Test
public void verifySingleRecordMetric() throws Exception {
Record record = GenericRecord.create(SCHEMA);
- record.setField("id", 3);
- record.setField("float", Float.NaN); // FLOAT_FIELD - 1
- record.setField("double", Double.NaN); // DOUBLE_FIELD - 1
- record.setField("floatlist", ImmutableList.of(3.3F, 2.8F, Float.NaN,
-25.1F, Float.NaN)); // FLOAT_LIST - 2
- record.setField("map1", ImmutableMap.of(Float.NaN, "a", 0F, "b")); //
MAP_FIELD_1 - 1
- record.setField("map2", ImmutableMap.of(
+ record.setField(ID_FIELD.name(), 3);
+ record.setField(FLOAT_FIELD.name(), Float.NaN); // FLOAT_FIELD - 1
+ record.setField(DOUBLE_FIELD.name(), Double.NaN); // DOUBLE_FIELD - 1
+ record.setField(FLOAT_LIST.name(), ImmutableList.of(3.3F, 2.8F, Float.NaN,
-25.1F, Float.NaN)); // FLOAT_LIST - 2
+ record.setField(MAP_FIELD_1.name(), ImmutableMap.of(Float.NaN, "a", 0F,
"b")); // MAP_FIELD_1 - 1
+ record.setField(MAP_FIELD_2.name(), ImmutableMap.of(
0, 0D, 1, Double.NaN, 2, 2D, 3, Double.NaN, 4, Double.NaN)); //
MAP_FIELD_2 - 3
FileAppender<T> appender = writeAndGetAppender(ImmutableList.of(record));
- Map<Integer, Long> nanValueCount = appender.metrics().nanValueCounts();
+ Metrics metrics = appender.metrics();
+ Map<Integer, Long> nanValueCount = metrics.nanValueCounts();
+ Map<Integer, ByteBuffer> upperBounds = metrics.upperBounds();
+ Map<Integer, ByteBuffer> lowerBounds = metrics.lowerBounds();
assertNaNCountMatch(1L, nanValueCount, FLOAT_FIELD);
assertNaNCountMatch(1L, nanValueCount, DOUBLE_FIELD);
assertNaNCountMatch(2L, nanValueCount, FLOAT_LIST);
assertNaNCountMatch(1L, nanValueCount, MAP_FIELD_1);
assertNaNCountMatch(3L, nanValueCount, MAP_FIELD_2);
- }
- private void assertNaNCountMatch(Long expected, Map<Integer, Long>
nanValueCount, Types.NestedField field) {
- Assert.assertEquals(
- String.format("NaN count for field %s does not match expected",
field.name()),
- expected, nanValueCount.get(FIELDS_WITH_NAN_COUNT_TO_ID.get(field)));
+ assertBoundValueMatch(null, upperBounds, FLOAT_FIELD);
+ assertBoundValueMatch(null, upperBounds, DOUBLE_FIELD);
+ assertBoundValueMatch(3.3F, upperBounds, FLOAT_LIST);
+ assertBoundValueMatch(0F, upperBounds, MAP_FIELD_1);
+ assertBoundValueMatch(2D, upperBounds, MAP_FIELD_2);
+
+ assertBoundValueMatch(null, lowerBounds, FLOAT_FIELD);
+ assertBoundValueMatch(null, lowerBounds, DOUBLE_FIELD);
+ assertBoundValueMatch(-25.1F, lowerBounds, FLOAT_LIST);
+ assertBoundValueMatch(0F, lowerBounds, MAP_FIELD_1);
+ assertBoundValueMatch(0D, lowerBounds, MAP_FIELD_2);
}
@Test
public void verifyRandomlyGeneratedRecordsMetric() throws Exception {
- List<Record> recordList = RandomGenericData.generate(SCHEMA, 50, 250L);
-
+ // too big of the record count will more likely to make all upper/lower
bounds +/-infinity,
+ // which makes the tests easier to pass
+ List<Record> recordList = RandomGenericData.generate(SCHEMA, 5, 250L);
FileAppender<T> appender = writeAndGetAppender(recordList);
- Map<Integer, Long> nanValueCount = appender.metrics().nanValueCounts();
- FIELDS_WITH_NAN_COUNT_TO_ID.forEach((key, value) -> Assert.assertEquals(
- String.format("NaN count for field %s does not match expected",
key.name()),
- getExpectedNaNCount(recordList, key),
- nanValueCount.get(value)));
+ Map<Types.NestedField, AtomicReference<Number>> expectedUpperBounds = new
HashMap<>();
+ Map<Types.NestedField, AtomicReference<Number>> expectedLowerBounds = new
HashMap<>();
+ Map<Types.NestedField, AtomicLong> expectedNaNCount = new HashMap<>();
+
+ populateExpectedValues(recordList, expectedUpperBounds,
expectedLowerBounds, expectedNaNCount);
+
+ Metrics metrics = appender.metrics();
+ expectedUpperBounds.forEach((key, value) ->
assertBoundValueMatch(value.get(), metrics.upperBounds(), key));
+ expectedLowerBounds.forEach((key, value) ->
assertBoundValueMatch(value.get(), metrics.lowerBounds(), key));
+ expectedNaNCount.forEach((key, value) -> assertNaNCountMatch(value.get(),
metrics.nanValueCounts(), key));
SCHEMA.columns().stream()
.filter(column -> !FIELDS_WITH_NAN_COUNT_TO_ID.containsKey(column))
.map(Types.NestedField::fieldId)
- .forEach(id -> Assert.assertNull("NaN count for field %s should be
null", nanValueCount.get(id)));
+ .forEach(id -> Assert.assertNull("NaN count for field %s should be
null",
+ metrics.nanValueCounts().get(id)));
+ }
+
+ private void assertNaNCountMatch(Long expected, Map<Integer, Long>
nanValueCount, Types.NestedField field) {
+ Assert.assertEquals(
+ String.format("NaN count for field %s does not match expected",
field.name()),
+ expected, nanValueCount.get(FIELDS_WITH_NAN_COUNT_TO_ID.get(field)));
+ }
+
+ private void assertBoundValueMatch(Number expected, Map<Integer, ByteBuffer>
boundMap, Types.NestedField field) {
+ if (field.type().isNestedType() && fileFormat == FileFormat.ORC) {
+ // we don't update floating column bounds values within ORC nested
columns
Review comment:
We actually could make it work for Orc but I'm not sure if it's worth it
given metrics in nested types are not recorded/used normally anyway. The reason
for the different behavior in Parquet/Orc is the following:
Parquet tracks column stats per block, and when computing metrics we need to
update min/max when iterating through all blocks; to avoid doing so for
floating point columns that we track min/max ourselves now, we update min/max
for these columns after all blocks have been processed.
However Orc tracks stats per column so we could directly inject the new
bound value when we evaluates every column stats. And since Orc doesn't track
nested column stats, we never look at columns within nested fields.
##########
File path: core/src/main/java/org/apache/iceberg/FieldMetrics.java
##########
@@ -20,25 +20,23 @@
package org.apache.iceberg;
-import java.nio.ByteBuffer;
-
/**
* Iceberg internally tracked field level metrics.
*/
-public class FieldMetrics {
+public class FieldMetrics<T> {
Review comment:
Change in this class overlaps with what #1963 contains. For now this
class is not directly used/created by Parquet/ORC, and will be for Avro it
will, thus should be safe to change. Same goes for `FloatFieldMetrics`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]