Davis-Zhang-Onehouse commented on code in PR #12826:
URL: https://github.com/apache/hudi/pull/12826#discussion_r1975933782
##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java:
##########
@@ -77,8 +75,7 @@ public String showCleans(
List<HoodieInstant> cleans =
timeline.getReverseOrderedInstants().collect(Collectors.toList());
List<Comparable[]> rows = new ArrayList<>();
for (HoodieInstant clean : cleans) {
- HoodieCleanMetadata cleanMetadata =
-
TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(clean).get());
+ HoodieCleanMetadata cleanMetadata =
timeline.loadHoodieCleanMetadata(clean);
Review Comment:
as discussed, that API can only return Object type and caller needs to cast
explicitly. We choose not to go this route.
##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java:
##########
@@ -125,9 +124,7 @@ public String compactionShow(
HoodieTableMetaClient client = checkAndGetMetaClient();
HoodieActiveTimeline activeTimeline = client.getActiveTimeline();
InstantGenerator instantGenerator = client.getInstantGenerator();
- HoodieCompactionPlan compactionPlan =
TimelineMetadataUtils.deserializeCompactionPlan(
- activeTimeline.readCompactionPlanAsBytes(
-
instantGenerator.getCompactionRequestedInstant(compactionInstantTime)).get());
+ HoodieCompactionPlan compactionPlan =
activeTimeline.loadCompactionPlan(instantGenerator.getCompactionRequestedInstant(compactionInstantTime));
Review Comment:
same as above
##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/RestoresCommand.java:
##########
@@ -98,9 +96,7 @@ public String showRestore(
private void addDetailsOfCompletedRestore(HoodieActiveTimeline
activeTimeline, List<Comparable[]> rows,
HoodieInstant restoreInstant)
throws IOException {
HoodieRestoreMetadata instantMetadata;
- Option<byte[]> instantDetails =
activeTimeline.getInstantDetails(restoreInstant);
- instantMetadata = TimelineMetadataUtils
- .deserializeAvroMetadata(instantDetails.get(),
HoodieRestoreMetadata.class);
+ instantMetadata = activeTimeline.loadInstantContent(restoreInstant,
HoodieRestoreMetadata.class);
Review Comment:
done
##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/RollbacksCommand.java:
##########
@@ -97,8 +95,8 @@ public String showRollback(
throws IOException {
HoodieActiveTimeline activeTimeline =
HoodieCLI.getTableMetaClient().getActiveTimeline();
final List<Comparable[]> rows = new ArrayList<>();
- HoodieRollbackMetadata metadata =
TimelineMetadataUtils.deserializeAvroMetadata(
-
activeTimeline.getInstantDetails(HoodieCLI.getTableMetaClient().createNewInstant(State.COMPLETED,
ROLLBACK_ACTION, rollbackInstant)).get(),
+ HoodieRollbackMetadata metadata = activeTimeline.loadInstantContent(
Review Comment:
done
##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java:
##########
@@ -77,8 +77,7 @@ public String writeAmplificationStats(
TimelineLayout layout =
TimelineLayout.fromVersion(timeline.getTimelineLayoutVersion());
for (HoodieInstant instantTime : timeline.getInstants()) {
String waf = "0";
- HoodieCommitMetadata commit =
layout.getCommitMetadataSerDe().deserialize(instantTime,
activeTimeline.getInstantDetails(instantTime).get(),
- HoodieCommitMetadata.class);
+ HoodieCommitMetadata commit =
activeTimeline.loadInstantContent(instantTime, HoodieCommitMetadata.class);
Review Comment:
done
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java:
##########
@@ -289,7 +289,7 @@ static boolean
rewriteTimelineV1InstantFileToV2Format(HoodieInstant instant, Hoo
boolean success = true;
if (instant.getAction().equals(COMMIT_ACTION) ||
instant.getAction().equals(DELTA_COMMIT_ACTION) ||
(instant.getAction().equals(REPLACE_COMMIT_ACTION) && instant.isCompleted())) {
Class<? extends HoodieCommitMetadata> clazz =
instant.getAction().equals(REPLACE_COMMIT_ACTION) ?
HoodieReplaceCommitMetadata.class : HoodieCommitMetadata.class;
- HoodieCommitMetadata commitMetadata =
commitMetadataSerDeV1.deserialize(instant,
metaClient.getActiveTimeline().getInstantDetails(instant).get(), clazz);
+ HoodieCommitMetadata commitMetadata =
metaClient.getActiveTimeline().loadInstantContent(instant,clazz);
Review Comment:
need to add test
##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java:
##########
@@ -912,9 +906,8 @@ protected void testMetadataStatsOnCommit(boolean
populateMetaFields, Function tr
// Read from commit file
HoodieInstant instant = INSTANT_GENERATOR.createNewInstant(COMPLETED,
COMMIT_ACTION, instantTime0);
- HoodieCommitMetadata metadata =
metaClient.getCommitMetadataSerDe().deserialize(instant,
-
createMetaClient().reloadActiveTimeline().getInstantDetails(instant).get(),
- HoodieCommitMetadata.class);
+ HoodieActiveTimeline activeTimeline =
createMetaClient().reloadActiveTimeline();
+ HoodieCommitMetadata metadata = activeTimeline.loadInstantContent(instant,
HoodieCommitMetadata.class);
Review Comment:
done
##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java:
##########
@@ -587,49 +584,35 @@ private static HoodieFileGroup
buildFileGroup(List<String> baseFileCommitTimes,
return group;
}
- private static Option<byte[]> getSavepointBytes(String partition,
List<String> paths) {
- try {
- Map<String, HoodieSavepointPartitionMetadata> partitionMetadata = new
HashMap<>();
- List<String> fileNames = paths.stream().map(path ->
path.substring(path.lastIndexOf("/") + 1)).collect(Collectors.toList());
- partitionMetadata.put(partition, new
HoodieSavepointPartitionMetadata(partition, fileNames));
- HoodieSavepointMetadata savepointMetadata =
- new HoodieSavepointMetadata("user", 1L, "comments",
partitionMetadata, 1);
- return
TimelineMetadataUtils.serializeSavepointMetadata(savepointMetadata);
- } catch (IOException ex) {
- throw new UncheckedIOException(ex);
- }
+ private static HoodieSavepointMetadata getSavepointBytes(String partition,
List<String> paths) {
+ Map<String, HoodieSavepointPartitionMetadata> partitionMetadata = new
HashMap<>();
+ List<String> fileNames = paths.stream().map(path ->
path.substring(path.lastIndexOf("/") + 1)).collect(Collectors.toList());
+ partitionMetadata.put(partition, new
HoodieSavepointPartitionMetadata(partition, fileNames));
+ return new HoodieSavepointMetadata("user", 1L, "comments",
partitionMetadata, 1);
}
- private static Pair<HoodieCleanMetadata, Option<byte[]>>
getCleanCommitMetadata(List<String> partitions, String instantTime, String
earliestCommitToRetain,
+ private static Pair<HoodieCleanMetadata, HoodieCleanMetadata>
getCleanCommitMetadata(List<String> partitions, String instantTime, String
earliestCommitToRetain,
String lastCompletedTime, Set<String> savepointsToTrack, Option<String>
earliestCommitToNotArchive) {
- try {
- Map<String, HoodieCleanPartitionMetadata> partitionMetadata = new
HashMap<>();
- partitions.forEach(partition -> partitionMetadata.put(partition, new
HoodieCleanPartitionMetadata(partition,
HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name(),
- Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), false)));
- Map<String, String> extraMetadata = new HashMap<>();
- extraMetadata.put(SAVEPOINTED_TIMESTAMPS,
savepointsToTrack.stream().collect(Collectors.joining(",")));
- HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata(instantTime,
100L, 10, earliestCommitToRetain, lastCompletedTime, partitionMetadata,
- CLEAN_METADATA_VERSION_2, Collections.EMPTY_MAP,
extraMetadata.isEmpty() ? null : extraMetadata);
- return Pair.of(cleanMetadata,
TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
- } catch (IOException ex) {
- throw new UncheckedIOException(ex);
- }
+ Map<String, HoodieCleanPartitionMetadata> partitionMetadata = new
HashMap<>();
+ partitions.forEach(partition -> partitionMetadata.put(partition, new
HoodieCleanPartitionMetadata(partition,
HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name(),
+ Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), false)));
+ Map<String, String> extraMetadata = new HashMap<>();
+ extraMetadata.put(SAVEPOINTED_TIMESTAMPS,
savepointsToTrack.stream().collect(Collectors.joining(",")));
+ HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata(instantTime,
100L, 10, earliestCommitToRetain, lastCompletedTime, partitionMetadata,
+ CLEAN_METADATA_VERSION_2, Collections.EMPTY_MAP,
extraMetadata.isEmpty() ? null : extraMetadata);
+ return Pair.of(cleanMetadata, cleanMetadata);
}
- private static Pair<HoodieSavepointMetadata, Option<byte[]>>
getSavepointMetadata(List<String> partitions) {
- try {
- Map<String, HoodieSavepointPartitionMetadata> partitionMetadata = new
HashMap<>();
- partitions.forEach(partition -> partitionMetadata.put(partition, new
HoodieSavepointPartitionMetadata(partition, Collections.emptyList())));
- HoodieSavepointMetadata savepointMetadata =
- new HoodieSavepointMetadata("user", 1L, "comments",
partitionMetadata, 1);
- return Pair.of(savepointMetadata,
TimelineMetadataUtils.serializeSavepointMetadata(savepointMetadata));
- } catch (IOException ex) {
- throw new UncheckedIOException(ex);
- }
+ private static Pair<HoodieSavepointMetadata, HoodieSavepointMetadata>
getSavepointMetadata(List<String> partitions) {
+ Map<String, HoodieSavepointPartitionMetadata> partitionMetadata = new
HashMap<>();
+ partitions.forEach(partition -> partitionMetadata.put(partition, new
HoodieSavepointPartitionMetadata(partition, Collections.emptyList())));
+ HoodieSavepointMetadata savepointMetadata =
+ new HoodieSavepointMetadata("user", 1L, "comments", partitionMetadata,
1);
+ return Pair.of(savepointMetadata, savepointMetadata);
Review Comment:
fixed
##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java:
##########
@@ -194,8 +197,8 @@ public void testArchivedClean() throws Exception {
HoodieLSMTimelineInstant archived =
MetadataConversionUtils.createLSMTimelineInstant(getActiveInstant(newCommitTime),
metaClient);
assertEquals(newCommitTime, archived.getInstantTime());
assertEquals(HoodieTimeline.CLEAN_ACTION, archived.getAction());
- assertDoesNotThrow(() -> CleanerUtils.getCleanerMetadata(metaClient,
archived.getMetadata().array()));
- assertDoesNotThrow(() ->
TimelineMetadataUtils.deserializeCleanerPlan(archived.getPlan().array()));
+ assertDoesNotThrow(() -> CleanerUtils.getCleanerMetadata(metaClient, new
ByteArrayInputStream(archived.getMetadata().array())));
+ assertDoesNotThrow(() -> deserializeAvroMetadata(new
ByteArrayInputStream(archived.getPlan().array()), HoodieCleanerPlan.class));
Review Comment:
it has to be kept as is because the byte[] is given, we have to use the low
level deserialize API instead of going to hoodie timeline.
##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java:
##########
@@ -587,49 +584,35 @@ private static HoodieFileGroup
buildFileGroup(List<String> baseFileCommitTimes,
return group;
}
- private static Option<byte[]> getSavepointBytes(String partition,
List<String> paths) {
- try {
- Map<String, HoodieSavepointPartitionMetadata> partitionMetadata = new
HashMap<>();
- List<String> fileNames = paths.stream().map(path ->
path.substring(path.lastIndexOf("/") + 1)).collect(Collectors.toList());
- partitionMetadata.put(partition, new
HoodieSavepointPartitionMetadata(partition, fileNames));
- HoodieSavepointMetadata savepointMetadata =
- new HoodieSavepointMetadata("user", 1L, "comments",
partitionMetadata, 1);
- return
TimelineMetadataUtils.serializeSavepointMetadata(savepointMetadata);
- } catch (IOException ex) {
- throw new UncheckedIOException(ex);
- }
+ private static HoodieSavepointMetadata getSavepointBytes(String partition,
List<String> paths) {
+ Map<String, HoodieSavepointPartitionMetadata> partitionMetadata = new
HashMap<>();
+ List<String> fileNames = paths.stream().map(path ->
path.substring(path.lastIndexOf("/") + 1)).collect(Collectors.toList());
+ partitionMetadata.put(partition, new
HoodieSavepointPartitionMetadata(partition, fileNames));
+ return new HoodieSavepointMetadata("user", 1L, "comments",
partitionMetadata, 1);
}
- private static Pair<HoodieCleanMetadata, Option<byte[]>>
getCleanCommitMetadata(List<String> partitions, String instantTime, String
earliestCommitToRetain,
+ private static Pair<HoodieCleanMetadata, HoodieCleanMetadata>
getCleanCommitMetadata(List<String> partitions, String instantTime, String
earliestCommitToRetain,
String lastCompletedTime, Set<String> savepointsToTrack, Option<String>
earliestCommitToNotArchive) {
- try {
- Map<String, HoodieCleanPartitionMetadata> partitionMetadata = new
HashMap<>();
- partitions.forEach(partition -> partitionMetadata.put(partition, new
HoodieCleanPartitionMetadata(partition,
HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name(),
- Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), false)));
- Map<String, String> extraMetadata = new HashMap<>();
- extraMetadata.put(SAVEPOINTED_TIMESTAMPS,
savepointsToTrack.stream().collect(Collectors.joining(",")));
- HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata(instantTime,
100L, 10, earliestCommitToRetain, lastCompletedTime, partitionMetadata,
- CLEAN_METADATA_VERSION_2, Collections.EMPTY_MAP,
extraMetadata.isEmpty() ? null : extraMetadata);
- return Pair.of(cleanMetadata,
TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
- } catch (IOException ex) {
- throw new UncheckedIOException(ex);
- }
+ Map<String, HoodieCleanPartitionMetadata> partitionMetadata = new
HashMap<>();
+ partitions.forEach(partition -> partitionMetadata.put(partition, new
HoodieCleanPartitionMetadata(partition,
HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name(),
+ Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), false)));
+ Map<String, String> extraMetadata = new HashMap<>();
+ extraMetadata.put(SAVEPOINTED_TIMESTAMPS,
savepointsToTrack.stream().collect(Collectors.joining(",")));
+ HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata(instantTime,
100L, 10, earliestCommitToRetain, lastCompletedTime, partitionMetadata,
+ CLEAN_METADATA_VERSION_2, Collections.EMPTY_MAP,
extraMetadata.isEmpty() ? null : extraMetadata);
+ return Pair.of(cleanMetadata, cleanMetadata);
Review Comment:
done
##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java:
##########
@@ -933,9 +926,8 @@ protected void testMetadataStatsOnCommit(boolean
populateMetaFields, Function tr
metaClient = createMetaClient();
instant = INSTANT_GENERATOR.createNewInstant(COMPLETED, COMMIT_ACTION,
instantTime1);
// Read from commit file
- metadata = metaClient.getCommitMetadataSerDe().deserialize(instant,
- metaClient.reloadActiveTimeline().getInstantDetails(instant).get(),
- HoodieCommitMetadata.class);
+ HoodieActiveTimeline activeTimeline1 = metaClient.reloadActiveTimeline();
+ metadata = activeTimeline1.loadInstantContent(instant,
HoodieCommitMetadata.class);
Review Comment:
done
##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java:
##########
@@ -244,17 +244,15 @@ public void testArchivedInsertOverwriteWithClustering()
throws Exception {
HoodieLSMTimelineInstant archived =
MetadataConversionUtils.createLSMTimelineInstant(getActiveInstant(newCommitTime),
metaClient);
assertEquals(newCommitTime, archived.getInstantTime());
assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, archived.getAction());
- assertDoesNotThrow(() ->
HoodieReplaceCommitMetadata.fromBytes(archived.getMetadata().array(),
HoodieReplaceCommitMetadata.class));
- assertDoesNotThrow(() ->
TimelineMetadataUtils.deserializeRequestedReplaceMetadata(archived.getPlan().array()));
+ assertDoesNotThrow(() -> deserializeAvroMetadata(new
ByteArrayInputStream(archived.getPlan().array()),
HoodieRequestedReplaceMetadata.class));
Review Comment:
same
##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java:
##########
@@ -244,17 +244,15 @@ public void testArchivedInsertOverwriteWithClustering()
throws Exception {
HoodieLSMTimelineInstant archived =
MetadataConversionUtils.createLSMTimelineInstant(getActiveInstant(newCommitTime),
metaClient);
assertEquals(newCommitTime, archived.getInstantTime());
assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, archived.getAction());
- assertDoesNotThrow(() ->
HoodieReplaceCommitMetadata.fromBytes(archived.getMetadata().array(),
HoodieReplaceCommitMetadata.class));
- assertDoesNotThrow(() ->
TimelineMetadataUtils.deserializeRequestedReplaceMetadata(archived.getPlan().array()));
+ assertDoesNotThrow(() -> deserializeAvroMetadata(new
ByteArrayInputStream(archived.getPlan().array()),
HoodieRequestedReplaceMetadata.class));
String newCommitTime2 = HoodieTestTable.makeNewCommitTime();
createReplace(newCommitTime2, WriteOperationType.INSERT_OVERWRITE_TABLE,
true);
// test conversion to archived instant
HoodieLSMTimelineInstant archived2 =
MetadataConversionUtils.createLSMTimelineInstant(getActiveInstant(newCommitTime2),
metaClient);
assertEquals(newCommitTime2, archived2.getInstantTime());
assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, archived2.getAction());
- assertDoesNotThrow(() ->
HoodieReplaceCommitMetadata.fromBytes(archived2.getMetadata().array(),
HoodieReplaceCommitMetadata.class));
- assertDoesNotThrow(() ->
TimelineMetadataUtils.deserializeRequestedReplaceMetadata(archived2.getPlan().array()));
+ assertDoesNotThrow(() -> deserializeAvroMetadata(new
ByteArrayInputStream(archived2.getPlan().array()),
HoodieRequestedReplaceMetadata.class));
Review Comment:
same
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java:
##########
@@ -395,19 +387,15 @@ public static HoodieReplaceCommitMetadata
convertReplaceCommitMetadataToPojo(org
}
/**
- * Convert commit metadata from avro to json.
+ * Convert replacecommit metadata from avro to pojo.
*/
- public static <T extends SpecificRecordBase> byte[]
convertCommitMetadataToJsonBytes(T avroMetaData, Class<T> clazz) {
- Schema avroSchema = clazz ==
org.apache.hudi.avro.model.HoodieReplaceCommitMetadata.class ?
org.apache.hudi.avro.model.HoodieReplaceCommitMetadata.getClassSchema() :
- org.apache.hudi.avro.model.HoodieCommitMetadata.getClassSchema();
- try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
- JsonEncoder jsonEncoder = new JsonEncoder(avroSchema, outputStream);
- DatumWriter<GenericRecord> writer = avroMetaData instanceof
SpecificRecord ? new SpecificDatumWriter<>(avroSchema) : new
GenericDatumWriter<>(avroSchema);
- writer.write(avroMetaData, jsonEncoder);
- jsonEncoder.flush();
- return outputStream.toByteArray();
- } catch (IOException e) {
- throw new HoodieIOException("Failed to convert to JSON.", e);
+ public static HoodieReplaceCommitMetadata
convertReplaceCommitMetadataAvroToPojo(org.apache.hudi.avro.model.HoodieReplaceCommitMetadata
replaceCommitMetadata) {
Review Comment:
duplicated code, removed.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java:
##########
@@ -173,72 +175,73 @@ public static HoodieArchivedMetaEntry createMetaWrapper(
archivedMetaWrapper.setActionState(HoodieInstant.State.COMPLETED.name());
archivedMetaWrapper.setStateTransitionTime(completionTime);
String actionType =
lsmTimelineRecord.get(ArchivedTimelineV2.ACTION_ARCHIVED_META_FIELD).toString();
- HoodieInstant instant =
metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED,
actionType, instantTime, completionTime);
+ HoodieInstant hoodieInstant =
metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED,
actionType, instantTime, completionTime);
switch (actionType) {
case HoodieTimeline.CLEAN_ACTION: {
-
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
instantDetails.get()));
-
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
planBytes.get()));
+
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
new ByteArrayInputStream(instantDetails.get())));
+
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
new ByteArrayInputStream(planBytes.get())));
archivedMetaWrapper.setActionType(ActionType.clean.name());
break;
}
case HoodieTimeline.COMMIT_ACTION: {
- HoodieCommitMetadata commitMetadata =
metaClient.getCommitMetadataSerDe().deserialize(instant, instantDetails.get(),
HoodieCommitMetadata.class);
-
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata));
+ getCommitMetadata(metaClient, hoodieInstant,
HoodieCommitMetadata.class)
+ .ifPresent(commitMetadata ->
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata)));
archivedMetaWrapper.setActionType(ActionType.commit.name());
if (planBytes.isPresent()) {
// this should be a compaction
- HoodieCompactionPlan plan =
CompactionUtils.getCompactionPlan(metaClient, planBytes);
+ HoodieCompactionPlan plan =
CompactionUtils.getCompactionPlan(metaClient, new
ByteArrayInputStream(planBytes.get()));
archivedMetaWrapper.setHoodieCompactionPlan(plan);
}
break;
}
case HoodieTimeline.DELTA_COMMIT_ACTION: {
- HoodieCommitMetadata deltaCommitMetadata =
metaClient.getCommitMetadataSerDe().deserialize(instant, instantDetails.get(),
HoodieCommitMetadata.class);
-
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(deltaCommitMetadata));
+ getCommitMetadata(metaClient, hoodieInstant,
HoodieCommitMetadata.class)
+ .ifPresent(commitMetadata ->
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata)));
archivedMetaWrapper.setActionType(ActionType.deltacommit.name());
if (planBytes.isPresent()) {
// this should be a log compaction
- HoodieCompactionPlan plan =
CompactionUtils.getCompactionPlan(metaClient, planBytes);
+ HoodieCompactionPlan plan =
CompactionUtils.getCompactionPlan(metaClient, new
ByteArrayInputStream(planBytes.get()));
archivedMetaWrapper.setHoodieCompactionPlan(plan);
}
break;
}
case HoodieTimeline.REPLACE_COMMIT_ACTION:
case HoodieTimeline.CLUSTERING_ACTION: {
- HoodieReplaceCommitMetadata replaceCommitMetadata =
HoodieReplaceCommitMetadata.fromBytes(instantDetails.get(),
HoodieReplaceCommitMetadata.class);
-
archivedMetaWrapper.setHoodieReplaceCommitMetadata(convertReplaceCommitMetadataToAvro(replaceCommitMetadata));
+ getCommitMetadata(metaClient, hoodieInstant,
HoodieReplaceCommitMetadata.class)
+ .ifPresent(replaceCommitMetadata ->
archivedMetaWrapper.setHoodieReplaceCommitMetadata(convertReplaceCommitMetadataToAvro(replaceCommitMetadata)));
// inflight replacecommit files have the same metadata body as
HoodieCommitMetadata
// so we could re-use it without further creating an inflight
extension.
// Or inflight replacecommit files are empty under clustering
circumstance
- Option<HoodieCommitMetadata> inflightCommitMetadata =
getInflightCommitMetadata(metaClient, instant, instantDetails);
+ Option<HoodieCommitMetadata> inflightCommitMetadata =
getCommitMetadata(metaClient, hoodieInstant, HoodieCommitMetadata.class);
if (inflightCommitMetadata.isPresent()) {
archivedMetaWrapper.setHoodieInflightReplaceMetadata(convertCommitMetadata(inflightCommitMetadata.get()));
}
- archivedMetaWrapper.setActionType(ActionType.replacecommit.name());
+ archivedMetaWrapper.setActionType(
+
hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION) ?
ActionType.replacecommit.name() : ActionType.clustering.name());
Review Comment:
done. we discussed offline and figured out the right way
##########
hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java:
##########
@@ -107,21 +109,18 @@ public static HoodieCleanMetadata
convertCleanMetadata(String startCleanTime,
*/
public static HoodieCleanMetadata getCleanerMetadata(HoodieTableMetaClient
metaClient, HoodieInstant cleanInstant)
throws IOException {
- CleanMetadataMigrator metadataMigrator = new
CleanMetadataMigrator(metaClient);
- HoodieCleanMetadata cleanMetadata =
TimelineMetadataUtils.deserializeHoodieCleanMetadata(
-
metaClient.getActiveTimeline().readCleanerInfoAsBytes(cleanInstant).get());
- return metadataMigrator.upgradeToLatest(cleanMetadata,
cleanMetadata.getVersion());
+ HoodieCleanMetadata cleanMetadata =
metaClient.getActiveTimeline().loadHoodieCleanMetadata(cleanInstant);
+ return upgradeCleanMetadata(metaClient, cleanMetadata);
}
- /**
- * Get Latest Version of Hoodie Cleaner Metadata - Output of cleaner
operation.
- * @return Latest version of Clean metadata corresponding to clean instant
- * @throws IOException
- */
- public static HoodieCleanMetadata getCleanerMetadata(HoodieTableMetaClient
metaClient, byte[] details)
+ public static HoodieCleanMetadata getCleanerMetadata(HoodieTableMetaClient
metaClient, InputStream inputStream)
throws IOException {
+ HoodieCleanMetadata cleanMetadata = deserializeAvroMetadata(inputStream,
HoodieCleanMetadata.class);
+ return upgradeCleanMetadata(metaClient, cleanMetadata);
+ }
Review Comment:
we need this as some caller hand craft cleaner metadata bytes instead of
reading from timeline, we need to expose API to handle this usage
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ArchivedTimelineV2.java:
##########
@@ -156,7 +155,11 @@ public Option<byte[]> getInstantDetails(HoodieInstant
instant) {
@Override
public InputStream getContentStream(HoodieInstant instant) {
- return new ByteArrayInputStream(getInstantDetails(instant).orElseGet(() ->
new byte[0]));
+ Option<InputStream> stream = getInputStreamOptionLegacy(this, instant);
+ if (stream.isEmpty()) {
+ return new ByteArrayInputStream(new byte[]{});
+ }
+ return stream.get();
Review Comment:
requires consumer code change, like for archived timeline we don't do input
stream as it is different logic
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java:
##########
@@ -166,7 +168,11 @@ public Option<byte[]> getInstantDetails(HoodieInstant
instant) {
@Override
public InputStream getContentStream(HoodieInstant instant) {
- return new ByteArrayInputStream(getInstantDetails(instant).orElseGet(() ->
new byte[0]));
+ Option<InputStream> stream = getInputStreamOptionLegacy(this, instant);
+ if (stream.isEmpty()) {
+ return new ByteArrayInputStream(new byte[]{});
+ }
+ return stream.get();
Review Comment:
no, we need to keep this the same behavior as day 1 as archival timeline
also use it. for archive timeline we need extra effort to support stream IO
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java:
##########
@@ -332,24 +335,13 @@ public static HoodieArchivedMetaEntry
createMetaWrapperForEmptyInstant(HoodieIns
return archivedMetaWrapper;
}
- private static Option<HoodieCommitMetadata>
getInflightCommitMetadata(HoodieTableMetaClient metaClient, HoodieInstant
instant,
-
Option<byte[]> inflightContent) throws IOException {
- if (!inflightContent.isPresent() || inflightContent.get().length == 0) {
- // inflight files can be empty in some certain cases, e.g. when users
opt in clustering
+ private static <T extends HoodieCommitMetadata> Option<T>
getCommitMetadata(HoodieTableMetaClient metaClient, HoodieInstant instant,
Class<T> clazz) throws IOException {
+ T commitMetadata =
metaClient.getActiveTimeline().loadInstantContent(instant, clazz);
+ // an empty file will return the default instance with an UNKNOWN
operation type and in that case we return an empty option
+ if (commitMetadata.getOperationType() == WriteOperationType.UNKNOWN) {
Review Comment:
in real world we should not have - a valid write operation should always set
valid op type, test can inject this.
##########
hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java:
##########
@@ -169,22 +168,14 @@ public static Option<HoodieInstant>
getEarliestCommitToRetain(
public static HoodieCleanerPlan getCleanerPlan(HoodieTableMetaClient
metaClient, HoodieInstant cleanInstant)
throws IOException {
CleanPlanMigrator cleanPlanMigrator = new CleanPlanMigrator(metaClient);
- HoodieCleanerPlan cleanerPlan =
TimelineMetadataUtils.deserializeAvroMetadata(
-
metaClient.getActiveTimeline().readCleanerInfoAsBytes(cleanInstant).get(),
HoodieCleanerPlan.class);
+ HoodieCleanerPlan cleanerPlan =
metaClient.getActiveTimeline().loadCleanerPlan(cleanInstant);
return cleanPlanMigrator.upgradeToLatest(cleanerPlan,
cleanerPlan.getVersion());
}
- /**
- * Get Latest version of cleaner plan corresponding to a clean instant.
- *
- * @param metaClient Hoodie Table Meta Client
- * @return Cleaner plan corresponding to clean instant
- * @throws IOException
- */
- public static HoodieCleanerPlan getCleanerPlan(HoodieTableMetaClient
metaClient, byte[] details)
+ public static HoodieCleanerPlan getCleanerPlan(HoodieTableMetaClient
metaClient, InputStream in)
Review Comment:
same, some caller craft arbitrary metadata and get bytes for
deserialization, which requires this API. they do not read from timeline
##########
hudi-common/src/test/java/org/apache/hudi/common/table/timeline/versioning/BaseTestCommitMetadataSerDe.java:
##########
@@ -72,7 +85,7 @@ protected void testEmptyMetadataSerDe() throws Exception {
assertTrue(serialized.isPresent());
// Deserialize
- HoodieCommitMetadata deserialized = serDe.deserialize(instant,
serialized.get(), HoodieCommitMetadata.class);
+ HoodieCommitMetadata deserialized = serDe.deserialize(instant, new
ByteArrayInputStream(serialized.get()), () -> true, HoodieCommitMetadata.class);
Review Comment:
done
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java:
##########
@@ -244,13 +242,11 @@ private boolean isAnySavepointDeleted(HoodieCleanMetadata
cleanMetadata) {
private Stream<String> getPartitionsForInstants(HoodieInstant instant) {
try {
if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
- HoodieReplaceCommitMetadata replaceCommitMetadata =
HoodieReplaceCommitMetadata.fromBytes(
- hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
HoodieReplaceCommitMetadata.class);
+ TimelineLayout layout =
TimelineLayout.fromVersion(hoodieTable.getMetaClient().getTimelineLayoutVersion());
Review Comment:
removed
##########
hudi-common/src/test/java/org/apache/hudi/common/table/timeline/versioning/BaseTestCommitMetadataSerDe.java:
##########
@@ -178,7 +289,7 @@ private HoodieWriteStat createTestWriteStat() {
return writeStat;
}
- private void verifyCommitMetadata(HoodieCommitMetadata metadata) {
+ public void verifyCommitMetadata(HoodieCommitMetadata metadata) {
Review Comment:
done
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java:
##########
@@ -189,18 +188,17 @@ protected Option<HoodieCleanerPlan> requestClean(String
startCleanTime) {
if (lastClean.isPresent()) {
HoodieInstant cleanInstant = lastClean.get();
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+ HoodieInstant cleanPlanInstant = new
HoodieInstant(HoodieInstant.State.INFLIGHT, cleanInstant.getAction(),
cleanInstant.requestedTime(),
InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR);
Review Comment:
done
##########
hudi-common/src/test/java/org/apache/hudi/common/table/timeline/versioning/BaseTestCommitMetadataSerDe.java:
##########
@@ -194,7 +305,7 @@ private void
verifyReplaceCommitMetadata(HoodieReplaceCommitMetadata metadata) {
assertEquals("test-value-2",
metadata.getExtraMetadata().get("test-key-2"));
}
- private void verifyWriteStat(HoodieWriteStat stat) {
+ public void verifyWriteStat(HoodieWriteStat stat) {
Review Comment:
done
##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java:
##########
@@ -206,8 +209,7 @@ public void testArchivedReplace() throws Exception {
HoodieLSMTimelineInstant archived =
MetadataConversionUtils.createLSMTimelineInstant(getActiveInstant(newCommitTime),
metaClient);
assertEquals(newCommitTime, archived.getInstantTime());
assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, archived.getAction());
- assertDoesNotThrow(() ->
HoodieReplaceCommitMetadata.fromBytes(archived.getMetadata().array(),
HoodieReplaceCommitMetadata.class));
- assertDoesNotThrow(() ->
TimelineMetadataUtils.deserializeRequestedReplaceMetadata(archived.getPlan().array()));
+ assertDoesNotThrow(() -> deserializeAvroMetadata(new
ByteArrayInputStream(archived.getPlan().array()),
HoodieRequestedReplaceMetadata.class));
Review Comment:
HoodieReplaceCommitMetadata.fromBytes should not be used at all, so this
coverage is not needed
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java:
##########
@@ -38,11 +38,12 @@
import org.apache.hudi.exception.HoodieTimeTravelException;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
-
Review Comment:
done
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/CommitMetadataSerDeV1.java:
##########
@@ -24,30 +24,41 @@
import org.apache.hudi.common.util.JsonUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.avro.specific.SpecificRecordBase;
+
import java.io.IOException;
+import java.io.InputStream;
+import java.util.function.BooleanSupplier;
-import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;
+import static
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAvroMetadata;
public class CommitMetadataSerDeV1 implements CommitMetadataSerDe {
@Override
- public <T> T deserialize(HoodieInstant instant, byte[] bytes, Class<T>
clazz) throws IOException {
+ public <T> T deserialize(HoodieInstant instant, InputStream inputStream,
BooleanSupplier isEmptyInstant, Class<T> clazz) throws IOException {
try {
- if (bytes.length == 0) {
- return clazz.newInstance();
+ // For commit metadata we need special case handling as they are using
non avro type in memory.
Review Comment:
done
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java:
##########
@@ -56,100 +52,106 @@
*/
public class MetadataConversionUtils {
- public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant
hoodieInstant, HoodieTableMetaClient metaClient) throws IOException {
- Option<byte[]> instantDetails =
metaClient.getActiveTimeline().getInstantDetails(hoodieInstant);
- if (hoodieInstant.isCompleted() && instantDetails.get().length == 0) {
- // in local FS and HDFS, there could be empty completed instants due to
crash.
- // let's add an entry to the archival, even if not for the plan.
- return createMetaWrapperForEmptyInstant(hoodieInstant);
- }
- HoodieArchivedMetaEntry archivedMetaWrapper = new
HoodieArchivedMetaEntry();
- archivedMetaWrapper.setCommitTime(hoodieInstant.requestedTime());
- archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
-
archivedMetaWrapper.setStateTransitionTime(hoodieInstant.getCompletionTime());
- switch (hoodieInstant.getAction()) {
- case HoodieTimeline.CLEAN_ACTION: {
- if (hoodieInstant.isCompleted()) {
-
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
instantDetails.get()));
- } else {
-
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
instantDetails.get()));
+ public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant
hoodieInstant, HoodieTableMetaClient metaClient) {
+ try {
+ HoodieArchivedMetaEntry archivedMetaWrapper = new
HoodieArchivedMetaEntry();
+ archivedMetaWrapper.setCommitTime(hoodieInstant.requestedTime());
+ archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
+
archivedMetaWrapper.setStateTransitionTime(hoodieInstant.getCompletionTime());
+ CommitMetadataSerDe serDe = metaClient.getCommitMetadataSerDe();
+ switch (hoodieInstant.getAction()) {
+ case HoodieTimeline.CLEAN_ACTION: {
+ if (hoodieInstant.isCompleted()) {
+
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
hoodieInstant));
+ } else {
+
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
hoodieInstant));
+ }
+ archivedMetaWrapper.setActionType(ActionType.clean.name());
+ break;
}
- archivedMetaWrapper.setActionType(ActionType.clean.name());
- break;
- }
- case HoodieTimeline.COMMIT_ACTION: {
- HoodieCommitMetadata commitMetadata =
metaClient.getCommitMetadataSerDe().deserialize(hoodieInstant,
instantDetails.get(), HoodieCommitMetadata.class);
-
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata));
- archivedMetaWrapper.setActionType(ActionType.commit.name());
- break;
- }
- case HoodieTimeline.DELTA_COMMIT_ACTION: {
- HoodieCommitMetadata deltaCommitMetadata =
metaClient.getCommitMetadataSerDe().deserialize(hoodieInstant,
instantDetails.get(), HoodieCommitMetadata.class);
-
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(deltaCommitMetadata));
- archivedMetaWrapper.setActionType(ActionType.deltacommit.name());
- break;
- }
- case HoodieTimeline.REPLACE_COMMIT_ACTION:
- case HoodieTimeline.CLUSTERING_ACTION: {
- if (hoodieInstant.isCompleted()) {
- HoodieReplaceCommitMetadata replaceCommitMetadata =
HoodieReplaceCommitMetadata.fromBytes(instantDetails.get(),
HoodieReplaceCommitMetadata.class);
-
archivedMetaWrapper.setHoodieReplaceCommitMetadata(convertReplaceCommitMetadataToAvro(replaceCommitMetadata));
- } else if (hoodieInstant.isInflight()) {
- // inflight replacecommit files have the same metadata body as
HoodieCommitMetadata
- // so we could re-use it without further creating an inflight
extension.
- // Or inflight replacecommit files are empty under clustering
circumstance
- Option<HoodieCommitMetadata> inflightCommitMetadata =
getInflightCommitMetadata(metaClient, hoodieInstant, instantDetails);
- if (inflightCommitMetadata.isPresent()) {
-
archivedMetaWrapper.setHoodieInflightReplaceMetadata(convertCommitMetadata(inflightCommitMetadata.get()));
+ case HoodieTimeline.COMMIT_ACTION: {
+ getCommitMetadata(metaClient, hoodieInstant,
HoodieCommitMetadata.class)
+ .ifPresent(commitMetadata ->
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata)));
+ archivedMetaWrapper.setActionType(ActionType.commit.name());
+ break;
+ }
+ case HoodieTimeline.DELTA_COMMIT_ACTION: {
+ getCommitMetadata(metaClient, hoodieInstant,
HoodieCommitMetadata.class)
+ .ifPresent(deltaCommitMetadata ->
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(deltaCommitMetadata)));
+ archivedMetaWrapper.setActionType(ActionType.deltacommit.name());
+ break;
+ }
+ case HoodieTimeline.REPLACE_COMMIT_ACTION:
+ case HoodieTimeline.CLUSTERING_ACTION: {
+ if (hoodieInstant.isCompleted()) {
+ getCommitMetadata(metaClient, hoodieInstant,
HoodieReplaceCommitMetadata.class)
+ .ifPresent(replaceCommitMetadata ->
archivedMetaWrapper.setHoodieReplaceCommitMetadata(convertReplaceCommitMetadataToAvro(replaceCommitMetadata)));
+ } else if (hoodieInstant.isInflight()) {
+ // inflight replacecommit files have the same metadata body as
HoodieCommitMetadata
+ // so we could re-use it without further creating an inflight
extension.
+ // Or inflight replacecommit files are empty under clustering
circumstance
+ Option<HoodieCommitMetadata> inflightCommitMetadata =
getCommitMetadata(metaClient, hoodieInstant, HoodieCommitMetadata.class);
+ if (inflightCommitMetadata.isPresent()) {
+
archivedMetaWrapper.setHoodieInflightReplaceMetadata(convertCommitMetadata(inflightCommitMetadata.get()));
+ }
+ } else {
+ // we may have cases with empty HoodieRequestedReplaceMetadata
e.g. insert_overwrite_table or insert_overwrite
+ // without clustering. However, we should revisit the requested
commit file standardization
+ Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata =
Option.of(metaClient.getActiveTimeline()
+ .loadRequestedReplaceMetadata(hoodieInstant));
+ if (requestedReplaceMetadata.isPresent()) {
+
archivedMetaWrapper.setHoodieRequestedReplaceMetadata(requestedReplaceMetadata.get());
+ }
}
- } else {
- // we may have cases with empty HoodieRequestedReplaceMetadata e.g.
insert_overwrite_table or insert_overwrite
- // without clustering. However, we should revisit the requested
commit file standardization
- Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata =
getRequestedReplaceMetadata(instantDetails);
- if (requestedReplaceMetadata.isPresent()) {
-
archivedMetaWrapper.setHoodieRequestedReplaceMetadata(requestedReplaceMetadata.get());
+ archivedMetaWrapper.setActionType(
+
hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION) ?
ActionType.replacecommit.name() : ActionType.clustering.name());
+ break;
+ }
+ case HoodieTimeline.ROLLBACK_ACTION: {
+ if (hoodieInstant.isCompleted()) {
+
archivedMetaWrapper.setHoodieRollbackMetadata(metaClient.getActiveTimeline().loadInstantContent(hoodieInstant,
HoodieRollbackMetadata.class));
}
+ archivedMetaWrapper.setActionType(ActionType.rollback.name());
+ break;
}
- archivedMetaWrapper.setActionType(
-
hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION) ?
ActionType.replacecommit.name() : ActionType.clustering.name());
- break;
- }
- case HoodieTimeline.ROLLBACK_ACTION: {
- if (hoodieInstant.isCompleted()) {
-
archivedMetaWrapper.setHoodieRollbackMetadata(TimelineMetadataUtils.deserializeAvroMetadata(instantDetails.get(),
HoodieRollbackMetadata.class));
+ case HoodieTimeline.SAVEPOINT_ACTION: {
+
archivedMetaWrapper.setHoodieSavePointMetadata(metaClient.getActiveTimeline().loadInstantContent(hoodieInstant,
HoodieSavepointMetadata.class));
+ archivedMetaWrapper.setActionType(ActionType.savepoint.name());
+ break;
}
- archivedMetaWrapper.setActionType(ActionType.rollback.name());
- break;
- }
- case HoodieTimeline.SAVEPOINT_ACTION: {
-
archivedMetaWrapper.setHoodieSavePointMetadata(TimelineMetadataUtils.deserializeAvroMetadata(instantDetails.get(),
HoodieSavepointMetadata.class));
- archivedMetaWrapper.setActionType(ActionType.savepoint.name());
- break;
- }
- case HoodieTimeline.COMPACTION_ACTION: {
- if (hoodieInstant.isRequested()) {
- HoodieCompactionPlan plan =
CompactionUtils.getCompactionPlan(metaClient, instantDetails);
- archivedMetaWrapper.setHoodieCompactionPlan(plan);
+ case HoodieTimeline.COMPACTION_ACTION: {
+ if (hoodieInstant.isRequested()) {
+ HoodieCompactionPlan plan =
CompactionUtils.getCompactionPlan(metaClient, hoodieInstant);
+ archivedMetaWrapper.setHoodieCompactionPlan(plan);
+ }
+ archivedMetaWrapper.setActionType(ActionType.compaction.name());
+ break;
}
- archivedMetaWrapper.setActionType(ActionType.compaction.name());
- break;
- }
- case HoodieTimeline.LOG_COMPACTION_ACTION: {
- if (hoodieInstant.isRequested()) {
- HoodieCompactionPlan plan =
CompactionUtils.getCompactionPlan(metaClient, instantDetails);
- archivedMetaWrapper.setHoodieCompactionPlan(plan);
+ case HoodieTimeline.LOG_COMPACTION_ACTION: {
+ if (hoodieInstant.isRequested()) {
+ HoodieCompactionPlan plan =
CompactionUtils.getCompactionPlan(metaClient, hoodieInstant);
+ archivedMetaWrapper.setHoodieCompactionPlan(plan);
+ }
+ archivedMetaWrapper.setActionType(ActionType.logcompaction.name());
+ break;
+ }
+ default: {
+ throw new UnsupportedOperationException("Action not fully supported
yet");
}
- archivedMetaWrapper.setActionType(ActionType.logcompaction.name());
- break;
}
- default: {
- throw new UnsupportedOperationException("Action not fully supported
yet");
+ return archivedMetaWrapper;
+ } catch (IOException | HoodieIOException ex) {
+ if (metaClient.getActiveTimeline().isEmpty(hoodieInstant)) {
+ // in local FS and HDFS, there could be empty completed instants due
to crash.
+ // let's add an entry to the archival, even if not for the plan.
+ return createMetaWrapperForEmptyInstant(hoodieInstant);
}
+ throw new HoodieException(ex);
}
- return archivedMetaWrapper;
}
/**
+ * TODO(reviewers) - new code applied similar refactoring, please pay close
attention.
Review Comment:
done
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ArchivedTimelineV2.java:
##########
@@ -249,4 +251,9 @@ public HoodieTimeline getWriteTimeline() {
readCommits.containsKey(i.requestedTime()))
.filter(s -> validActions.contains(s.getAction())), instantReader);
}
+
+ @Override
+ public boolean isEmpty(HoodieInstant instant) {
+ return getInstantDetails(instant).isEmpty();
Review Comment:
no, archived timeline we need extra effort to support input stream
##########
hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java:
##########
@@ -168,7 +169,7 @@ public void testCommitMetadataSerde() throws Exception {
HoodieInstant instant =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, "commit",
"1");
org.apache.hudi.common.model.HoodieCommitMetadata commitMetadata1 =
COMMIT_METADATA_SER_DE.deserialize(instant,
- serializedCommitMetadata,
org.apache.hudi.common.model.HoodieCommitMetadata.class);
+ new ByteArrayInputStream(serializedCommitMetadata), () -> true,
org.apache.hudi.common.model.HoodieCommitMetadata.class);
Review Comment:
done
##########
hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java:
##########
@@ -169,22 +168,14 @@ public static Option<HoodieInstant>
getEarliestCommitToRetain(
public static HoodieCleanerPlan getCleanerPlan(HoodieTableMetaClient
metaClient, HoodieInstant cleanInstant)
throws IOException {
CleanPlanMigrator cleanPlanMigrator = new CleanPlanMigrator(metaClient);
- HoodieCleanerPlan cleanerPlan =
TimelineMetadataUtils.deserializeAvroMetadata(
-
metaClient.getActiveTimeline().readCleanerInfoAsBytes(cleanInstant).get(),
HoodieCleanerPlan.class);
+ HoodieCleanerPlan cleanerPlan =
metaClient.getActiveTimeline().loadCleanerPlan(cleanInstant);
return cleanPlanMigrator.upgradeToLatest(cleanerPlan,
cleanerPlan.getVersion());
}
- /**
- * Get Latest version of cleaner plan corresponding to a clean instant.
- *
- * @param metaClient Hoodie Table Meta Client
- * @return Cleaner plan corresponding to clean instant
- * @throws IOException
- */
- public static HoodieCleanerPlan getCleanerPlan(HoodieTableMetaClient
metaClient, byte[] details)
+ public static HoodieCleanerPlan getCleanerPlan(HoodieTableMetaClient
metaClient, InputStream in)
throws IOException {
CleanPlanMigrator cleanPlanMigrator = new CleanPlanMigrator(metaClient);
- HoodieCleanerPlan cleanerPlan =
TimelineMetadataUtils.deserializeAvroMetadata(details, HoodieCleanerPlan.class);
+ HoodieCleanerPlan cleanerPlan =
TimelineMetadataUtils.deserializeAvroMetadata(in, HoodieCleanerPlan.class);
Review Comment:
removed unused function
##########
hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java:
##########
@@ -190,22 +189,47 @@ private static Option<HoodieRequestedReplaceMetadata>
getRequestedReplaceMetadat
String action = factory instanceof InstantGeneratorV2 ?
HoodieTimeline.CLUSTERING_ACTION : HoodieTimeline.REPLACE_COMMIT_ACTION;
requestedInstant =
factory.createNewInstant(HoodieInstant.State.REQUESTED, action,
pendingReplaceOrClusterInstant.requestedTime());
}
- Option<byte[]> content = Option.empty();
try {
- content = timeline.getInstantDetails(requestedInstant);
+ // First assume the instant file is not empty and parse it.
+ return getHoodieRequestedReplaceMetadataOption(timeline,
pendingReplaceOrClusterInstant, factory, requestedInstant);
+ } catch (Exception ex) {
+ // If anything goes wrong, check if this is empty file.
+ if (isEmptyReplaceOrClusteringInstant(timeline,
pendingReplaceOrClusterInstant, factory, requestedInstant)) {
+ return Option.empty();
+ }
+ // If still no luck, throw the exception.
+ throw ex;
+ }
+ }
+
+ private static Option<HoodieRequestedReplaceMetadata>
getHoodieRequestedReplaceMetadataOption(
Review Comment:
done
##########
hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java:
##########
@@ -182,7 +183,7 @@ public void testCommitMetadataSerde() throws Exception {
byte[] v1Bytes = v1SerDe.serialize(commitMetadata1).get();
System.out.println(new String(v1Bytes));
org.apache.hudi.common.model.HoodieCommitMetadata commitMetadata2 =
- COMMIT_METADATA_SER_DE.deserialize(legacyInstant, v1Bytes,
org.apache.hudi.common.model.HoodieCommitMetadata.class);
+ COMMIT_METADATA_SER_DE.deserialize(legacyInstant, new
ByteArrayInputStream(v1Bytes), () -> true,
org.apache.hudi.common.model.HoodieCommitMetadata.class);
Review Comment:
done
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketAssignFunction.java:
##########
@@ -135,9 +136,9 @@ private ConsistentBucketIdentifier
getBucketIdentifier(String partition) {
public void snapshotState(FunctionSnapshotContext functionSnapshotContext)
throws Exception {
HoodieTimeline timeline =
writeClient.getHoodieTable().getActiveTimeline().getCompletedReplaceTimeline().findInstantsAfter(lastRefreshInstant);
if (!timeline.empty()) {
+ TimelineLayout layout =
TimelineLayout.fromVersion(timeline.getTimelineLayoutVersion());
Review Comment:
removed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]