vinothchandar commented on a change in pull request #2263: URL: https://github.com/apache/hudi/pull/2263#discussion_r544859334
########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config; + +import org.apache.hudi.common.config.DefaultHoodieConfig; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Properties; + +/** + * Clustering specific configs. + */ +public class HoodieClusteringConfig extends DefaultHoodieConfig { + + public static final String ASYNC_CLUSTERING_ENABLED = "hoodie.clustering.enabled"; + public static final String DEFAULT_ASYNC_CLUSTERING_ENABLED = "false"; + + public static final String SCHEDULE_CLUSTERING_STRATEGY_CLASS = "hoodie.clustering.schedule.strategy.class"; + public static final String DEFAULT_SCHEDULE_CLUSTERING_STRATEGY_CLASS = + "org.apache.hudi.client.clustering.schedule.strategy.SparkBoundedDayBasedScheduleClusteringStrategy"; + + public static final String RUN_CLUSTERING_STRATEGY_CLASS = "hoodie.clustering.run.strategy.class"; + public static final String DEFAULT_RUN_CLUSTERING_STRATEGY_CLASS = + "org.apache.hudi.client.clustering.run.strategy.SparkBulkInsertBasedRunClusteringStrategy"; + + // Turn on inline clustering - after few commits an inline clustering will be run + public static final String INLINE_CLUSTERING_PROP = "hoodie.clustering.inline"; + private static final String DEFAULT_INLINE_CLUSTERING = "false"; + + public static final String INLINE_CLUSTERING_NUM_COMMIT_PROP = "hoodie.clustering.inline.num.commits"; Review comment: lets please add java docs for all these configs? ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java ########## @@ -59,25 +58,39 @@ public static SparkBulkInsertHelper newInstance() { } @Override - public HoodieWriteMetadata<JavaRDD<WriteStatus>> bulkInsert(JavaRDD<HoodieRecord<T>> inputRecords, - String instantTime, - HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, - HoodieWriteConfig config, - BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> executor, - boolean performDedupe, - Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner) { + public HoodieWriteMetadata<JavaRDD<WriteStatus>> bulkInsert(final JavaRDD<HoodieRecord<T>> inputRecords, final String instantTime, final HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, final HoodieWriteConfig config, final BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> executor, final boolean performDedupe, final Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner) { Review comment: make this more readable, by putting each param on a line like before? ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ScheduleClusteringStrategy.java ########## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.cluster.strategy; + +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.avro.model.HoodieSliceInfo; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.view.SyncableFileSystemView; +import org.apache.hudi.common.util.FileSliceUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Pluggable implementation for scheduling clustering and creating ClusteringPlan. + */ +public abstract class ScheduleClusteringStrategy<T extends HoodieRecordPayload,I,K,O> implements Serializable { + private static final Logger LOG = LogManager.getLogger(ScheduleClusteringStrategy.class); + + public static final int CLUSTERING_PLAN_VERSION_1 = 1; + + private final HoodieTable<T,I,K,O> hoodieTable; + private final HoodieEngineContext engineContext; + private final HoodieWriteConfig writeConfig; + + public ScheduleClusteringStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { + this.writeConfig = writeConfig; + this.hoodieTable = table; + this.engineContext = engineContext; + } + + /** + * Generate metadata for grouping eligible files and create a plan. Note that data is not moved around + * as part of this step. + * + * If there is no data available to cluster, return None. + */ + public abstract Option<HoodieClusteringPlan> generateClusteringPlan(); + + /** + * Return file slices eligible for clustering. FileIds in + * 1) pending clustering/compaction + * 2) Larger than clustering target file size Review comment: should the filtering for 2 happen at this level? that seems like something a specific plan would do. 1 makes sense to do at this level. ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ScheduleClusteringStrategy.java ########## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.cluster.strategy; + +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.avro.model.HoodieSliceInfo; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.view.SyncableFileSystemView; +import org.apache.hudi.common.util.FileSliceUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Pluggable implementation for scheduling clustering and creating ClusteringPlan. + */ +public abstract class ScheduleClusteringStrategy<T extends HoodieRecordPayload,I,K,O> implements Serializable { + private static final Logger LOG = LogManager.getLogger(ScheduleClusteringStrategy.class); + + public static final int CLUSTERING_PLAN_VERSION_1 = 1; + + private final HoodieTable<T,I,K,O> hoodieTable; + private final HoodieEngineContext engineContext; + private final HoodieWriteConfig writeConfig; + + public ScheduleClusteringStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { Review comment: this is actually just a `ClusteringPlanStrategy` right? i.e it generates clustering plans. It has less to do with scheduling of clustering itself? ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/RunClusteringStrategy.java ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.cluster.strategy; + +import org.apache.avro.Schema; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.Serializable; +import java.util.Map; + +/** + * Pluggable implementation for writing data into new file groups based on ClusteringPlan. + */ +public abstract class RunClusteringStrategy<T extends HoodieRecordPayload,I,K,O> implements Serializable { + private static final Logger LOG = LogManager.getLogger(RunClusteringStrategy.class); + + private final HoodieTable<T,I,K,O> hoodieTable; + private final HoodieEngineContext engineContext; + private final HoodieWriteConfig writeConfig; + + public RunClusteringStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { + this.writeConfig = writeConfig; + this.hoodieTable = table; + this.engineContext = engineContext; + } + + /** + * Run clustering to write inputRecords into new files as defined by rules in strategy parameters. The number of new + * file groups created is bounded by numOutputGroups. + * Note that commit is not done as part of strategy. commit is callers responsibility. + */ + public abstract O performClustering(final I inputRecords, final int numOutputGroups, final String instantTime, Review comment: lets rename the parameter to be more descriptive? ########## File path: hudi-common/src/main/java/org/apache/hudi/common/util/FileSliceUtils.java ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieLogFile; + +import java.util.List; +import java.util.Map; + +/** + * A utility class for numeric. + */ +public class FileSliceUtils { + + public static final String TOTAL_IO_READ_MB = "TOTAL_IO_READ_MB"; + public static final String TOTAL_IO_WRITE_MB = "TOTAL_IO_WRITE_MB"; + public static final String TOTAL_IO_MB = "TOTAL_IO_MB"; + public static final String TOTAL_LOG_FILE_SIZE = "TOTAL_LOG_FILES_SIZE"; + public static final String TOTAL_LOG_FILES = "TOTAL_LOG_FILES"; + + public static void addFileSliceCommonMetrics(List<FileSlice> fileSlices, Map<String, Double> metrics, long defaultBaseFileSize) { Review comment: this is very metric specific. could we stick it somewhere else closer to actual usage in hudi-client-common? may be rename the class to something like `FileSliceMetricUtils` ? ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java ########## @@ -48,6 +49,7 @@ private Timer deltaCommitTimer = null; private Timer finalizeTimer = null; private Timer compactionTimer = null; + private Timer clusteringTimer = null; Review comment: should this be called `replaceTimer` as well? ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractBulkInsertHelper.java ########## @@ -27,8 +27,21 @@ public abstract class AbstractBulkInsertHelper<T extends HoodieRecordPayload, I, K, O, R> { + /** + * Mark instant as inflight, write input records, update index and return result. + */ public abstract HoodieWriteMetadata<O> bulkInsert(I inputRecords, String instantTime, HoodieTable<T, I, K, O> table, HoodieWriteConfig config, BaseCommitActionExecutor<T, I, K, O, R> executor, boolean performDedupe, Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner); + + /** + * Only write input records. Does not change timeline/index. Return information about new files created. + */ + public abstract O bulkInsert(I inputRecords, String instantTime, + HoodieTable<T, I, K, O> table, HoodieWriteConfig config, + boolean performDedupe, + Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner, + boolean addMetadataFields, + int numOutputGroups); Review comment: can we rename `numOutputGroups` to something that can be easily understood in the bulk insert context. If we leak some clustering terminology here, it becomes to harder to read this. ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareScheduleClusteringStrategy.java ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.cluster.strategy; + +import org.apache.hudi.avro.model.HoodieClusteringGroup; +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.avro.model.HoodieClusteringStrategy; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Scheduling strategy with restriction that clustering groups can only contain files from same partition. + */ +public abstract class PartitionAwareScheduleClusteringStrategy<T extends HoodieRecordPayload,I,K,O> extends ScheduleClusteringStrategy<T,I,K,O> { + private static final Logger LOG = LogManager.getLogger(PartitionAwareScheduleClusteringStrategy.class); + // With more than 50 groups, we see performance degradation with this Strategy implementation. + private static final int MAX_CLUSTERING_GROUPS_STRATEGY = 50; + + public PartitionAwareScheduleClusteringStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + /** + * Create Clustering group based on files eligible for clustering in the partition. + */ + protected abstract Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, + List<FileSlice> fileSlices); + + /** + * Return list of partition paths to be considered for clustering. + */ + protected List<String> filterPartitionPaths(List<String> partitionPaths) { + return partitionPaths; + } + + @Override + public Option<HoodieClusteringPlan> generateClusteringPlan() { + try { + HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient(); + LOG.info("Scheduling clustering for " + metaClient.getBasePath()); + List<String> partitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), + getWriteConfig().shouldAssumeDatePartitioning()); + + // filter the partition paths if needed to reduce list status + partitionPaths = filterPartitionPaths(partitionPaths); + + if (partitionPaths.isEmpty()) { + // In case no partitions could be picked, return no clustering plan + return Option.empty(); + } + + long maxClusteringGroups = getWriteConfig().getClusteringMaxNumGroups(); Review comment: I think we should have 50 as the default value for this config and allow any value to passed in, as opposed to having this limit hard-coded. ########## File path: hudi-common/src/main/avro/HoodieClusteringGroup.avsc ########## @@ -40,6 +40,11 @@ }], "default": null }, + { + "name":"numOutputGroups", Review comment: should it be called `numOutputSlices`? the whole thing is called a group right? or may be you mean `numOutputFileGroups`? in which case, lets rename this to clarify ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareScheduleClusteringStrategy.java ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.cluster.strategy; + +import org.apache.hudi.avro.model.HoodieClusteringGroup; +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.avro.model.HoodieClusteringStrategy; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Scheduling strategy with restriction that clustering groups can only contain files from same partition. + */ +public abstract class PartitionAwareScheduleClusteringStrategy<T extends HoodieRecordPayload,I,K,O> extends ScheduleClusteringStrategy<T,I,K,O> { + private static final Logger LOG = LogManager.getLogger(PartitionAwareScheduleClusteringStrategy.class); + // With more than 50 groups, we see performance degradation with this Strategy implementation. + private static final int MAX_CLUSTERING_GROUPS_STRATEGY = 50; + + public PartitionAwareScheduleClusteringStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + /** + * Create Clustering group based on files eligible for clustering in the partition. + */ + protected abstract Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, + List<FileSlice> fileSlices); + + /** + * Return list of partition paths to be considered for clustering. + */ + protected List<String> filterPartitionPaths(List<String> partitionPaths) { + return partitionPaths; + } + + @Override + public Option<HoodieClusteringPlan> generateClusteringPlan() { + try { + HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient(); + LOG.info("Scheduling clustering for " + metaClient.getBasePath()); + List<String> partitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), + getWriteConfig().shouldAssumeDatePartitioning()); + + // filter the partition paths if needed to reduce list status + partitionPaths = filterPartitionPaths(partitionPaths); + + if (partitionPaths.isEmpty()) { + // In case no partitions could be picked, return no clustering plan + return Option.empty(); + } + + long maxClusteringGroups = getWriteConfig().getClusteringMaxNumGroups(); + if (maxClusteringGroups > MAX_CLUSTERING_GROUPS_STRATEGY) { + LOG.warn("Reducing max clustering groups to " + MAX_CLUSTERING_GROUPS_STRATEGY + " for performance reasons"); + maxClusteringGroups = MAX_CLUSTERING_GROUPS_STRATEGY; + } + + List<HoodieClusteringGroup> clusteringGroups = partitionPaths.stream() Review comment: why not try to do this in parallel using context.map() etc? it should improve performance as well ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/schedule/strategy/SparkBoundedDayBasedScheduleClusteringStrategy.java ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.clustering.schedule.strategy; + +import org.apache.hudi.avro.model.HoodieClusteringGroup; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieSparkCopyOnWriteTable; +import org.apache.hudi.table.HoodieSparkMergeOnReadTable; +import org.apache.hudi.table.action.cluster.strategy.PartitionAwareScheduleClusteringStrategy; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.config.HoodieClusteringConfig.SORT_COLUMNS_PROPERTY; + +/** + * Clustering Strategy based on following. + * 1) Spark execution engine. + * 2) Limits amount of data per clustering operation. + */ +public class SparkBoundedDayBasedScheduleClusteringStrategy<T extends HoodieRecordPayload<T>> + extends PartitionAwareScheduleClusteringStrategy<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> { + private static final Logger LOG = LogManager.getLogger(SparkBoundedDayBasedScheduleClusteringStrategy.class); + + public SparkBoundedDayBasedScheduleClusteringStrategy(HoodieSparkCopyOnWriteTable<T> table, + HoodieSparkEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + public SparkBoundedDayBasedScheduleClusteringStrategy(HoodieSparkMergeOnReadTable<T> table, + HoodieSparkEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + @Override + protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) { + List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>(); + List<FileSlice> currentGroup = new ArrayList<>(); + int totalSizeSoFar = 0; + for (FileSlice currentSlice : fileSlices) { + // assume each filegroup size is ~= parquet.max.file.size + totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : getWriteConfig().getParquetMaxFileSize(); + // check if max size is reached and create new group, if needed. + if (totalSizeSoFar >= getWriteConfig().getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) { + fileSliceGroups.add(Pair.of(currentGroup, getNumberOfGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes()))); + currentGroup = new ArrayList<>(); + totalSizeSoFar = 0; + } + currentGroup.add(currentSlice); + } + if (!currentGroup.isEmpty()) { + fileSliceGroups.add(Pair.of(currentGroup, getNumberOfGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes()))); + } + + return fileSliceGroups.stream().map(fileSliceGroup -> HoodieClusteringGroup.newBuilder() + .setSlices(getFileSliceInfo(fileSliceGroup.getLeft())) + .setNumOutputGroups(fileSliceGroup.getRight()) + .setMetrics(buildMetrics(fileSliceGroup.getLeft())) + .build()); + } + + @Override + protected Map<String, String> getStrategyParams() { + Map<String, String> params = new HashMap<>(); + if (getWriteConfig().getProps().containsKey(SORT_COLUMNS_PROPERTY)) { + params.put(SORT_COLUMNS_PROPERTY, getWriteConfig().getProps().getProperty(SORT_COLUMNS_PROPERTY)); + } + return params; + } + + @Override + protected List<String> filterPartitionPaths(List<String> partitionPaths) { + int targetPartitionsForClustering = getWriteConfig().getTargetPartitionsForClustering(); + return partitionPaths.stream().map(partition -> partition.replace("/", "-")) Review comment: why do we need the /,- replace logic. It should correctly even without that? ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java ########## @@ -40,21 +36,6 @@ public class LogFileSizeBasedCompactionStrategy extends BoundedIOCompactionStrategy implements Comparator<HoodieCompactionOperation> { - private static final String TOTAL_LOG_FILE_SIZE = "TOTAL_LOG_FILE_SIZE"; Review comment: are these cleanups strictly needed for this PR? if you have tested them already, its okay. but generally, separating these in a different refactor PR is preferrable. ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ScheduleClusteringStrategy.java ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.cluster.strategy; + +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.avro.model.HoodieSliceInfo; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.view.SyncableFileSystemView; +import org.apache.hudi.common.util.ClusteringUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Pluggable implementation for scheduling clustering and creating ClusteringPlan. + */ +public abstract class ScheduleClusteringStrategy<T extends HoodieRecordPayload,I,K,O> implements Serializable { + private static final Logger LOG = LogManager.getLogger(ScheduleClusteringStrategy.class); + + public static final String TOTAL_IO_READ_MB = "TOTAL_IO_READ_MB"; + public static final String TOTAL_IO_WRITE_MB = "TOTAL_IO_WRITE_MB"; + public static final String TOTAL_IO_MB = "TOTAL_IO_MB"; + public static final String TOTAL_LOG_FILE_SIZE = "TOTAL_LOG_FILES_SIZE"; + public static final String TOTAL_LOG_FILES = "TOTAL_LOG_FILES"; + + public static final int CLUSTERING_PLAN_VERSION_1 = 1; + + private final HoodieTable<T,I,K,O> hoodieTable; + private final HoodieEngineContext engineContext; + private final HoodieWriteConfig writeConfig; + + public ScheduleClusteringStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { + this.writeConfig = writeConfig; + this.hoodieTable = table; + this.engineContext = engineContext; + } + + /** + * Generate metadata for grouping eligible files and create a plan. Note that data is not moved around + * as part of this step. + * + * If there is no data available to cluster, return None. + */ + public abstract Option<HoodieClusteringPlan> generateClusteringPlan(); + + /** + * Return file slices eligible for clustering. FileIds in + * 1) pending clustering/compaction + * 2) Larger than clustering target file size + * + * are not eligible for clustering + */ + protected List<FileSlice> getFileSlicesEligibleForClustering(String partition) { + Set<HoodieFileGroupId> fgIdsInPendingCompactionAndClustering = ((SyncableFileSystemView) getHoodieTable().getSliceView()).getPendingCompactionOperations() + .map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId()) + .collect(Collectors.toSet()); + fgIdsInPendingCompactionAndClustering.addAll(ClusteringUtils.getAllFileGroupsInPendingClusteringPlans(getHoodieTable().getMetaClient()).keySet()); + + return hoodieTable.getSliceView().getLatestFileSlices(partition) + // file ids already in clustering are not eligible + .filter(slice -> !fgIdsInPendingCompactionAndClustering.contains(slice.getFileGroupId())) + // files that have basefile size larger than clustering target file size are not eligible (Note that compaction can merge any updates) + .filter(slice -> slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) < writeConfig.getClusteringTargetFileSize()) + .collect(Collectors.toList()); + } + + /** + * Get parameters specific to strategy. These parameters are passed from 'schedule clustering' step to + * 'run clustering' step. 'run clustering' step is typically async. So these params help with passing any required + * context from schedule to run step. + */ + protected abstract Map<String, String> getStrategyParams(); + + /** + * Returns any specific parameters to be stored as part of clustering metadata. + */ + protected Map<String, String> getExtraMetadata() { + return Collections.emptyMap(); + } + + /** + * Version to support future changes for plan. + */ + protected int getPlanVersion() { + return CLUSTERING_PLAN_VERSION_1; + } + + /** + * Transform {@link FileSlice} to {@link HoodieSliceInfo}. + */ + protected List<HoodieSliceInfo> getFileSliceInfo(List<FileSlice> slices) { + return slices.stream().map(slice -> new HoodieSliceInfo().newBuilder() + .setPartitionPath(slice.getPartitionPath()) + .setFileId(slice.getFileId()) + .setDataFilePath(slice.getBaseFile().map(BaseFile::getPath).orElse("")) Review comment: use the constant in L123 as well? ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/schedule/strategy/SparkBoundedDayBasedScheduleClusteringStrategy.java ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.clustering.schedule.strategy; + +import org.apache.hudi.avro.model.HoodieClusteringGroup; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieSparkCopyOnWriteTable; +import org.apache.hudi.table.HoodieSparkMergeOnReadTable; +import org.apache.hudi.table.action.cluster.strategy.PartitionAwareScheduleClusteringStrategy; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.config.HoodieClusteringConfig.SORT_COLUMNS_PROPERTY; + +/** + * Clustering Strategy based on following. + * 1) Spark execution engine. + * 2) Limits amount of data per clustering operation. + */ +public class SparkBoundedDayBasedScheduleClusteringStrategy<T extends HoodieRecordPayload<T>> + extends PartitionAwareScheduleClusteringStrategy<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> { + private static final Logger LOG = LogManager.getLogger(SparkBoundedDayBasedScheduleClusteringStrategy.class); + + public SparkBoundedDayBasedScheduleClusteringStrategy(HoodieSparkCopyOnWriteTable<T> table, + HoodieSparkEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + public SparkBoundedDayBasedScheduleClusteringStrategy(HoodieSparkMergeOnReadTable<T> table, + HoodieSparkEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + @Override + protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) { + List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>(); + List<FileSlice> currentGroup = new ArrayList<>(); + int totalSizeSoFar = 0; + for (FileSlice currentSlice : fileSlices) { + // assume each filegroup size is ~= parquet.max.file.size + totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : getWriteConfig().getParquetMaxFileSize(); + // check if max size is reached and create new group, if needed. + if (totalSizeSoFar >= getWriteConfig().getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) { + fileSliceGroups.add(Pair.of(currentGroup, getNumberOfGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes()))); + currentGroup = new ArrayList<>(); + totalSizeSoFar = 0; + } + currentGroup.add(currentSlice); + } + if (!currentGroup.isEmpty()) { + fileSliceGroups.add(Pair.of(currentGroup, getNumberOfGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes()))); + } + + return fileSliceGroups.stream().map(fileSliceGroup -> HoodieClusteringGroup.newBuilder() + .setSlices(getFileSliceInfo(fileSliceGroup.getLeft())) + .setNumOutputGroups(fileSliceGroup.getRight()) + .setMetrics(buildMetrics(fileSliceGroup.getLeft())) + .build()); + } + + @Override + protected Map<String, String> getStrategyParams() { + Map<String, String> params = new HashMap<>(); + if (getWriteConfig().getProps().containsKey(SORT_COLUMNS_PROPERTY)) { + params.put(SORT_COLUMNS_PROPERTY, getWriteConfig().getProps().getProperty(SORT_COLUMNS_PROPERTY)); + } + return params; + } + + @Override + protected List<String> filterPartitionPaths(List<String> partitionPaths) { + int targetPartitionsForClustering = getWriteConfig().getTargetPartitionsForClustering(); + return partitionPaths.stream().map(partition -> partition.replace("/", "-")) + .sorted(Comparator.reverseOrder()).map(partitionPath -> partitionPath.replace("-", "/")) + .limit(targetPartitionsForClustering > 0 ? targetPartitionsForClustering : partitionPaths.size()) + .collect(Collectors.toList()); + } + + private int getNumberOfGroups(long groupSize, long targetFileSize) { Review comment: is this like a group number? Seems more like the number of output files produced by that group ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java ########## @@ -59,25 +58,39 @@ public static SparkBulkInsertHelper newInstance() { } @Override - public HoodieWriteMetadata<JavaRDD<WriteStatus>> bulkInsert(JavaRDD<HoodieRecord<T>> inputRecords, - String instantTime, - HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, - HoodieWriteConfig config, - BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> executor, - boolean performDedupe, - Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner) { + public HoodieWriteMetadata<JavaRDD<WriteStatus>> bulkInsert(final JavaRDD<HoodieRecord<T>> inputRecords, final String instantTime, final HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, final HoodieWriteConfig config, final BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> executor, final boolean performDedupe, final Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner) { Review comment: place each parameter on its own line? ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ScheduleClusteringStrategy.java ########## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.cluster.strategy; + +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.avro.model.HoodieSliceInfo; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.view.SyncableFileSystemView; +import org.apache.hudi.common.util.FileSliceUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Pluggable implementation for scheduling clustering and creating ClusteringPlan. + */ +public abstract class ScheduleClusteringStrategy<T extends HoodieRecordPayload,I,K,O> implements Serializable { + private static final Logger LOG = LogManager.getLogger(ScheduleClusteringStrategy.class); + + public static final int CLUSTERING_PLAN_VERSION_1 = 1; + + private final HoodieTable<T,I,K,O> hoodieTable; + private final HoodieEngineContext engineContext; + private final HoodieWriteConfig writeConfig; + + public ScheduleClusteringStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { Review comment: Should we rename this class and the configs? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
