rdsr commented on a change in pull request #1238:
URL: https://github.com/apache/iceberg/pull/1238#discussion_r466152224
##########
File path: spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
##########
@@ -20,399 +20,121 @@
package org.apache.iceberg.spark.data;
import java.util.List;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.orc.OrcRowWriter;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
import org.apache.orc.TypeDescription;
-import org.apache.orc.storage.common.type.HiveDecimal;
-import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
import org.apache.orc.storage.ql.exec.vector.ColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
-import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
-import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
-import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
-import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
-import org.apache.spark.sql.catalyst.util.ArrayData;
-import org.apache.spark.sql.catalyst.util.MapData;
/**
* This class acts as an adaptor from an OrcFileAppender to a
* FileAppender<InternalRow>.
*/
public class SparkOrcWriter implements OrcRowWriter<InternalRow> {
- private final Converter[] converters;
+ private final SparkOrcValueWriter writer;
- public SparkOrcWriter(TypeDescription schema) {
- converters = buildConverters(schema);
+ public SparkOrcWriter(Schema iSchema, TypeDescription orcSchema) {
+ Preconditions.checkArgument(orcSchema.getCategory() ==
TypeDescription.Category.STRUCT,
+ "Top level must be a struct " + orcSchema);
+
+ writer = OrcSchemaWithTypeVisitor.visit(iSchema, orcSchema, new
WriteBuilder());
}
@Override
public void write(InternalRow value, VectorizedRowBatch output) {
- int row = output.size++;
- for (int c = 0; c < converters.length; ++c) {
- converters[c].addValue(row, c, value, output.cols[c]);
- }
- }
-
- /**
- * The interface for the conversion from Spark's SpecializedGetters to
- * ORC's ColumnVectors.
- */
- interface Converter {
- /**
- * Take a value from the Spark data value and add it to the ORC output.
- * @param rowId the row in the ColumnVector
- * @param column either the column number or element number
- * @param data either an InternalRow or ArrayData
- * @param output the ColumnVector to put the value into
- */
- void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output);
- }
-
- static class BooleanConverter implements Converter {
- @Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- ((LongColumnVector) output).vector[rowId] = data.getBoolean(column) ?
1 : 0;
- }
- }
- }
-
- static class ByteConverter implements Converter {
- @Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- ((LongColumnVector) output).vector[rowId] = data.getByte(column);
- }
- }
- }
-
- static class ShortConverter implements Converter {
- @Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- ((LongColumnVector) output).vector[rowId] = data.getShort(column);
- }
- }
- }
-
- static class IntConverter implements Converter {
- @Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- ((LongColumnVector) output).vector[rowId] = data.getInt(column);
- }
- }
- }
-
- static class LongConverter implements Converter {
- @Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- ((LongColumnVector) output).vector[rowId] = data.getLong(column);
- }
- }
- }
-
- static class FloatConverter implements Converter {
- @Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- ((DoubleColumnVector) output).vector[rowId] = data.getFloat(column);
- }
- }
- }
-
- static class DoubleConverter implements Converter {
- @Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- ((DoubleColumnVector) output).vector[rowId] = data.getDouble(column);
- }
- }
- }
-
- static class StringConverter implements Converter {
- @Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- byte[] value = data.getUTF8String(column).getBytes();
- ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
- }
- }
- }
+ Preconditions.checkArgument(writer instanceof StructWriter, "writer must
be StructWriter");
- static class BytesConverter implements Converter {
- @Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- // getBinary always makes a copy, so we don't need to worry about it
- // being changed behind our back.
- byte[] value = data.getBinary(column);
- ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
- }
+ int row = output.size;
+ output.size += 1;
+ List<SparkOrcValueWriter> writers = ((StructWriter) writer).writers();
+ for (int c = 0; c < writers.size(); c++) {
+ SparkOrcValueWriter child = writers.get(c);
+ child.write(row, c, value, output.cols[c]);
}
}
- static class TimestampTzConverter implements Converter {
- @Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- TimestampColumnVector cv = (TimestampColumnVector) output;
- long micros = data.getLong(column);
- cv.time[rowId] = micros / 1_000; // millis
- cv.nanos[rowId] = (int) (micros % 1_000_000) * 1_000; // nanos
- }
- }
- }
-
- static class Decimal18Converter implements Converter {
- private final int precision;
- private final int scale;
-
- Decimal18Converter(TypeDescription schema) {
- precision = schema.getPrecision();
- scale = schema.getScale();
+ private static class WriteBuilder extends
OrcSchemaWithTypeVisitor<SparkOrcValueWriter> {
+ private WriteBuilder() {
}
@Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- ((DecimalColumnVector) output).vector[rowId].setFromLongAndScale(
- data.getDecimal(column, precision, scale).toUnscaledLong(), scale);
- }
- }
- }
-
- static class Decimal38Converter implements Converter {
- private final int precision;
- private final int scale;
-
- Decimal38Converter(TypeDescription schema) {
- precision = schema.getPrecision();
- scale = schema.getScale();
+ public SparkOrcValueWriter record(Types.StructType iStruct,
TypeDescription record,
+ List<String> names,
List<SparkOrcValueWriter> fields) {
+ return new StructWriter(fields);
}
@Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- ((DecimalColumnVector) output).vector[rowId].set(
- HiveDecimal.create(data.getDecimal(column, precision, scale)
- .toJavaBigDecimal()));
- }
- }
- }
-
- static class StructConverter implements Converter {
- private final Converter[] children;
-
- StructConverter(TypeDescription schema) {
- children = new Converter[schema.getChildren().size()];
- for (int c = 0; c < children.length; ++c) {
- children[c] = buildConverter(schema.getChildren().get(c));
- }
+ public SparkOrcValueWriter list(Types.ListType iList, TypeDescription
array,
+ SparkOrcValueWriter element) {
+ return SparkOrcValueWriters.list(element);
}
@Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- InternalRow value = data.getStruct(column, children.length);
- StructColumnVector cv = (StructColumnVector) output;
- for (int c = 0; c < children.length; ++c) {
- children[c].addValue(rowId, c, value, cv.fields[c]);
- }
- }
- }
- }
-
- static class ListConverter implements Converter {
- private final Converter children;
-
- ListConverter(TypeDescription schema) {
- children = buildConverter(schema.getChildren().get(0));
+ public SparkOrcValueWriter map(Types.MapType iMap, TypeDescription map,
+ SparkOrcValueWriter key,
SparkOrcValueWriter value) {
+ return SparkOrcValueWriters.map(key, value);
}
@Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- ArrayData value = data.getArray(column);
- ListColumnVector cv = (ListColumnVector) output;
- // record the length and start of the list elements
- cv.lengths[rowId] = value.numElements();
- cv.offsets[rowId] = cv.childCount;
- cv.childCount += cv.lengths[rowId];
- // make sure the child is big enough
- cv.child.ensureSize(cv.childCount, true);
- // Add each element
- for (int e = 0; e < cv.lengths[rowId]; ++e) {
- children.addValue((int) (e + cv.offsets[rowId]), e, value, cv.child);
- }
- }
- }
- }
-
- static class MapConverter implements Converter {
- private final Converter keyConverter;
- private final Converter valueConverter;
-
- MapConverter(TypeDescription schema) {
- keyConverter = buildConverter(schema.getChildren().get(0));
- valueConverter = buildConverter(schema.getChildren().get(1));
+ public SparkOrcValueWriter primitive(Type.PrimitiveType iPrimitive,
TypeDescription primitive) {
+ switch (primitive.getCategory()) {
+ case BOOLEAN:
+ return SparkOrcValueWriters.booleans();
+ case BYTE:
+ return SparkOrcValueWriters.bytes();
+ case SHORT:
+ return SparkOrcValueWriters.shorts();
+ case DATE:
+ case INT:
+ return SparkOrcValueWriters.ints();
+ case LONG:
+ return SparkOrcValueWriters.longs();
+ case FLOAT:
+ return SparkOrcValueWriters.floats();
+ case DOUBLE:
+ return SparkOrcValueWriters.doubles();
+ case BINARY:
+ return SparkOrcValueWriters.byteArrays();
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ return SparkOrcValueWriters.strings();
+ case DECIMAL:
+ return SparkOrcValueWriters.decimal(primitive.getPrecision(),
primitive.getScale());
+ case TIMESTAMP_INSTANT:
Review comment:
I was thinking that we should also handle `TIMESTAMP` and not just
`TIMESTAMP_INSTANT` . But I checked with @shardulm94 and the reason is that
Spark does not support timestamp without timezone.
My other point was to dispatch on primitive so that it is consistent with
other writers. e.g GenericOrcWriter. But here it doesn't make a lot of
difference.
Thanks for the explanation!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]