nsivabalan commented on code in PR #12646:
URL: https://github.com/apache/hudi/pull/12646#discussion_r1925890902
##########
hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java:
##########
@@ -91,18 +133,252 @@ public void testRecreateSchemaWhenDropPartitionColumns() {
@Test
public void testReadSchemaFromLogFile() throws IOException,
URISyntaxException, InterruptedException {
- String testDir = initTestDir("read_schema_from_log_file");
- StoragePath partitionPath = new StoragePath(testDir, "partition1");
+ initPath("read_schema_from_log_file");
+ StoragePath partitionPath = new StoragePath(basePath, "partition1");
Schema expectedSchema = getSimpleSchema();
StoragePath logFilePath = writeLogFile(partitionPath, expectedSchema);
assertEquals(expectedSchema, TableSchemaResolver.readSchemaFromLogFile(new
HoodieHadoopStorage(
logFilePath, HoodieTestUtils.getDefaultStorageConfWithDefaults()),
logFilePath));
}
- private String initTestDir(String folderName) throws IOException {
- java.nio.file.Path basePath = tempDir.resolve(folderName);
- java.nio.file.Files.createDirectories(basePath);
- return basePath.toString();
+ @Test
+ public void testGetTableSchemaFromLatestCommitMetadataV2() throws Exception {
+ Schema originalSchema = new
Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+
+ HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+ String commitTime1 = "001";
+ HoodieInstant instant1 = new HoodieInstant(INFLIGHT, COMMIT_ACTION,
commitTime1, InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR);
+ activeTimeline.createNewInstant(instant1);
+
+ Map<String, String> extraMetadata = new HashMap<>();
+ extraMetadata.put(HoodieCommitMetadata.SCHEMA_KEY,
originalSchema.toString());
+
+ activeTimeline.saveAsComplete(instant1,
+ Option.of(getCommitMetadata(basePath, "partition1", commitTime1, 2,
extraMetadata)));
+ metaClient.reloadActiveTimeline();
+
+ TableSchemaResolver resolver = new TableSchemaResolver(metaClient);
+ Option<Schema> schemaOption =
resolver.getTableAvroSchemaIfPresentV2(false);
+ assertTrue(schemaOption.isPresent());
+ assertEquals(originalSchema, schemaOption.get());
+ }
+
+ @Test
+ public void testGetTableCreateSchemaWithMetadata() throws IOException {
+ Schema originalSchema = new
Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+
+ // Set table create schema in table config
+ // Create commit metadata without schema information, which should be
ignored.
+ Map<String, String> emptyMetadata = new HashMap<>();
+
+ HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+ String commitTime1 = "001";
+ HoodieInstant instant1 = new HoodieInstant(INFLIGHT, COMMIT_ACTION,
commitTime1, InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR);
+ activeTimeline.createNewInstant(instant1);
+ activeTimeline.saveAsComplete(instant1,
Option.of(getCommitMetadata(basePath, "partition1", commitTime1, 2,
emptyMetadata)));
+ metaClient.reloadActiveTimeline();
+ metaClient.getTableConfig().setValue(HoodieTableConfig.CREATE_SCHEMA,
originalSchema.toString());
+
+ TableSchemaResolver resolver = new TableSchemaResolver(metaClient);
+ Option<Schema> schemaOption =
resolver.getTableAvroSchemaIfPresentV2(false);
+ assertTrue(schemaOption.isPresent());
+ assertEquals(originalSchema, schemaOption.get());
+ }
+
+ @Test
+ public void testHandlePartitionColumnsIfNeeded() {
+ Schema originalSchema = new
Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+
+ String[] partitionFields = new String[] {"partition_path"};
+ metaClient.getTableConfig().setValue(PARTITION_FIELDS, String.join(",",
partitionFields));
+
metaClient.getTableConfig().setValue(HoodieTableConfig.DROP_PARTITION_COLUMNS,
"true");
+ metaClient.getTableConfig().setValue(HoodieTableConfig.CREATE_SCHEMA,
originalSchema.toString());
+
+ TableSchemaResolver resolver = new TableSchemaResolver(metaClient);
+ Option<Schema> schemaOption =
resolver.getTableAvroSchemaIfPresentV2(false);
+ assertTrue(schemaOption.isPresent());
+
+ Schema resultSchema = schemaOption.get();
+ assertTrue(resultSchema.getFields().stream()
+ .anyMatch(f -> f.name().equals("partition_path")));
+ }
+
+ static class SchemaEvolutionTestCase {
+ private final String name;
+ private final HoodieTableType tableType;
+ private final List<HoodieInstant> inputInstants;
+ private final List<HoodieInstant> expectedInstants;
+ private final Option<HoodieInstant> clusteringInstants;
+
+ public SchemaEvolutionTestCase(
+ String name,
+ HoodieTableType tableType,
+ List<HoodieInstant> inputInstants,
+ List<HoodieInstant> expectedInstants) {
+ this.name = name;
+ this.tableType = tableType;
+ this.inputInstants = inputInstants;
+ this.expectedInstants = expectedInstants;
+ this.clusteringInstants = Option.empty();
+ }
+
+ public SchemaEvolutionTestCase(
+ String name,
+ HoodieTableType tableType,
+ List<HoodieInstant> inputInstants,
+ List<HoodieInstant> expectedInstants,
+ HoodieInstant clusteringInstants
+ ) {
+ this.name = name;
+ this.tableType = tableType;
+ this.inputInstants = inputInstants;
+ this.expectedInstants = expectedInstants;
+ this.clusteringInstants = Option.of(clusteringInstants);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s (%s)", name, tableType);
+ }
+ }
+
+ static Stream<SchemaEvolutionTestCase> schemaEvolutionTestCases() {
+ return Stream.of(
+ // Empty timeline case
+ new SchemaEvolutionTestCase(
+ "Empty Timeline",
+ HoodieTableType.COPY_ON_WRITE,
+ new ArrayList<>(),
+ new ArrayList<>()
+ ),
+
+ // Empty timeline case
+ new SchemaEvolutionTestCase(
+ "Empty Timeline",
+ HoodieTableType.MERGE_ON_READ,
+ new ArrayList<>(),
+ new ArrayList<>()
+ ),
+
+ // Mixed actions case
+ new SchemaEvolutionTestCase(
+ "Mixed Actions",
+ HoodieTableType.COPY_ON_WRITE,
+ Arrays.asList(
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "001",
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR),
+ new HoodieInstant(COMPLETED, REPLACE_COMMIT_ACTION, "002",
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "003",
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR),
+ // Table service actions that should be filtered out
+ new HoodieInstant(REQUESTED, CLEAN_ACTION, "004",
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR),
+ new HoodieInstant(REQUESTED, ROLLBACK_ACTION, "005",
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR),
+ new HoodieInstant(REQUESTED, CLUSTERING_ACTION, "006",
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR),
+ new HoodieInstant(INFLIGHT, CLEAN_ACTION, "004",
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR),
+ new HoodieInstant(INFLIGHT, ROLLBACK_ACTION, "005",
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR),
+ new HoodieInstant(INFLIGHT, CLUSTERING_ACTION, "006",
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR),
+ new HoodieInstant(COMPLETED, CLEAN_ACTION, "004",
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR),
+ new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "005",
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR),
+ new HoodieInstant(COMPLETED, CLUSTERING_ACTION, "006",
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR),
+ new HoodieInstant(COMPLETED, REPLACE_COMMIT_ACTION, "010",
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR) // clustering commits
Review Comment:
do we have replacecommits which are not clustering instants? if not, can we
add them please. and ensure schema evolves w/ it and getTableSchema returns the
right one.
and can we also test the case where latest instant is a clustering instant
and it has a diff schema. but getTableSchema returns the right one and matches
the prev right instant's schema.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -222,6 +244,50 @@ private Option<Schema>
getTableSchemaFromLatestCommitMetadata(boolean includeMet
}
}
+ public Option<Schema> getTableAvroSchemaIfPresentV2(boolean
includeMetadataFields) {
Review Comment:
whats the necessity to add new API. can you help me understand, why can't we
fix the existing impl only?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -222,6 +244,50 @@ private Option<Schema>
getTableSchemaFromLatestCommitMetadata(boolean includeMet
}
}
+ public Option<Schema> getTableAvroSchemaIfPresentV2(boolean
includeMetadataFields) {
Review Comment:
also, we don't need "IfPresent" clause. anyways we return Option<Schema> and
so we can just name this method
```
getTableAvroSchema
```
##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -514,4 +580,56 @@ public static Schema appendPartitionColumns(Schema
dataSchema, Option<String[]>
private Supplier<Exception> schemaNotFoundError() {
return () -> new HoodieSchemaNotFoundException("No schema found for table
at " + metaClient.getBasePath());
}
+
+ /**
+ * WARNING: This method should be only used as part of HUDI-8438 before the
jira owner fully accommodate all existing use
+ * cases.
+ *
+ * Get timeline in REVERSE order that only contains completed instants which
POTENTIALLY evolve the table schema.
+ * For types of instants that are included and not reflecting table schema
at their instant completion time please refer
+ * comments inside the code.
+ * */
+ HoodieTimeline getSchemaEvolutionTimelineInReverseOrder() {
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ Stream<HoodieInstant> timelineStream = timeline.getInstantsAsStream();
+ final Set<String> actions;
+ switch (metaClient.getTableType()) {
+ case COPY_ON_WRITE: {
+ actions = new HashSet<>(Arrays.asList(COMMIT_ACTION,
REPLACE_COMMIT_ACTION));
+ break;
+ }
+ case MERGE_ON_READ: {
+ actions = new HashSet<>(Arrays.asList(DELTA_COMMIT_ACTION,
REPLACE_COMMIT_ACTION));
+ break;
+ }
+ default:
+ throw new HoodieException("Unsupported table type :" +
metaClient.getTableType());
+ }
+
+ // We only care committed instant when it comes to table schema.
+ TimelineLayout timelineLayout = metaClient.getTimelineLayout();
+ Comparator<HoodieInstant> reversedComparator =
timelineLayout.getInstantComparator().requestedTimeOrderedComparator().reversed();
+
+ // The timeline still contains DELTA_COMMIT_ACTION/COMMIT_ACTION which
might not contain a valid schema
+ // field in their commit metadata.
+ // Since the operations of filtering them out are expensive, we should do
on-demand stream based
+ // filtering when we actually need the table schema.
+ Stream<HoodieInstant> reversedTimelineWithTableSchema = timelineStream
+ // Only focuses on those who could potentially evolve the table schema.
+ .filter(s -> actions.contains(s.getAction()))
+ // Further filtering out clustering operations as it does not evolve
table schema.
+ .filter(s -> isaBoolean(s, timeline))
+ .filter(HoodieInstant::isCompleted)
+ // We reverse the order as the operation against this timeline would
be very efficient if
+ // we always start from the tail.
+ .sorted(reversedComparator);
+ return timelineLayout.getTimelineFactory().createDefaultTimeline(
+ reversedTimelineWithTableSchema,
+ metaClient.getActiveTimeline()::getInstantDetails);
+ }
+
+ private boolean isaBoolean(HoodieInstant s, HoodieActiveTimeline timeline) {
Review Comment:
isClusteringInstant is a good naming.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -514,4 +580,56 @@ public static Schema appendPartitionColumns(Schema
dataSchema, Option<String[]>
private Supplier<Exception> schemaNotFoundError() {
return () -> new HoodieSchemaNotFoundException("No schema found for table
at " + metaClient.getBasePath());
}
+
+ /**
+ * WARNING: This method should be only used as part of HUDI-8438 before the
jira owner fully accommodate all existing use
+ * cases.
+ *
+ * Get timeline in REVERSE order that only contains completed instants which
POTENTIALLY evolve the table schema.
+ * For types of instants that are included and not reflecting table schema
at their instant completion time please refer
+ * comments inside the code.
+ * */
+ HoodieTimeline getSchemaEvolutionTimelineInReverseOrder() {
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ Stream<HoodieInstant> timelineStream = timeline.getInstantsAsStream();
+ final Set<String> actions;
+ switch (metaClient.getTableType()) {
+ case COPY_ON_WRITE: {
+ actions = new HashSet<>(Arrays.asList(COMMIT_ACTION,
REPLACE_COMMIT_ACTION));
+ break;
+ }
+ case MERGE_ON_READ: {
+ actions = new HashSet<>(Arrays.asList(DELTA_COMMIT_ACTION,
REPLACE_COMMIT_ACTION));
+ break;
+ }
+ default:
+ throw new HoodieException("Unsupported table type :" +
metaClient.getTableType());
+ }
+
+ // We only care committed instant when it comes to table schema.
+ TimelineLayout timelineLayout = metaClient.getTimelineLayout();
+ Comparator<HoodieInstant> reversedComparator =
timelineLayout.getInstantComparator().requestedTimeOrderedComparator().reversed();
+
+ // The timeline still contains DELTA_COMMIT_ACTION/COMMIT_ACTION which
might not contain a valid schema
+ // field in their commit metadata.
+ // Since the operations of filtering them out are expensive, we should do
on-demand stream based
+ // filtering when we actually need the table schema.
+ Stream<HoodieInstant> reversedTimelineWithTableSchema = timelineStream
+ // Only focuses on those who could potentially evolve the table schema.
+ .filter(s -> actions.contains(s.getAction()))
+ // Further filtering out clustering operations as it does not evolve
table schema.
+ .filter(s -> isaBoolean(s, timeline))
+ .filter(HoodieInstant::isCompleted)
+ // We reverse the order as the operation against this timeline would
be very efficient if
+ // we always start from the tail.
+ .sorted(reversedComparator);
+ return timelineLayout.getTimelineFactory().createDefaultTimeline(
+ reversedTimelineWithTableSchema,
+ metaClient.getActiveTimeline()::getInstantDetails);
+ }
+
+ private boolean isaBoolean(HoodieInstant s, HoodieActiveTimeline timeline) {
+ return !s.getAction().equals(REPLACE_COMMIT_ACTION)
+ || !ClusteringUtils.isClusteringInstant(timeline, s,
metaClient.getInstantGenerator());
Review Comment:
in case of 1.0:
we can identify clustering instant just by looking up the corresponding
requested file. action will be "CLUSTERING".
in case of 0.x:
we have to deser to find if there is a plan.
we have two diff Timeline Implementations. So, lets ensure our deduction
works for both 0.x i.e tbl version6 and 1.x i.e. tbl version 8.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -514,4 +580,56 @@ public static Schema appendPartitionColumns(Schema
dataSchema, Option<String[]>
private Supplier<Exception> schemaNotFoundError() {
return () -> new HoodieSchemaNotFoundException("No schema found for table
at " + metaClient.getBasePath());
}
+
+ /**
+ * WARNING: This method should be only used as part of HUDI-8438 before the
jira owner fully accommodate all existing use
+ * cases.
+ *
+ * Get timeline in REVERSE order that only contains completed instants which
POTENTIALLY evolve the table schema.
+ * For types of instants that are included and not reflecting table schema
at their instant completion time please refer
+ * comments inside the code.
+ * */
+ HoodieTimeline getSchemaEvolutionTimelineInReverseOrder() {
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ Stream<HoodieInstant> timelineStream = timeline.getInstantsAsStream();
+ final Set<String> actions;
+ switch (metaClient.getTableType()) {
+ case COPY_ON_WRITE: {
+ actions = new HashSet<>(Arrays.asList(COMMIT_ACTION,
REPLACE_COMMIT_ACTION));
+ break;
+ }
+ case MERGE_ON_READ: {
+ actions = new HashSet<>(Arrays.asList(DELTA_COMMIT_ACTION,
REPLACE_COMMIT_ACTION));
+ break;
+ }
+ default:
+ throw new HoodieException("Unsupported table type :" +
metaClient.getTableType());
+ }
+
+ // We only care committed instant when it comes to table schema.
+ TimelineLayout timelineLayout = metaClient.getTimelineLayout();
+ Comparator<HoodieInstant> reversedComparator =
timelineLayout.getInstantComparator().requestedTimeOrderedComparator().reversed();
+
+ // The timeline still contains DELTA_COMMIT_ACTION/COMMIT_ACTION which
might not contain a valid schema
+ // field in their commit metadata.
+ // Since the operations of filtering them out are expensive, we should do
on-demand stream based
+ // filtering when we actually need the table schema.
+ Stream<HoodieInstant> reversedTimelineWithTableSchema = timelineStream
+ // Only focuses on those who could potentially evolve the table schema.
+ .filter(s -> actions.contains(s.getAction()))
+ // Further filtering out clustering operations as it does not evolve
table schema.
+ .filter(s -> isaBoolean(s, timeline))
+ .filter(HoodieInstant::isCompleted)
+ // We reverse the order as the operation against this timeline would
be very efficient if
+ // we always start from the tail.
+ .sorted(reversedComparator);
+ return timelineLayout.getTimelineFactory().createDefaultTimeline(
+ reversedTimelineWithTableSchema,
+ metaClient.getActiveTimeline()::getInstantDetails);
+ }
+
+ private boolean isaBoolean(HoodieInstant s, HoodieActiveTimeline timeline) {
Review Comment:
also, lets avoid naming w/ single letter.
```
instant or hoodieInstant for HoodieInstant
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]