the-other-tim-brown commented on code in PR #12826:
URL: https://github.com/apache/hudi/pull/12826#discussion_r1955305945
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java:
##########
@@ -394,4 +423,12 @@ public static <T extends SpecificRecordBase> byte[]
convertCommitMetadataToJsonB
throw new HoodieIOException("Failed to convert to JSON.", e);
}
}
+
+ public static boolean isEmptyStream(InputStream inputStream) throws
IOException {
Review Comment:
It looks like this method is unused?
##########
hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java:
##########
@@ -190,22 +190,21 @@ private static Option<HoodieRequestedReplaceMetadata>
getRequestedReplaceMetadat
String action = factory instanceof InstantGeneratorV2 ?
HoodieTimeline.CLUSTERING_ACTION : HoodieTimeline.REPLACE_COMMIT_ACTION;
requestedInstant =
factory.createNewInstant(HoodieInstant.State.REQUESTED, action,
pendingReplaceOrClusterInstant.requestedTime());
}
- Option<byte[]> content = Option.empty();
+ if (timeline.isEmpty(requestedInstant)) {
Review Comment:
Can we move this to the error handling?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantReader.java:
##########
@@ -31,21 +31,23 @@
public interface HoodieInstantReader {
/**
* Reads the provided instant's content into a stream for parsing.
+ *
* @param instant the instant to read
* @return an InputStream with the content
*/
- default InputStream getContentStream(HoodieInstant instant) {
+ default Option<InputStream> getContentStream(HoodieInstant instant) {
throw new RuntimeException("Not implemented");
}
/**
+ * dd
* Reads the provided instant's content into a byte array for parsing.
* @param instant the instant to read
* @return an InputStream with the details
*/
@Deprecated
default Option<byte[]> getInstantDetails(HoodieInstant instant) {
- try (InputStream inputStream = getContentStream(instant)) {
+ try (InputStream inputStream = getContentStream(instant).get()) {
Review Comment:
Should this first check if the Option is present?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java:
##########
@@ -378,6 +394,19 @@ private static
org.apache.hudi.avro.model.HoodieReplaceCommitMetadata convertRep
return JsonUtils.getObjectMapper().convertValue(replaceCommitMetadata,
org.apache.hudi.avro.model.HoodieReplaceCommitMetadata.class);
}
+ /**
+ * Convert replacecommit metadata from json to avro.
+ */
+ public static HoodieReplaceCommitMetadata
convertReplaceCommitMetadataAvroToPojo(org.apache.hudi.avro.model.HoodieReplaceCommitMetadata
replaceCommitMetadata) {
+ if (replaceCommitMetadata.getPartitionToWriteStats() != null) {
+ replaceCommitMetadata.getPartitionToWriteStats().remove(null);
+ }
+ if (replaceCommitMetadata.getPartitionToReplaceFileIds() != null) {
+ replaceCommitMetadata.getPartitionToReplaceFileIds().remove(null);
+ }
+ return JsonUtils.getObjectMapper().convertValue(replaceCommitMetadata,
HoodieReplaceCommitMetadata.class);
+ }
+
Review Comment:
The method below this one is now showing as unused, can it be cleaned up?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java:
##########
@@ -193,10 +193,9 @@ protected Option<HoodieCleanerPlan> requestClean(String
startCleanTime) {
activeTimeline.deleteEmptyInstantIfExists(cleanInstant);
HoodieInstant cleanPlanInstant = new
HoodieInstant(HoodieInstant.State.INFLIGHT, cleanInstant.getAction(),
cleanInstant.requestedTime(),
InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR);
try {
- Option<byte[]> content =
activeTimeline.getInstantDetails(cleanPlanInstant);
// Deserialize plan if it is non-empty
- if (content.map(bytes -> bytes.length > 0).orElse(false)) {
- return
Option.of(TimelineMetadataUtils.deserializeCleanerPlan(content.get()));
+ if (!activeTimeline.isEmpty(cleanPlanInstant)) {
Review Comment:
Can we try to parse the plan first and then during the exception handling
check if it is empty?
##########
hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java:
##########
@@ -202,7 +202,7 @@ private Long getLatestCleanTimeTakenInMillis() throws
IOException {
HoodieInstant clean =
timeline.getReverseOrderedInstants().findFirst().orElse(null);
if (clean != null) {
HoodieCleanMetadata cleanMetadata =
-
TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(clean).get());
+
TimelineMetadataUtils.deserializeHoodieCleanMetadataLegacy(timeline.getInstantDetails(clean).get());
Review Comment:
can this
```suggestion
TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantContentStream(clean));
```
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java:
##########
@@ -173,72 +182,75 @@ public static HoodieArchivedMetaEntry createMetaWrapper(
archivedMetaWrapper.setActionState(HoodieInstant.State.COMPLETED.name());
archivedMetaWrapper.setStateTransitionTime(completionTime);
String actionType =
lsmTimelineRecord.get(ArchivedTimelineV2.ACTION_ARCHIVED_META_FIELD).toString();
- HoodieInstant instant =
metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED,
actionType, instantTime, completionTime);
+ HoodieInstant hoodieInstant =
metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED,
actionType, instantTime, completionTime);
switch (actionType) {
case HoodieTimeline.CLEAN_ACTION: {
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
instantDetails.get()));
-
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
planBytes.get()));
+
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlanLegacy(metaClient,
planBytes.get()));
Review Comment:
We could replace all the "legacy" methods by wrapping the bytes in an input
stream. This seems to be the main non-test class where these methods are used
so we can also try to remove those methods after cleaning this up. This could
help avoid confusion for future developers.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java:
##########
@@ -156,59 +159,88 @@ public static Option<byte[]>
serializeCommitMetadata(CommitMetadataSerDe commitM
public static <T extends SpecificRecordBase> Option<byte[]>
serializeAvroMetadata(T metadata, Class<T> clazz)
throws IOException {
DatumWriter<T> datumWriter = new SpecificDatumWriter<>(clazz);
- DataFileWriter<T> fileWriter = new DataFileWriter<>(datumWriter);
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- fileWriter.create(metadata.getSchema(), baos);
- fileWriter.append(metadata);
- fileWriter.flush();
- return Option.of(baos.toByteArray());
+ try (DataFileWriter<T> fileWriter = new DataFileWriter<>(datumWriter)) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ fileWriter.create(metadata.getSchema(), baos);
+ fileWriter.append(metadata);
+ fileWriter.flush();
+ return Option.of(baos.toByteArray());
+ }
+ }
+
+ public static HoodieCleanerPlan deserializeCleanerPlan(Option<InputStream>
in) throws IOException {
+ return deserializeAvroMetadata(in, HoodieCleanerPlan.class);
+ }
+
+ public static HoodieCompactionPlan
deserializeCompactionPlan(Option<InputStream> in) throws IOException {
+ return deserializeAvroMetadata(in, HoodieCompactionPlan.class);
+ }
+
+ public static HoodieCompactionPlan deserializeCompactionPlanLegacy(byte[]
bytes) throws IOException {
+ return deserializeAvroMetadataLegacy(bytes, HoodieCompactionPlan.class);
}
- public static HoodieCleanerPlan deserializeCleanerPlan(byte[] bytes) throws
IOException {
- return deserializeAvroMetadata(bytes, HoodieCleanerPlan.class);
+ public static HoodieCleanMetadata
deserializeHoodieCleanMetadataLegacy(byte[] bytes) throws IOException {
Review Comment:
There are only 2 callers of this and it looks like they can both be migrated
to use the content stream.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java:
##########
@@ -103,8 +104,8 @@ private Pair<HoodieInstant, Option<byte[]>>
readInstant(GenericRecord record) {
// should be json bytes.
try {
HoodieInstant instant =
metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED,
action, instantTime, stateTransitionTime);
- org.apache.hudi.common.model.HoodieCommitMetadata commitMetadata =
new CommitMetadataSerDeV1().deserialize(instant,
getUTF8Bytes(actionData.toString()),
- org.apache.hudi.common.model.HoodieCommitMetadata.class);
+ org.apache.hudi.common.model.HoodieCommitMetadata commitMetadata =
new CommitMetadataSerDeV1().deserialize(
+ instant, Option.of(new
ByteArrayInputStream(getUTF8Bytes(actionData.toString()))),
org.apache.hudi.common.model.HoodieCommitMetadata.class);
Review Comment:
Is this part of the code just doing the conversion from the avro to the
pojo? If so, will we be able to use that other method you wrote?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java:
##########
@@ -604,4 +612,33 @@ public static boolean isDeletePartition(WriteOperationType
operation) {
|| operation == WriteOperationType.INSERT_OVERWRITE_TABLE
|| operation == WriteOperationType.INSERT_OVERWRITE;
}
+
+ public static boolean isEmpty(HoodieTableMetaClient metaClient,
HoodieInstant instant) {
+ try {
+ return metaClient.getStorage()
+ .getPathInfo(new StoragePath(metaClient.getTimelinePath(),
metaClient.getInstantFileNameGenerator().getFileName(instant)))
+ .getLength() == 0;
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to check emptiness of instant " +
instant, e);
+ }
+ }
+
+ public static Option<InputStream> getInputStreamOptionLegacy(HoodieTimeline
timeline, HoodieInstant instant) {
+ Option<byte[]> bytes = timeline.getInstantDetails(instant);
+ if (bytes.isEmpty() || bytes.get().length == 0) {
+ return Option.empty();
+ }
+ return Option.of(new ByteArrayInputStream(bytes.get()));
+ }
+
+ public static Option<HoodieInstant> getInstantFromTimeline(HoodieInstant
instant, HoodieActiveTimeline timeline, Option<HoodieInstant> actualInstant) {
Review Comment:
Can you explain why we need this? in the old `getInstantDetails`, I don't
see similar logic
##########
hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java:
##########
@@ -186,16 +187,36 @@ public static HoodieCompactionPlan
getLogCompactionPlan(HoodieTableMetaClient me
* Util method to fetch both compaction and log compaction plan from
requestedInstant.
*/
private static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient
metaClient, HoodieInstant requestedInstant) {
- return getCompactionPlan(metaClient,
metaClient.getActiveTimeline().readCompactionPlanAsBytes(requestedInstant));
+ return getCompactionPlan(metaClient,
metaClient.getActiveTimeline().getInstantContentStream(requestedInstant));
}
/**
* Util method to fetch both compaction and log compaction plan from
requestedInstant.
*/
- public static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient
metaClient, Option<byte[]> planContent) {
+ public static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient
metaClient, Option<InputStream> planContent) {
CompactionPlanMigrator migrator = new CompactionPlanMigrator(metaClient);
try {
- HoodieCompactionPlan compactionPlan =
TimelineMetadataUtils.deserializeCompactionPlan(planContent.get());
+ HoodieCompactionPlan compactionPlan =
TimelineMetadataUtils.deserializeCompactionPlan(planContent);
+ return migrator.upgradeToLatest(compactionPlan,
compactionPlan.getVersion());
+ } catch (IOException e) {
+ throw new HoodieException(e);
+ }
+ }
+
+ public static HoodieCompactionPlan
getCompactionPlanLegacy(HoodieTableMetaClient metaClient, byte[] bytes) {
+ CompactionPlanMigrator migrator = new CompactionPlanMigrator(metaClient);
+ try {
+ HoodieCompactionPlan compactionPlan =
TimelineMetadataUtils.deserializeCompactionPlanLegacy(bytes);
+ return migrator.upgradeToLatest(compactionPlan,
compactionPlan.getVersion());
+ } catch (IOException e) {
+ throw new HoodieException(e);
+ }
+ }
+
+ public static HoodieCompactionPlan
getCompactionPlanFromInputStream(HoodieTableMetaClient metaClient,
Option<InputStream> in) {
Review Comment:
This seems similar to the method at line 196
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java:
##########
@@ -258,4 +258,9 @@ public HoodieArchivedTimeline reload() {
public HoodieArchivedTimeline reload(String startTs) {
return new ArchivedTimelineV1(metaClient, startTs);
}
+
+ @Override
+ public boolean isEmpty(HoodieInstant instant) {
+ return TimelineUtils.isEmpty(metaClient, instant);
Review Comment:
For the archived timeline, I think we want to check the `readCommits`
directly instead of going to the file system
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1094,7 +1094,7 @@ private static void
reAddLogFilesFromRollbackPlan(HoodieTableMetaClient dataTabl
HoodieInstant requested =
factory.getRollbackRequestedInstant(rollbackInstant);
try {
HoodieRollbackPlan rollbackPlan =
TimelineMetadataUtils.deserializeAvroMetadata(
-
dataTableMetaClient.getActiveTimeline().readRollbackInfoAsBytes(requested).get(),
HoodieRollbackPlan.class);
Review Comment:
`readRollbackInfoAsBytes` looks like it is no longer used in the codebase
and can be removed
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ArchivedTimelineV2.java:
##########
@@ -249,4 +246,9 @@ public HoodieTimeline getWriteTimeline() {
readCommits.containsKey(i.requestedTime()))
.filter(s -> validActions.contains(s.getAction())), instantReader);
}
+
+ @Override
+ public boolean isEmpty(HoodieInstant instant) {
+ return TimelineUtils.isEmpty(metaClient, instant);
Review Comment:
Similarly here, do we want to reference the `readCommits`?
##########
hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java:
##########
@@ -125,6 +126,20 @@ public static HoodieCleanMetadata
getCleanerMetadata(HoodieTableMetaClient metaC
return metadataMigrator.upgradeToLatest(cleanMetadata,
cleanMetadata.getVersion());
}
+ public static HoodieCleanMetadata getCleanerMetadata(HoodieTableMetaClient
metaClient, Option<InputStream> details)
+ throws IOException {
+ CleanMetadataMigrator metadataMigrator = new
CleanMetadataMigrator(metaClient);
+ HoodieCleanMetadata cleanMetadata =
TimelineMetadataUtils.deserializeHoodieCleanMetadata(details);
+ return metadataMigrator.upgradeToLatest(cleanMetadata,
cleanMetadata.getVersion());
+ }
+
+ public static HoodieCleanMetadata
getCleanerMetadataFromInputStream(HoodieTableMetaClient metaClient,
Option<InputStream> in)
+ throws IOException {
+ CleanMetadataMigrator metadataMigrator = new
CleanMetadataMigrator(metaClient);
+ HoodieCleanMetadata cleanMetadata =
TimelineMetadataUtils.deserializeHoodieCleanMetadata(in);
+ return metadataMigrator.upgradeToLatest(cleanMetadata,
cleanMetadata.getVersion());
+ }
Review Comment:
These two methods look like they are the same?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]