xiarixiaoyao commented on a change in pull request #3330:
URL: https://github.com/apache/hudi/pull/3330#discussion_r701536494
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
##########
@@ -149,6 +155,37 @@ public HoodieWriteMetadata
insertOverwrite(HoodieEngineContext context, String i
return new SparkInsertOverwriteTableCommitActionExecutor(context, config,
this, instantTime, records).execute();
}
+ @Override
+ public HoodieWriteMetadata<JavaRDD<WriteStatus>>
optimize(HoodieEngineContext context, String instantTime,
JavaRDD<HoodieRecord<T>> records) {
+ return new
SparkOptimizeWriteCommitActionExecutor((HoodieSparkEngineContext)context,
config, this, instantTime, records).execute();
+ }
+
+ @Override
+ public void updateStatistics(HoodieEngineContext context,
List<HoodieWriteStat> stats, String instantTime, Boolean isOptimizeOperation) {
+ // deal with z-order/hilbert statistic info
+ if (isOptimizeOperation) {
+ updateOptimizeOperationStatistics(context, stats, instantTime);
+ }
+ }
+
+ private void updateOptimizeOperationStatistics(HoodieEngineContext context,
List<HoodieWriteStat> stats, String instantTime) {
+ String cols = config.getOptimizeSortColumns();
+ String saveMode = config.getOptimizeStatisticsSaveMode();
+ String basePath = metaClient.getBasePath();
+ String indexPath = metaClient.getZindexPath();
+ List<String> validateCommits = metaClient.getCommitsTimeline()
+ .filterCompletedInstants().getInstants().map(f ->
f.getTimestamp()).collect(Collectors.toList());
+ List<String> touchFiles = stats.stream().map(s -> new Path(basePath,
s.getPath()).toString()).collect(Collectors.toList());
+ if (touchFiles.isEmpty() || cols.isEmpty() || indexPath.isEmpty()) {
+ LOG.warn("save nothing to index table");
+ return;
+ }
+ HoodieSparkEngineContext sparkEngineContext =
(HoodieSparkEngineContext)context;
+ Zoptimize$.MODULE$.saveStatisticsInfo(sparkEngineContext
Review comment:
@vinothchandar yes,Since the statistics will not be large, we stored
as spark parqeut tables for better performance。
we save those statistics info to the path ./hoodie/.index with commitTime
as name
// /tmp/mytest/.hoodie/.index
20210808123645
//
20210808123645 is the index table name.
if the indexPath has no index table, we will save statistis info direclty as
parquet table with commitTime as it's name
if the indexPath has old index table, we will update the old index table by
statistis info with full out join method. then save the updated info into a new
parquet table with commitTime as it's name (full out join method is more like
the update method of cow table)
In the hoodieFileIndex, we do data skip by use lastest index table. Filters
from query statement will be convert to the filter for index table, choose the
filter files from index table, than use those filter files to skip files。
of course this method is simple, but it's enough to do data skip for
z-order/hilbert optimitze. RFC-27 is a surprising feature for data skip,
however this feature is not yet compeled. Once RFC-27 has been completed , i
will do adaptation。
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]