nsivabalan commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r1027248946
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -239,83 +289,65 @@ object HoodieSparkSqlWriter {
client.startCommitWithTime(instantTime, commitActionType)
val writeStatuses =
DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete,
instantTime)
(writeStatuses, client)
- }
- case _ => { // any other operation
- // register classes & schemas
- val (structName, nameSpace) =
AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
- sparkContext.getConf.registerKryoClasses(
- Array(classOf[org.apache.avro.generic.GenericData],
- classOf[org.apache.avro.Schema]))
-
- // TODO(HUDI-4472) revisit and simplify schema handling
- val sourceSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName,
nameSpace)
- val latestTableSchema = getLatestTableSchema(fs, basePath,
sparkContext).getOrElse(sourceSchema)
-
- val schemaEvolutionEnabled =
parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(),
"false").toBoolean
- var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath,
sparkContext)
-
- val writerSchema: Schema =
- if (reconcileSchema) {
- // In case we need to reconcile the schema and schema
evolution is enabled,
- // we will force-apply schema evolution to the writer's schema
- if (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) {
- internalSchemaOpt =
Some(AvroInternalSchemaConverter.convert(sourceSchema))
- }
- if (internalSchemaOpt.isDefined) {
- // Apply schema evolution, by auto-merging write schema and
read schema
- val mergedInternalSchema =
AvroSchemaEvolutionUtils.reconcileSchema(sourceSchema, internalSchemaOpt.get)
- AvroInternalSchemaConverter.convert(mergedInternalSchema,
latestTableSchema.getName)
- } else if
(TableSchemaResolver.isSchemaCompatible(sourceSchema, latestTableSchema)) {
- // In case schema reconciliation is enabled and source and
latest table schemas
- // are compatible (as defined by
[[TableSchemaResolver#isSchemaCompatible]], then we will
- // pick latest table's schema as the writer's schema
- latestTableSchema
- } else {
- // Otherwise fallback to original source's schema
- sourceSchema
- }
- } else {
- // In case reconciliation is disabled, we still have to do
nullability attributes
- // (minor) reconciliation, making sure schema of the incoming
batch is in-line with
- // the data already committed in the table
-
AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema,
latestTableSchema)
- }
+ case _ =>
+ // Convert to RDD[HoodieRecord]
+ val avroRecords: RDD[GenericRecord] =
HoodieSparkUtils.createRdd(df, avroRecordName, avroRecordNamespace,
+ Some(writerSchema))
+
+ // Check whether partition columns should be persisted w/in the
data-files, or should
+ // be instead omitted from them and simply encoded into the
partition path (which is Spark's
+ // behavior by default)
+ // TODO move partition columns handling down into the handlers
+ val shouldDropPartitionColumns =
hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
+ val dataFileSchema = if (shouldDropPartitionColumns) {
+ val truncatedSchema =
generateSchemaWithoutPartitionColumns(partitionColumns, writerSchema)
+ // NOTE: We have to register this schema w/ Kryo to make sure
it's able to apply an optimization
+ // allowing it to avoid the need to ser/de the whole
schema along _every_ Avro record
+ registerAvroSchemasWithKryo(sparkContext, truncatedSchema)
+ truncatedSchema
+ } else {
+ writerSchema
+ }
- validateSchemaForHoodieIsDeleted(writerSchema)
- sparkContext.getConf.registerAvroSchemas(writerSchema)
- log.info(s"Registered avro schema :
${writerSchema.toString(true)}")
+ // NOTE: Avro's [[Schema]] can't be effectively serialized by JVM
native serialization framework
+ // (due to containing cyclic refs), therefore we have to
convert it to string before
+ // passing onto the Executor
+ val dataFileSchemaStr = dataFileSchema.toString
- // Convert to RDD[HoodieRecord]
- val genericRecords: RDD[GenericRecord] =
HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,
- org.apache.hudi.common.util.Option.of(writerSchema))
val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
operation.equals(WriteOperationType.UPSERT) ||
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean
- val hoodieAllIncomingRecords = genericRecords.map(gr => {
- val processedRecord = getProcessedRecord(partitionColumns, gr,
dropPartitionColumns)
- val hoodieRecord = if (shouldCombine) {
- val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr,
hoodieConfig.getString(PRECOMBINE_FIELD), false, parameters.getOrElse(
-
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
-
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean)
- .asInstanceOf[Comparable[_]]
- DataSourceUtils.createHoodieRecord(processedRecord,
- orderingVal,
- keyGenerator.getKey(gr),
- hoodieConfig.getString(PAYLOAD_CLASS_NAME))
- } else {
- DataSourceUtils.createHoodieRecord(processedRecord,
keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME))
+
+ val hoodieRecords = avroRecords.mapPartitions(it => {
Review Comment:
from what I know, mapPartition will bring all records into memory (for a
single spark partition) and there is a chance of OOMing if there are any data
skews compared to map() call. where as with map() call, spark always operates
with one record at a time.
http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/
```
MapPartitions: output is retained in memory, as it can return after
processing all the rows
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]