This is an automated email from the ASF dual-hosted git repository.
nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 698694a [HUDI-1498] Read clustering plan from requested file for
inflight instant (#2389)
698694a is described below
commit 698694a1571cdcc9848fc79aa34c8cbbf9662bc4
Author: satishkotha <[email protected]>
AuthorDate: Mon Jan 4 10:36:44 2021 -0800
[HUDI-1498] Read clustering plan from requested file for inflight instant
(#2389)
---
.../org/apache/hudi/common/util/ClusteringUtils.java | 20 +++++++++++++++-----
.../common/table/view/TestIncrementalFSViewSync.java | 15 +++++++++++++--
.../apache/hudi/common/util/TestClusteringUtils.java | 16 ++++++++++++++++
3 files changed, 44 insertions(+), 7 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index 5cdb6fc..fcc3274 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
@@ -68,21 +69,30 @@ public class ClusteringUtils {
.filter(Option::isPresent).map(Option::get);
}
- public static Option<Pair<HoodieInstant, HoodieClusteringPlan>>
getClusteringPlan(HoodieTableMetaClient metaClient, HoodieInstant
requestedReplaceInstant) {
+ public static Option<Pair<HoodieInstant, HoodieClusteringPlan>>
getClusteringPlan(HoodieTableMetaClient metaClient, HoodieInstant
pendingReplaceInstant) {
try {
- Option<byte[]> content =
metaClient.getActiveTimeline().getInstantDetails(requestedReplaceInstant);
+ final HoodieInstant requestedInstant;
+ if (!pendingReplaceInstant.isRequested()) {
+ // inflight replacecommit files don't have clustering plan.
+ // This is because replacecommit inflight can have workload profile
for 'insert_overwrite'.
+ // Get the plan from corresponding requested instant.
+ requestedInstant =
HoodieTimeline.getReplaceCommitRequestedInstant(pendingReplaceInstant.getTimestamp());
+ } else {
+ requestedInstant = pendingReplaceInstant;
+ }
+ Option<byte[]> content =
metaClient.getActiveTimeline().getInstantDetails(requestedInstant);
if (!content.isPresent() || content.get().length == 0) {
// few operations create requested file without any content. Assume
these are not clustering
- LOG.warn("No content found in requested file for instant " +
requestedReplaceInstant);
+ LOG.warn("No content found in requested file for instant " +
pendingReplaceInstant);
return Option.empty();
}
HoodieRequestedReplaceMetadata requestedReplaceMetadata =
TimelineMetadataUtils.deserializeRequestedReplaceMetadta(content.get());
if
(WriteOperationType.CLUSTER.name().equals(requestedReplaceMetadata.getOperationType()))
{
- return Option.of(Pair.of(requestedReplaceInstant,
requestedReplaceMetadata.getClusteringPlan()));
+ return Option.of(Pair.of(pendingReplaceInstant,
requestedReplaceMetadata.getClusteringPlan()));
}
return Option.empty();
} catch (IOException e) {
- throw new HoodieIOException("Error reading clustering plan " +
requestedReplaceInstant.getTimestamp(), e);
+ throw new HoodieIOException("Error reading clustering plan " +
pendingReplaceInstant.getTimestamp(), e);
}
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
index e4933cf..5400dc4 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.table.view;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.HoodieCleanStat;
@@ -35,6 +36,7 @@ import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
@@ -857,11 +859,20 @@ public class TestIncrementalFSViewSync extends
HoodieCommonTestHarness {
private List<String> addReplaceInstant(HoodieTableMetaClient metaClient,
String instant,
List<Pair<String, HoodieWriteStat>>
writeStats,
Map<String, List<String>>
partitionToReplaceFileIds) throws IOException {
+ // created requested
+ HoodieInstant newRequestedInstant = new
HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.REPLACE_COMMIT_ACTION, instant);
+ HoodieRequestedReplaceMetadata requestedReplaceMetadata =
HoodieRequestedReplaceMetadata.newBuilder()
+ .setOperationType(WriteOperationType.UNKNOWN.name()).build();
+
metaClient.getActiveTimeline().saveToPendingReplaceCommit(newRequestedInstant,
+
TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));
+
+ metaClient.reloadActiveTimeline();
+ // transition to inflight
+ HoodieInstant inflightInstant =
metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(newRequestedInstant,
Option.empty());
+ // transition to replacecommit
HoodieReplaceCommitMetadata replaceCommitMetadata = new
HoodieReplaceCommitMetadata();
writeStats.forEach(e -> replaceCommitMetadata.addWriteStat(e.getKey(),
e.getValue()));
replaceCommitMetadata.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
- HoodieInstant inflightInstant = new HoodieInstant(true,
HoodieTimeline.REPLACE_COMMIT_ACTION, instant);
- metaClient.getActiveTimeline().createNewInstant(inflightInstant);
metaClient.getActiveTimeline().saveAsComplete(inflightInstant,
Option.of(replaceCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
return writeStats.stream().map(e ->
e.getValue().getPath()).collect(Collectors.toList());
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
index 5d82bbc..54ca072 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
@@ -98,6 +98,22 @@ public class TestClusteringUtils extends
HoodieCommonTestHarness {
validateClusteringInstant(fileIds3, partitionPath1, clusterTime,
fileGroupToInstantMap);
}
+ // replacecommit.inflight doesnt have clustering plan.
+ // Verify that getClusteringPlan fetches content from corresponding
requested file.
+ @Test
+ public void testClusteringPlanInflight() throws Exception {
+ String partitionPath1 = "partition1";
+ List<String> fileIds1 = new ArrayList<>();
+ fileIds1.add(UUID.randomUUID().toString());
+ fileIds1.add(UUID.randomUUID().toString());
+ String clusterTime1 = "1";
+ HoodieInstant requestedInstant =
createRequestedReplaceInstant(partitionPath1, clusterTime1, fileIds1);
+ HoodieInstant inflightInstant =
metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant,
Option.empty());
+ HoodieClusteringPlan requestedClusteringPlan =
ClusteringUtils.getClusteringPlan(metaClient,
requestedInstant).get().getRight();
+ HoodieClusteringPlan inflightClusteringPlan =
ClusteringUtils.getClusteringPlan(metaClient, inflightInstant).get().getRight();
+ assertEquals(requestedClusteringPlan, inflightClusteringPlan);
+ }
+
private void validateClusteringInstant(List<String> fileIds, String
partitionPath,
String expectedInstantTime,
Map<HoodieFileGroupId, HoodieInstant> fileGroupToInstantMap) {
for (String fileId : fileIds) {