[
https://issues.apache.org/jira/browse/HUDI-2101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17394979#comment-17394979
]
ASF GitHub Bot commented on HUDI-2101:
--------------------------------------
satishkotha commented on a change in pull request #3330:
URL: https://github.com/apache/hudi/pull/3330#discussion_r684467542
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java
##########
@@ -108,8 +108,12 @@ public
SparkExecuteClusteringCommitActionExecutor(HoodieEngineContext context,
validateWriteResult(writeMetadata);
commitOnAutoCommit(writeMetadata);
if (!writeMetadata.getCommitMetadata().isPresent()) {
+ WriteOperationType realOperationType = operationType;
+ if
(config.getClusteringExecutionStrategyClass().equals("org.apache.hudi.clustering.SparkOptimizeDataLayoutStrategy"))
{
Review comment:
this part is a bit confusing. It seems like you have
SparkOptimizeWriteCommitActionExecutor.java for 'OPTIMIZE'. Are you reworking
that as a clustering strategy? If yes, can we delete
SparkOptimizeWriteCommitActionExecutor.java?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -460,6 +487,17 @@ protected void preCommit(String instantTime,
HoodieCommitMetadata metadata) {
Option.of(metadata), config,
txnManager.getLastCompletedTransactionOwner());
}
+ @Override
+ protected void saveStatisticsInfo(List<String> touchFiles, String cols,
String indexPath, String saveMode) {
+ if (touchFiles.isEmpty() || cols.isEmpty() || indexPath.isEmpty()) {
+ LOG.warn("save nothing to index table");
+ return;
+ }
+ HoodieSparkEngineContext sparkEngineContext =
(HoodieSparkEngineContext)context;
+ Zoptimize$.MODULE$.saveStatisticsInfo(sparkEngineContext
Review comment:
will this work for log files too? I think it is better to build better
abstractions to make it easily extend to future usecases.
For example, this could call table.updateStatistics. Internally, for
CopyOnWriteTable we could read stats from parquet footer as an optimization
instead of recomputing min/max (This optimization need not be done now, but
building abstraction would help towards that goal.)
MergeOnReadTable could do some other optimizations for log files.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/UnsafeAccess.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.optimize;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.misc.Unsafe;
+
+import java.lang.reflect.Field;
+import java.nio.ByteOrder;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+public class UnsafeAccess {
Review comment:
can you please add javadoc and explain why this is needed?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkOptimizeWriteCommitActionExecutor.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.execution.bulkinsert.NonSortPartitioner;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import org.apache.spark.api.java.JavaRDD;
+
+import scala.Tuple2;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class SparkOptimizeWriteCommitActionExecutor<T extends
HoodieRecordPayload<T>> extends BaseSparkCommitActionExecutor<T> {
+ private final JavaRDD<HoodieRecord> inputRecordsRDD;
+
+ public SparkOptimizeWriteCommitActionExecutor(HoodieSparkEngineContext
context, HoodieWriteConfig config, HoodieTable table,
+ String instantTime,
JavaRDD<HoodieRecord> inputRecordsRDD) {
+ this(context, config, table, instantTime, inputRecordsRDD, Option.empty());
+ }
+
+ public SparkOptimizeWriteCommitActionExecutor(HoodieSparkEngineContext
context, HoodieWriteConfig config, HoodieTable table,
+ String instantTime,
JavaRDD<HoodieRecord> inputRecordsRDD, Option<Map<String, String>>
extraMetadata) {
+ super(context, config, table, instantTime, WriteOperationType.OPTIMIZE,
extraMetadata);
+ this.inputRecordsRDD = inputRecordsRDD;
+
+ }
+
+ @Override
+ public HoodieWriteMetadata<JavaRDD> execute() {
+ try {
+ return SparkBulkInsertHelper.newInstance().bulkInsert(inputRecordsRDD,
instantTime, table, config,
+ this, false, Option.of(new NonSortPartitioner()));
Review comment:
instead of passing NonSortPartitioner here, can we write a new
Partitioner that uses Zoptimize module? So basically do the sorting here
instead of doing it in SparkOptimizeDataLayoutStrategy.java
#prepareGenericRecord.
Basically, we can just reuse SparkSortAndSizeExecutionStrategy with new
partitioner here.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -365,6 +385,13 @@ private void
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<Wr
}
finalizeWrite(table, clusteringCommitTime, writeStats);
try {
+ // try to save statistics info to hudi
+ if (config.getOptimizeEnableDataSkipping() &&
!config.getOptimizeSortColumns().isEmpty()) {
+ String basePath = table.getMetaClient().getBasePath();
Review comment:
looks like stats is being included for 'bulkInsert' operation. What
happens if there are updates after that? Or say, data is rewritten later using
zorder? The index can get out of date. Where are we handling these scenarios?
Instead, i think this needs to be called as part of
BaseSparkCommitActionExecutor#updateIndex. we can update primary index and
data skipping index together. and it would work for all scenarios whenever new
action is taken. WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
> support z-order for hudi
> ------------------------
>
> Key: HUDI-2101
> URL: https://issues.apache.org/jira/browse/HUDI-2101
> Project: Apache Hudi
> Issue Type: Sub-task
> Components: Spark Integration
> Reporter: tao meng
> Assignee: tao meng
> Priority: Major
> Labels: pull-request-available
> Fix For: 0.10.0
>
>
> support z-order for hudi to optimze the query
--
This message was sent by Atlassian Jira
(v8.3.4#803005)