yihua commented on code in PR #12826:
URL: https://github.com/apache/hudi/pull/12826#discussion_r1971003281


##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java:
##########
@@ -156,59 +158,77 @@ public static Option<byte[]> 
serializeCommitMetadata(CommitMetadataSerDe commitM
   public static <T extends SpecificRecordBase> Option<byte[]> 
serializeAvroMetadata(T metadata, Class<T> clazz)
       throws IOException {
     DatumWriter<T> datumWriter = new SpecificDatumWriter<>(clazz);
-    DataFileWriter<T> fileWriter = new DataFileWriter<>(datumWriter);
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    fileWriter.create(metadata.getSchema(), baos);
-    fileWriter.append(metadata);
-    fileWriter.flush();
-    return Option.of(baos.toByteArray());
+    try (DataFileWriter<T> fileWriter = new DataFileWriter<>(datumWriter)) {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      fileWriter.create(metadata.getSchema(), baos);
+      fileWriter.append(metadata);
+      fileWriter.flush();
+      return Option.of(baos.toByteArray());
+    }
+  }
+
+  public static HoodieCleanerPlan deserializeCleanerPlan(Option<InputStream> 
in) throws IOException {
+    return deserializeAvroMetadata(in, HoodieCleanerPlan.class);
+  }
+
+  public static HoodieCompactionPlan 
deserializeCompactionPlan(Option<InputStream> in) throws IOException {
+    return deserializeAvroMetadata(in, HoodieCompactionPlan.class);
   }
 
-  public static HoodieCleanerPlan deserializeCleanerPlan(byte[] bytes) throws 
IOException {
-    return deserializeAvroMetadata(bytes, HoodieCleanerPlan.class);
+  public static HoodieCompactionPlan deserializeCompactionPlanLegacy(byte[] 
bytes) throws IOException {
+    return deserializeAvroMetadataLegacy(bytes, HoodieCompactionPlan.class);
   }
 
-  public static HoodieCompactionPlan deserializeCompactionPlan(byte[] bytes) 
throws IOException {
-    return deserializeAvroMetadata(bytes, HoodieCompactionPlan.class);
+  public static HoodieCleanMetadata 
deserializeHoodieCleanMetadata(Option<InputStream> inputStream) throws 
IOException {
+    return deserializeAvroMetadata(inputStream, HoodieCleanMetadata.class);
   }
 
   public static HoodieCleanMetadata deserializeHoodieCleanMetadata(byte[] 
bytes) throws IOException {
-    return deserializeAvroMetadata(bytes, HoodieCleanMetadata.class);
+    return deserializeAvroMetadataLegacy(bytes, HoodieCleanMetadata.class);
   }
 
-  public static HoodieRollbackMetadata 
deserializeHoodieRollbackMetadata(byte[] bytes) throws IOException {
-    return deserializeAvroMetadata(bytes, HoodieRollbackMetadata.class);
+  public static HoodieRollbackMetadata 
deserializeHoodieRollbackMetadata(Option<InputStream> in) throws IOException {
+    return deserializeAvroMetadata(in, HoodieRollbackMetadata.class);
   }
 
-  public static HoodieRestoreMetadata deserializeHoodieRestoreMetadata(byte[] 
bytes) throws IOException {
-    return deserializeAvroMetadata(bytes, HoodieRestoreMetadata.class);
+  public static HoodieRestoreMetadata 
deserializeHoodieRestoreMetadata(Option<InputStream> in) throws IOException {
+    return deserializeAvroMetadata(in, HoodieRestoreMetadata.class);
   }
 
-  public static HoodieSavepointMetadata 
deserializeHoodieSavepointMetadata(byte[] bytes) throws IOException {
-    return deserializeAvroMetadata(bytes, HoodieSavepointMetadata.class);
+  public static HoodieSavepointMetadata 
deserializeHoodieSavepointMetadata(Option<InputStream> instantStream) throws 
IOException {
+    return deserializeAvroMetadata(instantStream, 
HoodieSavepointMetadata.class);
   }
 
-  public static HoodieRequestedReplaceMetadata 
deserializeRequestedReplaceMetadata(byte[] bytes) throws IOException {
-    return deserializeAvroMetadata(bytes, 
HoodieRequestedReplaceMetadata.class);
+  public static HoodieRequestedReplaceMetadata 
deserializeRequestedReplaceMetadata(Option<InputStream> in) throws IOException {
+    return deserializeAvroMetadata(in, HoodieRequestedReplaceMetadata.class);
   }
 
-  public static HoodieIndexPlan deserializeIndexPlan(byte[] bytes) throws 
IOException {
-    return deserializeAvroMetadata(bytes, HoodieIndexPlan.class);
+  public static HoodieIndexPlan deserializeIndexPlan(Option<InputStream> in) 
throws IOException {
+    return deserializeAvroMetadata(in, HoodieIndexPlan.class);
   }
 
-  public static HoodieCommitMetadata deserializeCommitMetadata(byte[] bytes) 
throws IOException {
-    return deserializeAvroMetadata(bytes, HoodieCommitMetadata.class);
+  public static HoodieCommitMetadata 
deserializeCommitMetadata(Option<InputStream> instantStream) throws IOException 
{
+    return deserializeAvroMetadata(instantStream, HoodieCommitMetadata.class);
   }
 
-  public static HoodieReplaceCommitMetadata 
deserializeReplaceCommitMetadata(byte[] bytes) throws IOException {
-    return deserializeAvroMetadata(bytes, HoodieReplaceCommitMetadata.class);
+  public static HoodieReplaceCommitMetadata 
deserializeReplaceCommitMetadata(Option<InputStream> instantStream) throws 
IOException {
+    return deserializeAvroMetadata(instantStream, 
HoodieReplaceCommitMetadata.class);
   }
 
-  public static <T extends SpecificRecordBase> T 
deserializeAvroMetadata(byte[] bytes, Class<T> clazz)
+  public static <T extends SpecificRecordBase> T 
deserializeAvroMetadataLegacy(byte[] bytes, Class<T> clazz)
       throws IOException {
     DatumReader<T> reader = new SpecificDatumReader<>(clazz);
     FileReader<T> fileReader = DataFileReader.openReader(new 
SeekableByteArrayInput(bytes), reader);
     ValidationUtils.checkArgument(fileReader.hasNext(), "Could not deserialize 
metadata of type " + clazz);
     return fileReader.next();
   }
+
+  public static <T extends SpecificRecordBase> T 
deserializeAvroMetadata(Option<InputStream> instantStream, Class<T> clazz)
+      throws IOException {
+    DatumReader<T> reader = new SpecificDatumReader<>(clazz);
+    try (DataFileStream<T> fileReader = new 
DataFileStream<>(instantStream.get(), reader)) {

Review Comment:
   I don't see a place where `Option.empty()` is returned or checked, so 
`Option` is redundant? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to