nsivabalan commented on a change in pull request #3651:
URL: https://github.com/apache/hudi/pull/3651#discussion_r706989965
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
##########
@@ -417,6 +425,19 @@ public HoodieActiveTimeline getActiveTimeline() {
*/
public abstract HoodieCleanMetadata clean(HoodieEngineContext context,
String cleanInstantTime);
+ /**
+ * Schedule rollback for the instant time.
+ *
+ * @param context HoodieEngineContext
+ * @param instantTime Instant Time for scheduling rollback
+ * @param instantToRollback instant to be rolled back
+ * @return HoodieRollbackPlan containing info on rollback.
+ */
+ public abstract Option<HoodieRollbackPlan>
scheduleRollback(HoodieEngineContext context,
Review comment:
Naming this as scheduleRollback to be in line w/ existing clean,
compaction etc. But all this method is doing is creation a requested instant
and serializing the RollbackPlan. Open to naming this differently
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
##########
@@ -298,6 +299,12 @@ public void rollbackBootstrap(HoodieEngineContext context,
String instantTime) {
return new FlinkScheduleCleanActionExecutor(context, config, this,
instantTime, extraMetadata).execute();
}
+ @Override
+ public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext
context, String instantTime, HoodieInstant instantToRollback,
+ boolean
skipTimelinePublish) {
+ return null;
Review comment:
yet to fix flink and java. will be fixing it in a day or two.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -85,22 +113,215 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient
metaClient, HoodieWriteC
public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext
context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest>
rollbackRequests) {
int sparkPartitions = Math.max(Math.min(rollbackRequests.size(),
config.getRollbackParallelism()), 1);
context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback
stats for upgrade/downgrade");
- JavaPairRDD<String, HoodieRollbackStat> partitionPathRollbackStatsPairRDD
= maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests,
sparkPartitions, false);
+ JavaPairRDD<String, HoodieRollbackStat> partitionPathRollbackStatsPairRDD
= maybeDeleteAndCollectStatsForUpgrade(context, instantToRollback,
rollbackRequests, sparkPartitions, false);
Review comment:
Do we need to fix upgrade as well. As of now, upgrade sends
List<ListingBasedRollbackRequest> and rest of the code based uses
HoodieListingBasedRollbackRequest.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java
##########
@@ -48,20 +48,23 @@ public
SparkCopyOnWriteRestoreActionExecutor(HoodieSparkEngineContext context,
@Override
protected HoodieRollbackMetadata rollbackInstant(HoodieInstant
instantToRollback) {
+ if (!instantToRollback.getAction().equals(HoodieTimeline.COMMIT_ACTION)
Review comment:
moved code from below to here.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
##########
@@ -53,11 +54,6 @@
private static final Logger LOG =
LogManager.getLogger(BaseRollbackActionExecutor.class);
- interface RollbackStrategy extends Serializable {
Review comment:
Note to reviewer: RollbackStrategy is now moved to planActionExecutor
and also interface methods have changed.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -85,22 +113,215 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient
metaClient, HoodieWriteC
public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext
context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest>
rollbackRequests) {
int sparkPartitions = Math.max(Math.min(rollbackRequests.size(),
config.getRollbackParallelism()), 1);
context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback
stats for upgrade/downgrade");
- JavaPairRDD<String, HoodieRollbackStat> partitionPathRollbackStatsPairRDD
= maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests,
sparkPartitions, false);
+ JavaPairRDD<String, HoodieRollbackStat> partitionPathRollbackStatsPairRDD
= maybeDeleteAndCollectStatsForUpgrade(context, instantToRollback,
rollbackRequests, sparkPartitions, false);
return partitionPathRollbackStatsPairRDD.map(Tuple2::_2).collect();
}
/**
* May be delete interested files and collect stats or collect stats only.
*
- * @param context instance of {@link HoodieEngineContext} to use.
+ * @param context instance of {@link HoodieEngineContext} to use.
+ * @param instantToRollback {@link HoodieInstant} of interest for which
deletion or collect stats is requested.
+ * @param rollbackRequests List of {@link ListingBasedRollbackRequest} to
be operated on.
+ * @param sparkPartitions number of spark partitions to use for
parallelism.
+ * @return stats collected with or w/o actual deletions.
+ */
+ JavaRDD<HoodieListingBasedRollbackRequest>
getListingBasedRollbackRequests(HoodieEngineContext context, HoodieInstant
instantToRollback,
+
List<ListingBasedRollbackRequest> rollbackRequests, int sparkPartitions) {
+ JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
+ return jsc.parallelize(rollbackRequests,
sparkPartitions).map(rollbackRequest -> {
+ switch (rollbackRequest.getType()) {
+ case DELETE_DATA_FILES_ONLY: {
+ final FileStatus[] filesToDeletedStatus =
getBaseFilesToBeDeleted(metaClient, config, instantToRollback.getTimestamp(),
+ rollbackRequest.getPartitionPath(), metaClient.getFs());
+ List<String> filesToBeDeleted =
Arrays.stream(filesToDeletedStatus).map(fileStatus ->
fileStatus.getPath().toString()).collect(Collectors.toList());
+ return new
HoodieListingBasedRollbackRequest(rollbackRequest.getPartitionPath(),
+ "", "", filesToBeDeleted, Collections.EMPTY_MAP);
+ }
+ case DELETE_DATA_AND_LOG_FILES: {
+ final FileStatus[] filesToDeletedStatus =
getBaseAndLogFilesToBeDeleted(instantToRollback.getTimestamp(),
rollbackRequest.getPartitionPath(), metaClient.getFs());
+ List<String> filesToBeDeleted =
Arrays.stream(filesToDeletedStatus).map(fileStatus ->
fileStatus.getPath().toString()).collect(Collectors.toList());
+ return new
HoodieListingBasedRollbackRequest(rollbackRequest.getPartitionPath(), "", "",
filesToBeDeleted, Collections.EMPTY_MAP);
+ }
+ case APPEND_ROLLBACK_BLOCK: {
+ String fileId = rollbackRequest.getFileId().get();
+ String latestBaseInstant =
rollbackRequest.getLatestBaseInstant().get();
+ // collect all log files that is supposed to be deleted with this
rollback
+ Map<FileStatus, Long> writtenLogFileSizeMap =
FSUtils.getAllLogFiles(metaClient.getFs(),
+ FSUtils.getPartitionPath(config.getBasePath(),
rollbackRequest.getPartitionPath()),
+ fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(),
latestBaseInstant)
+ .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value ->
value.getFileStatus().getLen()));
+ Map<String, Long> logFilesToBeDeleted = new HashMap<>();
+ for (Map.Entry<FileStatus, Long> fileToBeDeleted :
writtenLogFileSizeMap.entrySet()) {
+
logFilesToBeDeleted.put(fileToBeDeleted.getKey().getPath().toString(),
fileToBeDeleted.getValue());
+ }
+ return new
HoodieListingBasedRollbackRequest(rollbackRequest.getPartitionPath(), fileId,
latestBaseInstant,
+ Collections.EMPTY_LIST, logFilesToBeDeleted);
+ }
+ default:
+ throw new IllegalStateException("Unknown Rollback action " +
rollbackRequest);
+ }
+ });
+ }
+
+ /**
+ * May be delete interested files and collect stats or collect stats only.
+ *
+ * @param context instance of {@link HoodieEngineContext} to use.
+ * @param instantToRollback {@link HoodieInstant} of interest for which
deletion or collect stats is requested.
+ * @param rollbackRequests List of {@link ListingBasedRollbackRequest} to
be operated on.
+ * @param sparkPartitions number of spark partitions to use for
parallelism.
+ * @param doDelete {@code true} if deletion has to be done. {@code
false} if only stats are to be collected w/o performing any deletes.
+ * @return stats collected with or w/o actual deletions.
+ */
+ private List<Tuple2<String, HoodieRollbackStat>>
maybeDeleteAndCollectStatsSequential(HoodieEngineContext context, HoodieInstant
instantToRollback,
Review comment:
Due to the issues reported in L 94, doing it sequentically here. once I
crack this, will update the patch and will remove the other method.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.avro.model.HoodieInstantInfo;
+import org.apache.hudi.avro.model.HoodieListingBasedRollbackRequest;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+public abstract class BaseRollbackPlanActionExecutor<T extends
HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O,
Option<HoodieRollbackPlan>> {
+
+ private static final Logger LOG =
LogManager.getLogger(BaseRollbackPlanActionExecutor.class);
+
+ protected final HoodieInstant instantToRollback;
+ private final boolean skipTimelinePublish;
+
+ public static final Integer ROLLBACK_PLAN_VERSION_1 = 1;
+ public static final Integer LATEST_ROLLBACK_PLAN_VERSION =
ROLLBACK_PLAN_VERSION_1;
Review comment:
I got inspired from CleanPlanner.
```
public static final Integer CLEAN_PLAN_VERSION_1 =
CleanPlanV1MigrationHandler.VERSION;
public static final Integer CLEAN_PLAN_VERSION_2 =
CleanPlanV2MigrationHandler.VERSION;
public static final Integer LATEST_CLEAN_PLAN_VERSION =
CLEAN_PLAN_VERSION_2;
```
##########
File path:
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaCopyOnWriteRollbackActionExecutor.java
##########
@@ -54,19 +55,25 @@ public
JavaCopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
}
@Override
- protected BaseRollbackActionExecutor.RollbackStrategy getRollbackStrategy() {
+ List<HoodieRollbackStat> rollbackAndGetStats(HoodieInstant
instantToRollback, HoodieRollbackPlan rollbackPlan) {
+ return null;
+ }
+
+ /*
+ @Override
Review comment:
yet to fix java
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
##########
@@ -316,6 +317,13 @@ public HoodieTimeline getCleanTimeline() {
return getActiveTimeline().getCleanerTimeline();
}
+ /**
+ * Get rollback timeline.
+ */
+ public HoodieTimeline getRollbackTimeline() {
Review comment:
yes, we do have for others.
getCompletedCleanTimeline
getCleanTimeline
getCompletedSavepointTimeline
getSavepoints
##########
File path: hudi-common/src/main/avro/HoodieListingBasedRollbackRequest.avsc
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ "namespace": "org.apache.hudi.avro.model",
+ "type": "record",
+ "name": "HoodieListingBasedRollbackRequest",
Review comment:
👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]