xiarixiaoyao commented on a change in pull request #3330:
URL: https://github.com/apache/hudi/pull/3330#discussion_r701530327



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
##########
@@ -241,6 +241,27 @@ private synchronized FileSystemViewManager 
getViewManager() {
    */
   public abstract HoodieWriteMetadata<O> 
insertOverwriteTable(HoodieEngineContext context, String instantTime, I 
records);
 
+  /**
+   * Replaces all the existing records of the Hoodie table and optimize data 
layout,
+   * for the partition paths contained in input records.
+   *
+   * @param context HoodieEngineContext
+   * @param instantTime Instant time for the replace action
+   * @param records input records
+   * @return HoodieWriteMetadata
+   */
+  public abstract HoodieWriteMetadata<O> optimize(HoodieEngineContext context, 
String instantTime, I records);
+
+  /**
+   * update statistics info for current table.
+   * now only support OPTIMIZE operation, to do support other operation type.
+   *
+   * @param context HoodieEngineContext
+   * @param instantTime Instant time for the replace action
+   * @param isOptimizeOperation whether current operation is OPTIMIZE type
+   */
+  public abstract void updateStatistics(HoodieEngineContext context, 
List<HoodieWriteStat> stats, String instantTime, Boolean isOptimizeOperation);

Review comment:
       now we donnot use hoodieMetadataTable to store statistics info。 maybe 
it's not suitable to put this method in to HoodieTableMetatable.   
   Of course it can be put in HoodieTableMetdata。

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
##########
@@ -149,6 +155,37 @@ public HoodieWriteMetadata 
insertOverwrite(HoodieEngineContext context, String i
     return new SparkInsertOverwriteTableCommitActionExecutor(context, config, 
this, instantTime, records).execute();
   }
 
+  @Override
+  public HoodieWriteMetadata<JavaRDD<WriteStatus>> 
optimize(HoodieEngineContext context, String instantTime, 
JavaRDD<HoodieRecord<T>> records) {
+    return new 
SparkOptimizeWriteCommitActionExecutor((HoodieSparkEngineContext)context, 
config, this, instantTime, records).execute();
+  }
+
+  @Override
+  public void updateStatistics(HoodieEngineContext context, 
List<HoodieWriteStat> stats, String instantTime, Boolean isOptimizeOperation) {
+    // deal with z-order/hilbert statistic info
+    if (isOptimizeOperation) {
+      updateOptimizeOperationStatistics(context, stats, instantTime);
+    }
+  }
+
+  private void updateOptimizeOperationStatistics(HoodieEngineContext context, 
List<HoodieWriteStat> stats, String instantTime) {
+    String cols = config.getOptimizeSortColumns();
+    String saveMode = config.getOptimizeStatisticsSaveMode();
+    String basePath = metaClient.getBasePath();
+    String indexPath = metaClient.getZindexPath();
+    List<String> validateCommits = metaClient.getCommitsTimeline()
+        .filterCompletedInstants().getInstants().map(f -> 
f.getTimestamp()).collect(Collectors.toList());
+    List<String> touchFiles = stats.stream().map(s -> new Path(basePath, 
s.getPath()).toString()).collect(Collectors.toList());
+    if (touchFiles.isEmpty() || cols.isEmpty() || indexPath.isEmpty()) {
+      LOG.warn("save nothing to index table");
+      return;
+    }
+    HoodieSparkEngineContext sparkEngineContext = 
(HoodieSparkEngineContext)context;
+    Zoptimize$.MODULE$.saveStatisticsInfo(sparkEngineContext

Review comment:
       @vinothchandar   yes,Since the statistics will not be large, we stored  
as spark parqeut tables for better performance。
    we save those statistics info to the path ./hoodie/.index with commitTime 
as name
   // /tmp/mytest/.hoodie/.index
   20210808123645
   //
   20210808123645 is the index table name.
   
   if the indexPath has no index table, we will save statistis info direclty as 
parquet table with commitTime as it's name
   if the indexPath has old index table, we will update the old index table by 
statistis info with full out join method. then save the updated info into a new 
parquet table with commitTime as it's name (full out join method is more like 
the update method of  cow table)
   
   In the hoodieFileIndex, we do data skip by use lastest index table. Filters 
from query statement will be convert to the filter for index table, choose the 
filter files from index table, than use those filter files to skip files。
   
   of course this method is simple, but it's enough to do data skip for 
z-order/hilbert optimitze. RFC-27 is a surprising feature for data skip, 
however this feature is not yet compeled. Once RFC-27 has been completed , i 
will do adaptation。




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to