the-other-tim-brown commented on code in PR #12826:
URL: https://github.com/apache/hudi/pull/12826#discussion_r1955305945


##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java:
##########
@@ -394,4 +423,12 @@ public static <T extends SpecificRecordBase> byte[] 
convertCommitMetadataToJsonB
       throw new HoodieIOException("Failed to convert to JSON.", e);
     }
   }
+
+  public static boolean isEmptyStream(InputStream inputStream) throws 
IOException {

Review Comment:
   It looks like this method is unused?



##########
hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java:
##########
@@ -190,22 +190,21 @@ private static Option<HoodieRequestedReplaceMetadata> 
getRequestedReplaceMetadat
       String action = factory instanceof InstantGeneratorV2 ? 
HoodieTimeline.CLUSTERING_ACTION : HoodieTimeline.REPLACE_COMMIT_ACTION;
       requestedInstant = 
factory.createNewInstant(HoodieInstant.State.REQUESTED, action, 
pendingReplaceOrClusterInstant.requestedTime());
     }
-    Option<byte[]> content = Option.empty();
+    if (timeline.isEmpty(requestedInstant)) {

Review Comment:
   Can we move this to the error handling?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantReader.java:
##########
@@ -31,21 +31,23 @@
 public interface HoodieInstantReader {
   /**
    * Reads the provided instant's content into a stream for parsing.
+   *
    * @param instant the instant to read
    * @return an InputStream with the content
    */
-  default InputStream getContentStream(HoodieInstant instant) {
+  default Option<InputStream> getContentStream(HoodieInstant instant) {
     throw new RuntimeException("Not implemented");
   }
 
   /**
+   * dd
    * Reads the provided instant's content into a byte array for parsing.
    * @param instant the instant to read
    * @return an InputStream with the details
    */
   @Deprecated
   default Option<byte[]> getInstantDetails(HoodieInstant instant) {
-    try (InputStream inputStream = getContentStream(instant)) {
+    try (InputStream inputStream = getContentStream(instant).get()) {

Review Comment:
   Should this first check if the Option is present?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java:
##########
@@ -378,6 +394,19 @@ private static 
org.apache.hudi.avro.model.HoodieReplaceCommitMetadata convertRep
     return JsonUtils.getObjectMapper().convertValue(replaceCommitMetadata, 
org.apache.hudi.avro.model.HoodieReplaceCommitMetadata.class);
   }
 
+  /**
+   * Convert replacecommit metadata from json to avro.
+   */
+  public static HoodieReplaceCommitMetadata 
convertReplaceCommitMetadataAvroToPojo(org.apache.hudi.avro.model.HoodieReplaceCommitMetadata
 replaceCommitMetadata) {
+    if (replaceCommitMetadata.getPartitionToWriteStats() != null) {
+      replaceCommitMetadata.getPartitionToWriteStats().remove(null);
+    }
+    if (replaceCommitMetadata.getPartitionToReplaceFileIds() != null) {
+      replaceCommitMetadata.getPartitionToReplaceFileIds().remove(null);
+    }
+    return JsonUtils.getObjectMapper().convertValue(replaceCommitMetadata, 
HoodieReplaceCommitMetadata.class);
+  }
+

Review Comment:
   The method below this one is now showing as unused, can it be cleaned up?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java:
##########
@@ -193,10 +193,9 @@ protected Option<HoodieCleanerPlan> requestClean(String 
startCleanTime) {
         activeTimeline.deleteEmptyInstantIfExists(cleanInstant);
         HoodieInstant cleanPlanInstant = new 
HoodieInstant(HoodieInstant.State.INFLIGHT, cleanInstant.getAction(), 
cleanInstant.requestedTime(), 
InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR);
         try {
-          Option<byte[]> content = 
activeTimeline.getInstantDetails(cleanPlanInstant);
           // Deserialize plan if it is non-empty
-          if (content.map(bytes -> bytes.length > 0).orElse(false)) {
-            return 
Option.of(TimelineMetadataUtils.deserializeCleanerPlan(content.get()));
+          if (!activeTimeline.isEmpty(cleanPlanInstant)) {

Review Comment:
   Can we try to parse the plan first and then during the exception handling 
check if it is empty?



##########
hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java:
##########
@@ -202,7 +202,7 @@ private Long getLatestCleanTimeTakenInMillis() throws 
IOException {
     HoodieInstant clean = 
timeline.getReverseOrderedInstants().findFirst().orElse(null);
     if (clean != null) {
       HoodieCleanMetadata cleanMetadata =
-          
TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(clean).get());
+          
TimelineMetadataUtils.deserializeHoodieCleanMetadataLegacy(timeline.getInstantDetails(clean).get());

Review Comment:
   can this 
   ```suggestion
                       
TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantContentStream(clean));
   ```



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java:
##########
@@ -173,72 +182,75 @@ public static HoodieArchivedMetaEntry createMetaWrapper(
     archivedMetaWrapper.setActionState(HoodieInstant.State.COMPLETED.name());
     archivedMetaWrapper.setStateTransitionTime(completionTime);
     String actionType = 
lsmTimelineRecord.get(ArchivedTimelineV2.ACTION_ARCHIVED_META_FIELD).toString();
-    HoodieInstant instant = 
metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED,
 actionType, instantTime, completionTime);
+    HoodieInstant hoodieInstant = 
metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED,
 actionType, instantTime, completionTime);
     switch (actionType) {
       case HoodieTimeline.CLEAN_ACTION: {
         
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
 instantDetails.get()));
-        
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
 planBytes.get()));
+        
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlanLegacy(metaClient,
 planBytes.get()));

Review Comment:
   We could replace all the "legacy" methods by wrapping the bytes in an input 
stream. This seems to be the main non-test class where these methods are used 
so we can also try to remove those methods after cleaning this up. This could 
help avoid confusion for future developers.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java:
##########
@@ -156,59 +159,88 @@ public static Option<byte[]> 
serializeCommitMetadata(CommitMetadataSerDe commitM
   public static <T extends SpecificRecordBase> Option<byte[]> 
serializeAvroMetadata(T metadata, Class<T> clazz)
       throws IOException {
     DatumWriter<T> datumWriter = new SpecificDatumWriter<>(clazz);
-    DataFileWriter<T> fileWriter = new DataFileWriter<>(datumWriter);
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    fileWriter.create(metadata.getSchema(), baos);
-    fileWriter.append(metadata);
-    fileWriter.flush();
-    return Option.of(baos.toByteArray());
+    try (DataFileWriter<T> fileWriter = new DataFileWriter<>(datumWriter)) {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      fileWriter.create(metadata.getSchema(), baos);
+      fileWriter.append(metadata);
+      fileWriter.flush();
+      return Option.of(baos.toByteArray());
+    }
+  }
+
+  public static HoodieCleanerPlan deserializeCleanerPlan(Option<InputStream> 
in) throws IOException {
+    return deserializeAvroMetadata(in, HoodieCleanerPlan.class);
+  }
+
+  public static HoodieCompactionPlan 
deserializeCompactionPlan(Option<InputStream> in) throws IOException {
+    return deserializeAvroMetadata(in, HoodieCompactionPlan.class);
+  }
+
+  public static HoodieCompactionPlan deserializeCompactionPlanLegacy(byte[] 
bytes) throws IOException {
+    return deserializeAvroMetadataLegacy(bytes, HoodieCompactionPlan.class);
   }
 
-  public static HoodieCleanerPlan deserializeCleanerPlan(byte[] bytes) throws 
IOException {
-    return deserializeAvroMetadata(bytes, HoodieCleanerPlan.class);
+  public static HoodieCleanMetadata 
deserializeHoodieCleanMetadataLegacy(byte[] bytes) throws IOException {

Review Comment:
   There are only 2 callers of this and it looks like they can both be migrated 
to use the content stream. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java:
##########
@@ -103,8 +104,8 @@ private Pair<HoodieInstant, Option<byte[]>> 
readInstant(GenericRecord record) {
           // should be json bytes.
           try {
             HoodieInstant instant = 
metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED,
 action, instantTime, stateTransitionTime);
-            org.apache.hudi.common.model.HoodieCommitMetadata commitMetadata = 
new CommitMetadataSerDeV1().deserialize(instant, 
getUTF8Bytes(actionData.toString()),
-                org.apache.hudi.common.model.HoodieCommitMetadata.class);
+            org.apache.hudi.common.model.HoodieCommitMetadata commitMetadata = 
new CommitMetadataSerDeV1().deserialize(
+                instant, Option.of(new 
ByteArrayInputStream(getUTF8Bytes(actionData.toString()))), 
org.apache.hudi.common.model.HoodieCommitMetadata.class);

Review Comment:
   Is this part of the code just doing the conversion from the avro to the 
pojo? If so, will we be able to use that other method you wrote?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java:
##########
@@ -604,4 +612,33 @@ public static boolean isDeletePartition(WriteOperationType 
operation) {
         || operation == WriteOperationType.INSERT_OVERWRITE_TABLE
         || operation == WriteOperationType.INSERT_OVERWRITE;
   }
+
+  public static boolean isEmpty(HoodieTableMetaClient metaClient, 
HoodieInstant instant) {
+    try {
+      return metaClient.getStorage()
+          .getPathInfo(new StoragePath(metaClient.getTimelinePath(), 
metaClient.getInstantFileNameGenerator().getFileName(instant)))
+          .getLength() == 0;
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to check emptiness of instant " + 
instant, e);
+    }
+  }
+
+  public static Option<InputStream> getInputStreamOptionLegacy(HoodieTimeline 
timeline, HoodieInstant instant) {
+    Option<byte[]> bytes = timeline.getInstantDetails(instant);
+    if (bytes.isEmpty() || bytes.get().length == 0) {
+      return Option.empty();
+    }
+    return Option.of(new ByteArrayInputStream(bytes.get()));
+  }
+
+  public static Option<HoodieInstant> getInstantFromTimeline(HoodieInstant 
instant, HoodieActiveTimeline timeline, Option<HoodieInstant> actualInstant) {

Review Comment:
   Can you explain why we need this? in the old `getInstantDetails`, I don't 
see similar logic



##########
hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java:
##########
@@ -186,16 +187,36 @@ public static HoodieCompactionPlan 
getLogCompactionPlan(HoodieTableMetaClient me
    * Util method to fetch both compaction and log compaction plan from 
requestedInstant.
    */
   private static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient 
metaClient, HoodieInstant requestedInstant) {
-    return getCompactionPlan(metaClient, 
metaClient.getActiveTimeline().readCompactionPlanAsBytes(requestedInstant));
+    return getCompactionPlan(metaClient, 
metaClient.getActiveTimeline().getInstantContentStream(requestedInstant));
   }
 
   /**
    * Util method to fetch both compaction and log compaction plan from 
requestedInstant.
    */
-  public static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient 
metaClient, Option<byte[]> planContent) {
+  public static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient 
metaClient, Option<InputStream> planContent) {
     CompactionPlanMigrator migrator = new CompactionPlanMigrator(metaClient);
     try {
-      HoodieCompactionPlan compactionPlan = 
TimelineMetadataUtils.deserializeCompactionPlan(planContent.get());
+      HoodieCompactionPlan compactionPlan = 
TimelineMetadataUtils.deserializeCompactionPlan(planContent);
+      return migrator.upgradeToLatest(compactionPlan, 
compactionPlan.getVersion());
+    } catch (IOException e) {
+      throw new HoodieException(e);
+    }
+  }
+
+  public static HoodieCompactionPlan 
getCompactionPlanLegacy(HoodieTableMetaClient metaClient, byte[] bytes) {
+    CompactionPlanMigrator migrator = new CompactionPlanMigrator(metaClient);
+    try {
+      HoodieCompactionPlan compactionPlan = 
TimelineMetadataUtils.deserializeCompactionPlanLegacy(bytes);
+      return migrator.upgradeToLatest(compactionPlan, 
compactionPlan.getVersion());
+    } catch (IOException e) {
+      throw new HoodieException(e);
+    }
+  }
+
+  public static HoodieCompactionPlan 
getCompactionPlanFromInputStream(HoodieTableMetaClient metaClient, 
Option<InputStream> in) {

Review Comment:
   This seems similar to the method at line 196



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java:
##########
@@ -258,4 +258,9 @@ public HoodieArchivedTimeline reload() {
   public HoodieArchivedTimeline reload(String startTs) {
     return new ArchivedTimelineV1(metaClient, startTs);
   }
+
+  @Override
+  public boolean isEmpty(HoodieInstant instant) {
+    return TimelineUtils.isEmpty(metaClient, instant);

Review Comment:
   For the archived timeline, I think we want to check the `readCommits` 
directly instead of going to the file system



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1094,7 +1094,7 @@ private static void 
reAddLogFilesFromRollbackPlan(HoodieTableMetaClient dataTabl
     HoodieInstant requested = 
factory.getRollbackRequestedInstant(rollbackInstant);
     try {
       HoodieRollbackPlan rollbackPlan = 
TimelineMetadataUtils.deserializeAvroMetadata(
-          
dataTableMetaClient.getActiveTimeline().readRollbackInfoAsBytes(requested).get(),
 HoodieRollbackPlan.class);

Review Comment:
   `readRollbackInfoAsBytes` looks like it is no longer used in the codebase 
and can be removed



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ArchivedTimelineV2.java:
##########
@@ -249,4 +246,9 @@ public HoodieTimeline getWriteTimeline() {
             readCommits.containsKey(i.requestedTime()))
         .filter(s -> validActions.contains(s.getAction())), instantReader);
   }
+
+  @Override
+  public boolean isEmpty(HoodieInstant instant) {
+    return TimelineUtils.isEmpty(metaClient, instant);

Review Comment:
   Similarly here, do we want to reference the `readCommits`?



##########
hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java:
##########
@@ -125,6 +126,20 @@ public static HoodieCleanMetadata 
getCleanerMetadata(HoodieTableMetaClient metaC
     return metadataMigrator.upgradeToLatest(cleanMetadata, 
cleanMetadata.getVersion());
   }
 
+  public static HoodieCleanMetadata getCleanerMetadata(HoodieTableMetaClient 
metaClient, Option<InputStream> details)
+      throws IOException {
+    CleanMetadataMigrator metadataMigrator = new 
CleanMetadataMigrator(metaClient);
+    HoodieCleanMetadata cleanMetadata = 
TimelineMetadataUtils.deserializeHoodieCleanMetadata(details);
+    return metadataMigrator.upgradeToLatest(cleanMetadata, 
cleanMetadata.getVersion());
+  }
+
+  public static HoodieCleanMetadata 
getCleanerMetadataFromInputStream(HoodieTableMetaClient metaClient, 
Option<InputStream> in)
+      throws IOException {
+    CleanMetadataMigrator metadataMigrator = new 
CleanMetadataMigrator(metaClient);
+    HoodieCleanMetadata cleanMetadata = 
TimelineMetadataUtils.deserializeHoodieCleanMetadata(in);
+    return metadataMigrator.upgradeToLatest(cleanMetadata, 
cleanMetadata.getVersion());
+  }

Review Comment:
   These two methods look like they are the same?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to