openinx commented on a change in pull request #1238:
URL: https://github.com/apache/iceberg/pull/1238#discussion_r466141100
##########
File path: spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
##########
@@ -20,399 +20,121 @@
package org.apache.iceberg.spark.data;
import java.util.List;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.orc.OrcRowWriter;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
import org.apache.orc.TypeDescription;
-import org.apache.orc.storage.common.type.HiveDecimal;
-import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
import org.apache.orc.storage.ql.exec.vector.ColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
-import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
-import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
-import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
-import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
-import org.apache.spark.sql.catalyst.util.ArrayData;
-import org.apache.spark.sql.catalyst.util.MapData;
/**
* This class acts as an adaptor from an OrcFileAppender to a
* FileAppender<InternalRow>.
*/
public class SparkOrcWriter implements OrcRowWriter<InternalRow> {
- private final Converter[] converters;
+ private final SparkOrcValueWriter writer;
- public SparkOrcWriter(TypeDescription schema) {
- converters = buildConverters(schema);
+ public SparkOrcWriter(Schema iSchema, TypeDescription orcSchema) {
+ Preconditions.checkArgument(orcSchema.getCategory() ==
TypeDescription.Category.STRUCT,
+ "Top level must be a struct " + orcSchema);
+
+ writer = OrcSchemaWithTypeVisitor.visit(iSchema, orcSchema, new
WriteBuilder());
}
@Override
public void write(InternalRow value, VectorizedRowBatch output) {
- int row = output.size++;
- for (int c = 0; c < converters.length; ++c) {
- converters[c].addValue(row, c, value, output.cols[c]);
- }
- }
-
- /**
- * The interface for the conversion from Spark's SpecializedGetters to
- * ORC's ColumnVectors.
- */
- interface Converter {
- /**
- * Take a value from the Spark data value and add it to the ORC output.
- * @param rowId the row in the ColumnVector
- * @param column either the column number or element number
- * @param data either an InternalRow or ArrayData
- * @param output the ColumnVector to put the value into
- */
- void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output);
- }
-
- static class BooleanConverter implements Converter {
- @Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- ((LongColumnVector) output).vector[rowId] = data.getBoolean(column) ?
1 : 0;
- }
- }
- }
-
- static class ByteConverter implements Converter {
- @Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- ((LongColumnVector) output).vector[rowId] = data.getByte(column);
- }
- }
- }
-
- static class ShortConverter implements Converter {
- @Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- ((LongColumnVector) output).vector[rowId] = data.getShort(column);
- }
- }
- }
-
- static class IntConverter implements Converter {
- @Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- ((LongColumnVector) output).vector[rowId] = data.getInt(column);
- }
- }
- }
-
- static class LongConverter implements Converter {
- @Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- ((LongColumnVector) output).vector[rowId] = data.getLong(column);
- }
- }
- }
-
- static class FloatConverter implements Converter {
- @Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- ((DoubleColumnVector) output).vector[rowId] = data.getFloat(column);
- }
- }
- }
-
- static class DoubleConverter implements Converter {
- @Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- ((DoubleColumnVector) output).vector[rowId] = data.getDouble(column);
- }
- }
- }
-
- static class StringConverter implements Converter {
- @Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- byte[] value = data.getUTF8String(column).getBytes();
- ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
- }
- }
- }
+ Preconditions.checkArgument(writer instanceof StructWriter, "writer must
be StructWriter");
- static class BytesConverter implements Converter {
- @Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- // getBinary always makes a copy, so we don't need to worry about it
- // being changed behind our back.
- byte[] value = data.getBinary(column);
- ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
- }
+ int row = output.size;
+ output.size += 1;
+ List<SparkOrcValueWriter> writers = ((StructWriter) writer).writers();
+ for (int c = 0; c < writers.size(); c++) {
+ SparkOrcValueWriter child = writers.get(c);
+ child.write(row, c, value, output.cols[c]);
}
}
- static class TimestampTzConverter implements Converter {
- @Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- TimestampColumnVector cv = (TimestampColumnVector) output;
- long micros = data.getLong(column);
- cv.time[rowId] = micros / 1_000; // millis
- cv.nanos[rowId] = (int) (micros % 1_000_000) * 1_000; // nanos
- }
- }
- }
-
- static class Decimal18Converter implements Converter {
- private final int precision;
- private final int scale;
-
- Decimal18Converter(TypeDescription schema) {
- precision = schema.getPrecision();
- scale = schema.getScale();
+ private static class WriteBuilder extends
OrcSchemaWithTypeVisitor<SparkOrcValueWriter> {
+ private WriteBuilder() {
}
@Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- ((DecimalColumnVector) output).vector[rowId].setFromLongAndScale(
- data.getDecimal(column, precision, scale).toUnscaledLong(), scale);
- }
- }
- }
-
- static class Decimal38Converter implements Converter {
- private final int precision;
- private final int scale;
-
- Decimal38Converter(TypeDescription schema) {
- precision = schema.getPrecision();
- scale = schema.getScale();
+ public SparkOrcValueWriter record(Types.StructType iStruct,
TypeDescription record,
+ List<String> names,
List<SparkOrcValueWriter> fields) {
+ return new StructWriter(fields);
}
@Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- ((DecimalColumnVector) output).vector[rowId].set(
- HiveDecimal.create(data.getDecimal(column, precision, scale)
- .toJavaBigDecimal()));
- }
- }
- }
-
- static class StructConverter implements Converter {
- private final Converter[] children;
-
- StructConverter(TypeDescription schema) {
- children = new Converter[schema.getChildren().size()];
- for (int c = 0; c < children.length; ++c) {
- children[c] = buildConverter(schema.getChildren().get(c));
- }
+ public SparkOrcValueWriter list(Types.ListType iList, TypeDescription
array,
+ SparkOrcValueWriter element) {
+ return SparkOrcValueWriters.list(element);
}
@Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- InternalRow value = data.getStruct(column, children.length);
- StructColumnVector cv = (StructColumnVector) output;
- for (int c = 0; c < children.length; ++c) {
- children[c].addValue(rowId, c, value, cv.fields[c]);
- }
- }
- }
- }
-
- static class ListConverter implements Converter {
- private final Converter children;
-
- ListConverter(TypeDescription schema) {
- children = buildConverter(schema.getChildren().get(0));
+ public SparkOrcValueWriter map(Types.MapType iMap, TypeDescription map,
+ SparkOrcValueWriter key,
SparkOrcValueWriter value) {
+ return SparkOrcValueWriters.map(key, value);
}
@Override
- public void addValue(int rowId, int column, SpecializedGetters data,
- ColumnVector output) {
- if (data.isNullAt(column)) {
- output.noNulls = false;
- output.isNull[rowId] = true;
- } else {
- output.isNull[rowId] = false;
- ArrayData value = data.getArray(column);
- ListColumnVector cv = (ListColumnVector) output;
- // record the length and start of the list elements
- cv.lengths[rowId] = value.numElements();
- cv.offsets[rowId] = cv.childCount;
- cv.childCount += cv.lengths[rowId];
- // make sure the child is big enough
- cv.child.ensureSize(cv.childCount, true);
- // Add each element
- for (int e = 0; e < cv.lengths[rowId]; ++e) {
- children.addValue((int) (e + cv.offsets[rowId]), e, value, cv.child);
- }
- }
- }
- }
-
- static class MapConverter implements Converter {
- private final Converter keyConverter;
- private final Converter valueConverter;
-
- MapConverter(TypeDescription schema) {
- keyConverter = buildConverter(schema.getChildren().get(0));
- valueConverter = buildConverter(schema.getChildren().get(1));
+ public SparkOrcValueWriter primitive(Type.PrimitiveType iPrimitive,
TypeDescription primitive) {
+ switch (primitive.getCategory()) {
+ case BOOLEAN:
+ return SparkOrcValueWriters.booleans();
+ case BYTE:
+ return SparkOrcValueWriters.bytes();
+ case SHORT:
+ return SparkOrcValueWriters.shorts();
+ case DATE:
+ case INT:
+ return SparkOrcValueWriters.ints();
+ case LONG:
+ return SparkOrcValueWriters.longs();
+ case FLOAT:
+ return SparkOrcValueWriters.floats();
+ case DOUBLE:
+ return SparkOrcValueWriters.doubles();
+ case BINARY:
+ return SparkOrcValueWriters.byteArrays();
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ return SparkOrcValueWriters.strings();
+ case DECIMAL:
+ return SparkOrcValueWriters.decimal(primitive.getPrecision(),
primitive.getScale());
+ case TIMESTAMP_INSTANT:
Review comment:
> So in this branch of the switch, the type is definitely timestamptz.
Yes, in this `TIMESAMP_INSTANT` case, it must be a timestamp with time
zone. see the Category in TypeDescription:
```java
public enum Category {
BOOLEAN("boolean", true),
BYTE("tinyint", true),
SHORT("smallint", true),
INT("int", true),
LONG("bigint", true),
FLOAT("float", true),
DOUBLE("double", true),
STRING("string", true),
DATE("date", true),
TIMESTAMP("timestamp", true),
BINARY("binary", true),
DECIMAL("decimal", true),
VARCHAR("varchar", true),
CHAR("char", true),
LIST("array", false),
MAP("map", false),
STRUCT("struct", false),
UNION("uniontype", false),
TIMESTAMP_INSTANT("timestamp with local time zone", false);
```
btw, Is there any reason that we spark don't support a timestamp without
time zone ? I saw the spark test
[here](https://github.com/apache/iceberg/blob/ffdcf09027e09460b7d7505e65aea119107934a3/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java#L52)
did not include the `timestamp without zone` data type. Maybe we could file
another issue to address this `timestamp without zone` issue, because this is a
refactor PR and we could get this merged firstly.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]