vinothchandar commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r909140564


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java:
##########
@@ -84,8 +87,19 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> 
execute(String instantTime,
         dedupedKeys = keys.repartition(parallelism);
       }
 
-      HoodieData<HoodieRecord<T>> dedupedRecords =
-          dedupedKeys.map(key -> new HoodieAvroRecord(key, new 
EmptyHoodieRecordPayload()));
+      HoodieData dedupedRecords;
+      if (config.getRecordType() == HoodieRecordType.AVRO) {
+        dedupedRecords =
+            dedupedKeys.map(key -> new HoodieAvroRecord(key, new 
EmptyHoodieRecordPayload()));
+      } else if (config.getRecordType() == HoodieRecordType.SPARK) {
+        dedupedRecords = dedupedKeys.map(key -> {
+          Class<?> recordClazz = 
ReflectionUtils.getClass("org.apache.hudi.commmon.model.HoodieSparkRecord");
+          Method method = recordClazz.getMethod("empty", HoodieKey.class);
+          return method.invoke(null, key);

Review Comment:
   +1 actually anything done per record like this is going to affect perf 
severly



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java:
##########
@@ -194,6 +194,8 @@ public List<WriteStatus> compact(HoodieCompactionHandler 
compactionHandler,
         
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
         .withOperationField(config.allowOperationMetadataField())
         .withPartition(operation.getPartitionPath())
+        .withRecordType(config.getRecordType())
+        .withCombiningEngineClassFQN(config.getMergeClass())

Review Comment:
   rename builder name?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -766,4 +764,59 @@ object HoodieSparkSqlWriter {
       Map.empty
     }
   }
+
+  private def createHoodieRecordRdd(df: DataFrame, config: HoodieConfig, 
parameters: Map[String, String], schema: Schema): JavaRDD[HoodieRecord[_]] = {
+    val reconcileSchema = 
parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
+    val tblName = config.getString(HoodieWriteConfig.TBL_NAME)
+    val (structName, nameSpace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
+    val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
+      
WriteOperationType.fromValue(config.getString(OPERATION)).equals(WriteOperationType.UPSERT)
 ||
+      parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
+        HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean
+    val precombineField = config.getString(PRECOMBINE_FIELD)
+    val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new 
TypedProperties(config.getProps))
+    val partitionCols = HoodieSparkUtils.getPartitionColumns(keyGenerator, 
toProperties(parameters))
+    val dropPartitionColumns = 
config.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
+   
HoodieRecord.HoodieRecordType.valueOf(config.getStringOrDefault(HoodieWriteConfig.RECORD_TYPE))
 match {

Review Comment:
   ok understood how we are choosing both the paths now



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -140,29 +136,31 @@ public abstract class AbstractHoodieLogRecordReader {
   private Option<String> partitionName;
   // Populate meta fields for the records
   private boolean populateMetaFields = true;
+  // Record type read from log block
+  protected final HoodieRecordType recordType;
 
   protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, 
List<String> logFilePaths,
                                           Schema readerSchema,
                                           String latestInstantTime, boolean 
readBlocksLazily, boolean reverseReader,
                                           int bufferSize, Option<InstantRange> 
instantRange,
-                                          boolean withOperationField) {
+                                          boolean withOperationField, 
HoodieRecordType recordType, String combiningEngineClassFQN) {
     this(fs, basePath, logFilePaths, readerSchema, latestInstantTime, 
readBlocksLazily, reverseReader, bufferSize,
-        instantRange, withOperationField, true, Option.empty(), 
InternalSchema.getEmptyInternalSchema());
+        instantRange, withOperationField, true, Option.empty(), 
InternalSchema.getEmptyInternalSchema(), recordType, combiningEngineClassFQN);
   }
 
   protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, 
List<String> logFilePaths,
                                           Schema readerSchema, String 
latestInstantTime, boolean readBlocksLazily,
                                           boolean reverseReader, int 
bufferSize, Option<InstantRange> instantRange,
                                           boolean withOperationField, boolean 
forceFullScan,
-                                          Option<String> partitionName, 
InternalSchema internalSchema) {
+                                          Option<String> partitionName, 
InternalSchema internalSchema, HoodieRecordType recordType, String 
combiningEngineClassFQN) {
     this.readerSchema = readerSchema;
     this.latestInstantTime = latestInstantTime;
     this.hoodieTableMetaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build();
     // load class from the payload fully qualified class name
     HoodieTableConfig tableConfig = 
this.hoodieTableMetaClient.getTableConfig();
     this.payloadClassFQN = tableConfig.getPayloadClass();
     this.preCombineField = tableConfig.getPreCombineField();
-    this.mergeClassFQN = tableConfig.getMergeClass();
+    this.mergeClassFQN = combiningEngineClassFQN;

Review Comment:
   rename variables across the board?



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java:
##########
@@ -89,6 +91,8 @@ private HoodieMergedLogRecordScanner 
getMergedLogRecordScanner() throws IOExcept
         
.withDiskMapType(jobConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(),
 HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()))
         
.withBitCaskDiskMapCompressionEnabled(jobConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
             
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
+        .withRecordType(HoodieRecordType.AVRO)
+        .withCombiningEngineClassFQN(HoodieAvroRecordMerge.class.getName())

Review Comment:
   so we override this from the reader, even though the table property has say 
`hoodie.merge.class` as say HoodieSparkRecordMerge?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to