[ 
https://issues.apache.org/jira/browse/HUDI-2101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17394979#comment-17394979
 ] 

ASF GitHub Bot commented on HUDI-2101:
--------------------------------------

satishkotha commented on a change in pull request #3330:
URL: https://github.com/apache/hudi/pull/3330#discussion_r684467542



##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java
##########
@@ -108,8 +108,12 @@ public 
SparkExecuteClusteringCommitActionExecutor(HoodieEngineContext context,
     validateWriteResult(writeMetadata);
     commitOnAutoCommit(writeMetadata);
     if (!writeMetadata.getCommitMetadata().isPresent()) {
+      WriteOperationType realOperationType = operationType;
+      if 
(config.getClusteringExecutionStrategyClass().equals("org.apache.hudi.clustering.SparkOptimizeDataLayoutStrategy"))
 {

Review comment:
       this part is a bit confusing. It seems like you have 
SparkOptimizeWriteCommitActionExecutor.java for 'OPTIMIZE'.  Are you reworking 
that as a clustering strategy? If yes, can we delete 
SparkOptimizeWriteCommitActionExecutor.java?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -460,6 +487,17 @@ protected void preCommit(String instantTime, 
HoodieCommitMetadata metadata) {
         Option.of(metadata), config, 
txnManager.getLastCompletedTransactionOwner());
   }
 
+  @Override
+  protected void saveStatisticsInfo(List<String> touchFiles, String cols, 
String indexPath, String saveMode) {
+    if (touchFiles.isEmpty() || cols.isEmpty() || indexPath.isEmpty()) {
+      LOG.warn("save nothing to index table");
+      return;
+    }
+    HoodieSparkEngineContext sparkEngineContext = 
(HoodieSparkEngineContext)context;
+    Zoptimize$.MODULE$.saveStatisticsInfo(sparkEngineContext

Review comment:
       will this work for log files too? I think it is better to build better 
abstractions to make it easily extend to future usecases.
   
   For example, this could call table.updateStatistics. Internally, for 
CopyOnWriteTable we could read stats from parquet footer as an optimization 
instead of recomputing min/max (This optimization need not be done now, but 
building abstraction would help towards that goal.) 
   MergeOnReadTable could do some other optimizations for log files.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/UnsafeAccess.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.optimize;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.misc.Unsafe;
+
+import java.lang.reflect.Field;
+import java.nio.ByteOrder;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+public class UnsafeAccess {

Review comment:
       can you please add javadoc and explain why this is needed?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkOptimizeWriteCommitActionExecutor.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.execution.bulkinsert.NonSortPartitioner;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import org.apache.spark.api.java.JavaRDD;
+
+import scala.Tuple2;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class SparkOptimizeWriteCommitActionExecutor<T extends 
HoodieRecordPayload<T>> extends BaseSparkCommitActionExecutor<T> {
+  private final JavaRDD<HoodieRecord> inputRecordsRDD;
+
+  public SparkOptimizeWriteCommitActionExecutor(HoodieSparkEngineContext 
context, HoodieWriteConfig config, HoodieTable table,
+                                                String instantTime, 
JavaRDD<HoodieRecord> inputRecordsRDD) {
+    this(context, config, table, instantTime, inputRecordsRDD, Option.empty());
+  }
+
+  public SparkOptimizeWriteCommitActionExecutor(HoodieSparkEngineContext 
context, HoodieWriteConfig config, HoodieTable table,
+                                                String instantTime, 
JavaRDD<HoodieRecord> inputRecordsRDD, Option<Map<String, String>> 
extraMetadata) {
+    super(context, config, table, instantTime, WriteOperationType.OPTIMIZE, 
extraMetadata);
+    this.inputRecordsRDD = inputRecordsRDD;
+
+  }
+
+  @Override
+  public HoodieWriteMetadata<JavaRDD> execute() {
+    try {
+      return SparkBulkInsertHelper.newInstance().bulkInsert(inputRecordsRDD, 
instantTime, table, config,
+          this, false, Option.of(new NonSortPartitioner()));

Review comment:
       instead of passing NonSortPartitioner here, can we write a new 
Partitioner that uses Zoptimize module? So basically do the sorting here 
instead of doing it in SparkOptimizeDataLayoutStrategy.java 
#prepareGenericRecord. 
   
   Basically, we can just reuse SparkSortAndSizeExecutionStrategy with new 
partitioner here.

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -365,6 +385,13 @@ private void 
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<Wr
     }
     finalizeWrite(table, clusteringCommitTime, writeStats);
     try {
+      // try to save statistics info to hudi
+      if (config.getOptimizeEnableDataSkipping() && 
!config.getOptimizeSortColumns().isEmpty()) {
+        String basePath = table.getMetaClient().getBasePath();

Review comment:
       looks like stats is being included for 'bulkInsert' operation. What 
happens if there are updates after that? Or say,  data is rewritten later using 
zorder? The index can get out of date. Where are we handling these scenarios?
   
   Instead, i think this needs to be called as part of 
BaseSparkCommitActionExecutor#updateIndex. we can  update primary index and 
data skipping index together. and it would work for all scenarios whenever new 
action is taken. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


> support z-order for hudi
> ------------------------
>
>                 Key: HUDI-2101
>                 URL: https://issues.apache.org/jira/browse/HUDI-2101
>             Project: Apache Hudi
>          Issue Type: Sub-task
>          Components: Spark Integration
>            Reporter: tao meng
>            Assignee: tao meng
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.10.0
>
>
> support z-order for hudi to optimze the query



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to