alexeykudinkin commented on code in PR #5733:
URL: https://github.com/apache/hudi/pull/5733#discussion_r889403945
##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -626,4 +499,128 @@ public Option<String>
getTableHistorySchemaStrFromCommitMetadata() {
String result = manager.getHistorySchemaStr();
return result.isEmpty() ? Option.empty() : Option.of(result);
}
+
+ /**
+ * NOTE: This method could only be used in tests
+ *
+ * @VisibleForTesting
+ */
+ public boolean hasOperationField() {
Review Comment:
Method did not change
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java:
##########
@@ -267,43 +265,47 @@ public Option<byte[]> getInstantDetails(HoodieInstant
instant) {
}
/**
- * Get the last instant with valid schema, and convert this to
HoodieCommitMetadata
+ * Returns most recent instant having valid schema in its {@link
HoodieCommitMetadata}
*/
public Option<Pair<HoodieInstant, HoodieCommitMetadata>>
getLastCommitMetadataWithValidSchema() {
- List<HoodieInstant> completed =
getCommitsTimeline().filterCompletedInstants().getInstants()
-
.sorted(Comparator.comparing(HoodieInstant::getTimestamp).reversed()).collect(Collectors.toList());
- for (HoodieInstant instant : completed) {
- try {
- HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
- getInstantDetails(instant).get(), HoodieCommitMetadata.class);
- if
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY)))
{
- return Option.of(Pair.of(instant, commitMetadata));
- }
- } catch (IOException e) {
- LOG.warn("Failed to convert instant to HoodieCommitMetadata: " +
instant.toString());
- }
- }
- return Option.empty();
+ return Option.fromJavaOptional(
+ getCommitMetadataStream()
+ .filter(instantCommitMetadataPair ->
+
!StringUtils.isNullOrEmpty(instantCommitMetadataPair.getValue().getMetadata(HoodieCommitMetadata.SCHEMA_KEY)))
+ .findFirst()
+ );
}
/**
* Get the last instant with valid data, and convert this to
HoodieCommitMetadata
*/
public Option<Pair<HoodieInstant, HoodieCommitMetadata>>
getLastCommitMetadataWithValidData() {
- List<HoodieInstant> completed =
getCommitsTimeline().filterCompletedInstants().getInstants()
-
.sorted(Comparator.comparing(HoodieInstant::getTimestamp).reversed()).collect(Collectors.toList());
- for (HoodieInstant instant : completed) {
- try {
- HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
- getInstantDetails(instant).get(), HoodieCommitMetadata.class);
- if (!commitMetadata.getFileIdAndRelativePaths().isEmpty()) {
- return Option.of(Pair.of(instant, commitMetadata));
- }
- } catch (IOException e) {
- LOG.warn("Failed to convert instant to HoodieCommitMetadata: " +
instant.toString());
- }
- }
- return Option.empty();
+ return Option.fromJavaOptional(
+ getCommitMetadataStream()
+ .filter(instantCommitMetadataPair ->
+
!instantCommitMetadataPair.getValue().getFileIdAndRelativePaths().isEmpty())
+ .findFirst()
+ );
+ }
+
+ /**
+ * Returns stream of {@link HoodieCommitMetadata} in order reverse to
chronological (ie most
+ * recent metadata being the first element)
+ */
+ private Stream<Pair<HoodieInstant, HoodieCommitMetadata>>
getCommitMetadataStream() {
+ // NOTE: Streams are lazy
Review Comment:
Yes, streams are lazy therefore it will only compute whole chain only for it
to get a single object
##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -626,4 +499,128 @@ public Option<String>
getTableHistorySchemaStrFromCommitMetadata() {
String result = manager.getHistorySchemaStr();
return result.isEmpty() ? Option.empty() : Option.of(result);
}
+
+ /**
+ * NOTE: This method could only be used in tests
+ *
+ * @VisibleForTesting
+ */
+ public boolean hasOperationField() {
+ try {
+ Schema tableAvroSchema = getTableAvroSchemaFromDataFile();
+ return tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD)
!= null;
+ } catch (Exception e) {
+ LOG.info(String.format("Failed to read operation field from avro schema
(%s)", e.getMessage()));
+ return false;
+ }
+ }
+
+ private Option<Pair<HoodieInstant, HoodieCommitMetadata>>
getLatestCommitMetadataWithValidSchema() {
+ if (latestCommitWithValidSchema == null) {
+ Option<Pair<HoodieInstant, HoodieCommitMetadata>>
instantAndCommitMetadata =
+
metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema();
+ if (instantAndCommitMetadata.isPresent()) {
+ HoodieInstant instant = instantAndCommitMetadata.get().getLeft();
+ HoodieCommitMetadata metadata =
instantAndCommitMetadata.get().getRight();
+ synchronized (this) {
+ if (latestCommitWithValidSchema == null) {
+ latestCommitWithValidSchema = instant;
+ }
+ commitMetadataCache.get().putIfAbsent(instant, metadata);
+ }
+ }
+ }
+
+ return Option.ofNullable(latestCommitWithValidSchema)
+ .map(instant -> Pair.of(instant,
commitMetadataCache.get().get(instant)));
+ }
+
+ private Option<Pair<HoodieInstant, HoodieCommitMetadata>>
getLatestCommitMetadataWithValidData() {
+ if (latestCommitWithValidData == null) {
+ Option<Pair<HoodieInstant, HoodieCommitMetadata>>
instantAndCommitMetadata =
+ metaClient.getActiveTimeline().getLastCommitMetadataWithValidData();
+ if (instantAndCommitMetadata.isPresent()) {
+ HoodieInstant instant = instantAndCommitMetadata.get().getLeft();
+ HoodieCommitMetadata metadata =
instantAndCommitMetadata.get().getRight();
+ synchronized (this) {
+ if (latestCommitWithValidData == null) {
+ latestCommitWithValidData = instant;
+ }
+ commitMetadataCache.get().putIfAbsent(instant, metadata);
+ }
+ }
+ }
+
+ return Option.ofNullable(latestCommitWithValidData)
+ .map(instant -> Pair.of(instant,
commitMetadataCache.get().get(instant)));
+ }
+
+ private HoodieCommitMetadata getCachedCommitMetadata(HoodieInstant instant) {
+ return commitMetadataCache.get()
+ .computeIfAbsent(instant, (missingInstant) -> {
+ HoodieTimeline timeline =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+ byte[] data = timeline.getInstantDetails(missingInstant).get();
+ try {
+ return HoodieCommitMetadata.fromBytes(data,
HoodieCommitMetadata.class);
+ } catch (IOException e) {
+ throw new HoodieIOException(String.format("Failed to fetch
HoodieCommitMetadata for instant (%s)", missingInstant), e);
+ }
+ });
+ }
+
+ private MessageType fetchSchemaFromFiles(Iterator<String> filePaths) throws
IOException {
+ MessageType type = null;
+ while (filePaths.hasNext() && type == null) {
+ String filePath = filePaths.next();
+ if (filePath.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) {
+ // this is a log file
+ type = readSchemaFromLogFile(new Path(filePath));
+ } else {
+ type = readSchemaFromBaseFile(filePath);
+ }
+ }
+ return type;
+ }
+
+ private MessageType readSchemaFromBaseFile(String filePath) throws
IOException {
Review Comment:
Method did not change
##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -626,4 +499,128 @@ public Option<String>
getTableHistorySchemaStrFromCommitMetadata() {
String result = manager.getHistorySchemaStr();
return result.isEmpty() ? Option.empty() : Option.of(result);
}
+
+ /**
+ * NOTE: This method could only be used in tests
+ *
+ * @VisibleForTesting
+ */
+ public boolean hasOperationField() {
+ try {
+ Schema tableAvroSchema = getTableAvroSchemaFromDataFile();
+ return tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD)
!= null;
+ } catch (Exception e) {
+ LOG.info(String.format("Failed to read operation field from avro schema
(%s)", e.getMessage()));
+ return false;
+ }
+ }
+
+ private Option<Pair<HoodieInstant, HoodieCommitMetadata>>
getLatestCommitMetadataWithValidSchema() {
+ if (latestCommitWithValidSchema == null) {
+ Option<Pair<HoodieInstant, HoodieCommitMetadata>>
instantAndCommitMetadata =
+
metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema();
+ if (instantAndCommitMetadata.isPresent()) {
+ HoodieInstant instant = instantAndCommitMetadata.get().getLeft();
+ HoodieCommitMetadata metadata =
instantAndCommitMetadata.get().getRight();
+ synchronized (this) {
+ if (latestCommitWithValidSchema == null) {
+ latestCommitWithValidSchema = instant;
+ }
+ commitMetadataCache.get().putIfAbsent(instant, metadata);
+ }
+ }
+ }
+
+ return Option.ofNullable(latestCommitWithValidSchema)
+ .map(instant -> Pair.of(instant,
commitMetadataCache.get().get(instant)));
+ }
+
+ private Option<Pair<HoodieInstant, HoodieCommitMetadata>>
getLatestCommitMetadataWithValidData() {
+ if (latestCommitWithValidData == null) {
+ Option<Pair<HoodieInstant, HoodieCommitMetadata>>
instantAndCommitMetadata =
+ metaClient.getActiveTimeline().getLastCommitMetadataWithValidData();
+ if (instantAndCommitMetadata.isPresent()) {
+ HoodieInstant instant = instantAndCommitMetadata.get().getLeft();
+ HoodieCommitMetadata metadata =
instantAndCommitMetadata.get().getRight();
+ synchronized (this) {
+ if (latestCommitWithValidData == null) {
+ latestCommitWithValidData = instant;
+ }
+ commitMetadataCache.get().putIfAbsent(instant, metadata);
+ }
+ }
+ }
+
+ return Option.ofNullable(latestCommitWithValidData)
+ .map(instant -> Pair.of(instant,
commitMetadataCache.get().get(instant)));
+ }
+
+ private HoodieCommitMetadata getCachedCommitMetadata(HoodieInstant instant) {
+ return commitMetadataCache.get()
+ .computeIfAbsent(instant, (missingInstant) -> {
+ HoodieTimeline timeline =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+ byte[] data = timeline.getInstantDetails(missingInstant).get();
+ try {
+ return HoodieCommitMetadata.fromBytes(data,
HoodieCommitMetadata.class);
+ } catch (IOException e) {
+ throw new HoodieIOException(String.format("Failed to fetch
HoodieCommitMetadata for instant (%s)", missingInstant), e);
+ }
+ });
+ }
+
+ private MessageType fetchSchemaFromFiles(Iterator<String> filePaths) throws
IOException {
Review Comment:
Method did not change
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]