nsivabalan commented on code in PR #6090:
URL: https://github.com/apache/hudi/pull/6090#discussion_r924448200
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -236,89 +247,157 @@ object HoodieSparkSqlWriter {
(writeStatuses, client)
}
case _ => { // any other operation
- // register classes & schemas
- val (structName, nameSpace) =
AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
- sparkContext.getConf.registerKryoClasses(
- Array(classOf[org.apache.avro.generic.GenericData],
- classOf[org.apache.avro.Schema]))
- var schema =
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName,
nameSpace)
- val lastestSchema = getLatestTableSchema(fs, basePath,
sparkContext, schema)
- var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath,
sparkContext)
- if (reconcileSchema &&
parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(),
"false").toBoolean
- && internalSchemaOpt.isEmpty) {
- // force apply full schema evolution.
- internalSchemaOpt =
Some(AvroInternalSchemaConverter.convert(schema))
- }
- if (reconcileSchema) {
- schema = lastestSchema
- }
- if (internalSchemaOpt.isDefined) {
- // Apply schema evolution.
- val mergedSparkSchema = if (!reconcileSchema) {
-
AvroConversionUtils.convertAvroSchemaToStructType(AvroSchemaEvolutionUtils.canonicalizeColumnNullability(schema,
lastestSchema))
- } else {
- // Auto merge write schema and read schema.
- val mergedInternalSchema =
AvroSchemaEvolutionUtils.reconcileSchema(schema, internalSchemaOpt.get)
-
AvroConversionUtils.convertAvroSchemaToStructType(AvroInternalSchemaConverter.convert(mergedInternalSchema,
lastestSchema.getName))
+ if (WriteOperationType.UPSERT.equals(operation) &&
+
hoodieConfig.getBooleanOrDefault(HoodieTableConfig.UPSERT_WITHOUT_RECORD_KEY)) {
+ // register classes & schemas
+ val (structName, nameSpace) =
AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
+ sparkContext.getConf.registerKryoClasses(
+ Array(classOf[org.apache.avro.generic.GenericData],
+ classOf[org.apache.avro.Schema]))
+ var schema =
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName,
nameSpace)
+ if (reconcileSchema) {
+ schema = getLatestTableSchema(fs, basePath, sparkContext,
schema)
}
- schema =
AvroConversionUtils.convertStructTypeToAvroSchema(mergedSparkSchema,
structName, nameSpace)
- }
-
- if (reconcileSchema && internalSchemaOpt.isEmpty) {
- schema = lastestSchema
- }
- validateSchemaForHoodieIsDeleted(schema)
- sparkContext.getConf.registerAvroSchemas(schema)
- log.info(s"Registered avro schema : ${schema.toString(true)}")
-
- // Convert to RDD[HoodieRecord]
- val genericRecords: RDD[GenericRecord] =
HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,
- org.apache.hudi.common.util.Option.of(schema))
- val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
- operation.equals(WriteOperationType.UPSERT) ||
-
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
-
HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean
- val hoodieAllIncomingRecords = genericRecords.map(gr => {
- val processedRecord = getProcessedRecord(partitionColumns, gr,
dropPartitionColumns)
- val hoodieRecord = if (shouldCombine) {
- val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr,
hoodieConfig.getString(PRECOMBINE_FIELD), false, parameters.getOrElse(
-
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
-
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean)
- .asInstanceOf[Comparable[_]]
- DataSourceUtils.createHoodieRecord(processedRecord,
- orderingVal,
- keyGenerator.getKey(gr),
- hoodieConfig.getString(PAYLOAD_CLASS_NAME))
- } else {
- DataSourceUtils.createHoodieRecord(processedRecord,
keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME))
+ sparkContext.getConf.registerAvroSchemas(schema)
+ log.info(s"Registered avro schema : ${schema.toString(true)}")
+
+ // Convert to RDD[HoodieRecord]
+ val genericRecords: RDD[GenericRecord] =
HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,
+ org.apache.hudi.common.util.Option.of(schema))
+ val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean
||
+ operation.equals(WriteOperationType.UPSERT) ||
+
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
+
HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean
+ val writeSchema =
+ if (dropPartitionColumns)
generateSchemaWithoutPartitionColumns(partitionColumns, schema)
+ else schema
+
+ // init write client
+ val client = hoodieWriteClient.getOrElse(
+ DataSourceUtils.createHoodieClient(jsc, writeSchema.toString,
path, tblName, mapAsJavaMap(parameters))
+ ).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
+ // needed to generate commit sequence number
+ val partitionId =
client.getEngineContext.getTaskContextSupplier.getPartitionIdSupplier.get
+ val recordIndex = new AtomicLong(1)
+
+ val hoodieAllIncomingRecords = genericRecords.map(gr => {
+ val processedRecord = getProcessedRecord(partitionColumns, gr,
dropPartitionColumns)
+ val csn = HoodieRecord.generateSequenceId(instantTime,
partitionId, recordIndex.getAndIncrement())
Review Comment:
I thought plan was to re-use the commit seq no that we generate within the
writer (i.e. by executors). or is my understanding wrong.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]