danny0405 commented on a change in pull request #4446:
URL: https://github.com/apache/hudi/pull/4446#discussion_r780733934
##########
File path:
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
##########
@@ -90,27 +90,29 @@ public BaseJavaCommitActionExecutor(HoodieEngineContext
context,
public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>>
inputRecords) {
HoodieWriteMetadata<List<WriteStatus>> result = new
HoodieWriteMetadata<>();
- WorkloadProfile profile = null;
+ WorkloadProfile inputProfile = null;
if (isWorkloadProfileNeeded()) {
- profile = new WorkloadProfile(buildProfile(inputRecords));
- LOG.info("Workload profile :" + profile);
+ inputProfile = new WorkloadProfile(buildProfile(inputRecords));
+ LOG.info("Input workload profile :" + inputProfile);
+ }
+
+ final Partitioner partitioner = getPartitioner(inputProfile);
+ try {
+ WorkloadProfile executionProfile =
partitioner.getExecutionWorkloadProfile();
+ LOG.info("Execution workload profile :" + inputProfile);
+ saveWorkloadProfileMetadataToInflight(executionProfile, instantTime);
Review comment:
Any why we must use the execution profile here ? I know the original
profile also works only for bloomfilter index but we should fix the profile
building instead of fetch it from the partitioner, if we have a way to
distinguish between `INSERT`s and `UPDATE`s before write.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
##########
@@ -182,14 +182,28 @@ public abstract void preCompact(
.withOperationField(config.allowOperationMetadataField())
.withPartition(operation.getPartitionPath())
.build();
- if (!scanner.iterator().hasNext()) {
- scanner.close();
- return new ArrayList<>();
- }
Option<HoodieBaseFile> oldDataFileOpt =
operation.getBaseFile(metaClient.getBasePath(),
operation.getPartitionPath());
+ // Considering following scenario: if all log blocks in this fileSlice is
rollback, it returns an empty scanner.
+ // But in this case, we need to give it a base file. Otherwise, it will
lose base file in following fileSlice.
+ if (!scanner.iterator().hasNext()) {
+ if (!oldDataFileOpt.isPresent()) {
+ scanner.close();
+ return new ArrayList<>();
+ } else {
+ // TODO: we may directly rename original parquet file if there is not
evolution/devolution of schema
Review comment:
If the file slice only has parquet files, why we still trigger
compaction ?
##########
File path:
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
##########
@@ -90,27 +90,29 @@ public BaseJavaCommitActionExecutor(HoodieEngineContext
context,
public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>>
inputRecords) {
HoodieWriteMetadata<List<WriteStatus>> result = new
HoodieWriteMetadata<>();
- WorkloadProfile profile = null;
+ WorkloadProfile inputProfile = null;
if (isWorkloadProfileNeeded()) {
- profile = new WorkloadProfile(buildProfile(inputRecords));
- LOG.info("Workload profile :" + profile);
+ inputProfile = new WorkloadProfile(buildProfile(inputRecords));
+ LOG.info("Input workload profile :" + inputProfile);
+ }
+
+ final Partitioner partitioner = getPartitioner(inputProfile);
+ try {
+ WorkloadProfile executionProfile =
partitioner.getExecutionWorkloadProfile();
+ LOG.info("Execution workload profile :" + inputProfile);
+ saveWorkloadProfileMetadataToInflight(executionProfile, instantTime);
Review comment:
Did you mean `executionProfile` ?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/Partitioner.java
##########
@@ -18,10 +18,14 @@
package org.apache.hudi.table.action.commit;
+import org.apache.hudi.table.WorkloadProfile;
+
import java.io.Serializable;
public interface Partitioner extends Serializable {
int getNumPartitions();
int getPartition(Object key);
+
+ WorkloadProfile getExecutionWorkloadProfile();
}
Review comment:
Why a `Partitioner` returns the profile ? Let's not put the interface
here.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -97,15 +98,25 @@ void saveWorkloadProfileMetadataToInflight(WorkloadProfile
profile, String insta
insertStat.setFileId("");
insertStat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
metadata.addWriteStat(path, insertStat);
-
- partitionStat.getUpdateLocationToCount().forEach((key, value) -> {
- HoodieWriteStat writeStat = new HoodieWriteStat();
- writeStat.setFileId(key);
- // TODO : Write baseCommitTime is possible here ?
- writeStat.setPrevCommit(value.getKey());
- writeStat.setNumUpdateWrites(value.getValue());
- metadata.addWriteStat(path, writeStat);
- });
+ Map<String, Pair<String, Long>> updateLocationMap =
partitionStat.getUpdateLocationToCount();
Review comment:
Can we just write the code block twice instead of union the stream
together, the code looks like a mess.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]