rdblue commented on a change in pull request #1060: URL: https://github.com/apache/iceberg/pull/1060#discussion_r434205788
########## File path: data/src/main/java/org/apache/iceberg/MetricsAppender.java ########## @@ -0,0 +1,839 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.Comparator; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Stream; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericFixed; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; + +public class MetricsAppender<D> implements FileAppender<D> { + + private final FileAppender<D> appender; + private final Schema schema; + private Metrics metrics = null; + private MetricsConfig metricsConfig = MetricsConfig.getDefault(); + private MetricsCollector metricsCollector; + + private MetricsAppender(FileAppender<D> appender, Schema schema, MetricsAppenderConfiguration conf) { + this.appender = appender; + this.schema = schema; + this.metricsCollector = TypeUtil.visit(schema, new BuildMetricsCollector(conf)); + } + + @Override + public void add(D datum) { + appender.add(datum); + metricsCollector.add(datum); + } + + @Override + public Metrics metrics() { + Preconditions.checkState(metrics != null, "Cannot produce metrics until closed"); + return metrics; + } + + @Override + public long length() { + return appender.length(); + } + + @Override + public void close() throws IOException { + appender.close(); + + Map<Integer, Long> values = new HashMap<Integer, Long>(); + Map<Integer, Long> nulls = new HashMap<Integer, Long>(); + Map<Integer, ByteBuffer> lowerBounds = new HashMap<Integer, ByteBuffer>(); + Map<Integer, ByteBuffer> upperBounds = new HashMap<Integer, ByteBuffer>(); + + ((Stream<FieldMetrics>) metricsCollector.getMetrics()).forEach(fieldMetrics -> { + values.put(fieldMetrics.getId(), fieldMetrics.getValueCount()); + nulls.put(fieldMetrics.getId(), fieldMetrics.getNullValueCount()); + lowerBounds.put(fieldMetrics.getId(), fieldMetrics.getLowerBound()); + upperBounds.put(fieldMetrics.getId(), fieldMetrics.getUpperBound()); + }); + + this.metrics = new Metrics(metricsCollector.count(), + null, + values, + nulls, + lowerBounds, + upperBounds); + } + + private enum RecordRepresentation { + RECORD, + GENERIC_RECORD + } + + private enum FixedTypeRepresentation { + BYTE_ARRAY, + BYTE_BUFFER, + GENERIC_FIXED + } + + private enum DateTypeRepresentation { + INT, + LOCAL_DATE + } + + private enum TimeTypeRepresentation { + LONG, + LOCAL_TIME + } + + private enum TimestampTypeRepresentation { + LONG, + OFFSET_DATETIME, + LOCAL_DATETIME + } + + private static class MetricsAppenderConfiguration { + private final FixedTypeRepresentation fixedTypeRepresentation; + private final DateTypeRepresentation dateTypeRepresentation; + private final TimeTypeRepresentation timeTypeRepresentation; + private final TimestampTypeRepresentation timestampTypeRepresentation; + private final RecordRepresentation recordRepresentation; + + MetricsAppenderConfiguration(RecordRepresentation recordRepresentation, + FixedTypeRepresentation fixedTypeRepresentation, + DateTypeRepresentation dateTypeRepresentation, + TimeTypeRepresentation timeTypeRepresentation, + TimestampTypeRepresentation timestampTypeRepresentation) { + this.recordRepresentation = recordRepresentation; + this.fixedTypeRepresentation = fixedTypeRepresentation; + this.dateTypeRepresentation = dateTypeRepresentation; + this.timeTypeRepresentation = timeTypeRepresentation; + this.timestampTypeRepresentation = timestampTypeRepresentation; + } + + public FixedTypeRepresentation getFixedTypeRepresentation() { + return fixedTypeRepresentation; + } + + public DateTypeRepresentation getDateTypeRepresentation() { + return dateTypeRepresentation; + } + + public TimestampTypeRepresentation getTimestampTypeRepresentation() { + return timestampTypeRepresentation; + } + } + + public static class Builder<D> { + + private final FileAppender<D> appender; + private final Schema schema; + private FixedTypeRepresentation fixedTypeRepresentation; + private DateTypeRepresentation dateTypeRepresentation; + private TimeTypeRepresentation timeTypeRepresentation; + private TimestampTypeRepresentation timestampTypeRepresentation; + private RecordRepresentation recordRepresentation; + + public Builder(FileAppender<D> appender, Schema schema) { + this.appender = appender; + this.schema = schema; + // TODO default null values or force setting every time? + fixedTypeRepresentation = FixedTypeRepresentation.BYTE_ARRAY; + dateTypeRepresentation = DateTypeRepresentation.LOCAL_DATE; + timeTypeRepresentation = TimeTypeRepresentation.LOCAL_TIME; + timestampTypeRepresentation = TimestampTypeRepresentation.OFFSET_DATETIME; + recordRepresentation = RecordRepresentation.RECORD; + } + + // TODO or setRecordRepresentation(RecordRepresentation) for every option? Review comment: It is probably easier to configure the object model being used, like `avroGenerics`, `icebergGenerics`, `pigTuples`, or `hiveRows`. That's the approach we took to configure the MR InputFormat [object model](https://github.com/apache/iceberg/blob/master/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java#L189-L197). That would give you a set of representations: ```java public Builder avroGenerics() { fixedTypeRepresentation = GENERIC_FIXED; recordRepresentation = GENERIC_RECORD; dateRepresentation = INT; ... } ``` But it may still be simpler to add metrics collection to the [`ValueWriter` interface](https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/avro/ValueWriter.java), since you could return no metrics by default and add to each implementation, which already has the typed value to write. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
