xiarixiaoyao commented on a change in pull request #3330:
URL: https://github.com/apache/hudi/pull/3330#discussion_r701536494



##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
##########
@@ -149,6 +155,37 @@ public HoodieWriteMetadata 
insertOverwrite(HoodieEngineContext context, String i
     return new SparkInsertOverwriteTableCommitActionExecutor(context, config, 
this, instantTime, records).execute();
   }
 
+  @Override
+  public HoodieWriteMetadata<JavaRDD<WriteStatus>> 
optimize(HoodieEngineContext context, String instantTime, 
JavaRDD<HoodieRecord<T>> records) {
+    return new 
SparkOptimizeWriteCommitActionExecutor((HoodieSparkEngineContext)context, 
config, this, instantTime, records).execute();
+  }
+
+  @Override
+  public void updateStatistics(HoodieEngineContext context, 
List<HoodieWriteStat> stats, String instantTime, Boolean isOptimizeOperation) {
+    // deal with z-order/hilbert statistic info
+    if (isOptimizeOperation) {
+      updateOptimizeOperationStatistics(context, stats, instantTime);
+    }
+  }
+
+  private void updateOptimizeOperationStatistics(HoodieEngineContext context, 
List<HoodieWriteStat> stats, String instantTime) {
+    String cols = config.getOptimizeSortColumns();
+    String saveMode = config.getOptimizeStatisticsSaveMode();
+    String basePath = metaClient.getBasePath();
+    String indexPath = metaClient.getZindexPath();
+    List<String> validateCommits = metaClient.getCommitsTimeline()
+        .filterCompletedInstants().getInstants().map(f -> 
f.getTimestamp()).collect(Collectors.toList());
+    List<String> touchFiles = stats.stream().map(s -> new Path(basePath, 
s.getPath()).toString()).collect(Collectors.toList());
+    if (touchFiles.isEmpty() || cols.isEmpty() || indexPath.isEmpty()) {
+      LOG.warn("save nothing to index table");
+      return;
+    }
+    HoodieSparkEngineContext sparkEngineContext = 
(HoodieSparkEngineContext)context;
+    Zoptimize$.MODULE$.saveStatisticsInfo(sparkEngineContext

Review comment:
       @vinothchandar   yes,Since the statistics will not be large, we stored  
as spark parqeut tables for better performance。
    we save those statistics info to the path ./hoodie/.index with commitTime 
as name
   // /tmp/mytest/.hoodie/.index
   20210808123645
   //
   20210808123645 is the index table name.
   
   if the indexPath has no index table, we will save statistis info direclty as 
parquet table with commitTime as it's name
   if the indexPath has old index table, we will update the old index table by 
statistis info with full out join method. then save the updated info into a new 
parquet table with commitTime as it's name (full out join method is more like 
the update method of  cow table)
   
   In the hoodieFileIndex, we do data skip by use lastest index table. Filters 
from query statement will be convert to the filter for index table, choose the 
filter files from index table, than use those filter files to skip files。
   
   of course this method is simple, but it's enough to do data skip for 
z-order/hilbert optimitze. RFC-27 is a surprising feature for data skip, 
however this feature is not yet compeled. Once RFC-27 has been completed , i 
will do adaptation。




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to