rdblue commented on a change in pull request #1060:
URL: https://github.com/apache/iceberg/pull/1060#discussion_r434205788



##########
File path: data/src/main/java/org/apache/iceberg/MetricsAppender.java
##########
@@ -0,0 +1,839 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.DateTimeUtil;
+
+public class MetricsAppender<D> implements FileAppender<D> {
+
+  private final FileAppender<D> appender;
+  private final Schema schema;
+  private Metrics metrics = null;
+  private MetricsConfig metricsConfig = MetricsConfig.getDefault();
+  private MetricsCollector metricsCollector;
+
+  private MetricsAppender(FileAppender<D> appender, Schema schema, 
MetricsAppenderConfiguration conf) {
+    this.appender = appender;
+    this.schema = schema;
+    this.metricsCollector = TypeUtil.visit(schema, new 
BuildMetricsCollector(conf));
+  }
+
+  @Override
+  public void add(D datum) {
+    appender.add(datum);
+    metricsCollector.add(datum);
+  }
+
+  @Override
+  public Metrics metrics() {
+    Preconditions.checkState(metrics != null, "Cannot produce metrics until 
closed");
+    return metrics;
+  }
+
+  @Override
+  public long length() {
+    return appender.length();
+  }
+
+  @Override
+  public void close() throws IOException {
+    appender.close();
+
+    Map<Integer, Long> values = new HashMap<Integer, Long>();
+    Map<Integer, Long> nulls = new HashMap<Integer, Long>();
+    Map<Integer, ByteBuffer> lowerBounds = new HashMap<Integer, ByteBuffer>();
+    Map<Integer, ByteBuffer> upperBounds = new HashMap<Integer, ByteBuffer>();
+
+    ((Stream<FieldMetrics>) 
metricsCollector.getMetrics()).forEach(fieldMetrics -> {
+      values.put(fieldMetrics.getId(), fieldMetrics.getValueCount());
+      nulls.put(fieldMetrics.getId(), fieldMetrics.getNullValueCount());
+      lowerBounds.put(fieldMetrics.getId(), fieldMetrics.getLowerBound());
+      upperBounds.put(fieldMetrics.getId(), fieldMetrics.getUpperBound());
+    });
+
+    this.metrics = new Metrics(metricsCollector.count(),
+        null,
+        values,
+        nulls,
+        lowerBounds,
+        upperBounds);
+  }
+
+  private enum RecordRepresentation {
+    RECORD,
+    GENERIC_RECORD
+  }
+
+  private enum FixedTypeRepresentation {
+    BYTE_ARRAY,
+    BYTE_BUFFER,
+    GENERIC_FIXED
+  }
+
+  private enum DateTypeRepresentation {
+    INT,
+    LOCAL_DATE
+  }
+
+  private enum TimeTypeRepresentation {
+    LONG,
+    LOCAL_TIME
+  }
+
+  private enum TimestampTypeRepresentation {
+    LONG,
+    OFFSET_DATETIME,
+    LOCAL_DATETIME
+  }
+
+  private static class MetricsAppenderConfiguration {
+    private final FixedTypeRepresentation fixedTypeRepresentation;
+    private final DateTypeRepresentation dateTypeRepresentation;
+    private final TimeTypeRepresentation timeTypeRepresentation;
+    private final TimestampTypeRepresentation timestampTypeRepresentation;
+    private final RecordRepresentation recordRepresentation;
+
+    MetricsAppenderConfiguration(RecordRepresentation recordRepresentation,
+                                 FixedTypeRepresentation 
fixedTypeRepresentation,
+                                 DateTypeRepresentation dateTypeRepresentation,
+                                 TimeTypeRepresentation timeTypeRepresentation,
+                                 TimestampTypeRepresentation 
timestampTypeRepresentation) {
+      this.recordRepresentation = recordRepresentation;
+      this.fixedTypeRepresentation = fixedTypeRepresentation;
+      this.dateTypeRepresentation = dateTypeRepresentation;
+      this.timeTypeRepresentation = timeTypeRepresentation;
+      this.timestampTypeRepresentation = timestampTypeRepresentation;
+    }
+
+    public FixedTypeRepresentation getFixedTypeRepresentation() {
+      return fixedTypeRepresentation;
+    }
+
+    public DateTypeRepresentation getDateTypeRepresentation() {
+      return dateTypeRepresentation;
+    }
+
+    public TimestampTypeRepresentation getTimestampTypeRepresentation() {
+      return timestampTypeRepresentation;
+    }
+  }
+
+  public static class Builder<D> {
+
+    private final FileAppender<D> appender;
+    private final Schema schema;
+    private FixedTypeRepresentation fixedTypeRepresentation;
+    private DateTypeRepresentation dateTypeRepresentation;
+    private TimeTypeRepresentation timeTypeRepresentation;
+    private TimestampTypeRepresentation timestampTypeRepresentation;
+    private RecordRepresentation recordRepresentation;
+
+    public Builder(FileAppender<D> appender, Schema schema) {
+      this.appender = appender;
+      this.schema = schema;
+      // TODO default null values or force setting every time?
+      fixedTypeRepresentation = FixedTypeRepresentation.BYTE_ARRAY;
+      dateTypeRepresentation = DateTypeRepresentation.LOCAL_DATE;
+      timeTypeRepresentation = TimeTypeRepresentation.LOCAL_TIME;
+      timestampTypeRepresentation = 
TimestampTypeRepresentation.OFFSET_DATETIME;
+      recordRepresentation = RecordRepresentation.RECORD;
+    }
+
+    // TODO or setRecordRepresentation(RecordRepresentation) for every option?

Review comment:
       It is probably easier to configure the object model being used, like 
`avroGenerics`, `icebergGenerics`, `pigTuples`, or `hiveRows`. That's the 
approach we took to configure the MR InputFormat [object 
model](https://github.com/apache/iceberg/blob/master/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java#L189-L197).
   
   That would give you a set of representations:
   ```java
     public Builder avroGenerics() {
       fixedTypeRepresentation = GENERIC_FIXED;
       recordRepresentation = GENERIC_RECORD;
       dateRepresentation = INT;
       ...
     }
   ```
   
   But it may still be simpler to add metrics collection to the [`ValueWriter` 
interface](https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/avro/ValueWriter.java),
 since you could return no metrics by default and add to each implementation, 
which already has the typed value to write.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to