yihua commented on code in PR #12826:
URL: https://github.com/apache/hudi/pull/12826#discussion_r1974311163
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -183,6 +195,129 @@ public interface HoodieTimeline extends
HoodieInstantReader, Serializable {
*/
HoodieTimeline getWriteTimeline();
+ /**
+ * Load and deserialize the content of an instant into the specified class
type.
+ *
+ * @param instant The instant to load content from
+ * @param clazz The target class to deserialize into
+ * @return Deserialized instant content
+ * @throws IOException when reading instant content fails
+ */
+ default <T> T loadInstantContent(HoodieInstant instant, Class<T> clazz)
throws IOException {
Review Comment:
nit: `readInstantContent` would be a better name
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -183,6 +195,129 @@ public interface HoodieTimeline extends
HoodieInstantReader, Serializable {
*/
HoodieTimeline getWriteTimeline();
+ /**
+ * Load and deserialize the content of an instant into the specified class
type.
+ *
+ * @param instant The instant to load content from
+ * @param clazz The target class to deserialize into
+ * @return Deserialized instant content
+ * @throws IOException when reading instant content fails
+ */
+ default <T> T loadInstantContent(HoodieInstant instant, Class<T> clazz)
throws IOException {
+ TimelineLayout layout =
TimelineLayout.fromVersion(getTimelineLayoutVersion());
+ return layout.getCommitMetadataSerDe().deserialize(instant,
getInstantContentStream(instant), () -> isEmpty(instant), clazz);
+ }
+
+ /**
+ * Load and deserialize cleaner plan from an instant.
+ *
+ * @param instant The instant containing the cleaner plan
+ * @return Deserialized HoodieCleanerPlan
+ * @throws IOException when reading instant content fails
+ */
+ default HoodieCleanerPlan loadCleanerPlan(HoodieInstant instant) throws
IOException {
+ return deserializeAvroMetadata(getInstantContentStream(instant),
HoodieCleanerPlan.class);
+ }
+
+ /**
+ * Load and deserialize compaction plan from an instant.
+ *
+ * @param instant The instant containing the compaction plan
+ * @return Deserialized HoodieCompactionPlan
+ * @throws IOException when reading instant content fails
+ */
+ default HoodieCompactionPlan loadCompactionPlan(HoodieInstant instant)
throws IOException {
+ return deserializeAvroMetadata(getInstantContentStream(instant),
HoodieCompactionPlan.class);
+ }
+
+ /**
+ * Load and deserialize clean metadata from an instant.
+ *
+ * @param instant The instant containing the clean metadata
+ * @return Deserialized HoodieCleanMetadata
+ * @throws IOException when reading instant content fails
+ */
+ default HoodieCleanMetadata loadHoodieCleanMetadata(HoodieInstant instant)
throws IOException {
+ return deserializeAvroMetadata(getInstantContentStream(instant),
HoodieCleanMetadata.class);
+ }
+
+ /**
+ * Load and deserialize rollback metadata from an instant.
+ *
+ * @param instant The instant containing the rollback metadata
+ * @return Deserialized HoodieRollbackMetadata
+ * @throws IOException when reading instant content fails
+ */
+ default HoodieRollbackMetadata loadHoodieRollbackMetadata(HoodieInstant
instant) throws IOException {
+ return deserializeAvroMetadata(getInstantContentStream(instant),
HoodieRollbackMetadata.class);
+ }
+
+ /**
+ * Load and deserialize restore metadata from an instant.
+ *
+ * @param instant The instant containing the restore metadata
+ * @return Deserialized HoodieRestoreMetadata
+ * @throws IOException when reading instant content fails
+ */
+ default HoodieRestoreMetadata loadHoodieRestoreMetadata(HoodieInstant
instant) throws IOException {
+ return deserializeAvroMetadata(getInstantContentStream(instant),
HoodieRestoreMetadata.class);
+ }
+
+ /**
+ * Load and deserialize savepoint metadata from an instant.
+ *
+ * @param instant The instant containing the savepoint metadata
+ * @return Deserialized HoodieSavepointMetadata
+ * @throws IOException when reading instant content fails
+ */
+ default HoodieSavepointMetadata loadHoodieSavepointMetadata(HoodieInstant
instant) throws IOException {
+ return deserializeAvroMetadata(getInstantContentStream(instant),
HoodieSavepointMetadata.class);
+ }
+
+ /**
+ * Load and deserialize requested replace metadata from an instant.
+ *
+ * @param instant The instant containing the requested replace metadata
+ * @return Deserialized HoodieRequestedReplaceMetadata
+ * @throws IOException when reading instant content fails
+ */
+ default HoodieRequestedReplaceMetadata
loadRequestedReplaceMetadata(HoodieInstant instant) throws IOException {
+ return deserializeAvroMetadata(getInstantContentStream(instant),
HoodieRequestedReplaceMetadata.class);
+ }
+
+ /**
+ * Load and deserialize index plan from an instant.
+ *
+ * @param instant The instant containing the index plan
+ * @return Deserialized HoodieIndexPlan
+ * @throws IOException when reading instant content fails
+ */
+ default HoodieIndexPlan loadIndexPlan(HoodieInstant instant) throws
IOException {
+ return deserializeAvroMetadata(getInstantContentStream(instant),
HoodieIndexPlan.class);
+ }
+
+ /**
+ * Load and deserialize commit metadata in Avro format from an instant.
+ *
+ * @param instant The instant containing the commit metadata
+ * @return Deserialized HoodieCommitMetadata
+ * @throws IOException when reading instant content fails
+ */
+ default HoodieCommitMetadata loadCommitMetadataAvro(HoodieInstant instant)
throws IOException {
Review Comment:
```suggestion
default HoodieCommitMetadata readCommitMetadataToAvro(HoodieInstant
instant) throws IOException {
```
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -183,6 +195,129 @@ public interface HoodieTimeline extends
HoodieInstantReader, Serializable {
*/
HoodieTimeline getWriteTimeline();
+ /**
+ * Load and deserialize the content of an instant into the specified class
type.
+ *
+ * @param instant The instant to load content from
+ * @param clazz The target class to deserialize into
+ * @return Deserialized instant content
+ * @throws IOException when reading instant content fails
+ */
+ default <T> T loadInstantContent(HoodieInstant instant, Class<T> clazz)
throws IOException {
+ TimelineLayout layout =
TimelineLayout.fromVersion(getTimelineLayoutVersion());
+ return layout.getCommitMetadataSerDe().deserialize(instant,
getInstantContentStream(instant), () -> isEmpty(instant), clazz);
+ }
+
+ /**
+ * Load and deserialize cleaner plan from an instant.
+ *
+ * @param instant The instant containing the cleaner plan
+ * @return Deserialized HoodieCleanerPlan
+ * @throws IOException when reading instant content fails
+ */
+ default HoodieCleanerPlan loadCleanerPlan(HoodieInstant instant) throws
IOException {
+ return deserializeAvroMetadata(getInstantContentStream(instant),
HoodieCleanerPlan.class);
+ }
+
+ /**
+ * Load and deserialize compaction plan from an instant.
+ *
+ * @param instant The instant containing the compaction plan
+ * @return Deserialized HoodieCompactionPlan
+ * @throws IOException when reading instant content fails
+ */
+ default HoodieCompactionPlan loadCompactionPlan(HoodieInstant instant)
throws IOException {
+ return deserializeAvroMetadata(getInstantContentStream(instant),
HoodieCompactionPlan.class);
+ }
+
+ /**
+ * Load and deserialize clean metadata from an instant.
+ *
+ * @param instant The instant containing the clean metadata
+ * @return Deserialized HoodieCleanMetadata
+ * @throws IOException when reading instant content fails
+ */
+ default HoodieCleanMetadata loadHoodieCleanMetadata(HoodieInstant instant)
throws IOException {
+ return deserializeAvroMetadata(getInstantContentStream(instant),
HoodieCleanMetadata.class);
+ }
+
+ /**
+ * Load and deserialize rollback metadata from an instant.
+ *
+ * @param instant The instant containing the rollback metadata
+ * @return Deserialized HoodieRollbackMetadata
+ * @throws IOException when reading instant content fails
+ */
+ default HoodieRollbackMetadata loadHoodieRollbackMetadata(HoodieInstant
instant) throws IOException {
+ return deserializeAvroMetadata(getInstantContentStream(instant),
HoodieRollbackMetadata.class);
+ }
+
+ /**
+ * Load and deserialize restore metadata from an instant.
+ *
+ * @param instant The instant containing the restore metadata
+ * @return Deserialized HoodieRestoreMetadata
+ * @throws IOException when reading instant content fails
+ */
+ default HoodieRestoreMetadata loadHoodieRestoreMetadata(HoodieInstant
instant) throws IOException {
+ return deserializeAvroMetadata(getInstantContentStream(instant),
HoodieRestoreMetadata.class);
+ }
+
+ /**
+ * Load and deserialize savepoint metadata from an instant.
+ *
+ * @param instant The instant containing the savepoint metadata
+ * @return Deserialized HoodieSavepointMetadata
+ * @throws IOException when reading instant content fails
+ */
+ default HoodieSavepointMetadata loadHoodieSavepointMetadata(HoodieInstant
instant) throws IOException {
+ return deserializeAvroMetadata(getInstantContentStream(instant),
HoodieSavepointMetadata.class);
+ }
+
+ /**
+ * Load and deserialize requested replace metadata from an instant.
+ *
+ * @param instant The instant containing the requested replace metadata
+ * @return Deserialized HoodieRequestedReplaceMetadata
+ * @throws IOException when reading instant content fails
+ */
+ default HoodieRequestedReplaceMetadata
loadRequestedReplaceMetadata(HoodieInstant instant) throws IOException {
+ return deserializeAvroMetadata(getInstantContentStream(instant),
HoodieRequestedReplaceMetadata.class);
+ }
+
+ /**
+ * Load and deserialize index plan from an instant.
+ *
+ * @param instant The instant containing the index plan
+ * @return Deserialized HoodieIndexPlan
+ * @throws IOException when reading instant content fails
+ */
+ default HoodieIndexPlan loadIndexPlan(HoodieInstant instant) throws
IOException {
+ return deserializeAvroMetadata(getInstantContentStream(instant),
HoodieIndexPlan.class);
+ }
+
+ /**
+ * Load and deserialize commit metadata in Avro format from an instant.
+ *
+ * @param instant The instant containing the commit metadata
+ * @return Deserialized HoodieCommitMetadata
+ * @throws IOException when reading instant content fails
+ */
+ default HoodieCommitMetadata loadCommitMetadataAvro(HoodieInstant instant)
throws IOException {
+ return deserializeAvroMetadata(getInstantContentStream(instant),
HoodieCommitMetadata.class);
+ }
+
+ /**
+ * Load and deserialize replace commit metadata in Avro format from an
instant.
+ *
+ * @param instant The instant containing the replace commit metadata
+ * @return Deserialized HoodieReplaceCommitMetadata
+ * @throws IOException when reading instant content fails
+ */
+ default HoodieReplaceCommitMetadata
loadReplaceCommitMetadataAvro(HoodieInstant instant) throws IOException {
Review Comment:
```suggestion
default HoodieReplaceCommitMetadata
readReplaceCommitMetadataToAvro(HoodieInstant instant) throws IOException {
```
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -183,6 +195,129 @@ public interface HoodieTimeline extends
HoodieInstantReader, Serializable {
*/
HoodieTimeline getWriteTimeline();
+ /**
+ * Load and deserialize the content of an instant into the specified class
type.
+ *
+ * @param instant The instant to load content from
+ * @param clazz The target class to deserialize into
+ * @return Deserialized instant content
+ * @throws IOException when reading instant content fails
+ */
+ default <T> T loadInstantContent(HoodieInstant instant, Class<T> clazz)
throws IOException {
+ TimelineLayout layout =
TimelineLayout.fromVersion(getTimelineLayoutVersion());
+ return layout.getCommitMetadataSerDe().deserialize(instant,
getInstantContentStream(instant), () -> isEmpty(instant), clazz);
+ }
+
+ /**
+ * Load and deserialize cleaner plan from an instant.
+ *
+ * @param instant The instant containing the cleaner plan
+ * @return Deserialized HoodieCleanerPlan
+ * @throws IOException when reading instant content fails
+ */
+ default HoodieCleanerPlan loadCleanerPlan(HoodieInstant instant) throws
IOException {
+ return deserializeAvroMetadata(getInstantContentStream(instant),
HoodieCleanerPlan.class);
+ }
+
+ /**
+ * Load and deserialize compaction plan from an instant.
+ *
+ * @param instant The instant containing the compaction plan
+ * @return Deserialized HoodieCompactionPlan
+ * @throws IOException when reading instant content fails
+ */
+ default HoodieCompactionPlan loadCompactionPlan(HoodieInstant instant)
throws IOException {
+ return deserializeAvroMetadata(getInstantContentStream(instant),
HoodieCompactionPlan.class);
+ }
+
+ /**
+ * Load and deserialize clean metadata from an instant.
+ *
+ * @param instant The instant containing the clean metadata
+ * @return Deserialized HoodieCleanMetadata
+ * @throws IOException when reading instant content fails
+ */
+ default HoodieCleanMetadata loadHoodieCleanMetadata(HoodieInstant instant)
throws IOException {
Review Comment:
We can get rid of `Hoodie` in the method name.
```suggestion
default HoodieCleanMetadata loadCleanMetadata(HoodieInstant instant)
throws IOException {
```
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java:
##########
@@ -332,24 +335,13 @@ public static HoodieArchivedMetaEntry
createMetaWrapperForEmptyInstant(HoodieIns
return archivedMetaWrapper;
}
- private static Option<HoodieCommitMetadata>
getInflightCommitMetadata(HoodieTableMetaClient metaClient, HoodieInstant
instant,
-
Option<byte[]> inflightContent) throws IOException {
- if (!inflightContent.isPresent() || inflightContent.get().length == 0) {
- // inflight files can be empty in some certain cases, e.g. when users
opt in clustering
+ private static <T extends HoodieCommitMetadata> Option<T>
getCommitMetadata(HoodieTableMetaClient metaClient, HoodieInstant instant,
Class<T> clazz) throws IOException {
+ T commitMetadata =
metaClient.getActiveTimeline().loadInstantContent(instant, clazz);
+ // an empty file will return the default instance with an UNKNOWN
operation type and in that case we return an empty option
+ if (commitMetadata.getOperationType() == WriteOperationType.UNKNOWN) {
Review Comment:
Could non-empty commit metadata also have `UNKNOWN` write operation?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java:
##########
@@ -173,72 +175,73 @@ public static HoodieArchivedMetaEntry createMetaWrapper(
archivedMetaWrapper.setActionState(HoodieInstant.State.COMPLETED.name());
archivedMetaWrapper.setStateTransitionTime(completionTime);
String actionType =
lsmTimelineRecord.get(ArchivedTimelineV2.ACTION_ARCHIVED_META_FIELD).toString();
- HoodieInstant instant =
metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED,
actionType, instantTime, completionTime);
+ HoodieInstant hoodieInstant =
metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED,
actionType, instantTime, completionTime);
switch (actionType) {
case HoodieTimeline.CLEAN_ACTION: {
-
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
instantDetails.get()));
-
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
planBytes.get()));
+
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
new ByteArrayInputStream(instantDetails.get())));
+
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
new ByteArrayInputStream(planBytes.get())));
archivedMetaWrapper.setActionType(ActionType.clean.name());
break;
}
case HoodieTimeline.COMMIT_ACTION: {
- HoodieCommitMetadata commitMetadata =
metaClient.getCommitMetadataSerDe().deserialize(instant, instantDetails.get(),
HoodieCommitMetadata.class);
-
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata));
+ getCommitMetadata(metaClient, hoodieInstant,
HoodieCommitMetadata.class)
+ .ifPresent(commitMetadata ->
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata)));
archivedMetaWrapper.setActionType(ActionType.commit.name());
if (planBytes.isPresent()) {
// this should be a compaction
- HoodieCompactionPlan plan =
CompactionUtils.getCompactionPlan(metaClient, planBytes);
+ HoodieCompactionPlan plan =
CompactionUtils.getCompactionPlan(metaClient, new
ByteArrayInputStream(planBytes.get()));
archivedMetaWrapper.setHoodieCompactionPlan(plan);
}
break;
}
case HoodieTimeline.DELTA_COMMIT_ACTION: {
- HoodieCommitMetadata deltaCommitMetadata =
metaClient.getCommitMetadataSerDe().deserialize(instant, instantDetails.get(),
HoodieCommitMetadata.class);
-
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(deltaCommitMetadata));
+ getCommitMetadata(metaClient, hoodieInstant,
HoodieCommitMetadata.class)
+ .ifPresent(commitMetadata ->
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata)));
archivedMetaWrapper.setActionType(ActionType.deltacommit.name());
if (planBytes.isPresent()) {
// this should be a log compaction
- HoodieCompactionPlan plan =
CompactionUtils.getCompactionPlan(metaClient, planBytes);
+ HoodieCompactionPlan plan =
CompactionUtils.getCompactionPlan(metaClient, new
ByteArrayInputStream(planBytes.get()));
archivedMetaWrapper.setHoodieCompactionPlan(plan);
}
break;
}
case HoodieTimeline.REPLACE_COMMIT_ACTION:
case HoodieTimeline.CLUSTERING_ACTION: {
- HoodieReplaceCommitMetadata replaceCommitMetadata =
HoodieReplaceCommitMetadata.fromBytes(instantDetails.get(),
HoodieReplaceCommitMetadata.class);
-
archivedMetaWrapper.setHoodieReplaceCommitMetadata(convertReplaceCommitMetadataToAvro(replaceCommitMetadata));
+ getCommitMetadata(metaClient, hoodieInstant,
HoodieReplaceCommitMetadata.class)
+ .ifPresent(replaceCommitMetadata ->
archivedMetaWrapper.setHoodieReplaceCommitMetadata(convertReplaceCommitMetadataToAvro(replaceCommitMetadata)));
// inflight replacecommit files have the same metadata body as
HoodieCommitMetadata
// so we could re-use it without further creating an inflight
extension.
// Or inflight replacecommit files are empty under clustering
circumstance
- Option<HoodieCommitMetadata> inflightCommitMetadata =
getInflightCommitMetadata(metaClient, instant, instantDetails);
+ Option<HoodieCommitMetadata> inflightCommitMetadata =
getCommitMetadata(metaClient, hoodieInstant, HoodieCommitMetadata.class);
if (inflightCommitMetadata.isPresent()) {
archivedMetaWrapper.setHoodieInflightReplaceMetadata(convertCommitMetadata(inflightCommitMetadata.get()));
}
- archivedMetaWrapper.setActionType(ActionType.replacecommit.name());
+ archivedMetaWrapper.setActionType(
+
hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION) ?
ActionType.replacecommit.name() : ActionType.clustering.name());
Review Comment:
This is for V1 so there is no clustering action for requested or inflight.
So let's keep using `ActionType.replacecommit.name()` here.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java:
##########
@@ -188,20 +186,20 @@ public static List<String>
getAffectedPartitions(HoodieTimeline timeline) {
}
case HoodieTimeline.CLEAN_ACTION:
try {
- HoodieCleanMetadata cleanMetadata =
TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(s).get());
+ HoodieCleanMetadata cleanMetadata =
timeline.loadHoodieCleanMetadata(s);
return cleanMetadata.getPartitionMetadata().keySet().stream();
} catch (IOException e) {
throw new HoodieIOException("Failed to get partitions cleaned at "
+ s, e);
}
case HoodieTimeline.ROLLBACK_ACTION:
try {
- return
TimelineMetadataUtils.deserializeHoodieRollbackMetadata(timeline.getInstantDetails(s).get()).getPartitionMetadata().keySet().stream();
+ return
timeline.loadHoodieRollbackMetadata(s).getPartitionMetadata().keySet().stream();
} catch (IOException e) {
throw new HoodieIOException("Failed to get partitions rolledback
at " + s, e);
}
case HoodieTimeline.RESTORE_ACTION:
try {
- HoodieRestoreMetadata restoreMetadata =
TimelineMetadataUtils.deserializeAvroMetadata(timeline.getInstantDetails(s).get(),
HoodieRestoreMetadata.class);
+ HoodieRestoreMetadata restoreMetadata =
timeline.loadInstantContent(s, HoodieRestoreMetadata.class);
Review Comment:
As discussed with @Davis-Zhang-Onehouse , we should have wrapper method in
the timeline to read specific metadata and avoid calling
`loadInstantContent(instant, clazz)` directly.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java:
##########
@@ -395,19 +387,15 @@ public static HoodieReplaceCommitMetadata
convertReplaceCommitMetadataToPojo(org
}
/**
- * Convert commit metadata from avro to json.
+ * Convert replacecommit metadata from avro to pojo.
*/
- public static <T extends SpecificRecordBase> byte[]
convertCommitMetadataToJsonBytes(T avroMetaData, Class<T> clazz) {
- Schema avroSchema = clazz ==
org.apache.hudi.avro.model.HoodieReplaceCommitMetadata.class ?
org.apache.hudi.avro.model.HoodieReplaceCommitMetadata.getClassSchema() :
- org.apache.hudi.avro.model.HoodieCommitMetadata.getClassSchema();
- try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
- JsonEncoder jsonEncoder = new JsonEncoder(avroSchema, outputStream);
- DatumWriter<GenericRecord> writer = avroMetaData instanceof
SpecificRecord ? new SpecificDatumWriter<>(avroSchema) : new
GenericDatumWriter<>(avroSchema);
- writer.write(avroMetaData, jsonEncoder);
- jsonEncoder.flush();
- return outputStream.toByteArray();
- } catch (IOException e) {
- throw new HoodieIOException("Failed to convert to JSON.", e);
+ public static HoodieReplaceCommitMetadata
convertReplaceCommitMetadataAvroToPojo(org.apache.hudi.avro.model.HoodieReplaceCommitMetadata
replaceCommitMetadata) {
Review Comment:
```suggestion
public static HoodieReplaceCommitMetadata
convertReplaceCommitMetadataToPojo(org.apache.hudi.avro.model.HoodieReplaceCommitMetadata
replaceCommitMetadata) {
```
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java:
##########
@@ -38,11 +38,12 @@
import org.apache.hudi.exception.HoodieTimeTravelException;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
-
Review Comment:
keep import grouping
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java:
##########
@@ -56,100 +52,106 @@
*/
public class MetadataConversionUtils {
- public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant
hoodieInstant, HoodieTableMetaClient metaClient) throws IOException {
- Option<byte[]> instantDetails =
metaClient.getActiveTimeline().getInstantDetails(hoodieInstant);
- if (hoodieInstant.isCompleted() && instantDetails.get().length == 0) {
- // in local FS and HDFS, there could be empty completed instants due to
crash.
- // let's add an entry to the archival, even if not for the plan.
- return createMetaWrapperForEmptyInstant(hoodieInstant);
- }
- HoodieArchivedMetaEntry archivedMetaWrapper = new
HoodieArchivedMetaEntry();
- archivedMetaWrapper.setCommitTime(hoodieInstant.requestedTime());
- archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
-
archivedMetaWrapper.setStateTransitionTime(hoodieInstant.getCompletionTime());
- switch (hoodieInstant.getAction()) {
- case HoodieTimeline.CLEAN_ACTION: {
- if (hoodieInstant.isCompleted()) {
-
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
instantDetails.get()));
- } else {
-
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
instantDetails.get()));
+ public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant
hoodieInstant, HoodieTableMetaClient metaClient) {
+ try {
+ HoodieArchivedMetaEntry archivedMetaWrapper = new
HoodieArchivedMetaEntry();
+ archivedMetaWrapper.setCommitTime(hoodieInstant.requestedTime());
+ archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
+
archivedMetaWrapper.setStateTransitionTime(hoodieInstant.getCompletionTime());
+ CommitMetadataSerDe serDe = metaClient.getCommitMetadataSerDe();
+ switch (hoodieInstant.getAction()) {
+ case HoodieTimeline.CLEAN_ACTION: {
+ if (hoodieInstant.isCompleted()) {
+
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
hoodieInstant));
+ } else {
+
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
hoodieInstant));
+ }
+ archivedMetaWrapper.setActionType(ActionType.clean.name());
+ break;
}
- archivedMetaWrapper.setActionType(ActionType.clean.name());
- break;
- }
- case HoodieTimeline.COMMIT_ACTION: {
- HoodieCommitMetadata commitMetadata =
metaClient.getCommitMetadataSerDe().deserialize(hoodieInstant,
instantDetails.get(), HoodieCommitMetadata.class);
-
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata));
- archivedMetaWrapper.setActionType(ActionType.commit.name());
- break;
- }
- case HoodieTimeline.DELTA_COMMIT_ACTION: {
- HoodieCommitMetadata deltaCommitMetadata =
metaClient.getCommitMetadataSerDe().deserialize(hoodieInstant,
instantDetails.get(), HoodieCommitMetadata.class);
-
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(deltaCommitMetadata));
- archivedMetaWrapper.setActionType(ActionType.deltacommit.name());
- break;
- }
- case HoodieTimeline.REPLACE_COMMIT_ACTION:
- case HoodieTimeline.CLUSTERING_ACTION: {
- if (hoodieInstant.isCompleted()) {
- HoodieReplaceCommitMetadata replaceCommitMetadata =
HoodieReplaceCommitMetadata.fromBytes(instantDetails.get(),
HoodieReplaceCommitMetadata.class);
-
archivedMetaWrapper.setHoodieReplaceCommitMetadata(convertReplaceCommitMetadataToAvro(replaceCommitMetadata));
- } else if (hoodieInstant.isInflight()) {
- // inflight replacecommit files have the same metadata body as
HoodieCommitMetadata
- // so we could re-use it without further creating an inflight
extension.
- // Or inflight replacecommit files are empty under clustering
circumstance
- Option<HoodieCommitMetadata> inflightCommitMetadata =
getInflightCommitMetadata(metaClient, hoodieInstant, instantDetails);
- if (inflightCommitMetadata.isPresent()) {
-
archivedMetaWrapper.setHoodieInflightReplaceMetadata(convertCommitMetadata(inflightCommitMetadata.get()));
+ case HoodieTimeline.COMMIT_ACTION: {
+ getCommitMetadata(metaClient, hoodieInstant,
HoodieCommitMetadata.class)
+ .ifPresent(commitMetadata ->
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata)));
+ archivedMetaWrapper.setActionType(ActionType.commit.name());
+ break;
+ }
+ case HoodieTimeline.DELTA_COMMIT_ACTION: {
+ getCommitMetadata(metaClient, hoodieInstant,
HoodieCommitMetadata.class)
+ .ifPresent(deltaCommitMetadata ->
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(deltaCommitMetadata)));
+ archivedMetaWrapper.setActionType(ActionType.deltacommit.name());
+ break;
+ }
+ case HoodieTimeline.REPLACE_COMMIT_ACTION:
+ case HoodieTimeline.CLUSTERING_ACTION: {
+ if (hoodieInstant.isCompleted()) {
+ getCommitMetadata(metaClient, hoodieInstant,
HoodieReplaceCommitMetadata.class)
+ .ifPresent(replaceCommitMetadata ->
archivedMetaWrapper.setHoodieReplaceCommitMetadata(convertReplaceCommitMetadataToAvro(replaceCommitMetadata)));
+ } else if (hoodieInstant.isInflight()) {
+ // inflight replacecommit files have the same metadata body as
HoodieCommitMetadata
+ // so we could re-use it without further creating an inflight
extension.
+ // Or inflight replacecommit files are empty under clustering
circumstance
+ Option<HoodieCommitMetadata> inflightCommitMetadata =
getCommitMetadata(metaClient, hoodieInstant, HoodieCommitMetadata.class);
+ if (inflightCommitMetadata.isPresent()) {
+
archivedMetaWrapper.setHoodieInflightReplaceMetadata(convertCommitMetadata(inflightCommitMetadata.get()));
+ }
+ } else {
+ // we may have cases with empty HoodieRequestedReplaceMetadata
e.g. insert_overwrite_table or insert_overwrite
+ // without clustering. However, we should revisit the requested
commit file standardization
+ Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata =
Option.of(metaClient.getActiveTimeline()
+ .loadRequestedReplaceMetadata(hoodieInstant));
+ if (requestedReplaceMetadata.isPresent()) {
+
archivedMetaWrapper.setHoodieRequestedReplaceMetadata(requestedReplaceMetadata.get());
+ }
}
- } else {
- // we may have cases with empty HoodieRequestedReplaceMetadata e.g.
insert_overwrite_table or insert_overwrite
- // without clustering. However, we should revisit the requested
commit file standardization
- Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata =
getRequestedReplaceMetadata(instantDetails);
- if (requestedReplaceMetadata.isPresent()) {
-
archivedMetaWrapper.setHoodieRequestedReplaceMetadata(requestedReplaceMetadata.get());
+ archivedMetaWrapper.setActionType(
+
hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION) ?
ActionType.replacecommit.name() : ActionType.clustering.name());
+ break;
+ }
+ case HoodieTimeline.ROLLBACK_ACTION: {
+ if (hoodieInstant.isCompleted()) {
+
archivedMetaWrapper.setHoodieRollbackMetadata(metaClient.getActiveTimeline().loadInstantContent(hoodieInstant,
HoodieRollbackMetadata.class));
}
+ archivedMetaWrapper.setActionType(ActionType.rollback.name());
+ break;
}
- archivedMetaWrapper.setActionType(
-
hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION) ?
ActionType.replacecommit.name() : ActionType.clustering.name());
- break;
- }
- case HoodieTimeline.ROLLBACK_ACTION: {
- if (hoodieInstant.isCompleted()) {
-
archivedMetaWrapper.setHoodieRollbackMetadata(TimelineMetadataUtils.deserializeAvroMetadata(instantDetails.get(),
HoodieRollbackMetadata.class));
+ case HoodieTimeline.SAVEPOINT_ACTION: {
+
archivedMetaWrapper.setHoodieSavePointMetadata(metaClient.getActiveTimeline().loadInstantContent(hoodieInstant,
HoodieSavepointMetadata.class));
+ archivedMetaWrapper.setActionType(ActionType.savepoint.name());
+ break;
}
- archivedMetaWrapper.setActionType(ActionType.rollback.name());
- break;
- }
- case HoodieTimeline.SAVEPOINT_ACTION: {
-
archivedMetaWrapper.setHoodieSavePointMetadata(TimelineMetadataUtils.deserializeAvroMetadata(instantDetails.get(),
HoodieSavepointMetadata.class));
- archivedMetaWrapper.setActionType(ActionType.savepoint.name());
- break;
- }
- case HoodieTimeline.COMPACTION_ACTION: {
- if (hoodieInstant.isRequested()) {
- HoodieCompactionPlan plan =
CompactionUtils.getCompactionPlan(metaClient, instantDetails);
- archivedMetaWrapper.setHoodieCompactionPlan(plan);
+ case HoodieTimeline.COMPACTION_ACTION: {
+ if (hoodieInstant.isRequested()) {
+ HoodieCompactionPlan plan =
CompactionUtils.getCompactionPlan(metaClient, hoodieInstant);
+ archivedMetaWrapper.setHoodieCompactionPlan(plan);
+ }
+ archivedMetaWrapper.setActionType(ActionType.compaction.name());
+ break;
}
- archivedMetaWrapper.setActionType(ActionType.compaction.name());
- break;
- }
- case HoodieTimeline.LOG_COMPACTION_ACTION: {
- if (hoodieInstant.isRequested()) {
- HoodieCompactionPlan plan =
CompactionUtils.getCompactionPlan(metaClient, instantDetails);
- archivedMetaWrapper.setHoodieCompactionPlan(plan);
+ case HoodieTimeline.LOG_COMPACTION_ACTION: {
+ if (hoodieInstant.isRequested()) {
+ HoodieCompactionPlan plan =
CompactionUtils.getCompactionPlan(metaClient, hoodieInstant);
+ archivedMetaWrapper.setHoodieCompactionPlan(plan);
+ }
+ archivedMetaWrapper.setActionType(ActionType.logcompaction.name());
+ break;
+ }
+ default: {
+ throw new UnsupportedOperationException("Action not fully supported
yet");
}
- archivedMetaWrapper.setActionType(ActionType.logcompaction.name());
- break;
}
- default: {
- throw new UnsupportedOperationException("Action not fully supported
yet");
+ return archivedMetaWrapper;
+ } catch (IOException | HoodieIOException ex) {
+ if (metaClient.getActiveTimeline().isEmpty(hoodieInstant)) {
+ // in local FS and HDFS, there could be empty completed instants due
to crash.
+ // let's add an entry to the archival, even if not for the plan.
+ return createMetaWrapperForEmptyInstant(hoodieInstant);
}
+ throw new HoodieException(ex);
}
- return archivedMetaWrapper;
}
/**
+ * TODO(reviewers) - new code applied similar refactoring, please pay close
attention.
Review Comment:
remove?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java:
##########
@@ -556,31 +550,14 @@ public Option<HoodieInstant>
getFirstNonSavepointCommitByCompletionTime() {
@Override
public Option<byte[]> getInstantDetails(HoodieInstant instant) {
- return getInstantReader().getInstantDetails(instant);
+ return getInstantReader().getInstantDetails(instant);
Review Comment:
Sg
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java:
##########
@@ -604,4 +602,22 @@ public static boolean isDeletePartition(WriteOperationType
operation) {
|| operation == WriteOperationType.INSERT_OVERWRITE_TABLE
|| operation == WriteOperationType.INSERT_OVERWRITE;
}
+
+ public static boolean isEmpty(HoodieTableMetaClient metaClient,
HoodieInstant instant) {
+ try {
+ return metaClient.getStorage()
+ .getPathInfo(new StoragePath(metaClient.getTimelinePath(),
metaClient.getInstantFileNameGenerator().getFileName(instant)))
+ .getLength() == 0;
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to check emptiness of instant " +
instant, e);
+ }
+ }
+
+ public static Option<InputStream> getInputStreamOptionLegacy(HoodieTimeline
timeline, HoodieInstant instant) {
Review Comment:
Let's have a JIRA to remove this in the future.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]