This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 46808dc [HUDI-2497] Refactor clean and restore actions in hudi-client
module (#3734)
46808dc is described below
commit 46808dcb1fe22491326a9e831dd4dde4c70796fb
Author: Y Ethan Guo <[email protected]>
AuthorDate: Thu Sep 30 15:20:25 2021 -0700
[HUDI-2497] Refactor clean and restore actions in hudi-client module (#3734)
---
...ctionExecutor.java => CleanActionExecutor.java} | 94 ++++++++++++++-
...nExecutor.java => CleanPlanActionExecutor.java} | 16 +--
.../restore/CopyOnWriteRestoreActionExecutor.java} | 48 ++++----
.../restore/MergeOnReadRestoreActionExecutor.java} | 33 ++---
.../client/common/HoodieFlinkEngineContext.java | 15 +++
.../hudi/table/HoodieFlinkCopyOnWriteTable.java | 10 +-
.../action/clean/FlinkCleanActionExecutor.java | 128 --------------------
.../clean/FlinkScheduleCleanActionExecutor.java | 52 --------
.../client/common/HoodieJavaEngineContext.java | 15 +++
.../hudi/table/HoodieJavaCopyOnWriteTable.java | 14 +--
.../action/clean/JavaCleanActionExecutor.java | 130 --------------------
.../clean/JavaScheduleCleanActionExecutor.java | 52 --------
.../client/common/HoodieSparkEngineContext.java | 19 +++
.../hudi/table/HoodieSparkCopyOnWriteTable.java | 18 +--
.../hudi/table/HoodieSparkMergeOnReadTable.java | 8 +-
.../action/clean/SparkCleanActionExecutor.java | 134 ---------------------
.../action/clean/SparkCleanPlanActionExecutor.java | 55 ---------
.../SparkCopyOnWriteRestoreActionExecutor.java | 70 -----------
.../hudi/common/engine/HoodieEngineContext.java | 7 ++
.../common/engine/HoodieLocalEngineContext.java | 16 +++
.../hudi/common/function/FunctionWrapper.java | 11 ++
.../function/SerializablePairFlatMapFunction.java | 33 +++++
22 files changed, 273 insertions(+), 705 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
similarity index 53%
rename from
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
rename to
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index acc3cdc..a5a72d4 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -20,10 +20,13 @@ package org.apache.hudi.table.action.clean;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+
+import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.CleanFileInfo;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -31,29 +34,36 @@ import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
+
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
-public abstract class BaseCleanActionExecutor<T extends HoodieRecordPayload,
I, K, O> extends BaseActionExecutor<T, I, K, O, HoodieCleanMetadata> {
+public class CleanActionExecutor<T extends HoodieRecordPayload, I, K, O>
extends BaseActionExecutor<T, I, K, O, HoodieCleanMetadata> {
private static final long serialVersionUID = 1L;
- private static final Logger LOG =
LogManager.getLogger(BaseCleanActionExecutor.class);
+ private static final Logger LOG =
LogManager.getLogger(CleanActionExecutor.class);
- public BaseCleanActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime) {
+ public CleanActionExecutor(HoodieEngineContext context, HoodieWriteConfig
config, HoodieTable<T, I, K, O> table, String instantTime) {
super(context, config, table, instantTime);
}
- protected static Boolean deleteFileAndGetResult(FileSystem fs, String
deletePathStr) throws IOException {
+ static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr)
throws IOException {
Path deletePath = new Path(deletePathStr);
LOG.debug("Working on delete path :" + deletePath);
try {
@@ -68,13 +78,85 @@ public abstract class BaseCleanActionExecutor<T extends
HoodieRecordPayload, I,
}
}
+ static Stream<Pair<String, PartitionCleanStat>>
deleteFilesFunc(Iterator<Pair<String, CleanFileInfo>> cleanFileInfo,
HoodieTable table) {
+ Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
+ FileSystem fs = table.getMetaClient().getFs();
+
+ cleanFileInfo.forEachRemaining(partitionDelFileTuple -> {
+ String partitionPath = partitionDelFileTuple.getLeft();
+ Path deletePath = new
Path(partitionDelFileTuple.getRight().getFilePath());
+ String deletePathStr = deletePath.toString();
+ Boolean deletedFileResult = null;
+ try {
+ deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
+
+ } catch (IOException e) {
+ LOG.error("Delete file failed: " + deletePathStr);
+ }
+ final PartitionCleanStat partitionCleanStat =
+ partitionCleanStatMap.computeIfAbsent(partitionPath, k -> new
PartitionCleanStat(partitionPath));
+ boolean isBootstrapBasePathFile =
partitionDelFileTuple.getRight().isBootstrapBaseFile();
+
+ if (isBootstrapBasePathFile) {
+ // For Bootstrap Base file deletions, store the full file path.
+ partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true);
+ partitionCleanStat.addDeletedFileResult(deletePath.toString(),
deletedFileResult, true);
+ } else {
+ partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false);
+ partitionCleanStat.addDeletedFileResult(deletePath.getName(),
deletedFileResult, false);
+ }
+ });
+ return partitionCleanStatMap.entrySet().stream().map(e ->
Pair.of(e.getKey(), e.getValue()));
+ }
+
/**
* Performs cleaning of partition paths according to cleaning policy and
returns the number of files cleaned. Handles
* skews in partitions to clean by making files to clean as the unit of task
distribution.
*
* @throws IllegalArgumentException if unknown cleaning policy is provided
*/
- abstract List<HoodieCleanStat> clean(HoodieEngineContext context,
HoodieCleanerPlan cleanerPlan);
+ List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan
cleanerPlan) {
+ int cleanerParallelism = Math.min(
+ (int)
(cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()),
+ config.getCleanerParallelism());
+ LOG.info("Using cleanerParallelism: " + cleanerParallelism);
+
+ context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of
partitions");
+
+ Stream<Pair<String, CleanFileInfo>> filesToBeDeletedPerPartition =
+ cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
+ .flatMap(x -> x.getValue().stream().map(y -> new
ImmutablePair<>(x.getKey(),
+ new CleanFileInfo(y.getFilePath(),
y.getIsBootstrapBaseFile()))));
+
+ Stream<ImmutablePair<String, PartitionCleanStat>> partitionCleanStats =
+ context.mapPartitionsToPairAndReduceByKey(filesToBeDeletedPerPartition,
+ iterator -> deleteFilesFunc(iterator, table),
PartitionCleanStat::merge, cleanerParallelism);
+
+ Map<String, PartitionCleanStat> partitionCleanStatsMap =
partitionCleanStats
+ .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+ // Return PartitionCleanStat for each partition passed.
+ return
cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath
-> {
+ PartitionCleanStat partitionCleanStat =
partitionCleanStatsMap.containsKey(partitionPath)
+ ? partitionCleanStatsMap.get(partitionPath)
+ : new PartitionCleanStat(partitionPath);
+ HoodieActionInstant actionInstant =
cleanerPlan.getEarliestInstantToRetain();
+ return
HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
+ .withEarliestCommitRetained(Option.ofNullable(
+ actionInstant != null
+ ? new
HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
+ actionInstant.getAction(), actionInstant.getTimestamp())
+ : null))
+ .withDeletePathPattern(partitionCleanStat.deletePathPatterns())
+ .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles())
+ .withFailedDeletes(partitionCleanStat.failedDeleteFiles())
+
.withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns())
+
.withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles())
+
.withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles())
+ .build();
+ }).collect(Collectors.toList());
+ }
+
/**
* Executes the Cleaner plan stored in the instant metadata.
@@ -143,7 +225,7 @@ public abstract class BaseCleanActionExecutor<T extends
HoodieRecordPayload, I,
}
// return the last clean metadata for now
// TODO (NA) : Clean only the earliest pending clean just like how we do
for other table services
- // This requires the BaseCleanActionExecutor to be refactored as
BaseCommitActionExecutor
+ // This requires the CleanActionExecutor to be refactored as
BaseCommitActionExecutor
return cleanMetadataList.size() > 0 ?
cleanMetadataList.get(cleanMetadataList.size() - 1) : null;
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanPlanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
similarity index 90%
rename from
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanPlanActionExecutor.java
rename to
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
index fc0c000..9b95bd7 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanPlanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
@@ -43,22 +43,24 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-public abstract class BaseCleanPlanActionExecutor<T extends
HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O,
Option<HoodieCleanerPlan>> {
+public class CleanPlanActionExecutor<T extends HoodieRecordPayload, I, K, O>
extends BaseActionExecutor<T, I, K, O, Option<HoodieCleanerPlan>> {
private static final Logger LOG = LogManager.getLogger(CleanPlanner.class);
private final Option<Map<String, String>> extraMetadata;
- public BaseCleanPlanActionExecutor(HoodieEngineContext context,
- HoodieWriteConfig config,
- HoodieTable<T, I, K, O> table,
- String instantTime,
- Option<Map<String, String>>
extraMetadata) {
+ public CleanPlanActionExecutor(HoodieEngineContext context,
+ HoodieWriteConfig config,
+ HoodieTable<T, I, K, O> table,
+ String instantTime,
+ Option<Map<String, String>> extraMetadata) {
super(context, config, table, instantTime);
this.extraMetadata = extraMetadata;
}
- protected abstract Option<HoodieCleanerPlan> createCleanerPlan();
+ protected Option<HoodieCleanerPlan> createCleanerPlan() {
+ return execute();
+ }
/**
* Generates List of files to be cleaned.
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/restore/JavaCopyOnWriteRestoreActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
similarity index 59%
rename from
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/restore/JavaCopyOnWriteRestoreActionExecutor.java
rename to
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
index f7677ae..2e3b148 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/restore/JavaCopyOnWriteRestoreActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java
@@ -7,22 +7,20 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.hudi.table.action.restore;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieJavaEngineContext;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -32,35 +30,35 @@ import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
-import java.util.List;
-
-public class JavaCopyOnWriteRestoreActionExecutor<T extends
HoodieRecordPayload> extends
- BaseRestoreActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> {
-
- public JavaCopyOnWriteRestoreActionExecutor(HoodieJavaEngineContext context,
- HoodieWriteConfig config,
- HoodieTable table,
- String instantTime,
- String restoreInstantTime) {
+public class CopyOnWriteRestoreActionExecutor<T extends HoodieRecordPayload,
I, K, O>
+ extends BaseRestoreActionExecutor<T, I, K, O> {
+ public CopyOnWriteRestoreActionExecutor(HoodieEngineContext context,
+ HoodieWriteConfig config,
+ HoodieTable table,
+ String instantTime,
+ String restoreInstantTime) {
super(context, config, table, instantTime, restoreInstantTime);
}
@Override
protected HoodieRollbackMetadata rollbackInstant(HoodieInstant
instantToRollback) {
+ if (!instantToRollback.getAction().equals(HoodieTimeline.COMMIT_ACTION)
+ &&
!instantToRollback.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+ throw new HoodieRollbackException("Unsupported action in rollback
instant:" + instantToRollback);
+ }
+ table.getMetaClient().reloadActiveTimeline();
+ String newInstantTime = HoodieActiveTimeline.createNewInstantTime();
+ table.scheduleRollback(context, newInstantTime, instantToRollback, false);
table.getMetaClient().reloadActiveTimeline();
CopyOnWriteRollbackActionExecutor rollbackActionExecutor = new
CopyOnWriteRollbackActionExecutor(
context,
config,
table,
- HoodieActiveTimeline.createNewInstantTime(),
+ newInstantTime,
instantToRollback,
true,
true,
false);
- if (!instantToRollback.getAction().equals(HoodieTimeline.COMMIT_ACTION)
- &&
!instantToRollback.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
- throw new HoodieRollbackException("Unsupported action in rollback
instant:" + instantToRollback);
- }
return rollbackActionExecutor.execute();
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkMergeOnReadRestoreActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
similarity index 67%
rename from
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkMergeOnReadRestoreActionExecutor.java
rename to
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
index 14a0b24..58663b6 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkMergeOnReadRestoreActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java
@@ -7,22 +7,20 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.hudi.table.action.restore;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -31,17 +29,10 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
-import org.apache.spark.api.java.JavaRDD;
-
-@SuppressWarnings("checkstyle:LineLength")
-public class SparkMergeOnReadRestoreActionExecutor<T extends
HoodieRecordPayload> extends
- BaseRestoreActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> {
-
- public SparkMergeOnReadRestoreActionExecutor(HoodieSparkEngineContext
context,
- HoodieWriteConfig config,
- HoodieTable table,
- String instantTime,
- String restoreInstantTime) {
+public class MergeOnReadRestoreActionExecutor<T extends HoodieRecordPayload,
I, K, O>
+ extends BaseRestoreActionExecutor<T, I, K, O> {
+ public MergeOnReadRestoreActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config, HoodieTable<T, I, K, O> table,
+ String instantTime, String
restoreInstantTime) {
super(context, config, table, instantTime, restoreInstantTime);
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
index 174122c..687ecc1 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -26,11 +26,13 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.Option;
import org.apache.flink.api.common.functions.RuntimeContext;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -38,9 +40,11 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.util.FlinkClientUtil;
+import static
org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapToPairWrapper;
import static
org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWrapper;
import static
org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper;
import static
org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
@@ -87,6 +91,17 @@ public class HoodieFlinkEngineContext extends
HoodieEngineContext {
}
@Override
+ public <I, K, V> Stream<ImmutablePair<K, V>>
mapPartitionsToPairAndReduceByKey(
+ Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V>
flatMapToPairFunc,
+ SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+ return
throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator())
+ .collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream()
+ .map(entry -> new ImmutablePair<>(entry.getKey(),
entry.getValue().stream().map(
+
Pair::getValue).reduce(throwingReduceWrapper(reduceFunc)).orElse(null)))
+ .filter(Objects::nonNull);
+ }
+
+ @Override
public <I, K, V> List<V> reduceByKey(
List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int
parallelism) {
return data.stream().parallel()
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 93785b9..2238ac3 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -44,8 +44,8 @@ import org.apache.hudi.io.HoodieSortedMergeHandle;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
-import org.apache.hudi.table.action.clean.FlinkCleanActionExecutor;
-import org.apache.hudi.table.action.clean.FlinkScheduleCleanActionExecutor;
+import org.apache.hudi.table.action.clean.CleanActionExecutor;
+import org.apache.hudi.table.action.clean.CleanPlanActionExecutor;
import org.apache.hudi.table.action.commit.FlinkDeleteCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkInsertCommitActionExecutor;
import
org.apache.hudi.table.action.commit.FlinkInsertOverwriteCommitActionExecutor;
@@ -297,7 +297,7 @@ public class HoodieFlinkCopyOnWriteTable<T extends
HoodieRecordPayload> extends
*/
@Override
public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext
context, String instantTime, Option<Map<String, String>> extraMetadata) {
- return new FlinkScheduleCleanActionExecutor(context, config, this,
instantTime, extraMetadata).execute();
+ return new CleanPlanActionExecutor(context, config, this, instantTime,
extraMetadata).execute();
}
@Override
@@ -308,7 +308,7 @@ public class HoodieFlinkCopyOnWriteTable<T extends
HoodieRecordPayload> extends
@Override
public HoodieCleanMetadata clean(HoodieEngineContext context, String
cleanInstantTime) {
- return new FlinkCleanActionExecutor(context, config, this,
cleanInstantTime).execute();
+ return new CleanActionExecutor(context, config, this,
cleanInstantTime).execute();
}
@Override
@@ -329,7 +329,7 @@ public class HoodieFlinkCopyOnWriteTable<T extends
HoodieRecordPayload> extends
// -------------------------------------------------------------------------
// Used for compaction
// -------------------------------------------------------------------------
-
+
public Iterator<List<WriteStatus>> handleUpdate(String instantTime, String
partitionPath, String fileId,
Map<String, HoodieRecord<T>>
keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException {
// these are updates
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/clean/FlinkCleanActionExecutor.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/clean/FlinkCleanActionExecutor.java
deleted file mode 100644
index 9378cb2..0000000
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/clean/FlinkCleanActionExecutor.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.clean;
-
-import org.apache.hudi.avro.model.HoodieActionInstant;
-import org.apache.hudi.avro.model.HoodieCleanerPlan;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.HoodieCleanStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.CleanFileInfo;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import scala.Tuple2;
-
-public class FlinkCleanActionExecutor<T extends HoodieRecordPayload> extends
- BaseCleanActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> {
-
- private static final Logger LOG =
LogManager.getLogger(FlinkCleanActionExecutor.class);
-
- public FlinkCleanActionExecutor(HoodieEngineContext context,
- HoodieWriteConfig config,
- HoodieTable<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> table,
- String instantTime) {
- super(context, config, table, instantTime);
- }
-
- @Override
- List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan
cleanerPlan) {
- Stream<Tuple2<String, CleanFileInfo>> filesToBeDeletedPerPartition =
cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
- .flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(),
new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile()))));
-
- Stream<Tuple2<String, PartitionCleanStat>> partitionCleanStats =
- deleteFilesFunc(filesToBeDeletedPerPartition, table)
- .collect(Collectors.groupingBy(Pair::getLeft))
- .entrySet().stream()
- .map(x -> new Tuple2(x.getKey(), x.getValue().stream().map(y ->
y.getRight()).reduce(PartitionCleanStat::merge).get()));
-
- Map<String, PartitionCleanStat> partitionCleanStatsMap =
partitionCleanStats
- .collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
-
- // Return PartitionCleanStat for each partition passed.
- return
cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath
-> {
- PartitionCleanStat partitionCleanStat =
partitionCleanStatsMap.containsKey(partitionPath)
- ? partitionCleanStatsMap.get(partitionPath)
- : new PartitionCleanStat(partitionPath);
- HoodieActionInstant actionInstant =
cleanerPlan.getEarliestInstantToRetain();
- return
HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
- .withEarliestCommitRetained(Option.ofNullable(
- actionInstant != null
- ? new
HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
- actionInstant.getAction(), actionInstant.getTimestamp())
- : null))
- .withDeletePathPattern(partitionCleanStat.deletePathPatterns())
- .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles())
- .withFailedDeletes(partitionCleanStat.failedDeleteFiles())
-
.withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns())
-
.withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles())
-
.withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles())
- .build();
- }).collect(Collectors.toList());
- }
-
- private static Stream<Pair<String, PartitionCleanStat>>
deleteFilesFunc(Stream<Tuple2<String, CleanFileInfo>> cleanFileInfo,
HoodieTable table) {
- Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
- FileSystem fs = table.getMetaClient().getFs();
-
- cleanFileInfo.parallel().forEach(partitionDelFileTuple -> {
- String partitionPath = partitionDelFileTuple._1();
- Path deletePath = new Path(partitionDelFileTuple._2().getFilePath());
- String deletePathStr = deletePath.toString();
- Boolean deletedFileResult = null;
- try {
- deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
- } catch (IOException e) {
- LOG.error("Delete file failed");
- }
- final PartitionCleanStat partitionCleanStat;
- synchronized (partitionCleanStatMap) {
- partitionCleanStat =
partitionCleanStatMap.computeIfAbsent(partitionPath, k -> new
PartitionCleanStat(partitionPath));
- }
- boolean isBootstrapBasePathFile =
partitionDelFileTuple._2().isBootstrapBaseFile();
- if (isBootstrapBasePathFile) {
- // For Bootstrap Base file deletions, store the full file path.
- partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true);
- partitionCleanStat.addDeletedFileResult(deletePath.toString(),
deletedFileResult, true);
- } else {
- partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false);
- partitionCleanStat.addDeletedFileResult(deletePath.getName(),
deletedFileResult, false);
- }
- });
- return partitionCleanStatMap.entrySet().stream().map(e ->
Pair.of(e.getKey(), e.getValue()));
- }
-}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/clean/FlinkScheduleCleanActionExecutor.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/clean/FlinkScheduleCleanActionExecutor.java
deleted file mode 100644
index 75da54e..0000000
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/clean/FlinkScheduleCleanActionExecutor.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.clean;
-
-import java.util.List;
-import java.util.Map;
-import org.apache.hudi.avro.model.HoodieCleanerPlan;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-public class FlinkScheduleCleanActionExecutor<T extends HoodieRecordPayload>
extends
- BaseCleanPlanActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> {
-
- private static final Logger LOG =
LogManager.getLogger(FlinkScheduleCleanActionExecutor.class);
-
- public FlinkScheduleCleanActionExecutor(HoodieEngineContext context,
- HoodieWriteConfig config,
- HoodieTable<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> table,
- String instantTime,
- Option<Map<String, String>> extraMetadata) {
- super(context, config, table, instantTime, extraMetadata);
- }
-
- @Override
- protected Option<HoodieCleanerPlan> createCleanerPlan() {
- return super.execute();
- }
-}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
index 4cdbff2..bdc2a85 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
@@ -19,6 +19,7 @@
package org.apache.hudi.client.common;
import org.apache.hadoop.conf.Configuration;
+
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -26,11 +27,14 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -38,6 +42,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.stream.Collectors.toList;
+import static
org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapToPairWrapper;
import static
org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWrapper;
import static
org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper;
import static
org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
@@ -71,6 +76,16 @@ public class HoodieJavaEngineContext extends
HoodieEngineContext {
}
@Override
+ public <I, K, V> Stream<ImmutablePair<K, V>>
mapPartitionsToPairAndReduceByKey(Stream<I> data,
SerializablePairFlatMapFunction<Iterator<I>, K, V> flatMapToPairFunc,
+
SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+ return
throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator())
+ .collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream()
+ .map(entry -> new ImmutablePair<>(entry.getKey(),
entry.getValue().stream().map(
+
Pair::getValue).reduce(throwingReduceWrapper(reduceFunc)).orElse(null)))
+ .filter(Objects::nonNull);
+ }
+
+ @Override
public <I, K, V> List<V> reduceByKey(
List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int
parallelism) {
return data.stream().parallel()
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 72d63d5..99cf413 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -39,8 +39,8 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
-import org.apache.hudi.table.action.clean.JavaCleanActionExecutor;
-import org.apache.hudi.table.action.clean.JavaScheduleCleanActionExecutor;
+import org.apache.hudi.table.action.clean.CleanActionExecutor;
+import org.apache.hudi.table.action.clean.CleanPlanActionExecutor;
import org.apache.hudi.table.action.commit.JavaDeleteCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaBulkInsertCommitActionExecutor;
import
org.apache.hudi.table.action.commit.JavaBulkInsertPreppedCommitActionExecutor;
@@ -50,7 +50,7 @@ import
org.apache.hudi.table.action.commit.JavaInsertOverwriteTableCommitActionE
import
org.apache.hudi.table.action.commit.JavaInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaUpsertCommitActionExecutor;
import
org.apache.hudi.table.action.commit.JavaUpsertPreppedCommitActionExecutor;
-import
org.apache.hudi.table.action.restore.JavaCopyOnWriteRestoreActionExecutor;
+import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor;
import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
@@ -187,13 +187,13 @@ public class HoodieJavaCopyOnWriteTable<T extends
HoodieRecordPayload> extends H
@Override
public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext
context, String instantTime, Option<Map<String, String>> extraMetadata) {
- return new JavaScheduleCleanActionExecutor<>(context, config, this,
instantTime, extraMetadata).execute();
+ return new CleanPlanActionExecutor<>(context, config, this, instantTime,
extraMetadata).execute();
}
@Override
public HoodieCleanMetadata clean(HoodieEngineContext context,
String cleanInstantTime) {
- return new JavaCleanActionExecutor(context, config, this,
cleanInstantTime).execute();
+ return new CleanActionExecutor(context, config, this,
cleanInstantTime).execute();
}
@Override
@@ -218,7 +218,7 @@ public class HoodieJavaCopyOnWriteTable<T extends
HoodieRecordPayload> extends H
public HoodieRestoreMetadata restore(HoodieEngineContext context,
String restoreInstantTime,
String instantToRestore) {
- return new JavaCopyOnWriteRestoreActionExecutor((HoodieJavaEngineContext)
context,
- config, this, restoreInstantTime, instantToRestore).execute();
+ return new CopyOnWriteRestoreActionExecutor(
+ context, config, this, restoreInstantTime, instantToRestore).execute();
}
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/clean/JavaCleanActionExecutor.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/clean/JavaCleanActionExecutor.java
deleted file mode 100644
index 0ca73d4..0000000
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/clean/JavaCleanActionExecutor.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.clean;
-
-import org.apache.hudi.avro.model.HoodieActionInstant;
-import org.apache.hudi.avro.model.HoodieCleanerPlan;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.HoodieCleanStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.CleanFileInfo;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.ImmutablePair;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-public class JavaCleanActionExecutor<T extends HoodieRecordPayload> extends
- BaseCleanActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> {
-
- private static final Logger LOG =
LogManager.getLogger(JavaCleanActionExecutor.class);
-
- public JavaCleanActionExecutor(HoodieEngineContext context,
- HoodieWriteConfig config,
- HoodieTable<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> table,
- String instantTime) {
- super(context, config, table, instantTime);
- }
-
- @Override
- List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan
cleanerPlan) {
-
- Iterator<ImmutablePair<String, CleanFileInfo>>
filesToBeDeletedPerPartition =
cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
- .flatMap(x -> x.getValue().stream().map(y -> new
ImmutablePair<>(x.getKey(), new CleanFileInfo(y.getFilePath(),
y.getIsBootstrapBaseFile())))).iterator();
-
- Stream<Pair<String, PartitionCleanStat>> partitionCleanStats =
- deleteFilesFunc(filesToBeDeletedPerPartition, table)
- .collect(Collectors.groupingBy(Pair::getLeft))
- .entrySet().stream()
- .map(x -> new ImmutablePair(x.getKey(),
x.getValue().stream().map(y ->
y.getRight()).reduce(PartitionCleanStat::merge).get()));
-
- Map<String, PartitionCleanStat> partitionCleanStatsMap =
partitionCleanStats
- .collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
-
- // Return PartitionCleanStat for each partition passed.
- return
cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath
-> {
- PartitionCleanStat partitionCleanStat =
partitionCleanStatsMap.containsKey(partitionPath)
- ? partitionCleanStatsMap.get(partitionPath)
- : new PartitionCleanStat(partitionPath);
- HoodieActionInstant actionInstant =
cleanerPlan.getEarliestInstantToRetain();
- return
HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
- .withEarliestCommitRetained(Option.ofNullable(
- actionInstant != null
- ? new
HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
- actionInstant.getAction(), actionInstant.getTimestamp())
- : null))
- .withDeletePathPattern(partitionCleanStat.deletePathPatterns())
- .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles())
- .withFailedDeletes(partitionCleanStat.failedDeleteFiles())
-
.withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns())
-
.withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles())
-
.withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles())
- .build();
- }).collect(Collectors.toList());
- }
-
- private static Stream<Pair<String, PartitionCleanStat>>
deleteFilesFunc(Iterator<ImmutablePair<String, CleanFileInfo>> iter,
HoodieTable table) {
- Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
- FileSystem fs = table.getMetaClient().getFs();
-
- while (iter.hasNext()) {
- Pair<String, CleanFileInfo> partitionDelFileTuple = iter.next();
- String partitionPath = partitionDelFileTuple.getLeft();
- Path deletePath = new
Path(partitionDelFileTuple.getRight().getFilePath());
- String deletePathStr = deletePath.toString();
- Boolean deletedFileResult = null;
- try {
- deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
- } catch (IOException e) {
- LOG.error("Delete file failed");
- }
- if (!partitionCleanStatMap.containsKey(partitionPath)) {
- partitionCleanStatMap.put(partitionPath, new
PartitionCleanStat(partitionPath));
- }
- boolean isBootstrapBasePathFile =
partitionDelFileTuple.getRight().isBootstrapBaseFile();
- PartitionCleanStat partitionCleanStat =
partitionCleanStatMap.get(partitionPath);
- if (isBootstrapBasePathFile) {
- // For Bootstrap Base file deletions, store the full file path.
- partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true);
- partitionCleanStat.addDeletedFileResult(deletePath.toString(),
deletedFileResult, true);
- } else {
- partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false);
- partitionCleanStat.addDeletedFileResult(deletePath.getName(),
deletedFileResult, false);
- }
- }
- return partitionCleanStatMap.entrySet().stream().map(e ->
Pair.of(e.getKey(), e.getValue()));
- }
-}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/clean/JavaScheduleCleanActionExecutor.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/clean/JavaScheduleCleanActionExecutor.java
deleted file mode 100644
index 05d19a6..0000000
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/clean/JavaScheduleCleanActionExecutor.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.clean;
-
-import java.util.List;
-import java.util.Map;
-import org.apache.hudi.avro.model.HoodieCleanerPlan;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-public class JavaScheduleCleanActionExecutor<T extends HoodieRecordPayload>
extends
- BaseCleanPlanActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> {
-
- private static final Logger LOG =
LogManager.getLogger(JavaScheduleCleanActionExecutor.class);
-
- public JavaScheduleCleanActionExecutor(HoodieEngineContext context,
- HoodieWriteConfig config,
- HoodieTable<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> table,
- String instantTime,
- Option<Map<String, String>> extraMetadata) {
- super(context, config, table, instantTime, extraMetadata);
- }
-
- @Override
- protected Option<HoodieCleanerPlan> createCleanerPlan() {
- return super.execute();
- }
-}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
index de06ea4..416992e 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
@@ -25,18 +25,23 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.sql.SQLContext;
import scala.Tuple2;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
@@ -83,6 +88,20 @@ public class HoodieSparkEngineContext extends
HoodieEngineContext {
}
@Override
+ public <I, K, V> Stream<ImmutablePair<K, V>>
mapPartitionsToPairAndReduceByKey(
+ Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V>
flatMapToPairFunc,
+ SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+ return javaSparkContext.parallelize(data.collect(Collectors.toList()),
parallelism)
+ .mapPartitionsToPair((PairFlatMapFunction<Iterator<I>, K, V>) iterator
->
+
flatMapToPairFunc.call(iterator).collect(Collectors.toList()).stream()
+ .map(e -> new Tuple2<>(e.getKey(), e.getValue())).iterator()
+ )
+ .reduceByKey(reduceFunc::apply)
+ .map(e -> new ImmutablePair<>(e._1, e._2))
+ .collect().stream();
+ }
+
+ @Override
public <I, K, V> List<V> reduceByKey(
List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int
parallelism) {
return javaSparkContext.parallelize(data, parallelism).mapToPair(pair ->
new Tuple2<K, V>(pair.getLeft(), pair.getRight()))
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index c2770a7..a9b36a8 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -50,8 +50,8 @@ import
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import
org.apache.hudi.table.action.bootstrap.SparkBootstrapCommitActionExecutor;
-import org.apache.hudi.table.action.clean.SparkCleanActionExecutor;
-import org.apache.hudi.table.action.clean.SparkCleanPlanActionExecutor;
+import org.apache.hudi.table.action.clean.CleanActionExecutor;
+import org.apache.hudi.table.action.clean.CleanPlanActionExecutor;
import org.apache.hudi.table.action.cluster.SparkClusteringPlanActionExecutor;
import
org.apache.hudi.table.action.cluster.SparkExecuteClusteringCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkBulkInsertCommitActionExecutor;
@@ -65,7 +65,7 @@ import
org.apache.hudi.table.action.commit.SparkInsertPreppedCommitActionExecuto
import org.apache.hudi.table.action.commit.SparkMergeHelper;
import org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor;
import
org.apache.hudi.table.action.commit.SparkUpsertPreppedCommitActionExecutor;
-import
org.apache.hudi.table.action.restore.SparkCopyOnWriteRestoreActionExecutor;
+import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor;
import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
@@ -181,12 +181,12 @@ public class HoodieSparkCopyOnWriteTable<T extends
HoodieRecordPayload> extends
@Override
public void rollbackBootstrap(HoodieEngineContext context, String
instantTime) {
- new SparkCopyOnWriteRestoreActionExecutor((HoodieSparkEngineContext)
context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
+ new CopyOnWriteRestoreActionExecutor(context, config, this, instantTime,
HoodieTimeline.INIT_INSTANT_TS).execute();
}
@Override
public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext
context, String instantTime, Option<Map<String, String>> extraMetadata) {
- return new SparkCleanPlanActionExecutor<>(context, config,this,
instantTime, extraMetadata).execute();
+ return new CleanPlanActionExecutor<>(context, config, this, instantTime,
extraMetadata).execute();
}
@Override
@@ -197,7 +197,7 @@ public class HoodieSparkCopyOnWriteTable<T extends
HoodieRecordPayload> extends
}
public Iterator<List<WriteStatus>> handleUpdate(String instantTime, String
partitionPath, String fileId,
- Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile
oldDataFile) throws IOException {
+ Map<String, HoodieRecord<T>>
keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException {
// these are updates
HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime,
partitionPath, fileId, keyToNewRecords, oldDataFile);
return handleUpdateInternal(upsertHandle, instantTime, fileId);
@@ -242,7 +242,7 @@ public class HoodieSparkCopyOnWriteTable<T extends
HoodieRecordPayload> extends
}
public Iterator<List<WriteStatus>> handleInsert(String instantTime, String
partitionPath, String fileId,
- Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordMap) {
+ Map<String, HoodieRecord<?
extends HoodieRecordPayload>> recordMap) {
HoodieCreateHandle<?,?,?,?> createHandle =
new HoodieCreateHandle(config, instantTime, this, partitionPath,
fileId, recordMap, taskContextSupplier);
createHandle.write();
@@ -251,7 +251,7 @@ public class HoodieSparkCopyOnWriteTable<T extends
HoodieRecordPayload> extends
@Override
public HoodieCleanMetadata clean(HoodieEngineContext context, String
cleanInstantTime) {
- return new SparkCleanActionExecutor((HoodieSparkEngineContext)context,
config, this, cleanInstantTime).execute();
+ return new CleanActionExecutor(context, config, this,
cleanInstantTime).execute();
}
@Override
@@ -266,7 +266,7 @@ public class HoodieSparkCopyOnWriteTable<T extends
HoodieRecordPayload> extends
@Override
public HoodieRestoreMetadata restore(HoodieEngineContext context, String
restoreInstantTime, String instantToRestore) {
- return new
SparkCopyOnWriteRestoreActionExecutor((HoodieSparkEngineContext) context,
config, this, restoreInstantTime, instantToRestore).execute();
+ return new CopyOnWriteRestoreActionExecutor(context, config, this,
restoreInstantTime, instantToRestore).execute();
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
index ee66d7b..b4b106c 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
@@ -39,6 +39,7 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import
org.apache.hudi.table.action.bootstrap.SparkBootstrapDeltaCommitActionExecutor;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
+import
org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor;
import org.apache.hudi.table.action.compact.SparkRunCompactionActionExecutor;
import
org.apache.hudi.table.action.compact.SparkScheduleCompactionActionExecutor;
import
org.apache.hudi.table.action.deltacommit.SparkBulkInsertDeltaCommitActionExecutor;
@@ -48,8 +49,7 @@ import
org.apache.hudi.table.action.deltacommit.SparkInsertDeltaCommitActionExec
import
org.apache.hudi.table.action.deltacommit.SparkInsertPreppedDeltaCommitActionExecutor;
import
org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExecutor;
import
org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor;
-import
org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor;
-import
org.apache.hudi.table.action.restore.SparkMergeOnReadRestoreActionExecutor;
+import org.apache.hudi.table.action.restore.MergeOnReadRestoreActionExecutor;
import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
@@ -141,7 +141,7 @@ public class HoodieSparkMergeOnReadTable<T extends
HoodieRecordPayload> extends
@Override
public void rollbackBootstrap(HoodieEngineContext context, String
instantTime) {
- new SparkMergeOnReadRestoreActionExecutor((HoodieSparkEngineContext)
context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
+ new MergeOnReadRestoreActionExecutor(context, config, this, instantTime,
HoodieTimeline.INIT_INSTANT_TS).execute();
}
@Override
@@ -161,7 +161,7 @@ public class HoodieSparkMergeOnReadTable<T extends
HoodieRecordPayload> extends
@Override
public HoodieRestoreMetadata restore(HoodieEngineContext context, String
restoreInstantTime, String instantToRestore) {
- return new
SparkMergeOnReadRestoreActionExecutor((HoodieSparkEngineContext) context,
config, this, restoreInstantTime, instantToRestore).execute();
+ return new MergeOnReadRestoreActionExecutor(context, config, this,
restoreInstantTime, instantToRestore).execute();
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/clean/SparkCleanActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/clean/SparkCleanActionExecutor.java
deleted file mode 100644
index ba2d42f..0000000
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/clean/SparkCleanActionExecutor.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.clean;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hudi.avro.model.HoodieActionInstant;
-import org.apache.hudi.avro.model.HoodieCleanerPlan;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.HoodieCleanStat;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.CleanFileInfo;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
-import scala.Tuple2;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-@SuppressWarnings("checkstyle:LineLength")
-public class SparkCleanActionExecutor<T extends HoodieRecordPayload> extends
- BaseCleanActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> {
-
- private static final Logger LOG =
LogManager.getLogger(SparkCleanActionExecutor.class);
-
- public SparkCleanActionExecutor(HoodieSparkEngineContext context,
- HoodieWriteConfig config,
- HoodieTable<T, JavaRDD<HoodieRecord<T>>,
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
- String instantTime) {
- super(context, config, table, instantTime);
- }
-
- private static PairFlatMapFunction<Iterator<Tuple2<String, CleanFileInfo>>,
String, PartitionCleanStat>
- deleteFilesFunc(HoodieTable table) {
- return (PairFlatMapFunction<Iterator<Tuple2<String, CleanFileInfo>>,
String, PartitionCleanStat>) iter -> {
- Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
- FileSystem fs = table.getMetaClient().getFs();
- while (iter.hasNext()) {
- Tuple2<String, CleanFileInfo> partitionDelFileTuple = iter.next();
- String partitionPath = partitionDelFileTuple._1();
- Path deletePath = new Path(partitionDelFileTuple._2().getFilePath());
- String deletePathStr = deletePath.toString();
- Boolean deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
- if (!partitionCleanStatMap.containsKey(partitionPath)) {
- partitionCleanStatMap.put(partitionPath, new
PartitionCleanStat(partitionPath));
- }
- boolean isBootstrapBasePathFile =
partitionDelFileTuple._2().isBootstrapBaseFile();
- PartitionCleanStat partitionCleanStat =
partitionCleanStatMap.get(partitionPath);
- if (isBootstrapBasePathFile) {
- // For Bootstrap Base file deletions, store the full file path.
- partitionCleanStat.addDeleteFilePatterns(deletePath.toString(),
true);
- partitionCleanStat.addDeletedFileResult(deletePath.toString(),
deletedFileResult, true);
- } else {
- partitionCleanStat.addDeleteFilePatterns(deletePath.getName(),
false);
- partitionCleanStat.addDeletedFileResult(deletePath.getName(),
deletedFileResult, false);
- }
- }
- return partitionCleanStatMap.entrySet().stream().map(e -> new
Tuple2<>(e.getKey(), e.getValue()))
- .collect(Collectors.toList()).iterator();
- };
- }
-
- @Override
- List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan
cleanerPlan) {
- JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
- int cleanerParallelism = Math.min(
- (int)
(cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()),
- config.getCleanerParallelism());
- LOG.info("Using cleanerParallelism: " + cleanerParallelism);
-
- context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of
partitions");
- List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc
-
.parallelize(cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
- .flatMap(x -> x.getValue().stream().map(y -> new
Tuple2<>(x.getKey(),
- new CleanFileInfo(y.getFilePath(),
y.getIsBootstrapBaseFile()))))
- .collect(Collectors.toList()), cleanerParallelism)
- .mapPartitionsToPair(deleteFilesFunc(table))
- .reduceByKey(PartitionCleanStat::merge).collect();
-
- Map<String, PartitionCleanStat> partitionCleanStatsMap =
partitionCleanStats.stream()
- .collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
-
- // Return PartitionCleanStat for each partition passed.
- return
cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath
-> {
- PartitionCleanStat partitionCleanStat =
partitionCleanStatsMap.containsKey(partitionPath)
- ? partitionCleanStatsMap.get(partitionPath)
- : new PartitionCleanStat(partitionPath);
- HoodieActionInstant actionInstant =
cleanerPlan.getEarliestInstantToRetain();
- return
HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
- .withEarliestCommitRetained(Option.ofNullable(
- actionInstant != null
- ? new
HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
- actionInstant.getAction(), actionInstant.getTimestamp())
- : null))
- .withDeletePathPattern(partitionCleanStat.deletePathPatterns())
- .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles())
- .withFailedDeletes(partitionCleanStat.failedDeleteFiles())
-
.withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns())
-
.withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles())
-
.withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles())
- .build();
- }).collect(Collectors.toList());
- }
-}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/clean/SparkCleanPlanActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/clean/SparkCleanPlanActionExecutor.java
deleted file mode 100644
index f5529a8..0000000
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/clean/SparkCleanPlanActionExecutor.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.clean;
-
-import org.apache.hudi.avro.model.HoodieCleanerPlan;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaRDD;
-
-import java.util.Map;
-
-@SuppressWarnings("checkstyle:LineLength")
-public class SparkCleanPlanActionExecutor<T extends HoodieRecordPayload>
extends
- BaseCleanPlanActionExecutor<T, JavaRDD<HoodieRecord<T>>,
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
-
- private static final Logger LOG =
LogManager.getLogger(SparkCleanPlanActionExecutor.class);
-
- public SparkCleanPlanActionExecutor(HoodieEngineContext context,
- HoodieWriteConfig config,
- HoodieTable<T, JavaRDD<HoodieRecord<T>>,
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
- String instantTime,
- Option<Map<String, String>>
extraMetadata) {
- super(context, config, table, instantTime, extraMetadata);
- }
-
- @Override
- protected Option<HoodieCleanerPlan> createCleanerPlan() {
- return super.execute();
- }
-
-}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java
deleted file mode 100644
index 7d60b28..0000000
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.restore;
-
-import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieRollbackException;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
-
-import org.apache.spark.api.java.JavaRDD;
-
-@SuppressWarnings("checkstyle:LineLength")
-public class SparkCopyOnWriteRestoreActionExecutor<T extends
HoodieRecordPayload> extends
- BaseRestoreActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> {
-
- public SparkCopyOnWriteRestoreActionExecutor(HoodieSparkEngineContext
context,
- HoodieWriteConfig config,
- HoodieTable table,
- String instantTime,
- String restoreInstantTime) {
- super(context, config, table, instantTime, restoreInstantTime);
- }
-
- @Override
- protected HoodieRollbackMetadata rollbackInstant(HoodieInstant
instantToRollback) {
- if (!instantToRollback.getAction().equals(HoodieTimeline.COMMIT_ACTION)
- &&
!instantToRollback.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
- throw new HoodieRollbackException("Unsupported action in rollback
instant:" + instantToRollback);
- }
- table.getMetaClient().reloadActiveTimeline();
- String instantTime = HoodieActiveTimeline.createNewInstantTime();
- table.scheduleRollback(context, instantTime, instantToRollback, false);
- table.getMetaClient().reloadActiveTimeline();
- CopyOnWriteRollbackActionExecutor rollbackActionExecutor = new
CopyOnWriteRollbackActionExecutor(
- (HoodieSparkEngineContext) context,
- config,
- table,
- instantTime,
- instantToRollback,
- true,
- true,
- false);
- return rollbackActionExecutor.execute();
- }
-}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
index 10c7ced..fde34b6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
@@ -22,10 +22,13 @@ import
org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
@@ -61,6 +64,10 @@ public abstract class HoodieEngineContext {
public abstract <I, K, V> List<V> mapToPairAndReduceByKey(
List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc,
SerializableBiFunction<V, V, V> reduceFunc, int parallelism);
+ public abstract <I, K, V> Stream<ImmutablePair<K, V>>
mapPartitionsToPairAndReduceByKey(
+ Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V>
flatMapToPairFunc,
+ SerializableBiFunction<V, V, V> reduceFunc, int parallelism);
+
public abstract <I, K, V> List<V> reduceByKey(
List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int
parallelism);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
index 1c935ff..ca032e7 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
@@ -19,15 +19,19 @@
package org.apache.hudi.common.engine;
import org.apache.hadoop.conf.Configuration;
+
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -35,6 +39,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.stream.Collectors.toList;
+import static
org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapToPairWrapper;
import static
org.apache.hudi.common.function.FunctionWrapper.throwingFlatMapWrapper;
import static
org.apache.hudi.common.function.FunctionWrapper.throwingForeachWrapper;
import static
org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
@@ -69,6 +74,17 @@ public final class HoodieLocalEngineContext extends
HoodieEngineContext {
}
@Override
+ public <I, K, V> Stream<ImmutablePair<K, V>>
mapPartitionsToPairAndReduceByKey(
+ Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V>
flatMapToPairFunc,
+ SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+ return
throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator())
+ .collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream()
+ .map(entry -> new ImmutablePair<>(entry.getKey(),
entry.getValue().stream().map(
+
Pair::getValue).reduce(throwingReduceWrapper(reduceFunc)).orElse(null)))
+ .filter(Objects::nonNull);
+ }
+
+ @Override
public <I, K, V> List<V> reduceByKey(
List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int
parallelism) {
return data.stream().parallel()
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/function/FunctionWrapper.java
b/hudi-common/src/main/java/org/apache/hudi/common/function/FunctionWrapper.java
index b729e48..40e1a9d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/function/FunctionWrapper.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/function/FunctionWrapper.java
@@ -72,6 +72,17 @@ public class FunctionWrapper {
};
}
+ public static <I, K, V> Function<I, Stream<Pair<K, V>>>
throwingFlatMapToPairWrapper(
+ SerializablePairFlatMapFunction<I, K, V> throwingPairFlatMapFunction) {
+ return v1 -> {
+ try {
+ return throwingPairFlatMapFunction.call(v1);
+ } catch (Exception e) {
+ throw new HoodieException("Error occurs when executing mapToPair", e);
+ }
+ };
+ }
+
public static <V> BinaryOperator<V>
throwingReduceWrapper(SerializableBiFunction<V, V, V> throwingReduceFunction) {
return (v1, v2) -> {
try {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/function/SerializablePairFlatMapFunction.java
b/hudi-common/src/main/java/org/apache/hudi/common/function/SerializablePairFlatMapFunction.java
new file mode 100644
index 0000000..4cc34ce
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/function/SerializablePairFlatMapFunction.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.function;
+
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.io.Serializable;
+import java.util.stream.Stream;
+
+/**
+ * A function that returns a stream of key-value pairs (Pair<K, V>).
+ */
+@FunctionalInterface
+public interface SerializablePairFlatMapFunction<I, K, V> extends Serializable
{
+ Stream<Pair<K, V>> call(I t) throws Exception;
+}