vinothchandar commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r909140564
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java:
##########
@@ -84,8 +87,19 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>>
execute(String instantTime,
dedupedKeys = keys.repartition(parallelism);
}
- HoodieData<HoodieRecord<T>> dedupedRecords =
- dedupedKeys.map(key -> new HoodieAvroRecord(key, new
EmptyHoodieRecordPayload()));
+ HoodieData dedupedRecords;
+ if (config.getRecordType() == HoodieRecordType.AVRO) {
+ dedupedRecords =
+ dedupedKeys.map(key -> new HoodieAvroRecord(key, new
EmptyHoodieRecordPayload()));
+ } else if (config.getRecordType() == HoodieRecordType.SPARK) {
+ dedupedRecords = dedupedKeys.map(key -> {
+ Class<?> recordClazz =
ReflectionUtils.getClass("org.apache.hudi.commmon.model.HoodieSparkRecord");
+ Method method = recordClazz.getMethod("empty", HoodieKey.class);
+ return method.invoke(null, key);
Review Comment:
+1 actually anything done per record like this is going to affect perf
severly
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java:
##########
@@ -194,6 +194,8 @@ public List<WriteStatus> compact(HoodieCompactionHandler
compactionHandler,
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.withOperationField(config.allowOperationMetadataField())
.withPartition(operation.getPartitionPath())
+ .withRecordType(config.getRecordType())
+ .withCombiningEngineClassFQN(config.getMergeClass())
Review Comment:
rename builder name?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -766,4 +764,59 @@ object HoodieSparkSqlWriter {
Map.empty
}
}
+
+ private def createHoodieRecordRdd(df: DataFrame, config: HoodieConfig,
parameters: Map[String, String], schema: Schema): JavaRDD[HoodieRecord[_]] = {
+ val reconcileSchema =
parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
+ val tblName = config.getString(HoodieWriteConfig.TBL_NAME)
+ val (structName, nameSpace) =
AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
+ val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
+
WriteOperationType.fromValue(config.getString(OPERATION)).equals(WriteOperationType.UPSERT)
||
+ parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
+ HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean
+ val precombineField = config.getString(PRECOMBINE_FIELD)
+ val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new
TypedProperties(config.getProps))
+ val partitionCols = HoodieSparkUtils.getPartitionColumns(keyGenerator,
toProperties(parameters))
+ val dropPartitionColumns =
config.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
+
HoodieRecord.HoodieRecordType.valueOf(config.getStringOrDefault(HoodieWriteConfig.RECORD_TYPE))
match {
Review Comment:
ok understood how we are choosing both the paths now
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -140,29 +136,31 @@ public abstract class AbstractHoodieLogRecordReader {
private Option<String> partitionName;
// Populate meta fields for the records
private boolean populateMetaFields = true;
+ // Record type read from log block
+ protected final HoodieRecordType recordType;
protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath,
List<String> logFilePaths,
Schema readerSchema,
String latestInstantTime, boolean
readBlocksLazily, boolean reverseReader,
int bufferSize, Option<InstantRange>
instantRange,
- boolean withOperationField) {
+ boolean withOperationField,
HoodieRecordType recordType, String combiningEngineClassFQN) {
this(fs, basePath, logFilePaths, readerSchema, latestInstantTime,
readBlocksLazily, reverseReader, bufferSize,
- instantRange, withOperationField, true, Option.empty(),
InternalSchema.getEmptyInternalSchema());
+ instantRange, withOperationField, true, Option.empty(),
InternalSchema.getEmptyInternalSchema(), recordType, combiningEngineClassFQN);
}
protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath,
List<String> logFilePaths,
Schema readerSchema, String
latestInstantTime, boolean readBlocksLazily,
boolean reverseReader, int
bufferSize, Option<InstantRange> instantRange,
boolean withOperationField, boolean
forceFullScan,
- Option<String> partitionName,
InternalSchema internalSchema) {
+ Option<String> partitionName,
InternalSchema internalSchema, HoodieRecordType recordType, String
combiningEngineClassFQN) {
this.readerSchema = readerSchema;
this.latestInstantTime = latestInstantTime;
this.hoodieTableMetaClient =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build();
// load class from the payload fully qualified class name
HoodieTableConfig tableConfig =
this.hoodieTableMetaClient.getTableConfig();
this.payloadClassFQN = tableConfig.getPayloadClass();
this.preCombineField = tableConfig.getPreCombineField();
- this.mergeClassFQN = tableConfig.getMergeClass();
+ this.mergeClassFQN = combiningEngineClassFQN;
Review Comment:
rename variables across the board?
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java:
##########
@@ -89,6 +91,8 @@ private HoodieMergedLogRecordScanner
getMergedLogRecordScanner() throws IOExcept
.withDiskMapType(jobConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(),
HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()))
.withBitCaskDiskMapCompressionEnabled(jobConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
+ .withRecordType(HoodieRecordType.AVRO)
+ .withCombiningEngineClassFQN(HoodieAvroRecordMerge.class.getName())
Review Comment:
so we override this from the reader, even though the table property has say
`hoodie.merge.class` as say HoodieSparkRecordMerge?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]