This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f96ba7a [HUDI-3642] Handle NPE due to empty requested replacecommit
metadata (#5090)
f96ba7a is described below
commit f96ba7abf043246e98f550f858e047411a12c550
Author: Sagar Sumit <[email protected]>
AuthorDate: Thu Mar 24 00:43:02 2022 +0530
[HUDI-3642] Handle NPE due to empty requested replacecommit metadata (#5090)
---
.../client/transaction/ConcurrentOperation.java | 48 ++++++++++++++++------
.../org/apache/hudi/common/util/CommitUtils.java | 2 +-
2 files changed, 37 insertions(+), 13 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
index e78a157..40da7dc 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
@@ -18,7 +18,6 @@
package org.apache.hudi.client.transaction;
-import java.io.IOException;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.client.utils.MetadataConversionUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -27,15 +26,18 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.Option;
+
+import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.stream.Collectors;
-import org.apache.hudi.common.util.Option;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+import static
org.apache.hudi.common.util.CommitUtils.getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord;
/**
* This class is used to hold all information used to identify how to resolve
conflicts between instants.
@@ -52,7 +54,7 @@ public class ConcurrentOperation {
private final String instantTime;
private Set<String> mutatedFileIds = Collections.EMPTY_SET;
- public ConcurrentOperation(HoodieInstant instant, HoodieTableMetaClient
metaClient) throws IOException {
+ public ConcurrentOperation(HoodieInstant instant, HoodieTableMetaClient
metaClient) throws IOException {
this.metadataWrapper = new
HoodieMetadataWrapper(MetadataConversionUtils.createMetaWrapper(instant,
metaClient));
this.commitMetadataOption = Option.empty();
this.actionState = instant.getState().name();
@@ -106,24 +108,37 @@ public class ConcurrentOperation {
break;
case COMMIT_ACTION:
case DELTA_COMMIT_ACTION:
- this.mutatedFileIds =
CommitUtils.getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata()
+ this.mutatedFileIds =
getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata()
.getPartitionToWriteStats()).keySet();
this.operationType =
WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata().getOperationType());
break;
case REPLACE_COMMIT_ACTION:
if (instant.isCompleted()) {
- this.mutatedFileIds =
CommitUtils.getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(
+ this.mutatedFileIds =
getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(
this.metadataWrapper.getMetadataFromTimeline().getHoodieReplaceCommitMetadata().getPartitionToWriteStats()).keySet();
this.operationType =
WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieReplaceCommitMetadata().getOperationType());
} else {
+ // we need to have different handling for requested and inflight
replacecommit because
+ // for requested replacecommit, clustering will generate a plan
and HoodieRequestedReplaceMetadata will not be empty, but
insert_overwrite/insert_overwrite_table could have empty content
+ // for inflight replacecommit, clustering will have no content in
metadata, but insert_overwrite/insert_overwrite_table will have some commit
metadata
HoodieRequestedReplaceMetadata requestedReplaceMetadata =
this.metadataWrapper.getMetadataFromTimeline().getHoodieRequestedReplaceMetadata();
- this.mutatedFileIds = requestedReplaceMetadata
- .getClusteringPlan().getInputGroups()
- .stream()
- .flatMap(ig -> ig.getSlices().stream())
- .map(file -> file.getFileId())
- .collect(Collectors.toSet());
- this.operationType = WriteOperationType.CLUSTER;
+ org.apache.hudi.avro.model.HoodieCommitMetadata
inflightCommitMetadata =
this.metadataWrapper.getMetadataFromTimeline().getHoodieInflightReplaceMetadata();
+ if (instant.isRequested()) {
+ if (requestedReplaceMetadata != null) {
+ this.mutatedFileIds =
getFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata);
+ this.operationType = WriteOperationType.CLUSTER;
+ }
+ } else {
+ if (inflightCommitMetadata != null) {
+ this.mutatedFileIds =
getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(inflightCommitMetadata.getPartitionToWriteStats()).keySet();
+ this.operationType =
WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata().getOperationType());
+ } else if (requestedReplaceMetadata != null) {
+ // inflight replacecommit metadata is empty due to clustering,
read fileIds from requested replacecommit
+ this.mutatedFileIds =
getFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata);
+ this.operationType = WriteOperationType.CLUSTER;
+ }
+ // NOTE: it cannot be the case that instant is inflight, and
both the requested and inflight replacecommit metadata are empty
+ }
}
break;
default:
@@ -142,6 +157,15 @@ public class ConcurrentOperation {
}
}
+ private static Set<String>
getFileIdsFromRequestedReplaceMetadata(HoodieRequestedReplaceMetadata
requestedReplaceMetadata) {
+ return requestedReplaceMetadata
+ .getClusteringPlan().getInputGroups()
+ .stream()
+ .flatMap(ig -> ig.getSlices().stream())
+ .map(file -> file.getFileId())
+ .collect(Collectors.toSet());
+ }
+
@Override
public String toString() {
return "{"
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
index 9970687..08b775f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
@@ -97,7 +97,7 @@ public class CommitUtils {
String
commitActionType,
WriteOperationType operationType) {
final HoodieCommitMetadata commitMetadata;
- if (commitActionType == HoodieTimeline.REPLACE_COMMIT_ACTION) {
+ if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(commitActionType)) {
HoodieReplaceCommitMetadata replaceMetadata = new
HoodieReplaceCommitMetadata();
replaceMetadata.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
commitMetadata = replaceMetadata;