alexeykudinkin commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1089793370
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -505,31 +495,6 @@ object HoodieSparkSqlWriter {
HoodieAvroUtils.removeFields(schema, partitionColumns.toSet.asJava)
}
- def generateSparkSchemaWithoutPartitionColumns(partitionParam: String,
schema: StructType): StructType = {
Review Comment:
Dead code
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -82,12 +84,11 @@ object HoodieSparkSqlWriter {
optParams: Map[String, String],
df: DataFrame,
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
- hoodieWriteClient:
Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty,
- asyncCompactionTriggerFn:
Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] =
Option.empty,
- asyncClusteringTriggerFn:
Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] =
Option.empty,
- extraPreCommitFn: Option[BiConsumer[HoodieTableMetaClient,
HoodieCommitMetadata]] = Option.empty)
- : (Boolean, common.util.Option[String], common.util.Option[String],
common.util.Option[String],
- SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = {
+ hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty,
Review Comment:
This was just search-and-replace removing invalid type references
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala:
##########
@@ -308,14 +175,351 @@ object HoodieInternalRowUtils {
}
case _ =>
}
+
if (value == None) {
throw new HoodieException(String.format("cannot support rewrite value
for schema type: %s since the old schema type is: %s", newSchema, oldSchema))
} else {
CatalystTypeConverters.convertToCatalyst(value)
}
}
+ */
+
+ private type RowFieldUpdater = (CatalystDataUpdater, Int, Any) => Unit
+
+ private def genUnsafeRowWriterRenaming(prevStructType: StructType,
newStructType: StructType, renamedColumnsMap: JMap[String, String], fieldNames:
JDeque[String]): (CatalystDataUpdater, Any) => Unit = {
+ // TODO need to canonicalize schemas (casing)
+ val fieldWriters = ArrayBuffer.empty[RowFieldUpdater]
+ val positionMap = ArrayBuffer.empty[Int]
+
+ for (newField <- newStructType.fields) {
+ fieldNames.push(newField.name)
+
+ val (fieldWriter, prevFieldPos): (RowFieldUpdater, Int) =
+ prevStructType.getFieldIndex(newField.name) match {
+ case Some(prevFieldPos) =>
+ val prevField = prevStructType(prevFieldPos)
+ (newWriterRenaming(prevField.dataType, newField.dataType,
renamedColumnsMap, fieldNames), prevFieldPos)
+
+ case None =>
+ val newFieldQualifiedName = createFullName(fieldNames)
+ val prevFieldName: String =
lookupRenamedField(newFieldQualifiedName, renamedColumnsMap)
+
+ // Handle rename
+ prevStructType.getFieldIndex(prevFieldName) match {
+ case Some(prevFieldPos) =>
+ val prevField = prevStructType.fields(prevFieldPos)
+ (newWriterRenaming(prevField.dataType, newField.dataType,
renamedColumnsMap, fieldNames), prevFieldPos)
+
+ case None =>
+ // TODO handle defaults
+ val updater: RowFieldUpdater = (fieldUpdater, ordinal, _) =>
fieldUpdater.setNullAt(ordinal)
+ (updater, -1)
+ }
+ }
+
+ fieldWriters += fieldWriter
+ positionMap += prevFieldPos
+
+ fieldNames.pop()
+ }
+
+ (fieldUpdater, row) => {
+ var pos = 0
+ while (pos < fieldWriters.length) {
+ val prevPos = positionMap(pos)
+ val prevValue = if (prevPos >= 0) {
+ row.asInstanceOf[InternalRow].get(prevPos,
prevStructType.fields(prevPos).dataType)
+ } else {
+ null
+ }
+
+ fieldWriters(pos)(fieldUpdater, pos, prevValue)
+ pos += 1
+ }
+ }
+ }
+
+ private def newWriterRenaming(prevDataType: DataType,
+ newDataType: DataType,
+ renamedColumnsMap: JMap[String, String],
+ fieldNames: JDeque[String]): RowFieldUpdater =
{
+ (prevDataType, newDataType) match {
+ case (prevType, newType) if prevType == newType =>
+ (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, value)
+
+ case (prevStructType: StructType, newStructType: StructType) =>
+ val writer = genUnsafeRowWriterRenaming(prevStructType, newStructType,
renamedColumnsMap, fieldNames)
+ val newRow = new
SpecificInternalRow(newStructType.fields.map(_.dataType))
+ val rowUpdater = new RowUpdater(newRow)
+
+ (fieldUpdater, ordinal, value) => {
+ // TODO elaborate
Review Comment:
Note to self to explain particular piece
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]