nsivabalan commented on a change in pull request #4489:
URL: https://github.com/apache/hudi/pull/4489#discussion_r818130802
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
##########
@@ -192,9 +192,10 @@ private synchronized FileSystemViewManager
getViewManager() {
* @param context HoodieEngineContext
* @param instantTime Instant Time for the action
* @param partitions {@link List} of partition to be deleted
+ * @param purge check if partition's path to be deleted
* @return HoodieWriteMetadata
*/
- public abstract HoodieWriteMetadata deletePartitions(HoodieEngineContext
context, String instantTime, List<String> partitions);
Review comment:
this is public method. atleast can we introduce another overloaded
method which will pass in some default value for purge?
@xushiyan : I am not too strong on this comment. whats your thought here.
its ok to change this I guess?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -472,4 +488,43 @@ private boolean
isFileSliceNeededForPendingCompaction(FileSlice fileSlice) {
private boolean isFileGroupInPendingCompaction(HoodieFileGroup fg) {
return fgIdToPendingCompactionOperations.containsKey(fg.getFileGroupId());
}
+
+ public boolean isDeletePartitionOperation(Option<HoodieInstant>
instantToRetain) {
+ try {
+ if (instantToRetain.isPresent() &&
HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instantToRetain.get().getAction()))
{
+ HoodieReplaceCommitMetadata replaceCommitMetadata =
HoodieReplaceCommitMetadata.fromBytes(
+
hoodieTable.getActiveTimeline().getInstantDetails(instantToRetain.get()).get(),
HoodieReplaceCommitMetadata.class);
+
+ if (replaceCommitMetadata != null
+ &&
WriteOperationType.DELETE_PARTITION.equals(replaceCommitMetadata.getOperationType()))
{
+ return true;
+ }
+ }
+ } catch (Exception e) {
+ throw new HoodieException("Failed to get commit metadata", e);
+ }
+ return false;
+ }
+
+ public List<String> getDropPartitionPathsForClean(Option<HoodieInstant>
earliestRetainedInstant) {
+ List<String> dropPartitions = new ArrayList<>();
+ if (!earliestRetainedInstant.isPresent()) {
+ LOG.info("No earliest commit to retain. No need to scan partitions !!");
+ return dropPartitions;
+ }
+
+ try {
+ if (isDeletePartitionOperation(earliestRetainedInstant)) {
Review comment:
guess we can't just check the earliestRetainedInstant alone here.
for the same reason quoted above. we have to check for all instants prior to
this from last cleaned up instant.
can we revisit this logic please.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
##########
@@ -42,27 +43,33 @@
public class SparkDeletePartitionCommitActionExecutor<T extends
HoodieRecordPayload<T>>
extends SparkInsertOverwriteCommitActionExecutor<T> {
- private List<String> partitions;
+ private final List<String> partitions;
+
public SparkDeletePartitionCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable table,
- String instantTime,
List<String> partitions) {
- super(context, config, table, instantTime,null,
WriteOperationType.DELETE_PARTITION);
+ String instantTime,
List<String> partitions,
+ boolean purge) {
Review comment:
guess purge is never used anymore. can we fix that.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -257,9 +256,13 @@ public HoodieWriteResult
insertOverwriteTable(JavaRDD<HoodieRecord<T>> records,
}
public HoodieWriteResult deletePartitions(List<String> partitions, String
instantTime) {
+ return deletePartitions(partitions, instantTime, false);
+ }
+
+ public HoodieWriteResult deletePartitions(List<String> partitions, String
instantTime, boolean purge) {
Review comment:
whats the necessity for last argument? I see only one usage is L 259.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -368,16 +377,23 @@ public CleanPlanner(HoodieEngineContext context,
HoodieTable<T, I, K, O> hoodieT
private List<CleanFileInfo> getReplacedFilesEligibleToClean(List<String>
savepointedFiles, String partitionPath, Option<HoodieInstant>
earliestCommitToRetain) {
final Stream<HoodieFileGroup> replacedGroups;
- if (earliestCommitToRetain.isPresent()) {
+ if (earliestCommitToRetain.isPresent() &&
!isDeletePartitionOperation(earliestCommitToRetain)) {
Review comment:
not sure if we can do this.
lets say,
C1, C2... C4(clustering in partition P1),... C6(delete partition with
partition P2)...
and earliest commit to retain is
1. lets say C6. we need to delete all file groups in P2 and any other
replaced file groups < C6. i.e.
fileSystemView.getReplacedFileGroupsBefore(earliestCommitToRetain.get().getTimestamp(),
partitionPath);
2. lets say C7. again, we need to delete all file groups in P2 and any other
replaced file groups < C7.
Can you check the logic if we are covering both these cases.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -149,6 +153,8 @@ public CleanPlanner(HoodieEngineContext context,
HoodieTable<T, I, K, O> hoodieT
return Collections.emptyList();
}
+ Stream<String> dropPartitionsStream =
getDropPartitionPathsForClean(instantToRetain).stream();
Review comment:
partitionsToDeleteStream.
getPartitionsToDeleteForClean(..)
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -368,16 +377,23 @@ public CleanPlanner(HoodieEngineContext context,
HoodieTable<T, I, K, O> hoodieT
private List<CleanFileInfo> getReplacedFilesEligibleToClean(List<String>
savepointedFiles, String partitionPath, Option<HoodieInstant>
earliestCommitToRetain) {
final Stream<HoodieFileGroup> replacedGroups;
- if (earliestCommitToRetain.isPresent()) {
+ if (earliestCommitToRetain.isPresent() &&
!isDeletePartitionOperation(earliestCommitToRetain)) {
replacedGroups =
fileSystemView.getReplacedFileGroupsBefore(earliestCommitToRetain.get().getTimestamp(),
partitionPath);
} else {
replacedGroups = fileSystemView.getAllReplacedFileGroups(partitionPath);
}
- return replacedGroups.flatMap(HoodieFileGroup::getAllFileSlices)
+
+ Stream<CleanFileInfo> replacedGroupsStream =
replacedGroups.flatMap(HoodieFileGroup::getAllFileSlices)
// do not delete savepointed files (archival will make sure
corresponding replacecommit file is not deleted)
.filter(slice -> !slice.getBaseFile().isPresent() ||
!savepointedFiles.contains(slice.getBaseFile().get().getFileName()))
- .flatMap(slice -> getCleanFileInfoForSlice(slice).stream())
- .collect(Collectors.toList());
+ .flatMap(slice -> getCleanFileInfoForSlice(slice).stream());
+
+ Stream<CleanFileInfo> dropPartitionsStream =
getDropPartitionPathsForClean(earliestCommitToRetain)
Review comment:
we can do something like this to get the list of partitions to be
deleted w/o relying on earliest instant to retain.
at L 385,
we can call getAllFileGroups() and filter out replacedGroups(collected in
lines 380 to 384). If resultant set if empty, we know we need to clean up
entire partition and hence add partition path to the list of paths to be
cleaned up.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -149,6 +153,8 @@ public CleanPlanner(HoodieEngineContext context,
HoodieTable<T, I, K, O> hoodieT
return Collections.emptyList();
}
+ Stream<String> dropPartitionsStream =
getDropPartitionPathsForClean(instantToRetain).stream();
Review comment:
so, if I understand correctly, we don't support deletion of partitions
with cleaning policy KEEP_LATEST_FILE_VERSIONS ?
is my understanding correct?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]