Davis-Zhang-Onehouse commented on code in PR #12900:
URL: https://github.com/apache/hudi/pull/12900#discussion_r1978487089
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java:
##########
@@ -232,12 +230,9 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K,
O> table, HoodieInstan
this.txnManager.beginTransaction(Option.of(inflightInstant),
Option.empty());
}
writeTableMetadata(metadata, inflightInstant.requestedTime());
- table.getActiveTimeline().transitionCleanInflightToComplete(false,
- inflightInstant,
TimelineMetadataUtils.serializeCleanMetadata(metadata));
+ table.getActiveTimeline().transitionCleanInflightToComplete(false,
inflightInstant, Option.of(metadata));
LOG.info("Marked clean started on " + inflightInstant.requestedTime() +
" as complete");
return metadata;
- } catch (IOException e) {
- throw new HoodieIOException("Failed to clean up after commit", e);
Review Comment:
yes
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java:
##########
@@ -232,12 +230,9 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K,
O> table, HoodieInstan
this.txnManager.beginTransaction(Option.of(inflightInstant),
Option.empty());
}
writeTableMetadata(metadata, inflightInstant.requestedTime());
- table.getActiveTimeline().transitionCleanInflightToComplete(false,
- inflightInstant,
TimelineMetadataUtils.serializeCleanMetadata(metadata));
+ table.getActiveTimeline().transitionCleanInflightToComplete(false,
inflightInstant, Option.of(metadata));
LOG.info("Marked clean started on " + inflightInstant.requestedTime() +
" as complete");
return metadata;
- } catch (IOException e) {
- throw new HoodieIOException("Failed to clean up after commit", e);
Review Comment:
yes it already throw the exception with the right type so no need to catch
and convert.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java:
##########
@@ -63,6 +58,19 @@
public class TimelineMetadataUtils {
private static final Integer DEFAULT_VERSION = 1;
+ private static final Map<Class<?>, DatumWriter<?>> DATUM_WRITERS = new
ConcurrentHashMap<>();
+
+ // Legacy method to handle cases where byte [] is still used as opposed to
leveraging HoodieInstantWriter.
+ @Deprecated
+ public static <T> byte[] convertMetadataToBytArray(T metadata,
CommitMetadataSerDe serDe) {
Review Comment:
done
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/CommitMetadataSerDeV2.java:
##########
@@ -93,23 +91,10 @@ public <T> T deserialize(HoodieInstant instant, InputStream
inputStream, Boolean
}
@Override
- public Option<byte[]>
serialize(org.apache.hudi.common.model.HoodieCommitMetadata commitMetadata)
throws IOException {
- if (commitMetadata instanceof
org.apache.hudi.common.model.HoodieReplaceCommitMetadata) {
- return
serializeAvroMetadata(MetadataConversionUtils.convertCommitMetadata(commitMetadata),
HoodieReplaceCommitMetadata.class);
+ public <T> Option<HoodieInstantWriter> getInstantWriter(T metadata) {
+ if (metadata instanceof org.apache.hudi.common.model.HoodieCommitMetadata)
{
Review Comment:
yes
##########
hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java:
##########
@@ -1895,18 +1889,18 @@ public void
testPendingCompactionWithDuplicateFileIdsAcrossPartitions(boolean pr
assertTrue(fileIdsInCompaction.contains(fileId));
}
- private void saveAsComplete(HoodieActiveTimeline timeline, HoodieInstant
inflight, Option<byte[]> data) {
+ private void saveAsComplete(HoodieActiveTimeline timeline, HoodieInstant
inflight, HoodieCommitMetadata metadata) {
if (inflight.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) {
- timeline.transitionCompactionInflightToComplete(true, inflight, data);
+ timeline.transitionCompactionInflightToComplete(true, inflight,
metadata);
} else {
HoodieInstant requested =
INSTANT_GENERATOR.createNewInstant(State.REQUESTED, inflight.getAction(),
inflight.requestedTime());
timeline.createNewInstant(requested);
timeline.transitionRequestedToInflight(requested, Option.empty());
- timeline.saveAsComplete(inflight, data);
+ timeline.saveAsComplete(inflight, Option.of(metadata));
}
}
- private void saveAsCompleteCluster(HoodieActiveTimeline timeline,
HoodieInstant inflight, Option<byte[]> data) {
+ private <T> void saveAsCompleteCluster(HoodieActiveTimeline timeline,
HoodieInstant inflight, HoodieCommitMetadata metadata) {
Review Comment:
done
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ActiveTimelineV1.java:
##########
@@ -138,25 +147,24 @@ public void
createRequestedCommitWithReplaceMetadata(String instantTime, String
HoodieInstant instant =
instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED, actionType,
instantTime);
LOG.info("Creating a new instant " + instant);
// Create the request replace file
- createFileInMetaPath(instantFileNameGenerator.getFileName(instant),
- TimelineMetadataUtils.serializeRequestedReplaceMetadata(new
HoodieRequestedReplaceMetadata()), false);
- } catch (IOException e) {
- throw new HoodieIOException("Error create requested replace commit ", e);
+ createFileInMetaPath(instantFileNameGenerator.getFileName(instant),
Option.of(new HoodieRequestedReplaceMetadata()), false);
+ } catch (HoodieIOException e) {
+ throw e;
}
Review Comment:
done
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanActionExecutor.java:
##########
@@ -138,7 +138,7 @@ void testPartialCleanFailure(CleanFailureType failureType)
throws IOException {
when(activeTimeline.getCleanerTimeline()).thenReturn(cleanTimeline);
when(cleanTimeline.getInstants()).thenReturn(Collections.singletonList(cleanInstant));
when(activeTimeline.readCleanerPlan(cleanInstant)).thenReturn(cleanerPlan);
-
when(activeTimeline.readCleanerInfoAsBytes(cleanInstant)).thenReturn(TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
+
when(activeTimeline.readCleanerInfoAsBytes(cleanInstant)).thenReturn(Option.of(convertMetadataToBytArray(cleanerPlan)));
Review Comment:
https://issues.apache.org/jira/browse/HUDI-9099 tracking jira
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java:
##########
@@ -181,4 +140,17 @@ public static <T extends SpecificRecordBase> T
deserializeAvroMetadata(InputStre
return fileReader.next();
}
}
+
+ public static <T extends SpecificRecordBase> Option<HoodieInstantWriter>
getInstantWriter(Option<T> metadata) {
+ if (metadata.isEmpty()) {
+ return Option.empty();
+ }
+ return Option.of(outputStream -> {
+ DatumWriter<T> datumWriter = (DatumWriter<T>)
DATUM_WRITERS.computeIfAbsent(metadata.get().getClass(),
SpecificDatumWriter::new);
Review Comment:
fixed
##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtilsLegacy.java:
##########
@@ -83,11 +81,18 @@ private static void createMetaFile(String basePath, String
instantTime, String s
}
private static void createMetaFile(String basePath, String instantTime,
String suffix) throws IOException {
- createMetaFile(basePath, instantTime, suffix, getUTF8Bytes(""));
+ createMetaFile(basePath, instantTime, suffix, Option.empty());
}
- private static void createMetaFile(String basePath, String instantTime,
String suffix, byte[] content) throws IOException {
- createMetaFile(getTimelinePath(new
StoragePath(basePath)).toUri().getPath(), instantTime,
InProcessTimeGenerator::createNewInstantTime, suffix, content);
+ private static <T> void createMetaFile(String basePath, String instantTime,
String suffix, Option<T> metadata) throws IOException {
+ createMetaFile(getTimelinePath(new
StoragePath(basePath)).toUri().getPath(), instantTime,
+ InProcessTimeGenerator::createNewInstantTime, suffix,
+ metadata.isEmpty() ? Option.empty() :
COMMIT_METADATA_SER_DE.getInstantWriter(metadata.get()));
Review Comment:
done
##########
hudi-common/pom.xml:
##########
@@ -354,6 +354,12 @@
<artifactId>tally-core</artifactId>
<version>${tally.version}</version>
</dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.18.0</version>
+ <scope>test</scope>
+ </dependency>
Review Comment:
removed as we already declared the dependency in the same pom file
##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/table/timeline/TestArchivedTimelineV2.java:
##########
@@ -100,7 +101,7 @@ private void writeArchivedTimeline(int batchSize, long
startTs) throws Exception
String completionTime = String.valueOf(instantTimeTs + 10);
HoodieInstant instant =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, "commit",
instantTime, completionTime);
HoodieCommitMetadata metadata =
testTable.createCommitMetadata(instantTime, WriteOperationType.INSERT,
Arrays.asList("par1", "par2"), 10, false);
- byte[] serializedMetadata =
TimelineMetadataUtils.serializeCommitMetadata(metaClient.getCommitMetadataSerDe(),
metadata).get();
+ byte[] serializedMetadata = convertMetadataToBytArray(metadata);
Review Comment:
this is for test usage, if they want to be explicit on v1 or v2, use the
same underlying method as convertMetadataToBytArray
fixed the typo
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java:
##########
@@ -143,25 +152,24 @@ public void
createRequestedCommitWithReplaceMetadata(String instantTime, String
HoodieInstant instant =
instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED, actionType,
instantTime);
LOG.info("Creating a new instant " + instant);
// Create the request replace file
- createFileInMetaPath(instantFileNameGenerator.getFileName(instant),
- TimelineMetadataUtils.serializeRequestedReplaceMetadata(new
HoodieRequestedReplaceMetadata()), false);
- } catch (IOException e) {
- throw new HoodieIOException("Error create requested replace commit ", e);
+ createFileInMetaPath(instantFileNameGenerator.getFileName(instant),
Option.of(new HoodieRequestedReplaceMetadata()), false);
+ } catch (HoodieIOException e) {
+ throw e;
Review Comment:
done
##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/Transformations.java:
##########
@@ -79,4 +83,14 @@ public static <T> List<T> randomSelect(List<T> items, int n)
{
public static List<HoodieKey> randomSelectAsHoodieKeys(List<HoodieRecord>
records, int n) {
return randomSelect(recordsToHoodieKeys(records), n);
}
+
+ public static byte[] writeInstantContentToBytes(HoodieInstantWriter writer) {
Review Comment:
done and tracked in https://issues.apache.org/jira/browse/HUDI-9099
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/CommitMetadataSerDeV2.java:
##########
@@ -93,23 +91,10 @@ public <T> T deserialize(HoodieInstant instant, InputStream
inputStream, Boolean
}
@Override
- public Option<byte[]>
serialize(org.apache.hudi.common.model.HoodieCommitMetadata commitMetadata)
throws IOException {
- if (commitMetadata instanceof
org.apache.hudi.common.model.HoodieReplaceCommitMetadata) {
- return
serializeAvroMetadata(MetadataConversionUtils.convertCommitMetadata(commitMetadata),
HoodieReplaceCommitMetadata.class);
+ public <T> Option<HoodieInstantWriter> getInstantWriter(T metadata) {
+ if (metadata instanceof org.apache.hudi.common.model.HoodieCommitMetadata)
{
+ return
TimelineMetadataUtils.getInstantWriter(Option.of(MetadataConversionUtils.convertCommitMetadataToAvro((HoodieCommitMetadata)
metadata)));
Review Comment:
moved
##########
hudi-common/src/test/java/org/apache/hudi/common/table/timeline/versioning/v2/TestCommitMetadataSerDeV2.java:
##########
@@ -65,9 +64,8 @@ public void testLegacyInstant() throws Exception {
// Serialize and deserialize
- Option<byte[]> serialized = serDeV1.serialize(metadata);
- assertTrue(serialized.isPresent());
- HoodieCommitMetadata deserialized = getSerDe().deserialize(instant, new
ByteArrayInputStream(serialized.get()), () -> false,
HoodieCommitMetadata.class);
+ byte[] serialized = convertMetadataToBytArray(metadata, serDeV1);
Review Comment:
done
##########
hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java:
##########
@@ -180,7 +181,7 @@ public void testCommitMetadataSerde() throws Exception {
// Case: Reading 0.x written commit metadata
HoodieInstant legacyInstant =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, "commit",
"1", "1", true);
CommitMetadataSerDe v1SerDe = new CommitMetadataSerDeV1();
- byte[] v1Bytes = v1SerDe.serialize(commitMetadata1).get();
+ byte[] v1Bytes = convertMetadataToBytArray(commitMetadata1, v1SerDe);
Review Comment:
done
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java:
##########
@@ -623,4 +624,15 @@ public static Option<InputStream>
getInputStreamOptionLegacy(HoodieTimeline time
}
return Option.of(new ByteArrayInputStream(bytes.get()));
}
+
+ // TODO[HUDI-9094]: work around when caller needs to write byte array in
raw. This method should be removed.
+ public static <T> Option<HoodieInstantWriter>
getHoodieInstantWriterOption(HoodieTimeline timeline, Option<T> metadata) {
+ Option<HoodieInstantWriter> writerOption;
+ if (metadata.isPresent() && metadata.get() instanceof HoodieInstantWriter)
{
+ writerOption = (Option<HoodieInstantWriter>) metadata;
+ } else {
+ writerOption = timeline.getInstantWriter(metadata);
+ }
+ return writerOption;
+ }
Review Comment:
create jira
##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java:
##########
@@ -392,6 +393,10 @@ public static StoragePath
getCompleteInstantPath(HoodieStorage storage, StorageP
return getCompleteInstantFileInfo(storage, parent, instantTime,
action).getPath();
}
+ public static <T> byte[] convertMetadataToBytArray(T metadata) {
Review Comment:
done
##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtilsLegacy.java:
##########
@@ -149,30 +152,26 @@ public static void createInflightDeltaCommit(String
basePath, String instantTime
public static void createReplaceCommit(CommitMetadataSerDe
commitMetadataSerDe, String basePath,
String instantTime,
HoodieReplaceCommitMetadata metadata) throws IOException {
- createMetaFile(basePath, instantTime,
HoodieTimeline.REPLACE_COMMIT_EXTENSION,
- serializeCommitMetadata(commitMetadataSerDe, metadata).get());
+ createMetaFile(basePath, instantTime,
HoodieTimeline.REPLACE_COMMIT_EXTENSION, Option.of(metadata));
}
public static void createRequestedClusterCommit(String basePath, String
instantTime,
-
HoodieRequestedReplaceMetadata requestedReplaceMetadata)
- throws IOException {
- createMetaFile(basePath, instantTime,
HoodieTimeline.REQUESTED_CLUSTERING_COMMIT_EXTENSION,
- serializeRequestedReplaceMetadata(requestedReplaceMetadata).get());
+
HoodieRequestedReplaceMetadata requestedReplaceMetadata) throws IOException {
+ createMetaFile(basePath, instantTime,
HoodieTimeline.REQUESTED_CLUSTERING_COMMIT_EXTENSION,
Option.of(requestedReplaceMetadata));
}
public static void createRequestedCompactionCommit(String basePath, String
instantTime,
- HoodieCompactionPlan
requestedCompactionPlan)
- throws IOException {
- createMetaFile(basePath, instantTime,
HoodieTimeline.REQUESTED_COMPACTION_EXTENSION,
- serializeCompactionPlan(requestedCompactionPlan).get());
+ HoodieCompactionPlan
requestedCompactionPlan) throws IOException {
+ createMetaFile(basePath, instantTime,
HoodieTimeline.REQUESTED_COMPACTION_EXTENSION,
Option.of(requestedCompactionPlan));
}
public static void createRequestedRollbackFile(String basePath, String
instantTime, HoodieRollbackPlan plan) throws IOException {
- createMetaFile(basePath, instantTime,
HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION, serializeRollbackPlan(plan).get());
+ createMetaFile(basePath, instantTime,
HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION, Option.of(plan));
}
public static void createRequestedRollbackFile(String basePath, String
instantTime, byte[] content) throws IOException {
- createMetaFile(basePath, instantTime,
HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION, content);
+ createMetaFile(basePath, instantTime,
HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION,
Review Comment:
https://issues.apache.org/jira/browse/HUDI-9099
##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java:
##########
@@ -185,29 +173,28 @@ public static void
createInflightReplaceCommit(HoodieTableMetaClient metaClient,
public static void createReplaceCommit(HoodieTableMetaClient metaClient,
CommitMetadataSerDe commitMetadataSerDe,
String instantTime,
HoodieReplaceCommitMetadata metadata) throws IOException {
createMetaFile(metaClient, instantTime,
HoodieTimeline.REPLACE_COMMIT_EXTENSION,
- serializeCommitMetadata(commitMetadataSerDe, metadata).get());
+ Option.of(metadata));
}
public static void createReplaceCommit(HoodieTableMetaClient metaClient,
CommitMetadataSerDe commitMetadataSerDe,
String instantTime, String
completionTime, HoodieReplaceCommitMetadata metadata) throws IOException {
createMetaFileInTimelinePath(
metaClient, instantTime, () -> completionTime,
HoodieTimeline.REPLACE_COMMIT_EXTENSION,
- serializeCommitMetadata(commitMetadataSerDe, metadata).get());
+ metaClient.getCommitMetadataSerDe().getInstantWriter(metadata));
}
public static void createRequestedClusterCommit(HoodieTableMetaClient
metaClient, String instantTime,
HoodieRequestedReplaceMetadata requestedReplaceMetadata)
throws IOException {
createMetaFile(metaClient, instantTime,
HoodieTimeline.REQUESTED_CLUSTERING_COMMIT_EXTENSION,
- serializeRequestedReplaceMetadata(requestedReplaceMetadata).get());
+ Option.of(requestedReplaceMetadata));
}
- public static void createInflightClusterCommit(HoodieTableMetaClient
metaClient, CommitMetadataSerDe commitMetadataSerDe,
- String instantTime,
Option<HoodieCommitMetadata> inflightReplaceMetadata)
+ public static <T> void createInflightClusterCommit(HoodieTableMetaClient
metaClient, CommitMetadataSerDe commitMetadataSerDe,
+ String instantTime,
Option<T> inflightReplaceMetadata)
Review Comment:
revised.
##########
hudi-platform-service/hudi-metaserver/hudi-metaserver-client/src/main/java/org/apache/hudi/common/table/timeline/HoodieMetaserverBasedTimeline.java:
##########
@@ -56,23 +60,26 @@ protected void deleteInstantFile(HoodieInstant instant) {
}
@Override
- protected void transitionStateToComplete(boolean shouldLock, HoodieInstant
fromInstant, HoodieInstant toInstant, Option<byte[]> data) {
+ protected <T> void transitionStateToComplete(boolean shouldLock,
HoodieInstant fromInstant, HoodieInstant toInstant, Option<T> metadata) {
ValidationUtils.checkArgument(fromInstant.requestedTime().equals(toInstant.requestedTime()));
- metaserverClient.transitionInstantState(databaseName, tableName,
fromInstant, toInstant, data);
+ metaserverClient.transitionInstantState(databaseName, tableName,
fromInstant, toInstant,
+ metadata.map(m -> convertMetadataToBytArray(m, metadataSerDeV2)));
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]