xushiyan commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1090072656
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java:
##########
@@ -33,31 +33,47 @@
public class ExecutorFactory {
- public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig
hoodieConfig,
+ public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig config,
Iterator<I> inputItr,
HoodieConsumer<O, E>
consumer,
Function<I, O>
transformFunction) {
- return create(hoodieConfig, inputItr, consumer, transformFunction,
Functions.noop());
+ return create(config, inputItr, consumer, transformFunction,
Functions.noop());
}
- public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig
hoodieConfig,
+ public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig config,
Iterator<I> inputItr,
HoodieConsumer<O, E>
consumer,
Function<I, O>
transformFunction,
Runnable
preExecuteRunnable) {
- ExecutorType executorType = hoodieConfig.getExecutorType();
-
+ ExecutorType executorType = config.getExecutorType();
switch (executorType) {
case BOUNDED_IN_MEMORY:
- return new
BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr,
consumer,
+ return new
BoundedInMemoryExecutor<>(config.getWriteBufferLimitBytes(), inputItr, consumer,
transformFunction, preExecuteRunnable);
case DISRUPTOR:
- return new
DisruptorExecutor<>(hoodieConfig.getWriteExecutorDisruptorWriteBufferSize(),
inputItr, consumer,
- transformFunction,
hoodieConfig.getWriteExecutorDisruptorWaitStrategy(), preExecuteRunnable);
+ return new
DisruptorExecutor<>(config.getWriteExecutorDisruptorWriteBufferSize(),
inputItr, consumer,
+ transformFunction, config.getWriteExecutorDisruptorWaitStrategy(),
preExecuteRunnable);
case SIMPLE:
return new SimpleExecutor<>(inputItr, consumer, transformFunction);
default:
throw new HoodieException("Unsupported Executor Type " + executorType);
}
}
+
+ /**
+ * Checks whether configured {@link HoodieExecutor} buffer records (for ex,
by holding them
+ * in the queue)
+ */
+ public static boolean isBufferingRecords(HoodieWriteConfig config) {
Review Comment:
why not make this a property of ExecutorType? so we don't need this extra
helper
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java:
##########
@@ -234,4 +233,18 @@ protected final IndexedRecord readRecordPayload(Kryo kryo,
Input input) {
return kryo.readObjectOrNull(input, GenericRecord.class, avroSerializer);
}
+
+ static void updateMetadataValuesInternal(GenericRecord avroRecord,
MetadataValues metadataValues) {
Review Comment:
this looks like a helper method that fits in some avro utils
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala:
##########
@@ -18,304 +18,413 @@
package org.apache.spark.sql
-import java.nio.charset.StandardCharsets
-import java.util.HashMap
-import java.util.concurrent.ConcurrentHashMap
import org.apache.avro.Schema
import org.apache.hbase.thirdparty.com.google.common.base.Supplier
-import org.apache.hudi.AvroConversionUtils
+import org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType
import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, toJavaDate}
import org.apache.hudi.exception.HoodieException
-import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow,
UnsafeProjection}
+import
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
+import org.apache.spark.sql.HoodieUnsafeRowUtils.{NestedFieldPath,
composeNestedFieldPath}
+import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow,
UnsafeArrayData, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData,
GenericArrayData, MapData}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.types.Decimal.ROUND_HALF_EVEN
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
+
+import java.util.concurrent.ConcurrentHashMap
+import java.util.{ArrayDeque => JArrayDeque, Collections => JCollections,
Deque => JDeque, Map => JMap}
+import java.util.function.{Function => JFunction}
import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter
object HoodieInternalRowUtils {
- // Projection are all thread local. Projection is not thread-safe
- val unsafeProjectionThreadLocal: ThreadLocal[HashMap[(StructType,
StructType), UnsafeProjection]] =
- ThreadLocal.withInitial(new Supplier[HashMap[(StructType, StructType),
UnsafeProjection]] {
- override def get(): HashMap[(StructType, StructType), UnsafeProjection]
= new HashMap[(StructType, StructType), UnsafeProjection]
+ private type RenamedColumnMap = JMap[String, String]
+ private type UnsafeRowWriter = InternalRow => UnsafeRow
+
+ // NOTE: [[UnsafeProjection]] objects cache have to stay [[ThreadLocal]]
since these are not thread-safe
+ private val unsafeWriterThreadLocal:
ThreadLocal[mutable.HashMap[(StructType, StructType, RenamedColumnMap),
UnsafeRowWriter]] =
+ ThreadLocal.withInitial(new Supplier[mutable.HashMap[(StructType,
StructType, RenamedColumnMap), UnsafeRowWriter]] {
+ override def get(): mutable.HashMap[(StructType, StructType,
RenamedColumnMap), UnsafeRowWriter] =
+ new mutable.HashMap[(StructType, StructType, RenamedColumnMap),
UnsafeRowWriter]
})
- val schemaMap = new ConcurrentHashMap[Schema, StructType]
- val orderPosListMap = new ConcurrentHashMap[(StructType, String),
NestedFieldPath]
- /**
- * @see
org.apache.hudi.avro.HoodieAvroUtils#rewriteRecord(org.apache.avro.generic.GenericRecord,
org.apache.avro.Schema)
- */
- def rewriteRecord(oldRecord: InternalRow, oldSchema: StructType, newSchema:
StructType): InternalRow = {
- val newRow = new
GenericInternalRow(Array.fill(newSchema.fields.length)(null).asInstanceOf[Array[Any]])
-
- for ((field, pos) <- newSchema.fields.zipWithIndex) {
- var oldValue: AnyRef = null
- var oldType: DataType = null
- if (existField(oldSchema, field.name)) {
- val oldField = oldSchema(field.name)
- val oldPos = oldSchema.fieldIndex(field.name)
- oldType = oldField.dataType
- oldValue = oldRecord.get(oldPos, oldType)
- }
- if (oldValue != null) {
- field.dataType match {
- case structType: StructType =>
- val oldType =
oldSchema(field.name).dataType.asInstanceOf[StructType]
- val newValue = rewriteRecord(oldValue.asInstanceOf[InternalRow],
oldType, structType)
- newRow.update(pos, newValue)
- case decimalType: DecimalType =>
- val oldFieldSchema =
oldSchema(field.name).dataType.asInstanceOf[DecimalType]
- if (decimalType.scale != oldFieldSchema.scale ||
decimalType.precision != oldFieldSchema.precision) {
- newRow.update(pos,
Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale))
- )
- } else {
- newRow.update(pos, oldValue)
- }
- case t if t == oldType => newRow.update(pos, oldValue)
- // Type promotion
- case _: ShortType =>
- oldType match {
- case _: ByteType => newRow.update(pos,
oldValue.asInstanceOf[Byte].toShort)
- case _ => throw new IllegalArgumentException(s"$oldSchema and
$newSchema are incompatible")
- }
- case _: IntegerType =>
- oldType match {
- case _: ByteType => newRow.update(pos,
oldValue.asInstanceOf[Byte].toInt)
- case _: ShortType => newRow.update(pos,
oldValue.asInstanceOf[Short].toInt)
- case _ => throw new IllegalArgumentException(s"$oldSchema and
$newSchema are incompatible")
- }
- case _: LongType =>
- oldType match {
- case _: ByteType => newRow.update(pos,
oldValue.asInstanceOf[Byte].toLong)
- case _: ShortType => newRow.update(pos,
oldValue.asInstanceOf[Short].toLong)
- case _: IntegerType => newRow.update(pos,
oldValue.asInstanceOf[Int].toLong)
- case _ => throw new IllegalArgumentException(s"$oldSchema and
$newSchema are incompatible")
- }
- case _: FloatType =>
- oldType match {
- case _: ByteType => newRow.update(pos,
oldValue.asInstanceOf[Byte].toFloat)
- case _: ShortType => newRow.update(pos,
oldValue.asInstanceOf[Short].toFloat)
- case _: IntegerType => newRow.update(pos,
oldValue.asInstanceOf[Int].toFloat)
- case _: LongType => newRow.update(pos,
oldValue.asInstanceOf[Long].toFloat)
- case _ => throw new IllegalArgumentException(s"$oldSchema and
$newSchema are incompatible")
- }
- case _: DoubleType =>
- oldType match {
- case _: ByteType => newRow.update(pos,
oldValue.asInstanceOf[Byte].toDouble)
- case _: ShortType => newRow.update(pos,
oldValue.asInstanceOf[Short].toDouble)
- case _: IntegerType => newRow.update(pos,
oldValue.asInstanceOf[Int].toDouble)
- case _: LongType => newRow.update(pos,
oldValue.asInstanceOf[Long].toDouble)
- case _: FloatType => newRow.update(pos,
oldValue.asInstanceOf[Float].toDouble)
- case _ => throw new IllegalArgumentException(s"$oldSchema and
$newSchema are incompatible")
- }
- case _: BinaryType if oldType.isInstanceOf[StringType] =>
newRow.update(pos, oldValue.asInstanceOf[String].getBytes)
- case _ => newRow.update(pos, oldValue)
- }
- } else {
- // TODO default value in newSchema
- }
- }
+ // NOTE: [[UnsafeRowWriter]] objects cache have to stay [[ThreadLocal]]
since these are not thread-safe
+ private val unsafeProjectionThreadLocal:
ThreadLocal[mutable.HashMap[(StructType, StructType), UnsafeProjection]] =
+ ThreadLocal.withInitial(new Supplier[mutable.HashMap[(StructType,
StructType), UnsafeProjection]] {
+ override def get(): mutable.HashMap[(StructType, StructType),
UnsafeProjection] =
+ new mutable.HashMap[(StructType, StructType), UnsafeProjection]
+ })
- newRow
- }
+ private val schemaMap = new ConcurrentHashMap[Schema, StructType]
+ private val orderPosListMap = new ConcurrentHashMap[(StructType, String),
Option[NestedFieldPath]]
/**
- * @see
org.apache.hudi.avro.HoodieAvroUtils#rewriteRecordWithNewSchema(org.apache.avro.generic.IndexedRecord,
org.apache.avro.Schema, java.util.Map)
+ * Provides cached instance of [[UnsafeProjection]] transforming provided
[[InternalRow]]s from
+ * one [[StructType]] and into another [[StructType]]
+ *
+ * For more details regarding its semantic, please check corresponding
scala-doc for
+ * [[HoodieCatalystExpressionUtils.generateUnsafeProjection]]
*/
- def rewriteRecordWithNewSchema(oldRecord: InternalRow, oldSchema:
StructType, newSchema: StructType, renameCols: java.util.Map[String, String]):
InternalRow = {
- rewriteRecordWithNewSchema(oldRecord, oldSchema, newSchema, renameCols,
new java.util.LinkedList[String]).asInstanceOf[InternalRow]
+ def getCachedUnsafeProjection(from: StructType, to: StructType):
UnsafeProjection = {
+ unsafeProjectionThreadLocal.get()
+ .getOrElseUpdate((from, to), generateUnsafeProjection(from, to))
}
/**
- * @see
org.apache.hudi.avro.HoodieAvroUtils#rewriteRecordWithNewSchema(java.lang.Object,
org.apache.avro.Schema, org.apache.avro.Schema, java.util.Map, java.util.Deque)
+ * Provides cached instance of [[UnsafeRowWriter]] transforming provided
[[InternalRow]]s from
+ * one [[StructType]] and into another [[StructType]]
+ *
+ * Unlike [[UnsafeProjection]] requiring that [[from]] has to be a proper
subset of [[to]] schema,
+ * [[UnsafeRowWriter]] is able to perform whole spectrum of schema-evolution
transformations including:
+ *
+ * <ul>
+ * <li>Transforming nested structs/maps/arrays</li>
+ * <li>Handling type promotions (int -> long, etc)</li>
+ * <li>Handling (field) renames</li>
+ * </ul>
*/
- private def rewriteRecordWithNewSchema(oldRecord: Any, oldSchema: DataType,
newSchema: DataType, renameCols: java.util.Map[String, String], fieldNames:
java.util.Deque[String]): Any = {
- if (oldRecord == null) {
- null
- } else {
- newSchema match {
- case targetSchema: StructType =>
- if (!oldRecord.isInstanceOf[InternalRow]) {
- throw new IllegalArgumentException("cannot rewrite record with
different type")
- }
- val oldRow = oldRecord.asInstanceOf[InternalRow]
- val helper = mutable.Map[Integer, Any]()
-
- val oldStrucType = oldSchema.asInstanceOf[StructType]
- targetSchema.fields.zipWithIndex.foreach { case (field, i) =>
- fieldNames.push(field.name)
- if (existField(oldStrucType, field.name)) {
- val oldField = oldStrucType(field.name)
- val oldPos = oldStrucType.fieldIndex(field.name)
- helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos,
oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames)
- } else {
- val fieldFullName = createFullName(fieldNames)
- val colNamePartsFromOldSchema =
renameCols.getOrDefault(fieldFullName, "").split("\\.")
- val lastColNameFromOldSchema =
colNamePartsFromOldSchema(colNamePartsFromOldSchema.length - 1)
- // deal with rename
- if (!existField(oldStrucType, field.name) &&
existField(oldStrucType, lastColNameFromOldSchema)) {
- // find rename
- val oldField = oldStrucType(lastColNameFromOldSchema)
- val oldPos = oldStrucType.fieldIndex(lastColNameFromOldSchema)
- helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos,
oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames)
- }
- }
- fieldNames.pop()
- }
- val newRow = new
GenericInternalRow(Array.fill(targetSchema.length)(null).asInstanceOf[Array[Any]])
- targetSchema.fields.zipWithIndex.foreach { case (_, i) =>
- if (helper.contains(i)) {
- newRow.update(i, helper(i))
- } else {
- // TODO add default val
- newRow.update(i, null)
- }
- }
+ def getCachedUnsafeRowWriter(from: StructType, to: StructType,
renamedColumnsMap: JMap[String, String] = JCollections.emptyMap()):
UnsafeRowWriter = {
+ unsafeWriterThreadLocal.get()
+ .getOrElseUpdate((from, to, renamedColumnsMap), genUnsafeRowWriter(from,
to, renamedColumnsMap))
+ }
- newRow
- case targetSchema: ArrayType =>
- if (!oldRecord.isInstanceOf[ArrayData]) {
- throw new IllegalArgumentException("cannot rewrite record with
different type")
- }
- val oldElementType = oldSchema.asInstanceOf[ArrayType].elementType
- val oldArray = oldRecord.asInstanceOf[ArrayData]
- val newElementType = targetSchema.elementType
- val newArray = new
GenericArrayData(Array.fill(oldArray.numElements())(null).asInstanceOf[Array[Any]])
- fieldNames.push("element")
- oldArray.toSeq[Any](oldElementType).zipWithIndex.foreach { case
(value, i) => newArray.update(i,
rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldElementType,
newElementType, renameCols, fieldNames)) }
- fieldNames.pop()
-
- newArray
- case targetSchema: MapType =>
- if (!oldRecord.isInstanceOf[MapData]) {
- throw new IllegalArgumentException("cannot rewrite record with
different type")
- }
- val oldValueType = oldSchema.asInstanceOf[MapType].valueType
- val oldKeyType = oldSchema.asInstanceOf[MapType].keyType
- val oldMap = oldRecord.asInstanceOf[MapData]
- val newValueType = targetSchema.valueType
- val newKeyArray = new
GenericArrayData(Array.fill(oldMap.keyArray().numElements())(null).asInstanceOf[Array[Any]])
- val newValueArray = new
GenericArrayData(Array.fill(oldMap.valueArray().numElements())(null).asInstanceOf[Array[Any]])
- val newMap = new ArrayBasedMapData(newKeyArray, newValueArray)
- fieldNames.push("value")
- oldMap.keyArray().toSeq[Any](oldKeyType).zipWithIndex.foreach { case
(value, i) => newKeyArray.update(i, rewritePrimaryType(value, oldKeyType,
oldKeyType)) }
- oldMap.valueArray().toSeq[Any](oldValueType).zipWithIndex.foreach {
case (value, i) => newValueArray.update(i,
rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldValueType,
newValueType, renameCols, fieldNames)) }
- fieldNames.pop()
-
- newMap
- case _ => rewritePrimaryType(oldRecord, oldSchema, newSchema)
- }
+ def getCachedPosList(structType: StructType, field: String):
Option[NestedFieldPath] = {
+ val nestedFieldPathOpt = orderPosListMap.get((structType, field))
+ // NOTE: This specifically designed to do 2 lookups (in case of
cache-miss) to avoid
+ // allocating the closure when using [[computeIfAbsent]] on more
frequent cache-hit path
+ if (nestedFieldPathOpt != null) {
+ nestedFieldPathOpt
+ } else {
+ orderPosListMap.computeIfAbsent((structType, field), new
JFunction[(StructType, String), Option[NestedFieldPath]] {
+ override def apply(t: (StructType, String)): Option[NestedFieldPath] =
+ composeNestedFieldPath(structType, field)
+ })
}
}
- def getCachedPosList(structType: StructType, field: String): NestedFieldPath
= {
- val schemaPair = (structType, field)
- if (!orderPosListMap.containsKey(schemaPair)) {
- val posList = HoodieUnsafeRowUtils.composeNestedFieldPath(structType,
field)
- orderPosListMap.put(schemaPair, posList)
+ def getCachedSchema(schema: Schema): StructType = {
+ val structType = schemaMap.get(schema)
+ // NOTE: This specifically designed to do 2 lookups (in case of
cache-miss) to avoid
+ // allocating the closure when using [[computeIfAbsent]] on more
frequent cache-hit path
+ if (structType != null) {
+ structType
+ } else {
+ schemaMap.computeIfAbsent(schema, new JFunction[Schema, StructType] {
+ override def apply(t: Schema): StructType =
+ convertAvroSchemaToStructType(schema)
+ })
}
- orderPosListMap.get(schemaPair)
}
- def getCachedUnsafeProjection(from: StructType, to: StructType):
UnsafeProjection = {
- val schemaPair = (from, to)
- val map = unsafeProjectionThreadLocal.get()
- if (!map.containsKey(schemaPair)) {
- val projection =
HoodieCatalystExpressionUtils.generateUnsafeProjection(from, to)
- map.put(schemaPair, projection)
+ private[sql] def genUnsafeRowWriter(prevSchema: StructType,
+ newSchema: StructType,
+ renamedColumnsMap: JMap[String,
String]): UnsafeRowWriter = {
+ val writer = newWriterRenaming(prevSchema, newSchema, renamedColumnsMap,
new JArrayDeque[String]())
+ val unsafeProjection = generateUnsafeProjection(newSchema, newSchema)
+ val phonyUpdater = new CatalystDataUpdater {
+ var value: InternalRow = _
+
+ override def set(ordinal: Int, value: Any): Unit =
+ this.value = value.asInstanceOf[InternalRow]
}
- map.get(schemaPair)
- }
- def getCachedSchema(schema: Schema): StructType = {
- if (!schemaMap.containsKey(schema)) {
- val structType =
AvroConversionUtils.convertAvroSchemaToStructType(schema)
- schemaMap.put(schema, structType)
+ oldRow => {
+ writer(phonyUpdater, 0, oldRow)
+ unsafeProjection(phonyUpdater.value)
}
- schemaMap.get(schema)
}
- def existField(structType: StructType, name: String): Boolean = {
- try {
- HoodieUnsafeRowUtils.composeNestedFieldPath(structType, name)
- true
- } catch {
- case _: IllegalArgumentException => false
+ private type RowFieldUpdater = (CatalystDataUpdater, Int, Any) => Unit
+
+ private def genUnsafeStructWriter(prevStructType: StructType,
+ newStructType: StructType,
+ renamedColumnsMap: JMap[String, String],
+ fieldNamesStack: JDeque[String]):
(CatalystDataUpdater, Any) => Unit = {
+ // TODO need to canonicalize schemas (casing)
+ val fieldWriters = ArrayBuffer.empty[RowFieldUpdater]
+ val positionMap = ArrayBuffer.empty[Int]
+
+ for (newField <- newStructType.fields) {
+ fieldNamesStack.push(newField.name)
+
+ val (fieldWriter, prevFieldPos): (RowFieldUpdater, Int) =
+ prevStructType.getFieldIndex(newField.name) match {
+ case Some(prevFieldPos) =>
+ val prevField = prevStructType(prevFieldPos)
+ (newWriterRenaming(prevField.dataType, newField.dataType,
renamedColumnsMap, fieldNamesStack), prevFieldPos)
+
+ case None =>
+ val newFieldQualifiedName = createFullName(fieldNamesStack)
+ val prevFieldName: String =
lookupRenamedField(newFieldQualifiedName, renamedColumnsMap)
+
+ // Handle rename
+ prevStructType.getFieldIndex(prevFieldName) match {
+ case Some(prevFieldPos) =>
+ val prevField = prevStructType.fields(prevFieldPos)
+ (newWriterRenaming(prevField.dataType, newField.dataType,
renamedColumnsMap, fieldNamesStack), prevFieldPos)
+
+ case None =>
+ val updater: RowFieldUpdater = (fieldUpdater, ordinal, _) =>
fieldUpdater.setNullAt(ordinal)
+ (updater, -1)
+ }
+ }
+
+ fieldWriters += fieldWriter
+ positionMap += prevFieldPos
+
+ fieldNamesStack.pop()
}
- }
- private def rewritePrimaryType(oldValue: Any, oldSchema: DataType,
newSchema: DataType) = {
- if (oldSchema.equals(newSchema) || (oldSchema.isInstanceOf[DecimalType] &&
newSchema.isInstanceOf[DecimalType])) {
- oldSchema match {
- case NullType | BooleanType | IntegerType | LongType | FloatType |
DoubleType | DateType | TimestampType | BinaryType =>
- oldValue
- // Copy UTF8String before putting into GenericInternalRow
- case StringType => UTF8String.fromString(oldValue.toString)
- case DecimalType() =>
-
Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale))
- case _ =>
- throw new HoodieException("Unknown schema type: " + newSchema)
+ (fieldUpdater, row) => {
+ var pos = 0
+ while (pos < fieldWriters.length) {
+ val prevPos = positionMap(pos)
+ val prevValue = if (prevPos >= 0) {
+ row.asInstanceOf[InternalRow].get(prevPos,
prevStructType.fields(prevPos).dataType)
+ } else {
+ null
+ }
+
+ fieldWriters(pos)(fieldUpdater, pos, prevValue)
+ pos += 1
}
- } else {
- rewritePrimaryTypeWithDiffSchemaType(oldValue, oldSchema, newSchema)
}
}
- private def rewritePrimaryTypeWithDiffSchemaType(oldValue: Any, oldSchema:
DataType, newSchema: DataType): Any = {
- val value = newSchema match {
- case NullType | BooleanType =>
- case DateType if oldSchema.equals(StringType) =>
-
CatalystTypeConverters.convertToCatalyst(java.sql.Date.valueOf(oldValue.toString))
- case LongType =>
- oldSchema match {
- case IntegerType =>
CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Int].longValue())
+ private def newWriterRenaming(prevDataType: DataType,
+ newDataType: DataType,
+ renamedColumnsMap: JMap[String, String],
+ fieldNameStack: JDeque[String]):
RowFieldUpdater = {
+ (newDataType, prevDataType) match {
+ case (newType, prevType) if prevType == newType =>
+ (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, value)
+
+ case (newStructType: StructType, prevStructType: StructType) =>
+ val writer = genUnsafeStructWriter(prevStructType, newStructType,
renamedColumnsMap, fieldNameStack)
+
+ val newRow = new
SpecificInternalRow(newStructType.fields.map(_.dataType))
+ val rowUpdater = new RowUpdater(newRow)
+
+ (fieldUpdater, ordinal, value) => {
+ // Here new row is built in 2 stages:
+ // - First, we pass mutable row (used as buffer/scratchpad)
created above wrapped into [[RowUpdater]]
+ // into generated row-writer
+ // - Upon returning from row-writer, we call back into parent
row's [[fieldUpdater]] to set returned
+ // row as a value in it
+ writer(rowUpdater, value)
+ fieldUpdater.set(ordinal, newRow)
+ }
+
+ case (ArrayType(newElementType, _), ArrayType(prevElementType,
containsNull)) =>
+ fieldNameStack.push("element")
+ val elementWriter = newWriterRenaming(prevElementType, newElementType,
renamedColumnsMap, fieldNameStack)
+ fieldNameStack.pop()
+
+ (fieldUpdater, ordinal, value) => {
+ val prevArrayData = value.asInstanceOf[ArrayData]
+ val prevArray = prevArrayData.toObjectArray(prevElementType)
+
+ val newArrayData = createArrayData(newElementType,
prevArrayData.numElements())
+ val elementUpdater = new ArrayDataUpdater(newArrayData)
+
+ var i = 0
+ while (i < prevArray.length) {
+ val element = prevArray(i)
+ if (element == null) {
+ if (!containsNull) {
+ throw new HoodieException(
+ s"Array value at path
'${fieldNameStack.asScala.mkString(".")}' is not allowed to be null")
+ } else {
+ elementUpdater.setNullAt(i)
+ }
+ } else {
+ elementWriter(elementUpdater, i, element)
+ }
+ i += 1
+ }
+
+ fieldUpdater.set(ordinal, newArrayData)
+ }
+
+ case (MapType(_, newValueType, _), MapType(_, prevValueType,
valueContainsNull)) =>
+ fieldNameStack.push("value")
+ val valueWriter = newWriterRenaming(prevValueType, newValueType,
renamedColumnsMap, fieldNameStack)
+ fieldNameStack.pop()
+
+ (updater, ordinal, value) =>
+ val mapData = value.asInstanceOf[MapData]
+ val prevKeyArrayData = mapData.keyArray
+ val prevValueArrayData = mapData.valueArray
+ val prevValueArray = prevValueArrayData.toObjectArray(prevValueType)
+
+ val newValueArray = createArrayData(newValueType,
mapData.numElements())
+ val valueUpdater = new ArrayDataUpdater(newValueArray)
+ var i = 0
+ while (i < prevValueArray.length) {
+ val value = prevValueArray(i)
+ if (value == null) {
+ if (!valueContainsNull) {
+ throw new HoodieException(s"Map value at path
${fieldNameStack.asScala.mkString(".")} is not allowed to be null")
+ } else {
+ valueUpdater.setNullAt(i)
+ }
+ } else {
+ valueWriter(valueUpdater, i, value)
+ }
+ i += 1
+ }
+
+ // NOTE: Key's couldn't be transformed and have to always be of
[[StringType]]
+ updater.set(ordinal, new ArrayBasedMapData(prevKeyArrayData,
newValueArray))
+
+ case (newDecimal: DecimalType, _) =>
+ prevDataType match {
+ case IntegerType | LongType | FloatType | DoubleType | StringType =>
+ (fieldUpdater, ordinal, value) =>
+ val scale = newDecimal.scale
+ // TODO this has to be revisited to avoid loss of precision (for
fps)
+ fieldUpdater.setDecimal(ordinal,
Decimal.fromDecimal(BigDecimal(value.toString).setScale(scale,
ROUND_HALF_EVEN)))
+
+ case _: DecimalType =>
+ (fieldUpdater, ordinal, value) =>
+ fieldUpdater.setDecimal(ordinal,
Decimal.fromDecimal(value.asInstanceOf[Decimal].toBigDecimal.setScale(newDecimal.scale)))
+
case _ =>
+ throw new IllegalArgumentException(s"$prevDataType and
$newDataType are incompatible")
}
- case FloatType =>
- oldSchema match {
- case IntegerType =>
CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Int].floatValue())
- case LongType =>
CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Long].floatValue())
+
+ case (_: ShortType, _) =>
+ prevDataType match {
+ case _: ByteType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setShort(ordinal, value.asInstanceOf[Byte].toShort)
case _ =>
+ throw new IllegalArgumentException(s"$prevDataType and
$newDataType are incompatible")
}
- case DoubleType =>
- oldSchema match {
- case IntegerType =>
CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Int].doubleValue())
- case LongType =>
CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Long].doubleValue())
- case FloatType =>
CatalystTypeConverters.convertToCatalyst(java.lang.Double.valueOf(oldValue.asInstanceOf[Float]
+ ""))
+
+ case (_: IntegerType, _) =>
+ prevDataType match {
+ case _: ShortType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setInt(ordinal, value.asInstanceOf[Short].toInt)
+ case _: ByteType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setInt(ordinal, value.asInstanceOf[Byte].toInt)
case _ =>
+ throw new IllegalArgumentException(s"$prevDataType and
$newDataType are incompatible")
}
- case BinaryType =>
- oldSchema match {
- case StringType =>
CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[String].getBytes(StandardCharsets.UTF_8))
+
+ case (_: LongType, _) =>
+ prevDataType match {
+ case _: IntegerType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setLong(ordinal, value.asInstanceOf[Int].toLong)
+ case _: ShortType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setLong(ordinal, value.asInstanceOf[Short].toLong)
+ case _: ByteType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setLong(ordinal, value.asInstanceOf[Byte].toLong)
case _ =>
+ throw new IllegalArgumentException(s"$prevDataType and
$newDataType are incompatible")
}
- case StringType =>
- oldSchema match {
- case BinaryType => CatalystTypeConverters.convertToCatalyst(new
String(oldValue.asInstanceOf[Array[Byte]]))
- case DateType =>
CatalystTypeConverters.convertToCatalyst(toJavaDate(oldValue.asInstanceOf[Integer]).toString)
- case IntegerType | LongType | FloatType | DoubleType | DecimalType()
=> CatalystTypeConverters.convertToCatalyst(oldValue.toString)
+
+ case (_: FloatType, _) =>
+ prevDataType match {
+ case _: LongType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setFloat(ordinal, value.asInstanceOf[Long].toFloat)
+ case _: IntegerType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setFloat(ordinal, value.asInstanceOf[Int].toFloat)
+ case _: ShortType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setFloat(ordinal, value.asInstanceOf[Short].toFloat)
+ case _: ByteType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setFloat(ordinal, value.asInstanceOf[Byte].toFloat)
case _ =>
+ throw new IllegalArgumentException(s"$prevDataType and
$newDataType are incompatible")
}
- case DecimalType() =>
- oldSchema match {
- case IntegerType | LongType | FloatType | DoubleType | StringType =>
- val scale = newSchema.asInstanceOf[DecimalType].scale
- Decimal.fromDecimal(BigDecimal(oldValue.toString).setScale(scale))
+ case (_: DoubleType, _) =>
+ prevDataType match {
+ case _: FloatType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setDouble(ordinal, value.asInstanceOf[Float].toDouble)
+ case _: LongType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setDouble(ordinal, value.asInstanceOf[Long].toDouble)
+ case _: IntegerType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setDouble(ordinal, value.asInstanceOf[Int].toDouble)
+ case _: ShortType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setDouble(ordinal, value.asInstanceOf[Short].toDouble)
+ case _: ByteType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setDouble(ordinal, value.asInstanceOf[Byte].toDouble)
case _ =>
+ throw new IllegalArgumentException(s"$prevDataType and
$newDataType are incompatible")
}
- case _ =>
- }
- if (value == None) {
- throw new HoodieException(String.format("cannot support rewrite value
for schema type: %s since the old schema type is: %s", newSchema, oldSchema))
- } else {
- CatalystTypeConverters.convertToCatalyst(value)
+
+ case (_: BinaryType, _: StringType) =>
+ (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal,
value.asInstanceOf[UTF8String].getBytes)
+
+ // TODO revisit this (we need to align permitted casting w/ Spark)
+ // NOTE: This is supported to stay compatible w/
[[HoodieAvroUtils.rewriteRecordWithNewSchema]]
+ case (_: StringType, _) =>
+ prevDataType match {
+ case BinaryType => (fieldUpdater, ordinal, value) =>
+ fieldUpdater.set(ordinal,
UTF8String.fromBytes(value.asInstanceOf[Array[Byte]]))
+ case DateType => (fieldUpdater, ordinal, value) =>
+ fieldUpdater.set(ordinal,
UTF8String.fromString(toJavaDate(value.asInstanceOf[Integer]).toString))
+ case IntegerType | LongType | FloatType | DoubleType | _:
DecimalType =>
+ (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal,
UTF8String.fromString(value.toString))
+
+ case _ =>
+ throw new IllegalArgumentException(s"$prevDataType and
$newDataType are incompatible")
+ }
+
+ case (DateType, StringType) =>
+ (fieldUpdater, ordinal, value) =>
+ fieldUpdater.set(ordinal,
CatalystTypeConverters.convertToCatalyst(java.sql.Date.valueOf(value.toString)))
+
+ case (_, _) =>
+ throw new IllegalArgumentException(s"$prevDataType and $newDataType
are incompatible")
}
}
- def removeFields(schema: StructType, fieldsToRemove:
java.util.List[String]): StructType = {
- StructType(schema.fields.filter(field =>
!fieldsToRemove.contains(field.name)))
+ private def lookupRenamedField(newFieldQualifiedName: String,
renamedColumnsMap: JMap[String, String]) = {
+ val prevFieldQualifiedName =
renamedColumnsMap.getOrDefault(newFieldQualifiedName, "")
+ val prevFieldQualifiedNameParts = prevFieldQualifiedName.split("\\.")
+ val prevFieldName =
prevFieldQualifiedNameParts(prevFieldQualifiedNameParts.length - 1)
+
+ prevFieldName
}
+
+ private def createArrayData(elementType: DataType, length: Int): ArrayData =
elementType match {
Review Comment:
are these the same across different spark versions?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]