This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new fb5fab5ef3 [HUDI-5089] Refactor HoodieCommitMetadata deserialization 
(#7055)
fb5fab5ef3 is described below

commit fb5fab5ef39c83ac4443381d4175ff612484e44b
Author: Danny Chan <[email protected]>
AuthorDate: Wed Oct 26 20:03:45 2022 +0800

    [HUDI-5089] Refactor HoodieCommitMetadata deserialization (#7055)
---
 .../org/apache/hudi/cli/commands/CommitsCommand.java  | 10 ++--------
 .../hudi/client/utils/MetadataConversionUtils.java    | 14 +++-----------
 .../apache/hudi/client/utils/TransactionUtils.java    | 17 +++--------------
 .../hudi/common/table/cdc/HoodieCDCExtractor.java     | 12 +++---------
 .../hudi/common/table/timeline/TimelineUtils.java     | 18 ++++++++++++++++++
 .../hudi/sink/partitioner/profile/WriteProfiles.java  |  4 ++--
 .../realtime/HoodieMergeOnReadTableInputFormat.java   |  3 ++-
 .../hudi/hadoop/utils/HoodieInputFormatUtils.java     | 19 -------------------
 .../apache/hudi/MergeOnReadIncrementalRelation.scala  |  3 ++-
 9 files changed, 35 insertions(+), 65 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
index e269f8da0c..75f7b9a8b8 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
@@ -23,13 +23,13 @@ import org.apache.hudi.cli.HoodiePrintHelper;
 import org.apache.hudi.cli.HoodieTableHeaderFields;
 import org.apache.hudi.cli.TableHeader;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
 import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.util.NumericUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
@@ -413,14 +413,8 @@ public class CommitsCommand {
 
   private Option<HoodieCommitMetadata> getHoodieCommitMetadata(HoodieTimeline 
timeline, Option<HoodieInstant> hoodieInstant) throws IOException {
     if (hoodieInstant.isPresent()) {
-      if 
(hoodieInstant.get().getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
-        return 
Option.of(HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get()).get(),
-            HoodieReplaceCommitMetadata.class));
-      }
-      return 
Option.of(HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get()).get(),
-          HoodieCommitMetadata.class));
+      return Option.of(TimelineUtils.getCommitMetadata(hoodieInstant.get(), 
timeline));
     }
-
     return Option.empty();
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
index 08d9d34ba2..1e1dea5d84 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
@@ -33,10 +33,10 @@ import 
org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieRollingStatMetadata;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
@@ -193,16 +193,8 @@ public class MetadataConversionUtils {
   }
 
   public static Option<HoodieCommitMetadata> 
getHoodieCommitMetadata(HoodieTableMetaClient metaClient, HoodieInstant 
hoodieInstant) throws IOException {
-    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
-    HoodieTimeline timeline = 
activeTimeline.getCommitsTimeline().filterCompletedInstants();
-
-    if 
(hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
-      return 
Option.of(HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant).get(),
-          HoodieReplaceCommitMetadata.class));
-    }
-    return 
Option.of(HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant).get(),
-        HoodieCommitMetadata.class));
-
+    HoodieTimeline timeline = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+    return Option.of(TimelineUtils.getCommitMetadata(hoodieInstant, timeline));
   }
 
   public static org.apache.hudi.avro.model.HoodieCommitMetadata 
convertCommitMetadata(
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
index ec15effdc4..01d37eaf32 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
@@ -21,10 +21,10 @@ package org.apache.hudi.client.utils;
 import org.apache.hudi.client.transaction.ConcurrentOperation;
 import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
@@ -107,19 +107,8 @@ public class TransactionUtils {
         .filterCompletedInstants().lastInstant();
     try {
       if (hoodieInstantOption.isPresent()) {
-        switch (hoodieInstantOption.get().getAction()) {
-          case HoodieTimeline.REPLACE_COMMIT_ACTION:
-            HoodieReplaceCommitMetadata replaceCommitMetadata = 
HoodieReplaceCommitMetadata
-                
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstantOption.get()).get(),
 HoodieReplaceCommitMetadata.class);
-            return Option.of(Pair.of(hoodieInstantOption.get(), 
replaceCommitMetadata.getExtraMetadata()));
-          case HoodieTimeline.DELTA_COMMIT_ACTION:
-          case HoodieTimeline.COMMIT_ACTION:
-            HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
-                
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstantOption.get()).get(),
 HoodieCommitMetadata.class);
-            return Option.of(Pair.of(hoodieInstantOption.get(), 
commitMetadata.getExtraMetadata()));
-          default:
-            throw new IllegalArgumentException("Unknown instant action" + 
hoodieInstantOption.get().getAction());
-        }
+        HoodieCommitMetadata commitMetadata = 
TimelineUtils.getCommitMetadata(hoodieInstantOption.get(), 
metaClient.getActiveTimeline());
+        return Option.of(Pair.of(hoodieInstantOption.get(), 
commitMetadata.getExtraMetadata()));
       } else {
         return Option.empty();
       }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
index 59f4a8779a..f4cb5fec4a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
@@ -31,7 +31,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
@@ -219,15 +219,9 @@ public class HoodieCDCExtractor {
                   && instantRange.isInRange(instant.getTimestamp())
                   && 
requiredActions.contains(instant.getAction().toLowerCase(Locale.ROOT))
           ).map(instant -> {
-            HoodieCommitMetadata commitMetadata;
+            final HoodieCommitMetadata commitMetadata;
             try {
-              if 
(instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
-                commitMetadata = HoodieReplaceCommitMetadata.fromBytes(
-                    activeTimeLine.getInstantDetails(instant).get(), 
HoodieReplaceCommitMetadata.class);
-              } else {
-                commitMetadata = HoodieCommitMetadata.fromBytes(
-                    activeTimeLine.getInstantDetails(instant).get(), 
HoodieCommitMetadata.class);
-              }
+              commitMetadata = TimelineUtils.getCommitMetadata(instant, 
activeTimeLine);
             } catch (IOException e) {
               throw new HoodieIOException(e.getMessage());
             }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
index 75493e7b46..a2e88b8009 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
@@ -209,4 +209,22 @@ public class TimelineUtils {
     }
     return activeTimeline;
   }
+
+  /**
+   * Returns the commit metadata of the given instant.
+   *
+   * @param instant   The hoodie instant
+   * @param timeline  The timeline
+   * @return the commit metadata
+   */
+  public static HoodieCommitMetadata getCommitMetadata(
+      HoodieInstant instant,
+      HoodieTimeline timeline) throws IOException {
+    byte[] data = timeline.getInstantDetails(instant).get();
+    if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+      return HoodieReplaceCommitMetadata.fromBytes(data, 
HoodieReplaceCommitMetadata.class);
+    } else {
+      return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
+    }
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
index 90c58687db..eaaa1a5a08 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
@@ -24,10 +24,10 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.core.fs.Path;
@@ -233,7 +233,7 @@ public class WriteProfiles {
       HoodieInstant instant,
       HoodieTimeline timeline) {
     try {
-      return HoodieInputFormatUtils.getCommitMetadata(instant, timeline);
+      return TimelineUtils.getCommitMetadata(instant, timeline);
     } catch (IOException e) {
       LOG.error("Get write metadata for table {} with instant {} and path: {} 
error",
           tableName, instant.getTimestamp(), basePath);
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
index 64fc54392a..b5c7303709 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
@@ -36,6 +36,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
@@ -156,7 +157,7 @@ public class HoodieMergeOnReadTableInputFormat extends 
HoodieCopyOnWriteTableInp
     List<HoodieCommitMetadata> metadataList = commitsToCheck
         .get().stream().map(instant -> {
           try {
-            return HoodieInputFormatUtils.getCommitMetadata(instant, 
commitsTimelineToReturn);
+            return TimelineUtils.getCommitMetadata(instant, 
commitsTimelineToReturn);
           } catch (IOException e) {
             throw new HoodieException(String.format("cannot get metadata for 
instant: %s", instant));
           }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index 7c5e76b2bd..1af2ef3050 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -37,7 +37,6 @@ import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
-import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -498,22 +497,4 @@ public class HoodieInputFormatUtils {
         .flatMap(Collection::stream)
         .collect(Collectors.toSet());
   }
-
-  /**
-   * Returns the commit metadata of the given instant.
-   *
-   * @param instant   The hoodie instant
-   * @param timeline  The timeline
-   * @return the commit metadata
-   */
-  public static HoodieCommitMetadata getCommitMetadata(
-      HoodieInstant instant,
-      HoodieTimeline timeline) throws IOException {
-    byte[] data = timeline.getInstantDetails(instant).get();
-    if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
-      return HoodieReplaceCommitMetadata.fromBytes(data, 
HoodieReplaceCommitMetadata.class);
-    } else {
-      return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
-    }
-  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 446c806b18..0b310c8e28 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -21,11 +21,12 @@ import org.apache.hadoop.fs.{FileStatus, GlobPattern, Path}
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
 import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
 import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.TimelineUtils.getCommitMetadata
 import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.exception.HoodieException
-import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.{getCommitMetadata, 
getWritePartitionPaths, listAffectedFilesForCommits}
+import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.{getWritePartitionPaths, 
listAffectedFilesForCommits}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.InternalRow

Reply via email to