nsivabalan commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r1027248946


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -239,83 +289,65 @@ object HoodieSparkSqlWriter {
             client.startCommitWithTime(instantTime, commitActionType)
             val writeStatuses = 
DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete, 
instantTime)
             (writeStatuses, client)
-          }
-          case _ => { // any other operation
-            // register classes & schemas
-            val (structName, nameSpace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
-            sparkContext.getConf.registerKryoClasses(
-              Array(classOf[org.apache.avro.generic.GenericData],
-                classOf[org.apache.avro.Schema]))
-
-            // TODO(HUDI-4472) revisit and simplify schema handling
-            val sourceSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
nameSpace)
-            val latestTableSchema = getLatestTableSchema(fs, basePath, 
sparkContext).getOrElse(sourceSchema)
-
-            val schemaEvolutionEnabled = 
parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), 
"false").toBoolean
-            var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, 
sparkContext)
-
-            val writerSchema: Schema =
-              if (reconcileSchema) {
-                // In case we need to reconcile the schema and schema 
evolution is enabled,
-                // we will force-apply schema evolution to the writer's schema
-                if (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) {
-                  internalSchemaOpt = 
Some(AvroInternalSchemaConverter.convert(sourceSchema))
-                }
 
-                if (internalSchemaOpt.isDefined) {
-                  // Apply schema evolution, by auto-merging write schema and 
read schema
-                  val mergedInternalSchema = 
AvroSchemaEvolutionUtils.reconcileSchema(sourceSchema, internalSchemaOpt.get)
-                  AvroInternalSchemaConverter.convert(mergedInternalSchema, 
latestTableSchema.getName)
-                } else if 
(TableSchemaResolver.isSchemaCompatible(sourceSchema, latestTableSchema)) {
-                  // In case schema reconciliation is enabled and source and 
latest table schemas
-                  // are compatible (as defined by 
[[TableSchemaResolver#isSchemaCompatible]], then we will
-                  // pick latest table's schema as the writer's schema
-                  latestTableSchema
-                } else {
-                  // Otherwise fallback to original source's schema
-                  sourceSchema
-                }
-              } else {
-                // In case reconciliation is disabled, we still have to do 
nullability attributes
-                // (minor) reconciliation, making sure schema of the incoming 
batch is in-line with
-                // the data already committed in the table
-                
AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema, 
latestTableSchema)
-              }
+          case _ =>
+            // Convert to RDD[HoodieRecord]
+            val avroRecords: RDD[GenericRecord] = 
HoodieSparkUtils.createRdd(df, avroRecordName, avroRecordNamespace,
+              Some(writerSchema))
+
+            // Check whether partition columns should be persisted w/in the 
data-files, or should
+            // be instead omitted from them and simply encoded into the 
partition path (which is Spark's
+            // behavior by default)
+            // TODO move partition columns handling down into the handlers
+            val shouldDropPartitionColumns = 
hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
+            val dataFileSchema = if (shouldDropPartitionColumns) {
+              val truncatedSchema = 
generateSchemaWithoutPartitionColumns(partitionColumns, writerSchema)
+              // NOTE: We have to register this schema w/ Kryo to make sure 
it's able to apply an optimization
+              //       allowing it to avoid the need to ser/de the whole 
schema along _every_ Avro record
+              registerAvroSchemasWithKryo(sparkContext, truncatedSchema)
+              truncatedSchema
+            } else {
+              writerSchema
+            }
 
-            validateSchemaForHoodieIsDeleted(writerSchema)
-            sparkContext.getConf.registerAvroSchemas(writerSchema)
-            log.info(s"Registered avro schema : 
${writerSchema.toString(true)}")
+            // NOTE: Avro's [[Schema]] can't be effectively serialized by JVM 
native serialization framework
+            //       (due to containing cyclic refs), therefore we have to 
convert it to string before
+            //       passing onto the Executor
+            val dataFileSchemaStr = dataFileSchema.toString
 
-            // Convert to RDD[HoodieRecord]
-            val genericRecords: RDD[GenericRecord] = 
HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,
-              org.apache.hudi.common.util.Option.of(writerSchema))
             val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
               operation.equals(WriteOperationType.UPSERT) ||
               
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
                 
HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean
-            val hoodieAllIncomingRecords = genericRecords.map(gr => {
-              val processedRecord = getProcessedRecord(partitionColumns, gr, 
dropPartitionColumns)
-              val hoodieRecord = if (shouldCombine) {
-                val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, 
hoodieConfig.getString(PRECOMBINE_FIELD), false, parameters.getOrElse(
-                  
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
-                  
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean)
-                  .asInstanceOf[Comparable[_]]
-                DataSourceUtils.createHoodieRecord(processedRecord,
-                  orderingVal,
-                  keyGenerator.getKey(gr),
-                  hoodieConfig.getString(PAYLOAD_CLASS_NAME))
-              } else {
-                DataSourceUtils.createHoodieRecord(processedRecord, 
keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME))
+
+            val hoodieRecords = avroRecords.mapPartitions(it => {

Review Comment:
   from what I know, mapPartition will bring all records into memory (for a 
single spark partition) and there is a chance of OOMing if there are any data 
skews compared to map() call. where as with map() call, spark always operates 
with one record at a time. 
   
   http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/
   
   ```
   MapPartitions: output is retained in memory, as it can return after 
processing all the rows
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to