rdblue commented on a change in pull request #828: iceberg-spark changes for 
vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/828#discussion_r410517220
 
 

 ##########
 File path: 
spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessors.java
 ##########
 @@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.vectorized;
+
+import io.netty.buffer.ArrowBuf;
+import java.math.BigInteger;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.holders.NullableVarCharHolder;
+import org.apache.iceberg.arrow.vectorized.IcebergArrowVectors;
+import org.apache.iceberg.arrow.vectorized.VectorHolder;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.vectorized.ArrowColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarArray;
+import org.apache.spark.unsafe.types.UTF8String;
+
+public class ArrowVectorAccessors {
+
+  private ArrowVectorAccessors() {}
+
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  static ArrowVectorAccessor getVectorAccessor(VectorHolder holder) {
+    Dictionary dictionary = holder.dictionary();
+    boolean isVectorDictEncoded = holder.isDictionaryEncoded();
+    ColumnDescriptor desc = holder.descriptor();
+    FieldVector vector = holder.vector();
+    PrimitiveType primitive = desc.getPrimitiveType();
+    if (isVectorDictEncoded) {
+      Preconditions.checkState(vector instanceof IntVector, "Dictionary ids 
should be stored in IntVectors only");
+      if (primitive.getOriginalType() != null) {
+        switch (desc.getPrimitiveType().getOriginalType()) {
+          case ENUM:
+          case JSON:
+          case UTF8:
+          case BSON:
+            return new DictionaryStringAccessor((IntVector) vector, 
dictionary);
+          case INT_64:
+          case TIMESTAMP_MILLIS:
+          case TIMESTAMP_MICROS:
+            return new DictionaryLongAccessor((IntVector) vector, dictionary);
+          case DECIMAL:
+            switch (primitive.getPrimitiveTypeName()) {
+              case BINARY:
+              case FIXED_LEN_BYTE_ARRAY:
+                return new DictionaryDecimalBinaryAccessor(
+                    (IntVector) vector,
+                    dictionary);
+              case INT64:
+                return new DictionaryDecimalLongAccessor(
+                    (IntVector) vector,
+                    dictionary);
+              case INT32:
+                return new DictionaryDecimalIntAccessor(
+                    (IntVector) vector,
+                    dictionary);
+              default:
+                throw new UnsupportedOperationException(
+                    "Unsupported base type for decimal: " + 
primitive.getPrimitiveTypeName());
+            }
+          default:
+            throw new UnsupportedOperationException(
+                "Unsupported logical type: " + primitive.getOriginalType());
+        }
+      } else {
+        switch (primitive.getPrimitiveTypeName()) {
+          case FIXED_LEN_BYTE_ARRAY:
+          case BINARY:
+            return new DictionaryBinaryAccessor((IntVector) vector, 
dictionary);
+          case FLOAT:
+            return new DictionaryFloatAccessor((IntVector) vector, dictionary);
+          case INT64:
+            return new DictionaryLongAccessor((IntVector) vector, dictionary);
+          case DOUBLE:
+            return new DictionaryDoubleAccessor((IntVector) vector, 
dictionary);
+          default:
+            throw new UnsupportedOperationException("Unsupported type: " + 
primitive);
+        }
+      }
+    } else {
+      if (vector instanceof BitVector) {
+        return new BooleanAccessor((BitVector) vector);
+      } else if (vector instanceof IntVector) {
+        return new IntAccessor((IntVector) vector);
+      } else if (vector instanceof BigIntVector) {
+        return new LongAccessor((BigIntVector) vector);
+      } else if (vector instanceof Float4Vector) {
+        return new FloatAccessor((Float4Vector) vector);
+      } else if (vector instanceof Float8Vector) {
+        return new DoubleAccessor((Float8Vector) vector);
+      } else if (vector instanceof IcebergArrowVectors.DecimalArrowVector) {
+        return new DecimalAccessor((IcebergArrowVectors.DecimalArrowVector) 
vector);
+      } else if (vector instanceof IcebergArrowVectors.VarcharArrowVector) {
+        return new StringAccessor((IcebergArrowVectors.VarcharArrowVector) 
vector);
+      } else if (vector instanceof IcebergArrowVectors.VarBinaryArrowVector) {
+        return new BinaryAccessor((IcebergArrowVectors.VarBinaryArrowVector) 
vector);
+      } else if (vector instanceof DateDayVector) {
+        return new DateAccessor((DateDayVector) vector);
+      } else if (vector instanceof TimeStampMicroTZVector) {
+        return new TimestampAccessor((TimeStampMicroTZVector) vector);
+      } else if (vector instanceof ListVector) {
+        ListVector listVector = (ListVector) vector;
+        return new ArrayAccessor(listVector);
+      } else if (vector instanceof StructVector) {
+        StructVector structVector = (StructVector) vector;
+        return new StructAccessor(structVector);
+      }
+    }
+    throw new UnsupportedOperationException("Unsupported type: " + primitive);
+  }
+
+  private static class BooleanAccessor extends ArrowVectorAccessor {
+
+    private final BitVector vector;
+
+    BooleanAccessor(BitVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final boolean getBoolean(int rowId) {
+      return vector.get(rowId) == 1;
+    }
+  }
+
+  private static class IntAccessor extends ArrowVectorAccessor {
+
+    private final IntVector vector;
+
+    IntAccessor(IntVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final int getInt(int rowId) {
+      return vector.get(rowId);
+    }
+  }
+
+  private static class LongAccessor extends ArrowVectorAccessor {
+
+    private final BigIntVector vector;
+
+    LongAccessor(BigIntVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final long getLong(int rowId) {
+      return vector.get(rowId);
+    }
+  }
+
+  private static class DictionaryLongAccessor extends 
DictionaryArrowVectorAccessor {
+
+    private final IntVector vector;
+
+    DictionaryLongAccessor(IntVector vector, Dictionary dictionary) {
+      super(vector, dictionary);
+      this.vector = vector;
+    }
+
+    @Override
+    final long getLong(int rowId) {
+      return parquetDictionary.decodeToLong(vector.get(rowId));
+    }
+  }
+
+  private static class FloatAccessor extends ArrowVectorAccessor {
+
+    private final Float4Vector vector;
+
+    FloatAccessor(Float4Vector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final float getFloat(int rowId) {
+      return vector.get(rowId);
+    }
+  }
+
+  private static class DictionaryFloatAccessor extends 
DictionaryArrowVectorAccessor {
+
+    private final IntVector vector;
+
+    DictionaryFloatAccessor(IntVector vector, Dictionary dictionary) {
+      super(vector, dictionary);
+      this.vector = vector;
+    }
+
+    @Override
+    final float getFloat(int rowId) {
+      return parquetDictionary.decodeToFloat(vector.get(rowId));
+    }
+  }
+
+  private static class DoubleAccessor extends ArrowVectorAccessor {
+
+    private final Float8Vector vector;
+
+    DoubleAccessor(Float8Vector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final double getDouble(int rowId) {
+      return vector.get(rowId);
+    }
+  }
+
+  private static class DictionaryDoubleAccessor extends 
DictionaryArrowVectorAccessor {
+
+    private final IntVector vector;
+
+    DictionaryDoubleAccessor(IntVector vector, Dictionary dictionary) {
+      super(vector, dictionary);
+      this.vector = vector;
+    }
+
+    @Override
+    final double getDouble(int rowId) {
+      return parquetDictionary.decodeToDouble(vector.get(rowId));
+    }
+  }
+
+  private static class DecimalAccessor extends ArrowVectorAccessor {
+
+    private final IcebergArrowVectors.DecimalArrowVector vector;
+
+    DecimalAccessor(IcebergArrowVectors.DecimalArrowVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final Decimal getDecimal(int rowId, int precision, int scale) {
+      return Decimal.apply(vector.getObject(rowId), precision, scale);
+    }
+  }
+
+  private static class StringAccessor extends ArrowVectorAccessor {
+
+    private final IcebergArrowVectors.VarcharArrowVector vector;
+    private final NullableVarCharHolder stringResult = new 
NullableVarCharHolder();
+
+    StringAccessor(IcebergArrowVectors.VarcharArrowVector vector) {
+      super(vector);
+      this.vector = vector;
+    }
+
+    @Override
+    final UTF8String getUTF8String(int rowId) {
+      vector.get(rowId, stringResult);
+      if (stringResult.isSet == 0) {
+        return null;
+      } else {
+        return UTF8String.fromAddress(
+            null,
+            stringResult.buffer.memoryAddress() + stringResult.start,
+            stringResult.end - stringResult.start);
+      }
+    }
+  }
+
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  private abstract static class DictionaryArrowVectorAccessor extends 
ArrowVectorAccessor {
+    final Dictionary parquetDictionary;
+    final IntVector dictionaryVector;
+
+    private DictionaryArrowVectorAccessor(IntVector vector, Dictionary 
dictionary) {
+      super(vector);
+      this.dictionaryVector = vector;
+      this.parquetDictionary = dictionary;
+    }
+  }
+
+  private static class DictionaryStringAccessor extends 
DictionaryArrowVectorAccessor {
+
+    DictionaryStringAccessor(IntVector vector, Dictionary dictionary) {
+      super(vector, dictionary);
 
 Review comment:
   I think that the dictionary should be eagerly decoded, for a few reasons:
   1. `getBytesUnsafe` will make a copy of the bytes when the binary is backed 
by a `ByteBuffer` or an array slice. It may also cache values and perform 
checks.
   2. `getBytesUnsafe` should only be used when the bytes are immediately 
consumed, but `UTF8String` keeps a reference, so even if there isn't a copy 
this could result in a correctness problem
   3. `decodeToBinary` should incur a dynamic dispatch cost (this also applies 
to the other dictionary readers)
   
   It's fairly easy to eagerly decode:
   ```java
     private static class DictionaryStringAccessor extends ArrowVectorAccessor {
       private IntVector offsetVector;
       private UTF8String[] decodedDictionary;
   
       DictionaryStringAccessor(IntVector vector, Dictionary dictionary) {
         super(vector);
         this.offsetVector = vector;
         this.decodedDictionary = IntStream.rangeClosed(0, 
dictionary.getMaxId())
             .mapToObj(dictionary::decodeToBinary)
             .map(binary -> UTF8String.fromBytes(binary.getBytes()))
             .toArray(UTF8String[]::new);
       }
   
       @Override
       final UTF8String getUTF8String(int rowId) {
         int offset = offsetVector.get(rowId);
         return decodedDictionary[offset];
       }
     }
   ```
   
   Also, there's no need for the `DictionaryArrowVectorAccessor`, since it only 
tracks one column that is also tracked by its parent.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to