This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new fb5fab5ef3 [HUDI-5089] Refactor HoodieCommitMetadata deserialization
(#7055)
fb5fab5ef3 is described below
commit fb5fab5ef39c83ac4443381d4175ff612484e44b
Author: Danny Chan <[email protected]>
AuthorDate: Wed Oct 26 20:03:45 2022 +0800
[HUDI-5089] Refactor HoodieCommitMetadata deserialization (#7055)
---
.../org/apache/hudi/cli/commands/CommitsCommand.java | 10 ++--------
.../hudi/client/utils/MetadataConversionUtils.java | 14 +++-----------
.../apache/hudi/client/utils/TransactionUtils.java | 17 +++--------------
.../hudi/common/table/cdc/HoodieCDCExtractor.java | 12 +++---------
.../hudi/common/table/timeline/TimelineUtils.java | 18 ++++++++++++++++++
.../hudi/sink/partitioner/profile/WriteProfiles.java | 4 ++--
.../realtime/HoodieMergeOnReadTableInputFormat.java | 3 ++-
.../hudi/hadoop/utils/HoodieInputFormatUtils.java | 19 -------------------
.../apache/hudi/MergeOnReadIncrementalRelation.scala | 3 ++-
9 files changed, 35 insertions(+), 65 deletions(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
index e269f8da0c..75f7b9a8b8 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
@@ -23,13 +23,13 @@ import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.NumericUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -413,14 +413,8 @@ public class CommitsCommand {
private Option<HoodieCommitMetadata> getHoodieCommitMetadata(HoodieTimeline
timeline, Option<HoodieInstant> hoodieInstant) throws IOException {
if (hoodieInstant.isPresent()) {
- if
(hoodieInstant.get().getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
- return
Option.of(HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get()).get(),
- HoodieReplaceCommitMetadata.class));
- }
- return
Option.of(HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get()).get(),
- HoodieCommitMetadata.class));
+ return Option.of(TimelineUtils.getCommitMetadata(hoodieInstant.get(),
timeline));
}
-
return Option.empty();
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
index 08d9d34ba2..1e1dea5d84 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
@@ -33,10 +33,10 @@ import
org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieRollingStatMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
@@ -193,16 +193,8 @@ public class MetadataConversionUtils {
}
public static Option<HoodieCommitMetadata>
getHoodieCommitMetadata(HoodieTableMetaClient metaClient, HoodieInstant
hoodieInstant) throws IOException {
- HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
- HoodieTimeline timeline =
activeTimeline.getCommitsTimeline().filterCompletedInstants();
-
- if
(hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
- return
Option.of(HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant).get(),
- HoodieReplaceCommitMetadata.class));
- }
- return
Option.of(HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant).get(),
- HoodieCommitMetadata.class));
-
+ HoodieTimeline timeline =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+ return Option.of(TimelineUtils.getCommitMetadata(hoodieInstant, timeline));
}
public static org.apache.hudi.avro.model.HoodieCommitMetadata
convertCommitMetadata(
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
index ec15effdc4..01d37eaf32 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
@@ -21,10 +21,10 @@ package org.apache.hudi.client.utils;
import org.apache.hudi.client.transaction.ConcurrentOperation;
import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -107,19 +107,8 @@ public class TransactionUtils {
.filterCompletedInstants().lastInstant();
try {
if (hoodieInstantOption.isPresent()) {
- switch (hoodieInstantOption.get().getAction()) {
- case HoodieTimeline.REPLACE_COMMIT_ACTION:
- HoodieReplaceCommitMetadata replaceCommitMetadata =
HoodieReplaceCommitMetadata
-
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstantOption.get()).get(),
HoodieReplaceCommitMetadata.class);
- return Option.of(Pair.of(hoodieInstantOption.get(),
replaceCommitMetadata.getExtraMetadata()));
- case HoodieTimeline.DELTA_COMMIT_ACTION:
- case HoodieTimeline.COMMIT_ACTION:
- HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
-
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstantOption.get()).get(),
HoodieCommitMetadata.class);
- return Option.of(Pair.of(hoodieInstantOption.get(),
commitMetadata.getExtraMetadata()));
- default:
- throw new IllegalArgumentException("Unknown instant action" +
hoodieInstantOption.get().getAction());
- }
+ HoodieCommitMetadata commitMetadata =
TimelineUtils.getCommitMetadata(hoodieInstantOption.get(),
metaClient.getActiveTimeline());
+ return Option.of(Pair.of(hoodieInstantOption.get(),
commitMetadata.getExtraMetadata()));
} else {
return Option.empty();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
index 59f4a8779a..f4cb5fec4a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
@@ -31,7 +31,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
@@ -219,15 +219,9 @@ public class HoodieCDCExtractor {
&& instantRange.isInRange(instant.getTimestamp())
&&
requiredActions.contains(instant.getAction().toLowerCase(Locale.ROOT))
).map(instant -> {
- HoodieCommitMetadata commitMetadata;
+ final HoodieCommitMetadata commitMetadata;
try {
- if
(instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
- commitMetadata = HoodieReplaceCommitMetadata.fromBytes(
- activeTimeLine.getInstantDetails(instant).get(),
HoodieReplaceCommitMetadata.class);
- } else {
- commitMetadata = HoodieCommitMetadata.fromBytes(
- activeTimeLine.getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
- }
+ commitMetadata = TimelineUtils.getCommitMetadata(instant,
activeTimeLine);
} catch (IOException e) {
throw new HoodieIOException(e.getMessage());
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
index 75493e7b46..a2e88b8009 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
@@ -209,4 +209,22 @@ public class TimelineUtils {
}
return activeTimeline;
}
+
+ /**
+ * Returns the commit metadata of the given instant.
+ *
+ * @param instant The hoodie instant
+ * @param timeline The timeline
+ * @return the commit metadata
+ */
+ public static HoodieCommitMetadata getCommitMetadata(
+ HoodieInstant instant,
+ HoodieTimeline timeline) throws IOException {
+ byte[] data = timeline.getInstantDetails(instant).get();
+ if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+ return HoodieReplaceCommitMetadata.fromBytes(data,
HoodieReplaceCommitMetadata.class);
+ } else {
+ return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
+ }
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
index 90c58687db..eaaa1a5a08 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
@@ -24,10 +24,10 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.core.fs.Path;
@@ -233,7 +233,7 @@ public class WriteProfiles {
HoodieInstant instant,
HoodieTimeline timeline) {
try {
- return HoodieInputFormatUtils.getCommitMetadata(instant, timeline);
+ return TimelineUtils.getCommitMetadata(instant, timeline);
} catch (IOException e) {
LOG.error("Get write metadata for table {} with instant {} and path: {}
error",
tableName, instant.getTimestamp(), basePath);
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
index 64fc54392a..b5c7303709 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
@@ -36,6 +36,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
@@ -156,7 +157,7 @@ public class HoodieMergeOnReadTableInputFormat extends
HoodieCopyOnWriteTableInp
List<HoodieCommitMetadata> metadataList = commitsToCheck
.get().stream().map(instant -> {
try {
- return HoodieInputFormatUtils.getCommitMetadata(instant,
commitsTimelineToReturn);
+ return TimelineUtils.getCommitMetadata(instant,
commitsTimelineToReturn);
} catch (IOException e) {
throw new HoodieException(String.format("cannot get metadata for
instant: %s", instant));
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index 7c5e76b2bd..1af2ef3050 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -37,7 +37,6 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
-import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -498,22 +497,4 @@ public class HoodieInputFormatUtils {
.flatMap(Collection::stream)
.collect(Collectors.toSet());
}
-
- /**
- * Returns the commit metadata of the given instant.
- *
- * @param instant The hoodie instant
- * @param timeline The timeline
- * @return the commit metadata
- */
- public static HoodieCommitMetadata getCommitMetadata(
- HoodieInstant instant,
- HoodieTimeline timeline) throws IOException {
- byte[] data = timeline.getInstantDetails(instant).get();
- if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
- return HoodieReplaceCommitMetadata.fromBytes(data,
HoodieReplaceCommitMetadata.class);
- } else {
- return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
- }
- }
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 446c806b18..0b310c8e28 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -21,11 +21,12 @@ import org.apache.hadoop.fs.{FileStatus, GlobPattern, Path}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.TimelineUtils.getCommitMetadata
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.exception.HoodieException
-import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.{getCommitMetadata,
getWritePartitionPaths, listAffectedFilesForCommits}
+import
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.{getWritePartitionPaths,
listAffectedFilesForCommits}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow