alexeykudinkin commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1090044032
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -94,8 +92,8 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table,
// In case Advanced Schema Evolution is enabled we might need to rewrite
currently
// persisted records to adhere to an evolved schema
- Option<Pair<Function<Schema, Function<HoodieRecord, HoodieRecord>>,
Schema>> schemaEvolutionTransformerOpt =
- composeSchemaEvolutionTransformer(writerSchema, baseFile, writeConfig,
table.getMetaClient());
+ Option<Function<HoodieRecord, HoodieRecord>> schemaEvolutionTransformerOpt
=
Review Comment:
Simplifying implementation by supplying reader-scheme into the method
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java:
##########
@@ -38,19 +38,7 @@
public class SparkLazyInsertIterable<T> extends HoodieLazyInsertIterable<T> {
- private boolean useWriterSchema;
-
- public SparkLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
Review Comment:
Dead code
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -150,8 +151,9 @@ public String getRecordKey(Schema recordSchema,
Option<BaseKeyGenerator> keyGene
return getRecordKey();
}
StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
- return keyGeneratorOpt.isPresent() ? ((SparkKeyGeneratorInterface)
keyGeneratorOpt.get())
- .getRecordKey(data, structType).toString() :
data.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
+ return keyGeneratorOpt.isPresent()
Review Comment:
This code is unchanged (there was a change but it got reverted)
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala:
##########
@@ -18,304 +18,413 @@
package org.apache.spark.sql
-import java.nio.charset.StandardCharsets
-import java.util.HashMap
-import java.util.concurrent.ConcurrentHashMap
import org.apache.avro.Schema
import org.apache.hbase.thirdparty.com.google.common.base.Supplier
-import org.apache.hudi.AvroConversionUtils
+import org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType
import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, toJavaDate}
import org.apache.hudi.exception.HoodieException
-import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow,
UnsafeProjection}
+import
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
+import org.apache.spark.sql.HoodieUnsafeRowUtils.{NestedFieldPath,
composeNestedFieldPath}
+import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow,
UnsafeArrayData, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData,
GenericArrayData, MapData}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.types.Decimal.ROUND_HALF_EVEN
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
+
+import java.util.concurrent.ConcurrentHashMap
+import java.util.{ArrayDeque => JArrayDeque, Collections => JCollections,
Deque => JDeque, Map => JMap}
+import java.util.function.{Function => JFunction}
import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter
object HoodieInternalRowUtils {
- // Projection are all thread local. Projection is not thread-safe
- val unsafeProjectionThreadLocal: ThreadLocal[HashMap[(StructType,
StructType), UnsafeProjection]] =
- ThreadLocal.withInitial(new Supplier[HashMap[(StructType, StructType),
UnsafeProjection]] {
- override def get(): HashMap[(StructType, StructType), UnsafeProjection]
= new HashMap[(StructType, StructType), UnsafeProjection]
+ private type RenamedColumnMap = JMap[String, String]
+ private type UnsafeRowWriter = InternalRow => UnsafeRow
Review Comment:
Utilities in this class had to be essentially reimplemented to support
decoupling of generation of `RowWriter` transforming rows from one schema into
another.
Decoupling is implemented in a following way (mirroring that one implemented
in Spark's Avro Serializer/Deserializer):
- When `RowWriter` is created we traverse both new and old schemas and
determine how the fields are mapped
- For every new field we create a field-writer (returned by
`newWriterRenaming`) that accepts `CatalystFieldUpdater` and current value of
the field and transforms into the new schema by either renaming, converting or
keeping it intact
- For fields held w/in a struct all field-writers are assembled into a
single struct's `RowWriter` (returned by `genUnsafeStructWriter`)
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java:
##########
@@ -127,7 +127,9 @@ public void testInterruptExecutor() {
@Override
public void consume(HoodieRecord record) {
try {
- Thread.currentThread().wait();
+ synchronized (this) {
Review Comment:
That's fixing the flaky test
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala:
##########
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi
-
-import org.apache.hudi.common.model.HoodieRecord
-import org.apache.hudi.testutils.HoodieClientTestUtils
-
-import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.sql.types._
-import org.apache.spark.sql.{HoodieInternalRowUtils, Row, SparkSession}
-import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-
-class TestHoodieInternalRowUtils extends FunSuite with Matchers with
BeforeAndAfterAll {
Review Comment:
These are merged into another test suite
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -186,48 +192,39 @@ public HoodieRecord joinWith(HoodieRecord other, Schema
targetSchema) {
}
@Override
- public HoodieRecord rewriteRecord(Schema recordSchema, Properties props,
Schema targetSchema) throws IOException {
+ public HoodieRecord prependMetaFields(Schema recordSchema, Schema
targetSchema, MetadataValues metadataValues, Properties props) {
StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
StructType targetStructType =
HoodieInternalRowUtils.getCachedSchema(targetSchema);
- // TODO HUDI-5281 Rewrite HoodieSparkRecord with UnsafeRowWriter
- InternalRow rewriteRecord =
HoodieInternalRowUtils.rewriteRecord(this.data, structType, targetStructType);
- UnsafeRow unsafeRow =
HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType,
targetStructType).apply(rewriteRecord);
-
- boolean containMetaFields = hasMetaFields(targetStructType);
- UTF8String[] metaFields = tryExtractMetaFields(unsafeRow,
targetStructType);
- HoodieInternalRow internalRow = new HoodieInternalRow(metaFields,
unsafeRow, containMetaFields);
+ HoodieInternalRow updatableRow = wrapIntoUpdatableOverlay(this.data,
structType);
+ updateMetadataValuesInternal(updatableRow, metadataValues);
- return new HoodieSparkRecord(getKey(), internalRow, targetStructType,
getOperation(), this.currentLocation, this.newLocation, false);
+ return new HoodieSparkRecord(getKey(), updatableRow, targetStructType,
getOperation(), this.currentLocation, this.newLocation, false);
}
@Override
- public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema,
Properties props, Schema newSchema, Map<String, String> renameCols) throws
IOException {
+ public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema,
Properties props, Schema newSchema, Map<String, String> renameCols) {
StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
StructType newStructType =
HoodieInternalRowUtils.getCachedSchema(newSchema);
- // TODO HUDI-5281 Rewrite HoodieSparkRecord with UnsafeRowWriter
- InternalRow rewriteRecord =
HoodieInternalRowUtils.rewriteRecordWithNewSchema(this.data, structType,
newStructType, renameCols);
- UnsafeRow unsafeRow =
HoodieInternalRowUtils.getCachedUnsafeProjection(newStructType,
newStructType).apply(rewriteRecord);
+ Function1<InternalRow, UnsafeRow> unsafeRowWriter =
+ HoodieInternalRowUtils.getCachedUnsafeRowWriter(structType,
newStructType, renameCols);
- boolean containMetaFields = hasMetaFields(newStructType);
Review Comment:
Wrapping into `HoodieInternalRow` has been removed and abstracted to only
occur in `prependMetaFields` API (considerably simplifying the impl here)
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala:
##########
@@ -18,304 +18,413 @@
package org.apache.spark.sql
-import java.nio.charset.StandardCharsets
-import java.util.HashMap
-import java.util.concurrent.ConcurrentHashMap
import org.apache.avro.Schema
import org.apache.hbase.thirdparty.com.google.common.base.Supplier
-import org.apache.hudi.AvroConversionUtils
+import org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType
import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, toJavaDate}
import org.apache.hudi.exception.HoodieException
-import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow,
UnsafeProjection}
+import
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
+import org.apache.spark.sql.HoodieUnsafeRowUtils.{NestedFieldPath,
composeNestedFieldPath}
+import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow,
UnsafeArrayData, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData,
GenericArrayData, MapData}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.types.Decimal.ROUND_HALF_EVEN
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
+
+import java.util.concurrent.ConcurrentHashMap
+import java.util.{ArrayDeque => JArrayDeque, Collections => JCollections,
Deque => JDeque, Map => JMap}
+import java.util.function.{Function => JFunction}
import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter
object HoodieInternalRowUtils {
- // Projection are all thread local. Projection is not thread-safe
- val unsafeProjectionThreadLocal: ThreadLocal[HashMap[(StructType,
StructType), UnsafeProjection]] =
- ThreadLocal.withInitial(new Supplier[HashMap[(StructType, StructType),
UnsafeProjection]] {
- override def get(): HashMap[(StructType, StructType), UnsafeProjection]
= new HashMap[(StructType, StructType), UnsafeProjection]
+ private type RenamedColumnMap = JMap[String, String]
+ private type UnsafeRowWriter = InternalRow => UnsafeRow
+
+ // NOTE: [[UnsafeProjection]] objects cache have to stay [[ThreadLocal]]
since these are not thread-safe
+ private val unsafeWriterThreadLocal:
ThreadLocal[mutable.HashMap[(StructType, StructType, RenamedColumnMap),
UnsafeRowWriter]] =
+ ThreadLocal.withInitial(new Supplier[mutable.HashMap[(StructType,
StructType, RenamedColumnMap), UnsafeRowWriter]] {
+ override def get(): mutable.HashMap[(StructType, StructType,
RenamedColumnMap), UnsafeRowWriter] =
+ new mutable.HashMap[(StructType, StructType, RenamedColumnMap),
UnsafeRowWriter]
})
- val schemaMap = new ConcurrentHashMap[Schema, StructType]
- val orderPosListMap = new ConcurrentHashMap[(StructType, String),
NestedFieldPath]
- /**
- * @see
org.apache.hudi.avro.HoodieAvroUtils#rewriteRecord(org.apache.avro.generic.GenericRecord,
org.apache.avro.Schema)
- */
- def rewriteRecord(oldRecord: InternalRow, oldSchema: StructType, newSchema:
StructType): InternalRow = {
- val newRow = new
GenericInternalRow(Array.fill(newSchema.fields.length)(null).asInstanceOf[Array[Any]])
-
- for ((field, pos) <- newSchema.fields.zipWithIndex) {
- var oldValue: AnyRef = null
- var oldType: DataType = null
- if (existField(oldSchema, field.name)) {
- val oldField = oldSchema(field.name)
- val oldPos = oldSchema.fieldIndex(field.name)
- oldType = oldField.dataType
- oldValue = oldRecord.get(oldPos, oldType)
- }
- if (oldValue != null) {
- field.dataType match {
- case structType: StructType =>
- val oldType =
oldSchema(field.name).dataType.asInstanceOf[StructType]
- val newValue = rewriteRecord(oldValue.asInstanceOf[InternalRow],
oldType, structType)
- newRow.update(pos, newValue)
- case decimalType: DecimalType =>
- val oldFieldSchema =
oldSchema(field.name).dataType.asInstanceOf[DecimalType]
- if (decimalType.scale != oldFieldSchema.scale ||
decimalType.precision != oldFieldSchema.precision) {
- newRow.update(pos,
Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale))
- )
- } else {
- newRow.update(pos, oldValue)
- }
- case t if t == oldType => newRow.update(pos, oldValue)
- // Type promotion
- case _: ShortType =>
- oldType match {
- case _: ByteType => newRow.update(pos,
oldValue.asInstanceOf[Byte].toShort)
- case _ => throw new IllegalArgumentException(s"$oldSchema and
$newSchema are incompatible")
- }
- case _: IntegerType =>
- oldType match {
- case _: ByteType => newRow.update(pos,
oldValue.asInstanceOf[Byte].toInt)
- case _: ShortType => newRow.update(pos,
oldValue.asInstanceOf[Short].toInt)
- case _ => throw new IllegalArgumentException(s"$oldSchema and
$newSchema are incompatible")
- }
- case _: LongType =>
- oldType match {
- case _: ByteType => newRow.update(pos,
oldValue.asInstanceOf[Byte].toLong)
- case _: ShortType => newRow.update(pos,
oldValue.asInstanceOf[Short].toLong)
- case _: IntegerType => newRow.update(pos,
oldValue.asInstanceOf[Int].toLong)
- case _ => throw new IllegalArgumentException(s"$oldSchema and
$newSchema are incompatible")
- }
- case _: FloatType =>
- oldType match {
- case _: ByteType => newRow.update(pos,
oldValue.asInstanceOf[Byte].toFloat)
- case _: ShortType => newRow.update(pos,
oldValue.asInstanceOf[Short].toFloat)
- case _: IntegerType => newRow.update(pos,
oldValue.asInstanceOf[Int].toFloat)
- case _: LongType => newRow.update(pos,
oldValue.asInstanceOf[Long].toFloat)
- case _ => throw new IllegalArgumentException(s"$oldSchema and
$newSchema are incompatible")
- }
- case _: DoubleType =>
- oldType match {
- case _: ByteType => newRow.update(pos,
oldValue.asInstanceOf[Byte].toDouble)
- case _: ShortType => newRow.update(pos,
oldValue.asInstanceOf[Short].toDouble)
- case _: IntegerType => newRow.update(pos,
oldValue.asInstanceOf[Int].toDouble)
- case _: LongType => newRow.update(pos,
oldValue.asInstanceOf[Long].toDouble)
- case _: FloatType => newRow.update(pos,
oldValue.asInstanceOf[Float].toDouble)
- case _ => throw new IllegalArgumentException(s"$oldSchema and
$newSchema are incompatible")
- }
- case _: BinaryType if oldType.isInstanceOf[StringType] =>
newRow.update(pos, oldValue.asInstanceOf[String].getBytes)
- case _ => newRow.update(pos, oldValue)
- }
- } else {
- // TODO default value in newSchema
- }
- }
+ // NOTE: [[UnsafeRowWriter]] objects cache have to stay [[ThreadLocal]]
since these are not thread-safe
+ private val unsafeProjectionThreadLocal:
ThreadLocal[mutable.HashMap[(StructType, StructType), UnsafeProjection]] =
+ ThreadLocal.withInitial(new Supplier[mutable.HashMap[(StructType,
StructType), UnsafeProjection]] {
+ override def get(): mutable.HashMap[(StructType, StructType),
UnsafeProjection] =
+ new mutable.HashMap[(StructType, StructType), UnsafeProjection]
+ })
- newRow
- }
+ private val schemaMap = new ConcurrentHashMap[Schema, StructType]
+ private val orderPosListMap = new ConcurrentHashMap[(StructType, String),
Option[NestedFieldPath]]
/**
- * @see
org.apache.hudi.avro.HoodieAvroUtils#rewriteRecordWithNewSchema(org.apache.avro.generic.IndexedRecord,
org.apache.avro.Schema, java.util.Map)
+ * Provides cached instance of [[UnsafeProjection]] transforming provided
[[InternalRow]]s from
+ * one [[StructType]] and into another [[StructType]]
+ *
+ * For more details regarding its semantic, please check corresponding
scala-doc for
+ * [[HoodieCatalystExpressionUtils.generateUnsafeProjection]]
*/
- def rewriteRecordWithNewSchema(oldRecord: InternalRow, oldSchema:
StructType, newSchema: StructType, renameCols: java.util.Map[String, String]):
InternalRow = {
- rewriteRecordWithNewSchema(oldRecord, oldSchema, newSchema, renameCols,
new java.util.LinkedList[String]).asInstanceOf[InternalRow]
+ def getCachedUnsafeProjection(from: StructType, to: StructType):
UnsafeProjection = {
+ unsafeProjectionThreadLocal.get()
+ .getOrElseUpdate((from, to), generateUnsafeProjection(from, to))
}
/**
- * @see
org.apache.hudi.avro.HoodieAvroUtils#rewriteRecordWithNewSchema(java.lang.Object,
org.apache.avro.Schema, org.apache.avro.Schema, java.util.Map, java.util.Deque)
+ * Provides cached instance of [[UnsafeRowWriter]] transforming provided
[[InternalRow]]s from
+ * one [[StructType]] and into another [[StructType]]
+ *
+ * Unlike [[UnsafeProjection]] requiring that [[from]] has to be a proper
subset of [[to]] schema,
+ * [[UnsafeRowWriter]] is able to perform whole spectrum of schema-evolution
transformations including:
+ *
+ * <ul>
+ * <li>Transforming nested structs/maps/arrays</li>
+ * <li>Handling type promotions (int -> long, etc)</li>
+ * <li>Handling (field) renames</li>
+ * </ul>
*/
- private def rewriteRecordWithNewSchema(oldRecord: Any, oldSchema: DataType,
newSchema: DataType, renameCols: java.util.Map[String, String], fieldNames:
java.util.Deque[String]): Any = {
- if (oldRecord == null) {
- null
- } else {
- newSchema match {
- case targetSchema: StructType =>
- if (!oldRecord.isInstanceOf[InternalRow]) {
- throw new IllegalArgumentException("cannot rewrite record with
different type")
- }
- val oldRow = oldRecord.asInstanceOf[InternalRow]
- val helper = mutable.Map[Integer, Any]()
-
- val oldStrucType = oldSchema.asInstanceOf[StructType]
- targetSchema.fields.zipWithIndex.foreach { case (field, i) =>
- fieldNames.push(field.name)
- if (existField(oldStrucType, field.name)) {
- val oldField = oldStrucType(field.name)
- val oldPos = oldStrucType.fieldIndex(field.name)
- helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos,
oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames)
- } else {
- val fieldFullName = createFullName(fieldNames)
- val colNamePartsFromOldSchema =
renameCols.getOrDefault(fieldFullName, "").split("\\.")
- val lastColNameFromOldSchema =
colNamePartsFromOldSchema(colNamePartsFromOldSchema.length - 1)
- // deal with rename
- if (!existField(oldStrucType, field.name) &&
existField(oldStrucType, lastColNameFromOldSchema)) {
- // find rename
- val oldField = oldStrucType(lastColNameFromOldSchema)
- val oldPos = oldStrucType.fieldIndex(lastColNameFromOldSchema)
- helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos,
oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames)
- }
- }
- fieldNames.pop()
- }
- val newRow = new
GenericInternalRow(Array.fill(targetSchema.length)(null).asInstanceOf[Array[Any]])
- targetSchema.fields.zipWithIndex.foreach { case (_, i) =>
- if (helper.contains(i)) {
- newRow.update(i, helper(i))
- } else {
- // TODO add default val
- newRow.update(i, null)
- }
- }
+ def getCachedUnsafeRowWriter(from: StructType, to: StructType,
renamedColumnsMap: JMap[String, String] = JCollections.emptyMap()):
UnsafeRowWriter = {
+ unsafeWriterThreadLocal.get()
+ .getOrElseUpdate((from, to, renamedColumnsMap), genUnsafeRowWriter(from,
to, renamedColumnsMap))
+ }
- newRow
- case targetSchema: ArrayType =>
- if (!oldRecord.isInstanceOf[ArrayData]) {
- throw new IllegalArgumentException("cannot rewrite record with
different type")
- }
- val oldElementType = oldSchema.asInstanceOf[ArrayType].elementType
- val oldArray = oldRecord.asInstanceOf[ArrayData]
- val newElementType = targetSchema.elementType
- val newArray = new
GenericArrayData(Array.fill(oldArray.numElements())(null).asInstanceOf[Array[Any]])
- fieldNames.push("element")
- oldArray.toSeq[Any](oldElementType).zipWithIndex.foreach { case
(value, i) => newArray.update(i,
rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldElementType,
newElementType, renameCols, fieldNames)) }
- fieldNames.pop()
-
- newArray
- case targetSchema: MapType =>
- if (!oldRecord.isInstanceOf[MapData]) {
- throw new IllegalArgumentException("cannot rewrite record with
different type")
- }
- val oldValueType = oldSchema.asInstanceOf[MapType].valueType
- val oldKeyType = oldSchema.asInstanceOf[MapType].keyType
- val oldMap = oldRecord.asInstanceOf[MapData]
- val newValueType = targetSchema.valueType
- val newKeyArray = new
GenericArrayData(Array.fill(oldMap.keyArray().numElements())(null).asInstanceOf[Array[Any]])
- val newValueArray = new
GenericArrayData(Array.fill(oldMap.valueArray().numElements())(null).asInstanceOf[Array[Any]])
- val newMap = new ArrayBasedMapData(newKeyArray, newValueArray)
- fieldNames.push("value")
- oldMap.keyArray().toSeq[Any](oldKeyType).zipWithIndex.foreach { case
(value, i) => newKeyArray.update(i, rewritePrimaryType(value, oldKeyType,
oldKeyType)) }
- oldMap.valueArray().toSeq[Any](oldValueType).zipWithIndex.foreach {
case (value, i) => newValueArray.update(i,
rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldValueType,
newValueType, renameCols, fieldNames)) }
- fieldNames.pop()
-
- newMap
- case _ => rewritePrimaryType(oldRecord, oldSchema, newSchema)
- }
+ def getCachedPosList(structType: StructType, field: String):
Option[NestedFieldPath] = {
+ val nestedFieldPathOpt = orderPosListMap.get((structType, field))
+ // NOTE: This specifically designed to do 2 lookups (in case of
cache-miss) to avoid
+ // allocating the closure when using [[computeIfAbsent]] on more
frequent cache-hit path
+ if (nestedFieldPathOpt != null) {
+ nestedFieldPathOpt
+ } else {
+ orderPosListMap.computeIfAbsent((structType, field), new
JFunction[(StructType, String), Option[NestedFieldPath]] {
+ override def apply(t: (StructType, String)): Option[NestedFieldPath] =
+ composeNestedFieldPath(structType, field)
+ })
}
}
- def getCachedPosList(structType: StructType, field: String): NestedFieldPath
= {
- val schemaPair = (structType, field)
- if (!orderPosListMap.containsKey(schemaPair)) {
- val posList = HoodieUnsafeRowUtils.composeNestedFieldPath(structType,
field)
- orderPosListMap.put(schemaPair, posList)
+ def getCachedSchema(schema: Schema): StructType = {
+ val structType = schemaMap.get(schema)
+ // NOTE: This specifically designed to do 2 lookups (in case of
cache-miss) to avoid
+ // allocating the closure when using [[computeIfAbsent]] on more
frequent cache-hit path
+ if (structType != null) {
+ structType
+ } else {
+ schemaMap.computeIfAbsent(schema, new JFunction[Schema, StructType] {
+ override def apply(t: Schema): StructType =
+ convertAvroSchemaToStructType(schema)
+ })
}
- orderPosListMap.get(schemaPair)
}
- def getCachedUnsafeProjection(from: StructType, to: StructType):
UnsafeProjection = {
- val schemaPair = (from, to)
- val map = unsafeProjectionThreadLocal.get()
- if (!map.containsKey(schemaPair)) {
- val projection =
HoodieCatalystExpressionUtils.generateUnsafeProjection(from, to)
- map.put(schemaPair, projection)
+ private[sql] def genUnsafeRowWriter(prevSchema: StructType,
+ newSchema: StructType,
+ renamedColumnsMap: JMap[String,
String]): UnsafeRowWriter = {
+ val writer = newWriterRenaming(prevSchema, newSchema, renamedColumnsMap,
new JArrayDeque[String]())
+ val unsafeProjection = generateUnsafeProjection(newSchema, newSchema)
+ val phonyUpdater = new CatalystDataUpdater {
+ var value: InternalRow = _
+
+ override def set(ordinal: Int, value: Any): Unit =
+ this.value = value.asInstanceOf[InternalRow]
}
- map.get(schemaPair)
- }
- def getCachedSchema(schema: Schema): StructType = {
- if (!schemaMap.containsKey(schema)) {
- val structType =
AvroConversionUtils.convertAvroSchemaToStructType(schema)
- schemaMap.put(schema, structType)
+ oldRow => {
+ writer(phonyUpdater, 0, oldRow)
+ unsafeProjection(phonyUpdater.value)
}
- schemaMap.get(schema)
}
- def existField(structType: StructType, name: String): Boolean = {
- try {
- HoodieUnsafeRowUtils.composeNestedFieldPath(structType, name)
- true
- } catch {
- case _: IllegalArgumentException => false
+ private type RowFieldUpdater = (CatalystDataUpdater, Int, Any) => Unit
+
+ private def genUnsafeStructWriter(prevStructType: StructType,
+ newStructType: StructType,
+ renamedColumnsMap: JMap[String, String],
+ fieldNamesStack: JDeque[String]):
(CatalystDataUpdater, Any) => Unit = {
+ // TODO need to canonicalize schemas (casing)
+ val fieldWriters = ArrayBuffer.empty[RowFieldUpdater]
+ val positionMap = ArrayBuffer.empty[Int]
+
+ for (newField <- newStructType.fields) {
+ fieldNamesStack.push(newField.name)
+
+ val (fieldWriter, prevFieldPos): (RowFieldUpdater, Int) =
+ prevStructType.getFieldIndex(newField.name) match {
+ case Some(prevFieldPos) =>
+ val prevField = prevStructType(prevFieldPos)
+ (newWriterRenaming(prevField.dataType, newField.dataType,
renamedColumnsMap, fieldNamesStack), prevFieldPos)
+
+ case None =>
+ val newFieldQualifiedName = createFullName(fieldNamesStack)
+ val prevFieldName: String =
lookupRenamedField(newFieldQualifiedName, renamedColumnsMap)
+
+ // Handle rename
+ prevStructType.getFieldIndex(prevFieldName) match {
+ case Some(prevFieldPos) =>
+ val prevField = prevStructType.fields(prevFieldPos)
+ (newWriterRenaming(prevField.dataType, newField.dataType,
renamedColumnsMap, fieldNamesStack), prevFieldPos)
+
+ case None =>
+ val updater: RowFieldUpdater = (fieldUpdater, ordinal, _) =>
fieldUpdater.setNullAt(ordinal)
+ (updater, -1)
+ }
+ }
+
+ fieldWriters += fieldWriter
+ positionMap += prevFieldPos
+
+ fieldNamesStack.pop()
}
- }
- private def rewritePrimaryType(oldValue: Any, oldSchema: DataType,
newSchema: DataType) = {
- if (oldSchema.equals(newSchema) || (oldSchema.isInstanceOf[DecimalType] &&
newSchema.isInstanceOf[DecimalType])) {
- oldSchema match {
- case NullType | BooleanType | IntegerType | LongType | FloatType |
DoubleType | DateType | TimestampType | BinaryType =>
- oldValue
- // Copy UTF8String before putting into GenericInternalRow
- case StringType => UTF8String.fromString(oldValue.toString)
- case DecimalType() =>
-
Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale))
- case _ =>
- throw new HoodieException("Unknown schema type: " + newSchema)
+ (fieldUpdater, row) => {
+ var pos = 0
+ while (pos < fieldWriters.length) {
+ val prevPos = positionMap(pos)
+ val prevValue = if (prevPos >= 0) {
+ row.asInstanceOf[InternalRow].get(prevPos,
prevStructType.fields(prevPos).dataType)
+ } else {
+ null
+ }
+
+ fieldWriters(pos)(fieldUpdater, pos, prevValue)
+ pos += 1
}
- } else {
- rewritePrimaryTypeWithDiffSchemaType(oldValue, oldSchema, newSchema)
}
}
- private def rewritePrimaryTypeWithDiffSchemaType(oldValue: Any, oldSchema:
DataType, newSchema: DataType): Any = {
- val value = newSchema match {
- case NullType | BooleanType =>
- case DateType if oldSchema.equals(StringType) =>
-
CatalystTypeConverters.convertToCatalyst(java.sql.Date.valueOf(oldValue.toString))
- case LongType =>
- oldSchema match {
- case IntegerType =>
CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Int].longValue())
+ private def newWriterRenaming(prevDataType: DataType,
+ newDataType: DataType,
+ renamedColumnsMap: JMap[String, String],
+ fieldNameStack: JDeque[String]):
RowFieldUpdater = {
+ (newDataType, prevDataType) match {
Review Comment:
All conversions here are the same as they were before and just are rewritten
into the form of field-writers (callbacks)
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java:
##########
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.util;
-
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
-
-import org.apache.spark.sql.HoodieInternalRowUtils;
-import org.apache.spark.sql.HoodieUnsafeRowUtils;
-import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.StructType;
-
-public class HoodieSparkRecordUtils {
-
- public static Object getValue(StructType structType, String fieldName,
InternalRow row) {
Review Comment:
Dead code
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1077,29 +1047,27 @@ object HoodieSparkSqlWriter {
hoodieRecord
}
}).toJavaRDD()
+
case HoodieRecord.HoodieRecordType.SPARK =>
- // ut will use AvroKeyGenerator, so we need to cast it in spark record
val sparkKeyGenerator =
keyGenerator.asInstanceOf[SparkKeyGeneratorInterface]
val dataFileSchema = new Schema.Parser().parse(dataFileSchemaStr)
val dataFileStructType =
HoodieInternalRowUtils.getCachedSchema(dataFileSchema)
val writerStructType =
HoodieInternalRowUtils.getCachedSchema(writerSchema)
val sourceStructType = df.schema
- df.queryExecution.toRdd.mapPartitions { iter =>
- iter.map { internalRow =>
- val recordKey = sparkKeyGenerator.getRecordKey(internalRow,
sourceStructType)
- val partitionPath =
sparkKeyGenerator.getPartitionPath(internalRow, sourceStructType)
+ df.queryExecution.toRdd.mapPartitions { it =>
+ val targetStructType = if (shouldDropPartitionColumns)
dataFileStructType else writerStructType
+ // NOTE: To make sure we properly transform records
+ val targetStructTypeRowWriter =
getCachedUnsafeRowWriter(sourceStructType, targetStructType)
Review Comment:
This replaces old way of `rewriteRecord` then doing `unsafeProjection` w/
just applying new `UnsafeRowWriter`
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieInternalRowUtils.scala:
##########
@@ -114,7 +164,22 @@ class TestStructTypeSchemaEvolutionUtils extends FunSuite
with Matchers with Bef
val internalSchema = AvroInternalSchemaConverter.convert(avroSchema)
// do change type operation
val updateChange = TableChanges.ColumnUpdateChange.get(internalSchema)
- updateChange.updateColumnType("id",
Types.LongType.get).updateColumnType("comb",
Types.FloatType.get).updateColumnType("com1",
Types.DoubleType.get).updateColumnType("col0",
Types.StringType.get).updateColumnType("col1",
Types.FloatType.get).updateColumnType("col11",
Types.DoubleType.get).updateColumnType("col12",
Types.StringType.get).updateColumnType("col2",
Types.DoubleType.get).updateColumnType("col21",
Types.StringType.get).updateColumnType("col3",
Types.StringType.get).updateColumnType("col31", Types.DecimalType.get(18,
9)).updateColumnType("col4", Types.DecimalType.get(18,
9)).updateColumnType("col41", Types.StringType.get).updateColumnType("col5",
Types.DateType.get).updateColumnType("col51", Types.DecimalType.get(18,
9)).updateColumnType("col6", Types.StringType.get)
+ updateChange.updateColumnType("id", Types.LongType.get)
Review Comment:
No changes just breaking down unreadably long line
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala:
##########
@@ -18,304 +18,413 @@
package org.apache.spark.sql
-import java.nio.charset.StandardCharsets
-import java.util.HashMap
-import java.util.concurrent.ConcurrentHashMap
import org.apache.avro.Schema
import org.apache.hbase.thirdparty.com.google.common.base.Supplier
-import org.apache.hudi.AvroConversionUtils
+import org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType
import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, toJavaDate}
import org.apache.hudi.exception.HoodieException
-import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow,
UnsafeProjection}
+import
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
+import org.apache.spark.sql.HoodieUnsafeRowUtils.{NestedFieldPath,
composeNestedFieldPath}
+import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow,
UnsafeArrayData, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData,
GenericArrayData, MapData}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.types.Decimal.ROUND_HALF_EVEN
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
+
+import java.util.concurrent.ConcurrentHashMap
+import java.util.{ArrayDeque => JArrayDeque, Collections => JCollections,
Deque => JDeque, Map => JMap}
+import java.util.function.{Function => JFunction}
import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter
object HoodieInternalRowUtils {
- // Projection are all thread local. Projection is not thread-safe
- val unsafeProjectionThreadLocal: ThreadLocal[HashMap[(StructType,
StructType), UnsafeProjection]] =
- ThreadLocal.withInitial(new Supplier[HashMap[(StructType, StructType),
UnsafeProjection]] {
- override def get(): HashMap[(StructType, StructType), UnsafeProjection]
= new HashMap[(StructType, StructType), UnsafeProjection]
+ private type RenamedColumnMap = JMap[String, String]
+ private type UnsafeRowWriter = InternalRow => UnsafeRow
+
+ // NOTE: [[UnsafeProjection]] objects cache have to stay [[ThreadLocal]]
since these are not thread-safe
+ private val unsafeWriterThreadLocal:
ThreadLocal[mutable.HashMap[(StructType, StructType, RenamedColumnMap),
UnsafeRowWriter]] =
+ ThreadLocal.withInitial(new Supplier[mutable.HashMap[(StructType,
StructType, RenamedColumnMap), UnsafeRowWriter]] {
+ override def get(): mutable.HashMap[(StructType, StructType,
RenamedColumnMap), UnsafeRowWriter] =
+ new mutable.HashMap[(StructType, StructType, RenamedColumnMap),
UnsafeRowWriter]
})
- val schemaMap = new ConcurrentHashMap[Schema, StructType]
- val orderPosListMap = new ConcurrentHashMap[(StructType, String),
NestedFieldPath]
- /**
- * @see
org.apache.hudi.avro.HoodieAvroUtils#rewriteRecord(org.apache.avro.generic.GenericRecord,
org.apache.avro.Schema)
- */
- def rewriteRecord(oldRecord: InternalRow, oldSchema: StructType, newSchema:
StructType): InternalRow = {
- val newRow = new
GenericInternalRow(Array.fill(newSchema.fields.length)(null).asInstanceOf[Array[Any]])
-
- for ((field, pos) <- newSchema.fields.zipWithIndex) {
- var oldValue: AnyRef = null
- var oldType: DataType = null
- if (existField(oldSchema, field.name)) {
- val oldField = oldSchema(field.name)
- val oldPos = oldSchema.fieldIndex(field.name)
- oldType = oldField.dataType
- oldValue = oldRecord.get(oldPos, oldType)
- }
- if (oldValue != null) {
- field.dataType match {
- case structType: StructType =>
- val oldType =
oldSchema(field.name).dataType.asInstanceOf[StructType]
- val newValue = rewriteRecord(oldValue.asInstanceOf[InternalRow],
oldType, structType)
- newRow.update(pos, newValue)
- case decimalType: DecimalType =>
- val oldFieldSchema =
oldSchema(field.name).dataType.asInstanceOf[DecimalType]
- if (decimalType.scale != oldFieldSchema.scale ||
decimalType.precision != oldFieldSchema.precision) {
- newRow.update(pos,
Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale))
- )
- } else {
- newRow.update(pos, oldValue)
- }
- case t if t == oldType => newRow.update(pos, oldValue)
- // Type promotion
- case _: ShortType =>
- oldType match {
- case _: ByteType => newRow.update(pos,
oldValue.asInstanceOf[Byte].toShort)
- case _ => throw new IllegalArgumentException(s"$oldSchema and
$newSchema are incompatible")
- }
- case _: IntegerType =>
- oldType match {
- case _: ByteType => newRow.update(pos,
oldValue.asInstanceOf[Byte].toInt)
- case _: ShortType => newRow.update(pos,
oldValue.asInstanceOf[Short].toInt)
- case _ => throw new IllegalArgumentException(s"$oldSchema and
$newSchema are incompatible")
- }
- case _: LongType =>
- oldType match {
- case _: ByteType => newRow.update(pos,
oldValue.asInstanceOf[Byte].toLong)
- case _: ShortType => newRow.update(pos,
oldValue.asInstanceOf[Short].toLong)
- case _: IntegerType => newRow.update(pos,
oldValue.asInstanceOf[Int].toLong)
- case _ => throw new IllegalArgumentException(s"$oldSchema and
$newSchema are incompatible")
- }
- case _: FloatType =>
- oldType match {
- case _: ByteType => newRow.update(pos,
oldValue.asInstanceOf[Byte].toFloat)
- case _: ShortType => newRow.update(pos,
oldValue.asInstanceOf[Short].toFloat)
- case _: IntegerType => newRow.update(pos,
oldValue.asInstanceOf[Int].toFloat)
- case _: LongType => newRow.update(pos,
oldValue.asInstanceOf[Long].toFloat)
- case _ => throw new IllegalArgumentException(s"$oldSchema and
$newSchema are incompatible")
- }
- case _: DoubleType =>
- oldType match {
- case _: ByteType => newRow.update(pos,
oldValue.asInstanceOf[Byte].toDouble)
- case _: ShortType => newRow.update(pos,
oldValue.asInstanceOf[Short].toDouble)
- case _: IntegerType => newRow.update(pos,
oldValue.asInstanceOf[Int].toDouble)
- case _: LongType => newRow.update(pos,
oldValue.asInstanceOf[Long].toDouble)
- case _: FloatType => newRow.update(pos,
oldValue.asInstanceOf[Float].toDouble)
- case _ => throw new IllegalArgumentException(s"$oldSchema and
$newSchema are incompatible")
- }
- case _: BinaryType if oldType.isInstanceOf[StringType] =>
newRow.update(pos, oldValue.asInstanceOf[String].getBytes)
- case _ => newRow.update(pos, oldValue)
- }
- } else {
- // TODO default value in newSchema
- }
- }
+ // NOTE: [[UnsafeRowWriter]] objects cache have to stay [[ThreadLocal]]
since these are not thread-safe
+ private val unsafeProjectionThreadLocal:
ThreadLocal[mutable.HashMap[(StructType, StructType), UnsafeProjection]] =
+ ThreadLocal.withInitial(new Supplier[mutable.HashMap[(StructType,
StructType), UnsafeProjection]] {
+ override def get(): mutable.HashMap[(StructType, StructType),
UnsafeProjection] =
+ new mutable.HashMap[(StructType, StructType), UnsafeProjection]
+ })
- newRow
- }
+ private val schemaMap = new ConcurrentHashMap[Schema, StructType]
+ private val orderPosListMap = new ConcurrentHashMap[(StructType, String),
Option[NestedFieldPath]]
/**
- * @see
org.apache.hudi.avro.HoodieAvroUtils#rewriteRecordWithNewSchema(org.apache.avro.generic.IndexedRecord,
org.apache.avro.Schema, java.util.Map)
+ * Provides cached instance of [[UnsafeProjection]] transforming provided
[[InternalRow]]s from
+ * one [[StructType]] and into another [[StructType]]
+ *
+ * For more details regarding its semantic, please check corresponding
scala-doc for
+ * [[HoodieCatalystExpressionUtils.generateUnsafeProjection]]
*/
- def rewriteRecordWithNewSchema(oldRecord: InternalRow, oldSchema:
StructType, newSchema: StructType, renameCols: java.util.Map[String, String]):
InternalRow = {
- rewriteRecordWithNewSchema(oldRecord, oldSchema, newSchema, renameCols,
new java.util.LinkedList[String]).asInstanceOf[InternalRow]
+ def getCachedUnsafeProjection(from: StructType, to: StructType):
UnsafeProjection = {
+ unsafeProjectionThreadLocal.get()
+ .getOrElseUpdate((from, to), generateUnsafeProjection(from, to))
}
/**
- * @see
org.apache.hudi.avro.HoodieAvroUtils#rewriteRecordWithNewSchema(java.lang.Object,
org.apache.avro.Schema, org.apache.avro.Schema, java.util.Map, java.util.Deque)
+ * Provides cached instance of [[UnsafeRowWriter]] transforming provided
[[InternalRow]]s from
+ * one [[StructType]] and into another [[StructType]]
+ *
+ * Unlike [[UnsafeProjection]] requiring that [[from]] has to be a proper
subset of [[to]] schema,
+ * [[UnsafeRowWriter]] is able to perform whole spectrum of schema-evolution
transformations including:
+ *
+ * <ul>
+ * <li>Transforming nested structs/maps/arrays</li>
+ * <li>Handling type promotions (int -> long, etc)</li>
+ * <li>Handling (field) renames</li>
+ * </ul>
*/
- private def rewriteRecordWithNewSchema(oldRecord: Any, oldSchema: DataType,
newSchema: DataType, renameCols: java.util.Map[String, String], fieldNames:
java.util.Deque[String]): Any = {
- if (oldRecord == null) {
- null
- } else {
- newSchema match {
- case targetSchema: StructType =>
- if (!oldRecord.isInstanceOf[InternalRow]) {
- throw new IllegalArgumentException("cannot rewrite record with
different type")
- }
- val oldRow = oldRecord.asInstanceOf[InternalRow]
- val helper = mutable.Map[Integer, Any]()
-
- val oldStrucType = oldSchema.asInstanceOf[StructType]
- targetSchema.fields.zipWithIndex.foreach { case (field, i) =>
- fieldNames.push(field.name)
- if (existField(oldStrucType, field.name)) {
- val oldField = oldStrucType(field.name)
- val oldPos = oldStrucType.fieldIndex(field.name)
- helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos,
oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames)
- } else {
- val fieldFullName = createFullName(fieldNames)
- val colNamePartsFromOldSchema =
renameCols.getOrDefault(fieldFullName, "").split("\\.")
- val lastColNameFromOldSchema =
colNamePartsFromOldSchema(colNamePartsFromOldSchema.length - 1)
- // deal with rename
- if (!existField(oldStrucType, field.name) &&
existField(oldStrucType, lastColNameFromOldSchema)) {
- // find rename
- val oldField = oldStrucType(lastColNameFromOldSchema)
- val oldPos = oldStrucType.fieldIndex(lastColNameFromOldSchema)
- helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos,
oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames)
- }
- }
- fieldNames.pop()
- }
- val newRow = new
GenericInternalRow(Array.fill(targetSchema.length)(null).asInstanceOf[Array[Any]])
- targetSchema.fields.zipWithIndex.foreach { case (_, i) =>
- if (helper.contains(i)) {
- newRow.update(i, helper(i))
- } else {
- // TODO add default val
- newRow.update(i, null)
- }
- }
+ def getCachedUnsafeRowWriter(from: StructType, to: StructType,
renamedColumnsMap: JMap[String, String] = JCollections.emptyMap()):
UnsafeRowWriter = {
+ unsafeWriterThreadLocal.get()
+ .getOrElseUpdate((from, to, renamedColumnsMap), genUnsafeRowWriter(from,
to, renamedColumnsMap))
+ }
- newRow
- case targetSchema: ArrayType =>
- if (!oldRecord.isInstanceOf[ArrayData]) {
- throw new IllegalArgumentException("cannot rewrite record with
different type")
- }
- val oldElementType = oldSchema.asInstanceOf[ArrayType].elementType
- val oldArray = oldRecord.asInstanceOf[ArrayData]
- val newElementType = targetSchema.elementType
- val newArray = new
GenericArrayData(Array.fill(oldArray.numElements())(null).asInstanceOf[Array[Any]])
- fieldNames.push("element")
- oldArray.toSeq[Any](oldElementType).zipWithIndex.foreach { case
(value, i) => newArray.update(i,
rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldElementType,
newElementType, renameCols, fieldNames)) }
- fieldNames.pop()
-
- newArray
- case targetSchema: MapType =>
- if (!oldRecord.isInstanceOf[MapData]) {
- throw new IllegalArgumentException("cannot rewrite record with
different type")
- }
- val oldValueType = oldSchema.asInstanceOf[MapType].valueType
- val oldKeyType = oldSchema.asInstanceOf[MapType].keyType
- val oldMap = oldRecord.asInstanceOf[MapData]
- val newValueType = targetSchema.valueType
- val newKeyArray = new
GenericArrayData(Array.fill(oldMap.keyArray().numElements())(null).asInstanceOf[Array[Any]])
- val newValueArray = new
GenericArrayData(Array.fill(oldMap.valueArray().numElements())(null).asInstanceOf[Array[Any]])
- val newMap = new ArrayBasedMapData(newKeyArray, newValueArray)
- fieldNames.push("value")
- oldMap.keyArray().toSeq[Any](oldKeyType).zipWithIndex.foreach { case
(value, i) => newKeyArray.update(i, rewritePrimaryType(value, oldKeyType,
oldKeyType)) }
- oldMap.valueArray().toSeq[Any](oldValueType).zipWithIndex.foreach {
case (value, i) => newValueArray.update(i,
rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldValueType,
newValueType, renameCols, fieldNames)) }
- fieldNames.pop()
-
- newMap
- case _ => rewritePrimaryType(oldRecord, oldSchema, newSchema)
- }
+ def getCachedPosList(structType: StructType, field: String):
Option[NestedFieldPath] = {
+ val nestedFieldPathOpt = orderPosListMap.get((structType, field))
+ // NOTE: This specifically designed to do 2 lookups (in case of
cache-miss) to avoid
+ // allocating the closure when using [[computeIfAbsent]] on more
frequent cache-hit path
+ if (nestedFieldPathOpt != null) {
+ nestedFieldPathOpt
+ } else {
+ orderPosListMap.computeIfAbsent((structType, field), new
JFunction[(StructType, String), Option[NestedFieldPath]] {
+ override def apply(t: (StructType, String)): Option[NestedFieldPath] =
+ composeNestedFieldPath(structType, field)
+ })
}
}
- def getCachedPosList(structType: StructType, field: String): NestedFieldPath
= {
- val schemaPair = (structType, field)
- if (!orderPosListMap.containsKey(schemaPair)) {
- val posList = HoodieUnsafeRowUtils.composeNestedFieldPath(structType,
field)
- orderPosListMap.put(schemaPair, posList)
+ def getCachedSchema(schema: Schema): StructType = {
+ val structType = schemaMap.get(schema)
+ // NOTE: This specifically designed to do 2 lookups (in case of
cache-miss) to avoid
+ // allocating the closure when using [[computeIfAbsent]] on more
frequent cache-hit path
+ if (structType != null) {
+ structType
+ } else {
+ schemaMap.computeIfAbsent(schema, new JFunction[Schema, StructType] {
+ override def apply(t: Schema): StructType =
+ convertAvroSchemaToStructType(schema)
+ })
}
- orderPosListMap.get(schemaPair)
}
- def getCachedUnsafeProjection(from: StructType, to: StructType):
UnsafeProjection = {
- val schemaPair = (from, to)
- val map = unsafeProjectionThreadLocal.get()
- if (!map.containsKey(schemaPair)) {
- val projection =
HoodieCatalystExpressionUtils.generateUnsafeProjection(from, to)
- map.put(schemaPair, projection)
+ private[sql] def genUnsafeRowWriter(prevSchema: StructType,
+ newSchema: StructType,
+ renamedColumnsMap: JMap[String,
String]): UnsafeRowWriter = {
+ val writer = newWriterRenaming(prevSchema, newSchema, renamedColumnsMap,
new JArrayDeque[String]())
+ val unsafeProjection = generateUnsafeProjection(newSchema, newSchema)
+ val phonyUpdater = new CatalystDataUpdater {
+ var value: InternalRow = _
+
+ override def set(ordinal: Int, value: Any): Unit =
+ this.value = value.asInstanceOf[InternalRow]
}
- map.get(schemaPair)
- }
- def getCachedSchema(schema: Schema): StructType = {
- if (!schemaMap.containsKey(schema)) {
- val structType =
AvroConversionUtils.convertAvroSchemaToStructType(schema)
- schemaMap.put(schema, structType)
+ oldRow => {
+ writer(phonyUpdater, 0, oldRow)
+ unsafeProjection(phonyUpdater.value)
}
- schemaMap.get(schema)
}
- def existField(structType: StructType, name: String): Boolean = {
- try {
- HoodieUnsafeRowUtils.composeNestedFieldPath(structType, name)
- true
- } catch {
- case _: IllegalArgumentException => false
+ private type RowFieldUpdater = (CatalystDataUpdater, Int, Any) => Unit
+
+ private def genUnsafeStructWriter(prevStructType: StructType,
+ newStructType: StructType,
+ renamedColumnsMap: JMap[String, String],
+ fieldNamesStack: JDeque[String]):
(CatalystDataUpdater, Any) => Unit = {
+ // TODO need to canonicalize schemas (casing)
+ val fieldWriters = ArrayBuffer.empty[RowFieldUpdater]
+ val positionMap = ArrayBuffer.empty[Int]
+
+ for (newField <- newStructType.fields) {
+ fieldNamesStack.push(newField.name)
+
+ val (fieldWriter, prevFieldPos): (RowFieldUpdater, Int) =
+ prevStructType.getFieldIndex(newField.name) match {
+ case Some(prevFieldPos) =>
+ val prevField = prevStructType(prevFieldPos)
+ (newWriterRenaming(prevField.dataType, newField.dataType,
renamedColumnsMap, fieldNamesStack), prevFieldPos)
+
+ case None =>
+ val newFieldQualifiedName = createFullName(fieldNamesStack)
+ val prevFieldName: String =
lookupRenamedField(newFieldQualifiedName, renamedColumnsMap)
+
+ // Handle rename
+ prevStructType.getFieldIndex(prevFieldName) match {
+ case Some(prevFieldPos) =>
+ val prevField = prevStructType.fields(prevFieldPos)
+ (newWriterRenaming(prevField.dataType, newField.dataType,
renamedColumnsMap, fieldNamesStack), prevFieldPos)
+
+ case None =>
+ val updater: RowFieldUpdater = (fieldUpdater, ordinal, _) =>
fieldUpdater.setNullAt(ordinal)
+ (updater, -1)
+ }
+ }
+
+ fieldWriters += fieldWriter
+ positionMap += prevFieldPos
+
+ fieldNamesStack.pop()
}
- }
- private def rewritePrimaryType(oldValue: Any, oldSchema: DataType,
newSchema: DataType) = {
- if (oldSchema.equals(newSchema) || (oldSchema.isInstanceOf[DecimalType] &&
newSchema.isInstanceOf[DecimalType])) {
- oldSchema match {
- case NullType | BooleanType | IntegerType | LongType | FloatType |
DoubleType | DateType | TimestampType | BinaryType =>
- oldValue
- // Copy UTF8String before putting into GenericInternalRow
- case StringType => UTF8String.fromString(oldValue.toString)
- case DecimalType() =>
-
Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale))
- case _ =>
- throw new HoodieException("Unknown schema type: " + newSchema)
+ (fieldUpdater, row) => {
+ var pos = 0
+ while (pos < fieldWriters.length) {
+ val prevPos = positionMap(pos)
+ val prevValue = if (prevPos >= 0) {
+ row.asInstanceOf[InternalRow].get(prevPos,
prevStructType.fields(prevPos).dataType)
+ } else {
+ null
+ }
+
+ fieldWriters(pos)(fieldUpdater, pos, prevValue)
+ pos += 1
}
- } else {
- rewritePrimaryTypeWithDiffSchemaType(oldValue, oldSchema, newSchema)
}
}
- private def rewritePrimaryTypeWithDiffSchemaType(oldValue: Any, oldSchema:
DataType, newSchema: DataType): Any = {
- val value = newSchema match {
- case NullType | BooleanType =>
- case DateType if oldSchema.equals(StringType) =>
-
CatalystTypeConverters.convertToCatalyst(java.sql.Date.valueOf(oldValue.toString))
- case LongType =>
- oldSchema match {
- case IntegerType =>
CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Int].longValue())
+ private def newWriterRenaming(prevDataType: DataType,
+ newDataType: DataType,
+ renamedColumnsMap: JMap[String, String],
+ fieldNameStack: JDeque[String]):
RowFieldUpdater = {
+ (newDataType, prevDataType) match {
+ case (newType, prevType) if prevType == newType =>
+ (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, value)
+
+ case (newStructType: StructType, prevStructType: StructType) =>
+ val writer = genUnsafeStructWriter(prevStructType, newStructType,
renamedColumnsMap, fieldNameStack)
+
+ val newRow = new
SpecificInternalRow(newStructType.fields.map(_.dataType))
+ val rowUpdater = new RowUpdater(newRow)
+
+ (fieldUpdater, ordinal, value) => {
+ // Here new row is built in 2 stages:
+ // - First, we pass mutable row (used as buffer/scratchpad)
created above wrapped into [[RowUpdater]]
+ // into generated row-writer
+ // - Upon returning from row-writer, we call back into parent
row's [[fieldUpdater]] to set returned
+ // row as a value in it
+ writer(rowUpdater, value)
+ fieldUpdater.set(ordinal, newRow)
+ }
+
+ case (ArrayType(newElementType, _), ArrayType(prevElementType,
containsNull)) =>
+ fieldNameStack.push("element")
+ val elementWriter = newWriterRenaming(prevElementType, newElementType,
renamedColumnsMap, fieldNameStack)
+ fieldNameStack.pop()
+
+ (fieldUpdater, ordinal, value) => {
+ val prevArrayData = value.asInstanceOf[ArrayData]
+ val prevArray = prevArrayData.toObjectArray(prevElementType)
+
+ val newArrayData = createArrayData(newElementType,
prevArrayData.numElements())
+ val elementUpdater = new ArrayDataUpdater(newArrayData)
+
+ var i = 0
+ while (i < prevArray.length) {
+ val element = prevArray(i)
+ if (element == null) {
+ if (!containsNull) {
+ throw new HoodieException(
+ s"Array value at path
'${fieldNameStack.asScala.mkString(".")}' is not allowed to be null")
+ } else {
+ elementUpdater.setNullAt(i)
+ }
+ } else {
+ elementWriter(elementUpdater, i, element)
+ }
+ i += 1
+ }
+
+ fieldUpdater.set(ordinal, newArrayData)
+ }
+
+ case (MapType(_, newValueType, _), MapType(_, prevValueType,
valueContainsNull)) =>
+ fieldNameStack.push("value")
+ val valueWriter = newWriterRenaming(prevValueType, newValueType,
renamedColumnsMap, fieldNameStack)
+ fieldNameStack.pop()
+
+ (updater, ordinal, value) =>
+ val mapData = value.asInstanceOf[MapData]
+ val prevKeyArrayData = mapData.keyArray
+ val prevValueArrayData = mapData.valueArray
+ val prevValueArray = prevValueArrayData.toObjectArray(prevValueType)
+
+ val newValueArray = createArrayData(newValueType,
mapData.numElements())
+ val valueUpdater = new ArrayDataUpdater(newValueArray)
+ var i = 0
+ while (i < prevValueArray.length) {
+ val value = prevValueArray(i)
+ if (value == null) {
+ if (!valueContainsNull) {
+ throw new HoodieException(s"Map value at path
${fieldNameStack.asScala.mkString(".")} is not allowed to be null")
+ } else {
+ valueUpdater.setNullAt(i)
+ }
+ } else {
+ valueWriter(valueUpdater, i, value)
+ }
+ i += 1
+ }
+
+ // NOTE: Key's couldn't be transformed and have to always be of
[[StringType]]
+ updater.set(ordinal, new ArrayBasedMapData(prevKeyArrayData,
newValueArray))
+
+ case (newDecimal: DecimalType, _) =>
+ prevDataType match {
+ case IntegerType | LongType | FloatType | DoubleType | StringType =>
+ (fieldUpdater, ordinal, value) =>
+ val scale = newDecimal.scale
+ // TODO this has to be revisited to avoid loss of precision (for
fps)
+ fieldUpdater.setDecimal(ordinal,
Decimal.fromDecimal(BigDecimal(value.toString).setScale(scale,
ROUND_HALF_EVEN)))
+
+ case _: DecimalType =>
+ (fieldUpdater, ordinal, value) =>
+ fieldUpdater.setDecimal(ordinal,
Decimal.fromDecimal(value.asInstanceOf[Decimal].toBigDecimal.setScale(newDecimal.scale)))
+
case _ =>
+ throw new IllegalArgumentException(s"$prevDataType and
$newDataType are incompatible")
}
- case FloatType =>
- oldSchema match {
- case IntegerType =>
CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Int].floatValue())
- case LongType =>
CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Long].floatValue())
+
+ case (_: ShortType, _) =>
+ prevDataType match {
+ case _: ByteType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setShort(ordinal, value.asInstanceOf[Byte].toShort)
case _ =>
+ throw new IllegalArgumentException(s"$prevDataType and
$newDataType are incompatible")
}
- case DoubleType =>
- oldSchema match {
- case IntegerType =>
CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Int].doubleValue())
- case LongType =>
CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Long].doubleValue())
- case FloatType =>
CatalystTypeConverters.convertToCatalyst(java.lang.Double.valueOf(oldValue.asInstanceOf[Float]
+ ""))
+
+ case (_: IntegerType, _) =>
+ prevDataType match {
+ case _: ShortType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setInt(ordinal, value.asInstanceOf[Short].toInt)
+ case _: ByteType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setInt(ordinal, value.asInstanceOf[Byte].toInt)
case _ =>
+ throw new IllegalArgumentException(s"$prevDataType and
$newDataType are incompatible")
}
- case BinaryType =>
- oldSchema match {
- case StringType =>
CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[String].getBytes(StandardCharsets.UTF_8))
+
+ case (_: LongType, _) =>
+ prevDataType match {
+ case _: IntegerType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setLong(ordinal, value.asInstanceOf[Int].toLong)
+ case _: ShortType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setLong(ordinal, value.asInstanceOf[Short].toLong)
+ case _: ByteType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setLong(ordinal, value.asInstanceOf[Byte].toLong)
case _ =>
+ throw new IllegalArgumentException(s"$prevDataType and
$newDataType are incompatible")
}
- case StringType =>
- oldSchema match {
- case BinaryType => CatalystTypeConverters.convertToCatalyst(new
String(oldValue.asInstanceOf[Array[Byte]]))
- case DateType =>
CatalystTypeConverters.convertToCatalyst(toJavaDate(oldValue.asInstanceOf[Integer]).toString)
- case IntegerType | LongType | FloatType | DoubleType | DecimalType()
=> CatalystTypeConverters.convertToCatalyst(oldValue.toString)
+
+ case (_: FloatType, _) =>
+ prevDataType match {
+ case _: LongType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setFloat(ordinal, value.asInstanceOf[Long].toFloat)
+ case _: IntegerType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setFloat(ordinal, value.asInstanceOf[Int].toFloat)
+ case _: ShortType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setFloat(ordinal, value.asInstanceOf[Short].toFloat)
+ case _: ByteType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setFloat(ordinal, value.asInstanceOf[Byte].toFloat)
case _ =>
+ throw new IllegalArgumentException(s"$prevDataType and
$newDataType are incompatible")
}
- case DecimalType() =>
- oldSchema match {
- case IntegerType | LongType | FloatType | DoubleType | StringType =>
- val scale = newSchema.asInstanceOf[DecimalType].scale
- Decimal.fromDecimal(BigDecimal(oldValue.toString).setScale(scale))
+ case (_: DoubleType, _) =>
+ prevDataType match {
+ case _: FloatType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setDouble(ordinal, value.asInstanceOf[Float].toDouble)
+ case _: LongType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setDouble(ordinal, value.asInstanceOf[Long].toDouble)
+ case _: IntegerType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setDouble(ordinal, value.asInstanceOf[Int].toDouble)
+ case _: ShortType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setDouble(ordinal, value.asInstanceOf[Short].toDouble)
+ case _: ByteType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setDouble(ordinal, value.asInstanceOf[Byte].toDouble)
case _ =>
+ throw new IllegalArgumentException(s"$prevDataType and
$newDataType are incompatible")
}
- case _ =>
- }
- if (value == None) {
- throw new HoodieException(String.format("cannot support rewrite value
for schema type: %s since the old schema type is: %s", newSchema, oldSchema))
- } else {
- CatalystTypeConverters.convertToCatalyst(value)
+
+ case (_: BinaryType, _: StringType) =>
+ (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal,
value.asInstanceOf[UTF8String].getBytes)
+
+ // TODO revisit this (we need to align permitted casting w/ Spark)
+ // NOTE: This is supported to stay compatible w/
[[HoodieAvroUtils.rewriteRecordWithNewSchema]]
+ case (_: StringType, _) =>
+ prevDataType match {
+ case BinaryType => (fieldUpdater, ordinal, value) =>
+ fieldUpdater.set(ordinal,
UTF8String.fromBytes(value.asInstanceOf[Array[Byte]]))
+ case DateType => (fieldUpdater, ordinal, value) =>
+ fieldUpdater.set(ordinal,
UTF8String.fromString(toJavaDate(value.asInstanceOf[Integer]).toString))
+ case IntegerType | LongType | FloatType | DoubleType | _:
DecimalType =>
+ (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal,
UTF8String.fromString(value.toString))
+
+ case _ =>
+ throw new IllegalArgumentException(s"$prevDataType and
$newDataType are incompatible")
+ }
+
+ case (DateType, StringType) =>
+ (fieldUpdater, ordinal, value) =>
+ fieldUpdater.set(ordinal,
CatalystTypeConverters.convertToCatalyst(java.sql.Date.valueOf(value.toString)))
+
+ case (_, _) =>
+ throw new IllegalArgumentException(s"$prevDataType and $newDataType
are incompatible")
}
}
- def removeFields(schema: StructType, fieldsToRemove:
java.util.List[String]): StructType = {
- StructType(schema.fields.filter(field =>
!fieldsToRemove.contains(field.name)))
+ private def lookupRenamedField(newFieldQualifiedName: String,
renamedColumnsMap: JMap[String, String]) = {
+ val prevFieldQualifiedName =
renamedColumnsMap.getOrDefault(newFieldQualifiedName, "")
+ val prevFieldQualifiedNameParts = prevFieldQualifiedName.split("\\.")
+ val prevFieldName =
prevFieldQualifiedNameParts(prevFieldQualifiedNameParts.length - 1)
+
+ prevFieldName
}
+
+ private def createArrayData(elementType: DataType, length: Int): ArrayData =
elementType match {
Review Comment:
These utility classes are borrowed from Spark's `AvroSerializer`
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -416,7 +423,8 @@ private static HoodieRecord<InternalRow>
convertToHoodieSparkRecord(StructType s
getValue(structType, recordKeyPartitionPathFieldPair.getRight(),
record.data).toString());
HoodieOperation operation = withOperationField
- ? HoodieOperation.fromName(getNullableValAsString(structType,
record.data, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
+ ?
HoodieOperation.fromName(record.data.getString(structType.fieldIndex(HoodieRecord.OPERATION_METADATA_FIELD)))
Review Comment:
Actually, looked at it again and in that case `withOperationField` is true
so this field has to be present in the schema
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]