Davis-Zhang-Onehouse commented on code in PR #12900:
URL: https://github.com/apache/hudi/pull/12900#discussion_r1978487089


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java:
##########
@@ -232,12 +230,9 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K, 
O> table, HoodieInstan
         this.txnManager.beginTransaction(Option.of(inflightInstant), 
Option.empty());
       }
       writeTableMetadata(metadata, inflightInstant.requestedTime());
-      table.getActiveTimeline().transitionCleanInflightToComplete(false,
-          inflightInstant, 
TimelineMetadataUtils.serializeCleanMetadata(metadata));
+      table.getActiveTimeline().transitionCleanInflightToComplete(false, 
inflightInstant, Option.of(metadata));
       LOG.info("Marked clean started on " + inflightInstant.requestedTime() + 
" as complete");
       return metadata;
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to clean up after commit", e);

Review Comment:
   yes



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java:
##########
@@ -232,12 +230,9 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K, 
O> table, HoodieInstan
         this.txnManager.beginTransaction(Option.of(inflightInstant), 
Option.empty());
       }
       writeTableMetadata(metadata, inflightInstant.requestedTime());
-      table.getActiveTimeline().transitionCleanInflightToComplete(false,
-          inflightInstant, 
TimelineMetadataUtils.serializeCleanMetadata(metadata));
+      table.getActiveTimeline().transitionCleanInflightToComplete(false, 
inflightInstant, Option.of(metadata));
       LOG.info("Marked clean started on " + inflightInstant.requestedTime() + 
" as complete");
       return metadata;
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to clean up after commit", e);

Review Comment:
   yes it already throw the exception with the right type so no need to catch 
and convert.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java:
##########
@@ -63,6 +58,19 @@
 public class TimelineMetadataUtils {
 
   private static final Integer DEFAULT_VERSION = 1;
+  private static final Map<Class<?>, DatumWriter<?>> DATUM_WRITERS = new 
ConcurrentHashMap<>();
+
+  // Legacy method to handle cases where byte [] is still used as opposed to 
leveraging HoodieInstantWriter.
+  @Deprecated
+  public static <T> byte[] convertMetadataToBytArray(T metadata, 
CommitMetadataSerDe serDe) {

Review Comment:
   done



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/CommitMetadataSerDeV2.java:
##########
@@ -93,23 +91,10 @@ public <T> T deserialize(HoodieInstant instant, InputStream 
inputStream, Boolean
   }
 
   @Override
-  public Option<byte[]> 
serialize(org.apache.hudi.common.model.HoodieCommitMetadata commitMetadata) 
throws IOException {
-    if (commitMetadata instanceof 
org.apache.hudi.common.model.HoodieReplaceCommitMetadata) {
-      return 
serializeAvroMetadata(MetadataConversionUtils.convertCommitMetadata(commitMetadata),
 HoodieReplaceCommitMetadata.class);
+  public <T> Option<HoodieInstantWriter> getInstantWriter(T metadata) {
+    if (metadata instanceof org.apache.hudi.common.model.HoodieCommitMetadata) 
{

Review Comment:
   yes



##########
hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java:
##########
@@ -1895,18 +1889,18 @@ public void 
testPendingCompactionWithDuplicateFileIdsAcrossPartitions(boolean pr
     assertTrue(fileIdsInCompaction.contains(fileId));
   }
 
-  private void saveAsComplete(HoodieActiveTimeline timeline, HoodieInstant 
inflight, Option<byte[]> data) {
+  private void saveAsComplete(HoodieActiveTimeline timeline, HoodieInstant 
inflight, HoodieCommitMetadata metadata) {
     if (inflight.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) {
-      timeline.transitionCompactionInflightToComplete(true, inflight, data);
+      timeline.transitionCompactionInflightToComplete(true, inflight, 
metadata);
     } else {
       HoodieInstant requested = 
INSTANT_GENERATOR.createNewInstant(State.REQUESTED, inflight.getAction(), 
inflight.requestedTime());
       timeline.createNewInstant(requested);
       timeline.transitionRequestedToInflight(requested, Option.empty());
-      timeline.saveAsComplete(inflight, data);
+      timeline.saveAsComplete(inflight, Option.of(metadata));
     }
   }
 
-  private void saveAsCompleteCluster(HoodieActiveTimeline timeline, 
HoodieInstant inflight, Option<byte[]> data) {
+  private <T> void saveAsCompleteCluster(HoodieActiveTimeline timeline, 
HoodieInstant inflight, HoodieCommitMetadata metadata) {

Review Comment:
   done



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java:
##########
@@ -138,25 +147,24 @@ public void 
createRequestedCommitWithReplaceMetadata(String instantTime, String
       HoodieInstant instant = 
instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED, actionType, 
instantTime);
       LOG.info("Creating a new instant " + instant);
       // Create the request replace file
-      createFileInMetaPath(instantFileNameGenerator.getFileName(instant),
-          TimelineMetadataUtils.serializeRequestedReplaceMetadata(new 
HoodieRequestedReplaceMetadata()), false);
-    } catch (IOException e) {
-      throw new HoodieIOException("Error create requested replace commit ", e);
+      createFileInMetaPath(instantFileNameGenerator.getFileName(instant), 
Option.of(new HoodieRequestedReplaceMetadata()), false);
+    } catch (HoodieIOException e) {
+      throw e;
     }

Review Comment:
   done



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanActionExecutor.java:
##########
@@ -138,7 +138,7 @@ void testPartialCleanFailure(CleanFailureType failureType) 
throws IOException {
     when(activeTimeline.getCleanerTimeline()).thenReturn(cleanTimeline);
     
when(cleanTimeline.getInstants()).thenReturn(Collections.singletonList(cleanInstant));
     when(activeTimeline.readCleanerPlan(cleanInstant)).thenReturn(cleanerPlan);
-    
when(activeTimeline.readCleanerInfoAsBytes(cleanInstant)).thenReturn(TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
+    
when(activeTimeline.readCleanerInfoAsBytes(cleanInstant)).thenReturn(Option.of(convertMetadataToBytArray(cleanerPlan)));

Review Comment:
   https://issues.apache.org/jira/browse/HUDI-9099 tracking jira



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java:
##########
@@ -181,4 +140,17 @@ public static <T extends SpecificRecordBase> T 
deserializeAvroMetadata(InputStre
       return fileReader.next();
     }
   }
+
+  public static <T extends SpecificRecordBase> Option<HoodieInstantWriter> 
getInstantWriter(Option<T> metadata) {
+    if (metadata.isEmpty()) {
+      return Option.empty();
+    }
+    return Option.of(outputStream -> {
+      DatumWriter<T> datumWriter = (DatumWriter<T>) 
DATUM_WRITERS.computeIfAbsent(metadata.get().getClass(), 
SpecificDatumWriter::new);

Review Comment:
   fixed



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtilsLegacy.java:
##########
@@ -83,11 +81,18 @@ private static void createMetaFile(String basePath, String 
instantTime, String s
   }
 
   private static void createMetaFile(String basePath, String instantTime, 
String suffix) throws IOException {
-    createMetaFile(basePath, instantTime, suffix, getUTF8Bytes(""));
+    createMetaFile(basePath, instantTime, suffix, Option.empty());
   }
 
-  private static void createMetaFile(String basePath, String instantTime, 
String suffix, byte[] content) throws IOException {
-    createMetaFile(getTimelinePath(new 
StoragePath(basePath)).toUri().getPath(), instantTime, 
InProcessTimeGenerator::createNewInstantTime, suffix, content);
+  private static <T> void createMetaFile(String basePath, String instantTime, 
String suffix, Option<T> metadata) throws IOException {
+    createMetaFile(getTimelinePath(new 
StoragePath(basePath)).toUri().getPath(), instantTime,
+        InProcessTimeGenerator::createNewInstantTime, suffix,
+        metadata.isEmpty() ? Option.empty() : 
COMMIT_METADATA_SER_DE.getInstantWriter(metadata.get()));

Review Comment:
   done



##########
hudi-common/pom.xml:
##########
@@ -354,6 +354,12 @@
       <artifactId>tally-core</artifactId>
       <version>${tally.version}</version>
     </dependency>
+      <dependency>
+          <groupId>commons-io</groupId>
+          <artifactId>commons-io</artifactId>
+          <version>2.18.0</version>
+          <scope>test</scope>
+      </dependency>

Review Comment:
   removed as we already declared the dependency in the same pom file



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/table/timeline/TestArchivedTimelineV2.java:
##########
@@ -100,7 +101,7 @@ private void writeArchivedTimeline(int batchSize, long 
startTs) throws Exception
       String completionTime = String.valueOf(instantTimeTs + 10);
       HoodieInstant instant = 
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, "commit", 
instantTime, completionTime);
       HoodieCommitMetadata metadata  = 
testTable.createCommitMetadata(instantTime, WriteOperationType.INSERT, 
Arrays.asList("par1", "par2"), 10, false);
-      byte[] serializedMetadata = 
TimelineMetadataUtils.serializeCommitMetadata(metaClient.getCommitMetadataSerDe(),
 metadata).get();
+      byte[] serializedMetadata = convertMetadataToBytArray(metadata);

Review Comment:
   this is for test usage, if they want to be explicit on v1 or v2, use the 
same underlying method as convertMetadataToBytArray
   
   fixed the typo



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java:
##########
@@ -143,25 +152,24 @@ public void 
createRequestedCommitWithReplaceMetadata(String instantTime, String
       HoodieInstant instant = 
instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED, actionType, 
instantTime);
       LOG.info("Creating a new instant " + instant);
       // Create the request replace file
-      createFileInMetaPath(instantFileNameGenerator.getFileName(instant),
-          TimelineMetadataUtils.serializeRequestedReplaceMetadata(new 
HoodieRequestedReplaceMetadata()), false);
-    } catch (IOException e) {
-      throw new HoodieIOException("Error create requested replace commit ", e);
+      createFileInMetaPath(instantFileNameGenerator.getFileName(instant), 
Option.of(new HoodieRequestedReplaceMetadata()), false);
+    } catch (HoodieIOException e) {
+      throw e;

Review Comment:
   done



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/Transformations.java:
##########
@@ -79,4 +83,14 @@ public static <T> List<T> randomSelect(List<T> items, int n) 
{
   public static List<HoodieKey> randomSelectAsHoodieKeys(List<HoodieRecord> 
records, int n) {
     return randomSelect(recordsToHoodieKeys(records), n);
   }
+
+  public static byte[] writeInstantContentToBytes(HoodieInstantWriter writer) {

Review Comment:
   done and tracked in https://issues.apache.org/jira/browse/HUDI-9099



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/CommitMetadataSerDeV2.java:
##########
@@ -93,23 +91,10 @@ public <T> T deserialize(HoodieInstant instant, InputStream 
inputStream, Boolean
   }
 
   @Override
-  public Option<byte[]> 
serialize(org.apache.hudi.common.model.HoodieCommitMetadata commitMetadata) 
throws IOException {
-    if (commitMetadata instanceof 
org.apache.hudi.common.model.HoodieReplaceCommitMetadata) {
-      return 
serializeAvroMetadata(MetadataConversionUtils.convertCommitMetadata(commitMetadata),
 HoodieReplaceCommitMetadata.class);
+  public <T> Option<HoodieInstantWriter> getInstantWriter(T metadata) {
+    if (metadata instanceof org.apache.hudi.common.model.HoodieCommitMetadata) 
{
+      return 
TimelineMetadataUtils.getInstantWriter(Option.of(MetadataConversionUtils.convertCommitMetadataToAvro((HoodieCommitMetadata)
 metadata)));

Review Comment:
   moved



##########
hudi-common/src/test/java/org/apache/hudi/common/table/timeline/versioning/v2/TestCommitMetadataSerDeV2.java:
##########
@@ -65,9 +64,8 @@ public void testLegacyInstant() throws Exception {
 
 
     // Serialize and deserialize
-    Option<byte[]> serialized = serDeV1.serialize(metadata);
-    assertTrue(serialized.isPresent());
-    HoodieCommitMetadata deserialized = getSerDe().deserialize(instant, new 
ByteArrayInputStream(serialized.get()), () -> false, 
HoodieCommitMetadata.class);
+    byte[] serialized = convertMetadataToBytArray(metadata, serDeV1);

Review Comment:
   done



##########
hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java:
##########
@@ -180,7 +181,7 @@ public void testCommitMetadataSerde() throws Exception {
     // Case: Reading 0.x written commit metadata
     HoodieInstant legacyInstant = 
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, "commit", 
"1", "1", true);
     CommitMetadataSerDe v1SerDe = new CommitMetadataSerDeV1();
-    byte[] v1Bytes = v1SerDe.serialize(commitMetadata1).get();
+    byte[] v1Bytes = convertMetadataToBytArray(commitMetadata1, v1SerDe);

Review Comment:
   done



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java:
##########
@@ -623,4 +624,15 @@ public static Option<InputStream> 
getInputStreamOptionLegacy(HoodieTimeline time
     }
     return Option.of(new ByteArrayInputStream(bytes.get()));
   }
+
+  // TODO[HUDI-9094]: work around when caller needs to write byte array in 
raw. This method should be removed.
+  public static <T> Option<HoodieInstantWriter> 
getHoodieInstantWriterOption(HoodieTimeline timeline, Option<T> metadata) {
+    Option<HoodieInstantWriter> writerOption;
+    if (metadata.isPresent() && metadata.get() instanceof HoodieInstantWriter) 
{
+      writerOption = (Option<HoodieInstantWriter>) metadata;
+    } else {
+      writerOption = timeline.getInstantWriter(metadata);
+    }
+    return writerOption;
+  }

Review Comment:
   create jira



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java:
##########
@@ -392,6 +393,10 @@ public static StoragePath 
getCompleteInstantPath(HoodieStorage storage, StorageP
     return getCompleteInstantFileInfo(storage, parent, instantTime, 
action).getPath();
   }
 
+  public static <T> byte[] convertMetadataToBytArray(T metadata) {

Review Comment:
   done



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtilsLegacy.java:
##########
@@ -149,30 +152,26 @@ public static void createInflightDeltaCommit(String 
basePath, String instantTime
 
   public static void createReplaceCommit(CommitMetadataSerDe 
commitMetadataSerDe, String basePath,
                                          String instantTime, 
HoodieReplaceCommitMetadata metadata) throws IOException {
-    createMetaFile(basePath, instantTime, 
HoodieTimeline.REPLACE_COMMIT_EXTENSION,
-        serializeCommitMetadata(commitMetadataSerDe, metadata).get());
+    createMetaFile(basePath, instantTime, 
HoodieTimeline.REPLACE_COMMIT_EXTENSION, Option.of(metadata));
   }
 
   public static void createRequestedClusterCommit(String basePath, String 
instantTime,
-                                                  
HoodieRequestedReplaceMetadata requestedReplaceMetadata)
-      throws IOException {
-    createMetaFile(basePath, instantTime, 
HoodieTimeline.REQUESTED_CLUSTERING_COMMIT_EXTENSION,
-        serializeRequestedReplaceMetadata(requestedReplaceMetadata).get());
+                                                  
HoodieRequestedReplaceMetadata requestedReplaceMetadata) throws IOException {
+    createMetaFile(basePath, instantTime, 
HoodieTimeline.REQUESTED_CLUSTERING_COMMIT_EXTENSION, 
Option.of(requestedReplaceMetadata));
   }
 
   public static void createRequestedCompactionCommit(String basePath, String 
instantTime,
-                                                     HoodieCompactionPlan 
requestedCompactionPlan)
-      throws IOException {
-    createMetaFile(basePath, instantTime, 
HoodieTimeline.REQUESTED_COMPACTION_EXTENSION,
-        serializeCompactionPlan(requestedCompactionPlan).get());
+                                                     HoodieCompactionPlan 
requestedCompactionPlan) throws IOException {
+    createMetaFile(basePath, instantTime, 
HoodieTimeline.REQUESTED_COMPACTION_EXTENSION, 
Option.of(requestedCompactionPlan));
   }
 
   public static void createRequestedRollbackFile(String basePath, String 
instantTime, HoodieRollbackPlan plan) throws IOException {
-    createMetaFile(basePath, instantTime, 
HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION, serializeRollbackPlan(plan).get());
+    createMetaFile(basePath, instantTime, 
HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION, Option.of(plan));
   }
 
   public static void createRequestedRollbackFile(String basePath, String 
instantTime, byte[] content) throws IOException {
-    createMetaFile(basePath, instantTime, 
HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION, content);
+    createMetaFile(basePath, instantTime, 
HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION,

Review Comment:
   https://issues.apache.org/jira/browse/HUDI-9099



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java:
##########
@@ -185,29 +173,28 @@ public static void 
createInflightReplaceCommit(HoodieTableMetaClient metaClient,
   public static void createReplaceCommit(HoodieTableMetaClient metaClient, 
CommitMetadataSerDe commitMetadataSerDe,
                                          String instantTime, 
HoodieReplaceCommitMetadata metadata) throws IOException {
     createMetaFile(metaClient, instantTime, 
HoodieTimeline.REPLACE_COMMIT_EXTENSION,
-        serializeCommitMetadata(commitMetadataSerDe, metadata).get());
+        Option.of(metadata));
   }
 
   public static void createReplaceCommit(HoodieTableMetaClient metaClient, 
CommitMetadataSerDe commitMetadataSerDe,
                                          String instantTime, String 
completionTime, HoodieReplaceCommitMetadata metadata) throws IOException {
     createMetaFileInTimelinePath(
         metaClient, instantTime, () -> completionTime, 
HoodieTimeline.REPLACE_COMMIT_EXTENSION,
-        serializeCommitMetadata(commitMetadataSerDe, metadata).get());
+        metaClient.getCommitMetadataSerDe().getInstantWriter(metadata));
   }
 
   public static void createRequestedClusterCommit(HoodieTableMetaClient 
metaClient, String instantTime,
                                                   
HoodieRequestedReplaceMetadata requestedReplaceMetadata)
       throws IOException {
     createMetaFile(metaClient, instantTime, 
HoodieTimeline.REQUESTED_CLUSTERING_COMMIT_EXTENSION,
-        serializeRequestedReplaceMetadata(requestedReplaceMetadata).get());
+        Option.of(requestedReplaceMetadata));
   }
 
-  public static void createInflightClusterCommit(HoodieTableMetaClient 
metaClient, CommitMetadataSerDe commitMetadataSerDe,
-                                                 String instantTime, 
Option<HoodieCommitMetadata> inflightReplaceMetadata)
+  public static <T> void createInflightClusterCommit(HoodieTableMetaClient 
metaClient, CommitMetadataSerDe commitMetadataSerDe,
+                                                     String instantTime, 
Option<T> inflightReplaceMetadata)

Review Comment:
   revised.



##########
hudi-platform-service/hudi-metaserver/hudi-metaserver-client/src/main/java/org/apache/hudi/common/table/timeline/HoodieMetaserverBasedTimeline.java:
##########
@@ -56,23 +60,26 @@ protected void deleteInstantFile(HoodieInstant instant) {
   }
 
   @Override
-  protected void transitionStateToComplete(boolean shouldLock, HoodieInstant 
fromInstant, HoodieInstant toInstant, Option<byte[]> data) {
+  protected <T> void transitionStateToComplete(boolean shouldLock, 
HoodieInstant fromInstant, HoodieInstant toInstant, Option<T> metadata) {
     
ValidationUtils.checkArgument(fromInstant.requestedTime().equals(toInstant.requestedTime()));
-    metaserverClient.transitionInstantState(databaseName, tableName, 
fromInstant, toInstant, data);
+    metaserverClient.transitionInstantState(databaseName, tableName, 
fromInstant, toInstant,
+        metadata.map(m -> convertMetadataToBytArray(m, metadataSerDeV2)));

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to