openinx commented on a change in pull request #3248:
URL: https://github.com/apache/iceberg/pull/3248#discussion_r727053434



##########
File path: 
spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java
##########
@@ -19,245 +19,130 @@
 
 package org.apache.iceberg.spark.data;
 
+import java.util.List;
 import java.util.stream.Stream;
-import org.apache.iceberg.DoubleFieldMetrics;
 import org.apache.iceberg.FieldMetrics;
-import org.apache.iceberg.FloatFieldMetrics;
+import org.apache.iceberg.orc.OrcValueWriter;
+import org.apache.orc.TypeDescription;
 import org.apache.orc.storage.common.type.HiveDecimal;
 import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
 import org.apache.orc.storage.ql.exec.vector.ColumnVector;
 import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
 import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
-import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
 import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
 import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
-import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
 import org.apache.spark.sql.catalyst.util.ArrayData;
 import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.UTF8String;
 
 class SparkOrcValueWriters {
   private SparkOrcValueWriters() {
   }
 
-  static SparkOrcValueWriter booleans() {
-    return BooleanWriter.INSTANCE;
-  }
-
-  static SparkOrcValueWriter bytes() {
-    return ByteWriter.INSTANCE;
-  }
-
-  static SparkOrcValueWriter shorts() {
-    return ShortWriter.INSTANCE;
-  }
-
-  static SparkOrcValueWriter ints() {
-    return IntWriter.INSTANCE;
-  }
-
-  static SparkOrcValueWriter longs() {
-    return LongWriter.INSTANCE;
-  }
-
-  static SparkOrcValueWriter floats(int id) {
-    return new FloatWriter(id);
-  }
-
-  static SparkOrcValueWriter doubles(int id) {
-    return new DoubleWriter(id);
-  }
-
-  static SparkOrcValueWriter byteArrays() {
-    return BytesWriter.INSTANCE;
-  }
-
-  static SparkOrcValueWriter strings() {
+  static OrcValueWriter<?> strings() {
     return StringWriter.INSTANCE;
   }
 
-  static SparkOrcValueWriter timestampTz() {
+  static OrcValueWriter<?> timestampTz() {
     return TimestampTzWriter.INSTANCE;
   }
 
-  static SparkOrcValueWriter decimal(int precision, int scale) {
+  static OrcValueWriter<?> decimal(int precision, int scale) {
     if (precision <= 18) {
-      return new Decimal18Writer(precision, scale);
+      return new Decimal18Writer(scale);
     } else {
-      return new Decimal38Writer(precision, scale);
-    }
-  }
-
-  static SparkOrcValueWriter list(SparkOrcValueWriter element) {
-    return new ListWriter(element);
-  }
-
-  static SparkOrcValueWriter map(SparkOrcValueWriter keyWriter, 
SparkOrcValueWriter valueWriter) {
-    return new MapWriter(keyWriter, valueWriter);
-  }
-
-  private static class BooleanWriter implements SparkOrcValueWriter {
-    private static final BooleanWriter INSTANCE = new BooleanWriter();
-
-    @Override
-    public void nonNullWrite(int rowId, int column, SpecializedGetters data, 
ColumnVector output) {
-      ((LongColumnVector) output).vector[rowId] = data.getBoolean(column) ? 1 
: 0;
-    }
-  }
-
-  private static class ByteWriter implements SparkOrcValueWriter {
-    private static final ByteWriter INSTANCE = new ByteWriter();
-
-    @Override
-    public void nonNullWrite(int rowId, int column, SpecializedGetters data, 
ColumnVector output) {
-      ((LongColumnVector) output).vector[rowId] = data.getByte(column);
-    }
-  }
-
-  private static class ShortWriter implements SparkOrcValueWriter {
-    private static final ShortWriter INSTANCE = new ShortWriter();
-
-    @Override
-    public void nonNullWrite(int rowId, int column, SpecializedGetters data, 
ColumnVector output) {
-      ((LongColumnVector) output).vector[rowId] = data.getShort(column);
+      return new Decimal38Writer();
     }
   }
 
-  private static class IntWriter implements SparkOrcValueWriter {
-    private static final IntWriter INSTANCE = new IntWriter();
-
-    @Override
-    public void nonNullWrite(int rowId, int column, SpecializedGetters data, 
ColumnVector output) {
-      ((LongColumnVector) output).vector[rowId] = data.getInt(column);
-    }
+  static OrcValueWriter<?> list(OrcValueWriter<?> element, 
List<TypeDescription> orcType) {
+    return new ListWriter(element, orcType);
   }
 
-  private static class LongWriter implements SparkOrcValueWriter {
-    private static final LongWriter INSTANCE = new LongWriter();
-
-    @Override
-    public void nonNullWrite(int rowId, int column, SpecializedGetters data, 
ColumnVector output) {
-      ((LongColumnVector) output).vector[rowId] = data.getLong(column);
-    }
+  static OrcValueWriter<?> map(OrcValueWriter<?> keyWriter, OrcValueWriter<?> 
valueWriter,
+      List<TypeDescription> orcType) {
+    return new MapWriter(keyWriter, valueWriter, orcType);
   }
 
-  private static class FloatWriter implements SparkOrcValueWriter {
-    private final FloatFieldMetrics.Builder floatFieldMetricsBuilder;
-
-    private FloatWriter(int id) {
-      this.floatFieldMetricsBuilder = new FloatFieldMetrics.Builder(id);
-    }
-
-    @Override
-    public void nonNullWrite(int rowId, int column, SpecializedGetters data, 
ColumnVector output) {
-      float floatValue = data.getFloat(column);
-      ((DoubleColumnVector) output).vector[rowId] = floatValue;
-      floatFieldMetricsBuilder.addValue(floatValue);
-    }
-
-    @Override
-    public Stream<FieldMetrics<?>> metrics() {
-      return Stream.of(floatFieldMetricsBuilder.build());
-    }
-  }
-
-  private static class DoubleWriter implements SparkOrcValueWriter {
-    private final DoubleFieldMetrics.Builder doubleFieldMetricsBuilder;
-
-    private DoubleWriter(int id) {
-      this.doubleFieldMetricsBuilder = new DoubleFieldMetrics.Builder(id);
-    }
-
-    @Override
-    public void nonNullWrite(int rowId, int column, SpecializedGetters data, 
ColumnVector output) {
-      double doubleValue = data.getDouble(column);
-      ((DoubleColumnVector) output).vector[rowId] = doubleValue;
-      doubleFieldMetricsBuilder.addValue(doubleValue);
-    }
-
-    @Override
-    public Stream<FieldMetrics<?>> metrics() {
-      return Stream.of(doubleFieldMetricsBuilder.build());
-    }
-  }
-
-  private static class BytesWriter implements SparkOrcValueWriter {
-    private static final BytesWriter INSTANCE = new BytesWriter();
+  private static class StringWriter implements OrcValueWriter<UTF8String> {
+    private static final StringWriter INSTANCE = new StringWriter();
 
     @Override
-    public void nonNullWrite(int rowId, int column, SpecializedGetters data, 
ColumnVector output) {
-      // getBinary always makes a copy, so we don't need to worry about it
-      // being changed behind our back.
-      byte[] value = data.getBinary(column);
+    public void nonNullWrite(int rowId, UTF8String data, ColumnVector output) {
+      byte[] value = data.getBytes();
       ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
     }
-  }
-
-  private static class StringWriter implements SparkOrcValueWriter {
-    private static final StringWriter INSTANCE = new StringWriter();
 
     @Override
-    public void nonNullWrite(int rowId, int column, SpecializedGetters data, 
ColumnVector output) {
-      byte[] value = data.getUTF8String(column).getBytes();
-      ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
+    public Class<?> getJavaClass() {
+      return UTF8String.class;
     }
   }
 
-  private static class TimestampTzWriter implements SparkOrcValueWriter {
+  private static class TimestampTzWriter implements OrcValueWriter<Long> {
     private static final TimestampTzWriter INSTANCE = new TimestampTzWriter();
 
     @Override
-    public void nonNullWrite(int rowId, int column, SpecializedGetters data, 
ColumnVector output) {
+    public void nonNullWrite(int rowId, Long micros, ColumnVector output) {
       TimestampColumnVector cv = (TimestampColumnVector) output;
-      long micros = data.getLong(column); // it could be negative.
       cv.time[rowId] = Math.floorDiv(micros, 1_000); // millis
       cv.nanos[rowId] = (int) Math.floorMod(micros, 1_000_000) * 1_000; // 
nanos
     }
+
+    @Override
+    public Class<?> getJavaClass() {
+      return Long.class;
+    }
   }
 
-  private static class Decimal18Writer implements SparkOrcValueWriter {
-    private final int precision;
+  private static class Decimal18Writer implements OrcValueWriter<Decimal> {
     private final int scale;
 
-    Decimal18Writer(int precision, int scale) {
-      this.precision = precision;
+    Decimal18Writer(int scale) {
       this.scale = scale;
     }
 
     @Override
-    public void nonNullWrite(int rowId, int column, SpecializedGetters data, 
ColumnVector output) {
+    public void nonNullWrite(int rowId, Decimal decimal, ColumnVector output) {
       ((DecimalColumnVector) output).vector[rowId].setFromLongAndScale(
-          data.getDecimal(column, precision, scale).toUnscaledLong(), scale);
+          decimal.toUnscaledLong(), scale);
+    }
+
+    @Override
+    public Class<?> getJavaClass() {
+      return Decimal.class;
     }
   }
 
-  private static class Decimal38Writer implements SparkOrcValueWriter {
-    private final int precision;
-    private final int scale;
+  private static class Decimal38Writer implements OrcValueWriter<Decimal> {
 
-    Decimal38Writer(int precision, int scale) {
-      this.precision = precision;
-      this.scale = scale;
+    @Override
+    public void nonNullWrite(int rowId, Decimal decimal, ColumnVector output) {
+      ((DecimalColumnVector) output).vector[rowId].set(
+          HiveDecimal.create(decimal.toJavaBigDecimal()));
     }
 
     @Override
-    public void nonNullWrite(int rowId, int column, SpecializedGetters data, 
ColumnVector output) {
-      ((DecimalColumnVector) output).vector[rowId].set(
-          HiveDecimal.create(data.getDecimal(column, precision, scale)
-              .toJavaBigDecimal()));
+    public Class<?> getJavaClass() {
+      return Decimal.class;
     }
   }
 
-  private static class ListWriter implements SparkOrcValueWriter {
-    private final SparkOrcValueWriter writer;
+  private static class ListWriter implements OrcValueWriter<ArrayData> {
+    private final OrcValueWriter writer;
+    private final SparkOrcWriter.FieldGetter fieldGetter;
 
-    ListWriter(SparkOrcValueWriter writer) {
+    ListWriter(OrcValueWriter<?> writer, List<TypeDescription> orcTypes) {

Review comment:
       How about using the `elemType` as the TypeDescription argument because 
that looks more symmetrical to the writer?  I think we can move orcTypes size 
checking out of this scope.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to