nsivabalan commented on code in PR #12384:
URL: https://github.com/apache/hudi/pull/12384#discussion_r1864716954
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java:
##########
@@ -189,26 +218,58 @@ public List<WriteStatus> compact(HoodieCompactionHandler
compactionHandler,
new StoragePath(FSUtils.constructAbsolutePath(
metaClient.getBasePath(), operation.getPartitionPath()),
p).toString())
.collect(toList());
- HoodieMergedLogRecordScanner scanner =
HoodieMergedLogRecordScanner.newBuilder()
- .withStorage(storage)
- .withBasePath(metaClient.getBasePath())
- .withLogFilePaths(logFiles)
- .withReaderSchema(readerSchema)
-
.withLatestInstantTime(executionHelper.instantTimeToUseForScanning(instantTime,
maxInstantTime))
- .withInstantRange(instantRange)
-
.withInternalSchema(internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema()))
- .withMaxMemorySizeInBytes(maxMemoryPerCompaction)
- .withReverseReader(config.getCompactionReverseLogReadEnabled())
- .withBufferSize(config.getMaxDFSStreamBufferSize())
- .withSpillableMapBasePath(config.getSpillableMapBasePath())
- .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
-
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
- .withOperationField(config.allowOperationMetadataField())
- .withPartition(operation.getPartitionPath())
-
.withOptimizedLogBlocksScan(executionHelper.enableOptimizedLogBlockScan(config))
- .withRecordMerger(config.getRecordMerger())
- .withTableMetaClient(metaClient)
+
+ boolean isPartialUpdateEnabled =
config.getBooleanOrDefault("hoodie.spark.sql.merge.into.partial.updates",
false);
+ if (isPartialUpdateEnabled && broadcastManagerOpt.isPresent() &&
!isMetadataTable(metaClient)) {
+ return compactUsingFileGroupReader(
+ logFiles, readerSchema, instantTime, internalSchemaOption, config,
operation, metaClient, broadcastManagerOpt, taskContextSupplier);
+ } else {
+ return compactUsingLegacyMethod(
+ storage, logFiles, readerSchema, executionHelper, instantTime,
maxInstantTime, instantRange, internalSchemaOption, config,
+ maxMemoryPerCompaction, operation, metaClient, compactionHandler);
+ }
+ }
+
+ private List<WriteStatus> compactUsingFileGroupReader(List<String> logFiles,
Schema readerSchema, String instantTime,
+ Option<InternalSchema>
internalSchemaOption, HoodieWriteConfig config,
+ CompactionOperation
operation, HoodieTableMetaClient metaClient,
+
Option<CompactorBroadcastManager> partialUpdateReaderContextOpt,
+ TaskContextSupplier
taskContextSupplier) throws IOException {
+ CompactorBroadcastManager compactorBroadcastManager =
partialUpdateReaderContextOpt.get();
+ // PATH 1: When the engine decides to return a valid reader context object.
Review Comment:
can you remove this comment. or fix it as needed
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java:
##########
@@ -112,12 +112,12 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>>
upsert(HoodieEngineContext c
@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>>
insert(HoodieEngineContext context, String instantTime,
HoodieData<HoodieRecord<T>> records) {
- return new
SparkInsertCommitActionExecutor<>((HoodieSparkEngineContext)context, config,
this, instantTime, records).execute();
+ return new SparkInsertCommitActionExecutor<>((HoodieSparkEngineContext)
context, config, this, instantTime, records).execute();
}
@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>>
bulkInsert(HoodieEngineContext context, String instantTime,
HoodieData<HoodieRecord<T>> records,
- Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
+
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
Review Comment:
can you revert changes in these files if these are just unintentional
formatting changes.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java:
##########
@@ -122,19 +122,19 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>>
deletePrepped(HoodieEngineCo
@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>>
upsertPrepped(HoodieEngineContext context, String instantTime,
- HoodieData<HoodieRecord<T>> preppedRecords) {
+
HoodieData<HoodieRecord<T>> preppedRecords) {
return new
SparkUpsertPreppedDeltaCommitActionExecutor<>((HoodieSparkEngineContext)
context, config, this, instantTime, preppedRecords).execute();
}
@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>>
insertPrepped(HoodieEngineContext context, String instantTime,
- HoodieData<HoodieRecord<T>> preppedRecords) {
+
HoodieData<HoodieRecord<T>> preppedRecords) {
Review Comment:
can you revert changes in these files if these are just unintentional
formatting changes.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java:
##########
@@ -278,4 +368,123 @@ public CompactionExecutionHelper
getCompactionExecutionStrategy(HoodieCompaction
}
}
+ List<WriteStatus> compactWithFileGroupReader(HoodieReaderContext
readerContext,
+ String instantTime,
+ HoodieTableMetaClient
metaClient,
+ CompactionOperation operation,
+ List<HoodieLogFile> logFiles,
+ Schema readerSchema,
+ Option<InternalSchema>
internalSchemaOpt,
+ TypedProperties props,
+ HoodieWriteConfig config,
+ TaskContextSupplier
taskContextSupplier) throws IOException {
+ HoodieTimer timer = HoodieTimer.start();
+ timer.startTimer();
+ Option<HoodieBaseFile> baseFileOpt =
+ operation.getBaseFile(metaClient.getBasePath().toString(),
operation.getPartitionPath());
+ FileSlice fileSlice = new FileSlice(
+ operation.getFileGroupId(),
+ operation.getBaseInstantTime(),
+ baseFileOpt.isPresent() ? baseFileOpt.get() : null,
+ logFiles);
+
+ // 1. Generate the input for fg reader.
+ boolean usePosition =
config.getBooleanOrDefault(MERGE_USE_RECORD_POSITIONS);
+ HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<>(
+ readerContext,
+ metaClient.getStorage(),
+ metaClient.getBasePath().toString(),
+ instantTime,
+ fileSlice,
+ readerSchema,
+ readerSchema,
+ internalSchemaOpt,
+ metaClient,
+ props,
+ 0,
+ Long.MAX_VALUE,
+ usePosition);
+
+ // 2. Get the `HoodieFileGroupReaderIterator` from the fg reader.
+ fileGroupReader.initRecordIterators();
+ HoodieFileGroupReader.HoodieFileGroupReaderIterator<T> recordIterator
+ = fileGroupReader.getClosableIterator();
+
+ // 3. Write the record using parquet writer.
+ String writeToken = FSUtils.makeWriteToken(
Review Comment:
hey @yihua : Should we use HoodieCreateHandle here.
I am worried about adhoc code written here. what incase we miss out
something which regular MergeHandle should do?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java:
##########
@@ -101,6 +123,25 @@ public HoodieRecord<InternalRow>
constructHoodieRecord(Option<InternalRow> rowOp
return new HoodieSparkRecord(row,
HoodieInternalRowUtils.getCachedSchema(schema));
}
+ @Override
+ public HoodieRecord<InternalRow> constructHoodieRecord(Option<InternalRow>
rowOption,
+ Map<String, Object>
metadataMap,
+ TypedProperties
props) {
+ if (!rowOption.isPresent()) {
Review Comment:
again, can you help me understand why do we need these adhoc changes ?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java:
##########
@@ -130,8 +151,14 @@ public HoodieData<WriteStatus> compact(
TaskContextSupplier taskContextSupplier = table.getTaskContextSupplier();
// if this is a MDT, set up the instant range of log reader just like
regular MDT snapshot reader.
Option<InstantRange> instantRange =
CompactHelpers.getInstance().getInstantRange(metaClient);
+
+ // Broadcast necessary information.
+ Option<CompactorBroadcastManager> broadcastManagerOpt =
getCompactorBroadcastManager(context);
+ if (broadcastManagerOpt.isPresent()) {
+ broadcastManagerOpt.get().prepareAndBroadcast();
Review Comment:
see. why we are broadcasting if there is no partial updates?
can we avoid that
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java:
##########
@@ -130,8 +151,14 @@ public HoodieData<WriteStatus> compact(
TaskContextSupplier taskContextSupplier = table.getTaskContextSupplier();
// if this is a MDT, set up the instant range of log reader just like
regular MDT snapshot reader.
Option<InstantRange> instantRange =
CompactHelpers.getInstance().getInstantRange(metaClient);
+
+ // Broadcast necessary information.
+ Option<CompactorBroadcastManager> broadcastManagerOpt =
getCompactorBroadcastManager(context);
Review Comment:
I would say, we should not even instantiate CompactorBroadcastManager if
partial update is not enabled.
also, broadcastManager naming does not sit well. for eg, why would this be a
required entity for non partial update flows.
So, can we rename it to be little generic.
something like PartialUpdateReaderContext or PartialUpdateContext.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java:
##########
@@ -278,4 +368,123 @@ public CompactionExecutionHelper
getCompactionExecutionStrategy(HoodieCompaction
}
}
+ List<WriteStatus> compactWithFileGroupReader(HoodieReaderContext
readerContext,
+ String instantTime,
+ HoodieTableMetaClient
metaClient,
+ CompactionOperation operation,
+ List<HoodieLogFile> logFiles,
+ Schema readerSchema,
+ Option<InternalSchema>
internalSchemaOpt,
+ TypedProperties props,
+ HoodieWriteConfig config,
+ TaskContextSupplier
taskContextSupplier) throws IOException {
+ HoodieTimer timer = HoodieTimer.start();
+ timer.startTimer();
+ Option<HoodieBaseFile> baseFileOpt =
+ operation.getBaseFile(metaClient.getBasePath().toString(),
operation.getPartitionPath());
+ FileSlice fileSlice = new FileSlice(
+ operation.getFileGroupId(),
+ operation.getBaseInstantTime(),
+ baseFileOpt.isPresent() ? baseFileOpt.get() : null,
+ logFiles);
+
+ // 1. Generate the input for fg reader.
+ boolean usePosition =
config.getBooleanOrDefault(MERGE_USE_RECORD_POSITIONS);
+ HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<>(
+ readerContext,
+ metaClient.getStorage(),
+ metaClient.getBasePath().toString(),
+ instantTime,
+ fileSlice,
+ readerSchema,
+ readerSchema,
+ internalSchemaOpt,
+ metaClient,
+ props,
+ 0,
+ Long.MAX_VALUE,
+ usePosition);
+
+ // 2. Get the `HoodieFileGroupReaderIterator` from the fg reader.
+ fileGroupReader.initRecordIterators();
+ HoodieFileGroupReader.HoodieFileGroupReaderIterator<T> recordIterator
+ = fileGroupReader.getClosableIterator();
+
+ // 3. Write the record using parquet writer.
+ String writeToken = FSUtils.makeWriteToken(
Review Comment:
see. we are not creating markers here. thats the reason I told I was not
comfortable making this default for whole of spark.
unless we go through one of the existing write handles, the new custom code
we write might have some gaps.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java:
##########
@@ -83,7 +86,26 @@ public Object getValue(InternalRow row, Schema schema,
String fieldName) {
@Override
public String getRecordKey(InternalRow row, Schema schema) {
- return getFieldValueFromInternalRow(row, schema,
RECORD_KEY_METADATA_FIELD).toString();
+ Object key = getFieldValueFromInternalRow(row, schema,
RECORD_KEY_METADATA_FIELD);
+ if (key != null) {
+ return key.toString();
+ }
+ return null;
+ }
+
+ @Override
+ public String getRecordKey(InternalRow row, Schema schema, TypedProperties
props) {
+ String key = getRecordKey(row, schema);
Review Comment:
can you help me understand why do we need these changes?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java:
##########
@@ -19,6 +19,7 @@
package org.apache.hudi.common.table.log;
import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieKey;
Review Comment:
then, can we do the rebase?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java:
##########
@@ -278,4 +368,123 @@ public CompactionExecutionHelper
getCompactionExecutionStrategy(HoodieCompaction
}
}
+ List<WriteStatus> compactWithFileGroupReader(HoodieReaderContext
readerContext,
+ String instantTime,
+ HoodieTableMetaClient
metaClient,
+ CompactionOperation operation,
+ List<HoodieLogFile> logFiles,
+ Schema readerSchema,
+ Option<InternalSchema>
internalSchemaOpt,
+ TypedProperties props,
+ HoodieWriteConfig config,
+ TaskContextSupplier
taskContextSupplier) throws IOException {
+ HoodieTimer timer = HoodieTimer.start();
+ timer.startTimer();
+ Option<HoodieBaseFile> baseFileOpt =
+ operation.getBaseFile(metaClient.getBasePath().toString(),
operation.getPartitionPath());
+ FileSlice fileSlice = new FileSlice(
+ operation.getFileGroupId(),
+ operation.getBaseInstantTime(),
+ baseFileOpt.isPresent() ? baseFileOpt.get() : null,
+ logFiles);
+
+ // 1. Generate the input for fg reader.
+ boolean usePosition =
config.getBooleanOrDefault(MERGE_USE_RECORD_POSITIONS);
+ HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<>(
+ readerContext,
+ metaClient.getStorage(),
+ metaClient.getBasePath().toString(),
+ instantTime,
+ fileSlice,
+ readerSchema,
+ readerSchema,
+ internalSchemaOpt,
+ metaClient,
+ props,
+ 0,
+ Long.MAX_VALUE,
+ usePosition);
+
+ // 2. Get the `HoodieFileGroupReaderIterator` from the fg reader.
+ fileGroupReader.initRecordIterators();
+ HoodieFileGroupReader.HoodieFileGroupReaderIterator<T> recordIterator
+ = fileGroupReader.getClosableIterator();
+
+ // 3. Write the record using parquet writer.
+ String writeToken = FSUtils.makeWriteToken(
Review Comment:
For eg, does this handle virtual keys?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]