danny0405 commented on a change in pull request #4446:
URL: https://github.com/apache/hudi/pull/4446#discussion_r780733934



##########
File path: 
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
##########
@@ -90,27 +90,29 @@ public BaseJavaCommitActionExecutor(HoodieEngineContext 
context,
   public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> 
inputRecords) {
     HoodieWriteMetadata<List<WriteStatus>> result = new 
HoodieWriteMetadata<>();
 
-    WorkloadProfile profile = null;
+    WorkloadProfile inputProfile = null;
     if (isWorkloadProfileNeeded()) {
-      profile = new WorkloadProfile(buildProfile(inputRecords));
-      LOG.info("Workload profile :" + profile);
+      inputProfile = new WorkloadProfile(buildProfile(inputRecords));
+      LOG.info("Input workload profile :" + inputProfile);
+    }
+
+    final Partitioner partitioner = getPartitioner(inputProfile);
+    try {
+      WorkloadProfile executionProfile = 
partitioner.getExecutionWorkloadProfile();
+      LOG.info("Execution workload profile :" + inputProfile);
+      saveWorkloadProfileMetadataToInflight(executionProfile, instantTime);

Review comment:
       Any why we must use the execution profile here ? I know the original 
profile also works only for bloomfilter index but we should fix the profile 
building instead of fetch it from the partitioner, if we have a way to 
distinguish between  `INSERT`s and `UPDATE`s before write.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
##########
@@ -182,14 +182,28 @@ public abstract void preCompact(
         .withOperationField(config.allowOperationMetadataField())
         .withPartition(operation.getPartitionPath())
         .build();
-    if (!scanner.iterator().hasNext()) {
-      scanner.close();
-      return new ArrayList<>();
-    }
 
     Option<HoodieBaseFile> oldDataFileOpt =
         operation.getBaseFile(metaClient.getBasePath(), 
operation.getPartitionPath());
 
+    // Considering following scenario: if all log blocks in this fileSlice is 
rollback, it returns an empty scanner.
+    // But in this case, we need to give it a base file. Otherwise, it will 
lose base file in following fileSlice.
+    if (!scanner.iterator().hasNext()) {
+      if (!oldDataFileOpt.isPresent()) {
+        scanner.close();
+        return new ArrayList<>();
+      } else {
+        // TODO: we may directly rename original parquet file if there is not 
evolution/devolution of schema

Review comment:
       If the file slice only has parquet files, why we still trigger 
compaction ?

##########
File path: 
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
##########
@@ -90,27 +90,29 @@ public BaseJavaCommitActionExecutor(HoodieEngineContext 
context,
   public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> 
inputRecords) {
     HoodieWriteMetadata<List<WriteStatus>> result = new 
HoodieWriteMetadata<>();
 
-    WorkloadProfile profile = null;
+    WorkloadProfile inputProfile = null;
     if (isWorkloadProfileNeeded()) {
-      profile = new WorkloadProfile(buildProfile(inputRecords));
-      LOG.info("Workload profile :" + profile);
+      inputProfile = new WorkloadProfile(buildProfile(inputRecords));
+      LOG.info("Input workload profile :" + inputProfile);
+    }
+
+    final Partitioner partitioner = getPartitioner(inputProfile);
+    try {
+      WorkloadProfile executionProfile = 
partitioner.getExecutionWorkloadProfile();
+      LOG.info("Execution workload profile :" + inputProfile);
+      saveWorkloadProfileMetadataToInflight(executionProfile, instantTime);

Review comment:
       Did you mean `executionProfile` ?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/Partitioner.java
##########
@@ -18,10 +18,14 @@
 
 package org.apache.hudi.table.action.commit;
 
+import org.apache.hudi.table.WorkloadProfile;
+
 import java.io.Serializable;
 
 public interface Partitioner extends Serializable {
   int getNumPartitions();
 
   int getPartition(Object key);
+
+  WorkloadProfile getExecutionWorkloadProfile();
 }

Review comment:
       Why a `Partitioner` returns the profile ? Let's not put the interface 
here.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -97,15 +98,25 @@ void saveWorkloadProfileMetadataToInflight(WorkloadProfile 
profile, String insta
         insertStat.setFileId("");
         insertStat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
         metadata.addWriteStat(path, insertStat);
-
-        partitionStat.getUpdateLocationToCount().forEach((key, value) -> {
-          HoodieWriteStat writeStat = new HoodieWriteStat();
-          writeStat.setFileId(key);
-          // TODO : Write baseCommitTime is possible here ?
-          writeStat.setPrevCommit(value.getKey());
-          writeStat.setNumUpdateWrites(value.getValue());
-          metadata.addWriteStat(path, writeStat);
-        });
+        Map<String, Pair<String, Long>> updateLocationMap = 
partitionStat.getUpdateLocationToCount();

Review comment:
       Can we just write the code block twice instead of union the stream 
together, the code looks like a mess.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to