This is an automated email from the ASF dual-hosted git repository. vhs pushed a commit to branch phase-18-HoodieAvroUtils-removal-p2 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 04487e48a5ad6b36fcb8f0116c587efb7cefa269 Author: voon <[email protected]> AuthorDate: Mon Dec 15 20:43:04 2025 +0800 Remove HoodieAvroUtils from hudi-client-common --- .../apache/hudi/cli/commands/TestTableCommand.java | 12 +- .../apache/hudi/client/BaseHoodieWriteClient.java | 10 +- .../bootstrap/HoodieBootstrapSchemaProvider.java | 7 +- ...ConcurrentSchemaEvolutionTableSchemaGetter.java | 8 +- .../SchemaConflictResolutionStrategy.java | 5 +- .../SimpleSchemaConflictResolutionStrategy.java | 26 +- .../apache/hudi/client/utils/TransactionUtils.java | 6 +- .../org/apache/hudi/index/HoodieIndexUtils.java | 66 ++- .../hudi/metadata/HoodieMetadataWriteUtils.java | 10 +- ...ConcurrentSchemaEvolutionTableSchemaGetter.java | 66 +-- ...TestSimpleSchemaConflictResolutionStrategy.java | 29 +- .../apache/hudi/index/TestHoodieIndexUtils.java | 300 ++++++------ .../GenericRecordValidationTestUtils.java | 16 +- .../hudi/testutils/HoodieMergeOnReadTestUtils.java | 36 +- .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 4 - .../apache/hudi/common/schema/HoodieSchema.java | 4 + .../HoodieSchemaComparatorForSchemaEvolution.java | 375 +++++++++++++++ .../hudi/common/schema/HoodieSchemaUtils.java | 4 + ...stHoodieSchemaComparatorForSchemaEvolution.java | 505 +++++++++++++++++++++ .../hive/TestHoodieCombineHiveInputFormat.java | 3 +- .../hudi/hadoop/testutils/InputFormatTestUtil.java | 4 +- ...DataValidationCheckForLogCompactionActions.java | 4 +- .../org/apache/hudi/functional/TestBootstrap.java | 17 +- 23 files changed, 1179 insertions(+), 338 deletions(-) diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java index 061ba21f2b07..dce2c7064f3c 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java @@ -18,7 +18,6 @@ package org.apache.hudi.cli.commands; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.functional.CLIFunctionalTestHarness; import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator; @@ -27,6 +26,8 @@ import org.apache.hudi.common.config.HoodieTimeGeneratorConfig; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; @@ -36,7 +37,6 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.storage.StoragePath; -import org.apache.avro.Schema; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.BeforeEach; @@ -259,10 +259,10 @@ public class TestTableCommand extends CLIFunctionalTestHarness { assertTrue(ShellEvaluationResultUtil.isSuccess(result)); String actualSchemaStr = result.toString().substring(result.toString().indexOf("{")); - Schema actualSchema = new Schema.Parser().parse(actualSchemaStr); + HoodieSchema actualSchema = HoodieSchema.parse(actualSchemaStr); - Schema expectedSchema = new Schema.Parser().parse(schemaStr); - expectedSchema = HoodieAvroUtils.addMetadataFields(expectedSchema); + HoodieSchema expectedSchema = HoodieSchema.parse(schemaStr); + expectedSchema = HoodieSchemaUtils.addMetadataFields(expectedSchema); assertEquals(actualSchema, expectedSchema); File file = File.createTempFile("temp", null); @@ -270,7 +270,7 @@ public class TestTableCommand extends CLIFunctionalTestHarness { assertTrue(ShellEvaluationResultUtil.isSuccess(result)); actualSchemaStr = getFileContent(file.getAbsolutePath()); - actualSchema = new Schema.Parser().parse(actualSchemaStr); + actualSchema = HoodieSchema.parse(actualSchemaStr); assertEquals(actualSchema, expectedSchema); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 86efd7db1e08..4c4a1347903e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -19,7 +19,6 @@ package org.apache.hudi.client; import org.apache.hudi.avro.AvroSchemaUtils; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieIndexCommitMetadata; import org.apache.hudi.avro.model.HoodieIndexPlan; @@ -46,6 +45,7 @@ import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.TableServiceType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; @@ -341,15 +341,15 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(table.getMetaClient()); if (!historySchemaStr.isEmpty() || Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key()))) { InternalSchema internalSchema; - Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema(), config.allowOperationMetadataField()); + HoodieSchema schema = HoodieSchemaUtils.addMetadataFields(HoodieSchema.parse(config.getSchema()), config.allowOperationMetadataField()); if (historySchemaStr.isEmpty()) { - internalSchema = SerDeHelper.fromJson(config.getInternalSchema()).orElseGet(() -> InternalSchemaConverter.convert(HoodieSchema.fromAvroSchema(avroSchema))); + internalSchema = SerDeHelper.fromJson(config.getInternalSchema()).orElseGet(() -> InternalSchemaConverter.convert(schema)); internalSchema.setSchemaId(Long.parseLong(instantTime)); } else { internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime), SerDeHelper.parseSchemas(historySchemaStr)); } - InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.reconcileSchema(avroSchema, internalSchema, config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS)); + InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.reconcileSchema(schema.toAvroSchema(), internalSchema, config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS)); if (evolvedSchema.equals(internalSchema)) { metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(evolvedSchema)); //TODO save history schema by metaTable @@ -361,7 +361,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient schemasManager.persistHistorySchemaStr(instantTime, SerDeHelper.inheritSchemas(evolvedSchema, historySchemaStr)); } // update SCHEMA_KEY - metadata.addMetadata(SCHEMA_KEY, InternalSchemaConverter.convert(evolvedSchema, avroSchema.getFullName()).toString()); + metadata.addMetadata(SCHEMA_KEY, InternalSchemaConverter.convert(evolvedSchema, schema.getFullName()).toString()); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/HoodieBootstrapSchemaProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/HoodieBootstrapSchemaProvider.java index ce4c42ba1665..99b2dbad1be7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/HoodieBootstrapSchemaProvider.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/HoodieBootstrapSchemaProvider.java @@ -18,9 +18,10 @@ package org.apache.hudi.client.bootstrap; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieFileStatus; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaType; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; @@ -48,8 +49,8 @@ public abstract class HoodieBootstrapSchemaProvider { public final Schema getBootstrapSchema(HoodieEngineContext context, List<Pair<String, List<HoodieFileStatus>>> partitions) { if (writeConfig.getSchema() != null) { // Use schema specified by user if set - Schema userSchema = new Schema.Parser().parse(writeConfig.getSchema()); - if (!HoodieAvroUtils.getNullSchema().equals(userSchema)) { + HoodieSchema userSchema = HoodieSchema.parse(writeConfig.getSchema()); + if (!HoodieSchema.create(HoodieSchemaType.NULL).equals(userSchema)) { return userSchema; } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java index ab9c0ce426b7..0e5feab55e11 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java @@ -89,12 +89,16 @@ class ConcurrentSchemaEvolutionTableSchemaGetter { return schema; } - public Option<Schema> getTableAvroSchemaIfPresent(boolean includeMetadataFields, Option<HoodieInstant> instant) { + public Option<HoodieSchema> getTableSchemaIfPresent(boolean includeMetadataFields, Option<HoodieInstant> instant) { return getTableAvroSchemaFromTimelineWithCache(instant) // Get table schema from schema evolution timeline. .map(HoodieSchema::fromAvroSchema) .or(this::getTableCreateSchemaWithoutMetaField) // Fall back: read create schema from table config. .map(tableSchema -> includeMetadataFields ? HoodieSchemaUtils.addMetadataFields(tableSchema, false) : HoodieSchemaUtils.removeMetadataFields(tableSchema)) - .map(this::handlePartitionColumnsIfNeeded) + .map(this::handlePartitionColumnsIfNeeded); + } + + public Option<Schema> getTableAvroSchemaIfPresent(boolean includeMetadataFields, Option<HoodieInstant> instant) { + return getTableSchemaIfPresent(includeMetadataFields, instant) .map(HoodieSchema::toAvroSchema); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SchemaConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SchemaConflictResolutionStrategy.java index 36845c688c78..2dd39fbd0589 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SchemaConflictResolutionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SchemaConflictResolutionStrategy.java @@ -21,6 +21,7 @@ package org.apache.hudi.client.transaction; import org.apache.hudi.ApiMaturityLevel; import org.apache.hudi.PublicAPIClass; import org.apache.hudi.PublicAPIMethod; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; @@ -50,14 +51,14 @@ public interface SchemaConflictResolutionStrategy { * @throws HoodieWriteConflictException if schema conflicts cannot be resolved. */ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) - Option<Schema> resolveConcurrentSchemaEvolution( + Option<HoodieSchema> resolveConcurrentSchemaEvolution( HoodieTable table, HoodieWriteConfig config, Option<HoodieInstant> lastCompletedTxnOwnerInstant, Option<HoodieInstant> currTxnOwnerInstant); static void throwConcurrentSchemaEvolutionException( - Option<Schema> tableSchemaAtTxnStart, Option<Schema> tableSchemaAtTxnValidation, Schema writerSchemaOfTxn, + Option<HoodieSchema> tableSchemaAtTxnStart, Option<HoodieSchema> tableSchemaAtTxnValidation, HoodieSchema writerSchemaOfTxn, Option<HoodieInstant> lastCompletedTxnOwnerInstant, Option<HoodieInstant> currTxnOwnerInstant) throws HoodieWriteConflictException { String errMsg = String.format( diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleSchemaConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleSchemaConflictResolutionStrategy.java index 688f14c38d42..5f72ce11bdf2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleSchemaConflictResolutionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleSchemaConflictResolutionStrategy.java @@ -18,7 +18,8 @@ package org.apache.hudi.client.transaction; -import org.apache.hudi.avro.AvroSchemaComparatorForSchemaEvolution; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaComparatorForSchemaEvolution; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.Option; @@ -28,11 +29,9 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.HoodieTable; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; import java.util.stream.Stream; -import static org.apache.hudi.avro.HoodieAvroUtils.isSchemaNull; import static org.apache.hudi.client.transaction.SchemaConflictResolutionStrategy.throwConcurrentSchemaEvolutionException; import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION; import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN_OR_EQUALS; @@ -46,7 +45,7 @@ import static org.apache.hudi.common.table.timeline.InstantComparison.compareTim public class SimpleSchemaConflictResolutionStrategy implements SchemaConflictResolutionStrategy { @Override - public Option<Schema> resolveConcurrentSchemaEvolution( + public Option<HoodieSchema> resolveConcurrentSchemaEvolution( HoodieTable table, HoodieWriteConfig config, Option<HoodieInstant> lastCompletedTxnOwnerInstant, @@ -66,10 +65,10 @@ public class SimpleSchemaConflictResolutionStrategy implements SchemaConflictRes return Option.empty(); } - Schema writerSchemaOfTxn = new Schema.Parser().parse(config.getWriteSchema()); + HoodieSchema writerSchemaOfTxn = HoodieSchema.parse(config.getWriteSchema()); // If a writer does not come with a meaningful schema, skip the schema resolution. ConcurrentSchemaEvolutionTableSchemaGetter schemaResolver = new ConcurrentSchemaEvolutionTableSchemaGetter(table.getMetaClient()); - if (isSchemaNull(writerSchemaOfTxn)) { + if (writerSchemaOfTxn.isSchemaNull()) { return getTableSchemaAtInstant(schemaResolver, currTxnOwnerInstant.get()); } @@ -98,14 +97,14 @@ public class SimpleSchemaConflictResolutionStrategy implements SchemaConflictRes return Option.of(writerSchemaOfTxn); } - Option<Schema> tableSchemaAtTxnValidation = getTableSchemaAtInstant(schemaResolver, lastCompletedInstantAtTxnValidation); + Option<HoodieSchema> tableSchemaAtTxnValidation = getTableSchemaAtInstant(schemaResolver, lastCompletedInstantAtTxnValidation); // If table schema is not defined, it's still case 1. There can be cases where there are commits but they didn't // write any data. if (!tableSchemaAtTxnValidation.isPresent()) { return Option.of(writerSchemaOfTxn); } // Case 2, 4, 7: Both writers try to evolve to the same schema or neither evolves schema. - boolean writerSchemaIsCurrentTableSchema = AvroSchemaComparatorForSchemaEvolution.schemaEquals(writerSchemaOfTxn, tableSchemaAtTxnValidation.get()); + boolean writerSchemaIsCurrentTableSchema = HoodieSchemaComparatorForSchemaEvolution.schemaEquals(writerSchemaOfTxn, tableSchemaAtTxnValidation.get()); if (writerSchemaIsCurrentTableSchema) { return Option.of(writerSchemaOfTxn); } @@ -122,7 +121,7 @@ public class SimpleSchemaConflictResolutionStrategy implements SchemaConflictRes throwConcurrentSchemaEvolutionException( Option.empty(), tableSchemaAtTxnValidation, writerSchemaOfTxn, lastCompletedTxnOwnerInstant, currTxnOwnerInstant); } - Option<Schema> tableSchemaAtTxnStart = getTableSchemaAtInstant(schemaResolver, lastCompletedInstantAtTxnStart); + Option<HoodieSchema> tableSchemaAtTxnStart = getTableSchemaAtInstant(schemaResolver, lastCompletedInstantAtTxnStart); // If no table schema is defined, fall back to case 3. if (!tableSchemaAtTxnStart.isPresent()) { throwConcurrentSchemaEvolutionException( @@ -132,13 +131,13 @@ public class SimpleSchemaConflictResolutionStrategy implements SchemaConflictRes // Case 5: // Table schema has not changed from the start of the transaction till the pre-commit validation // If table schema parsing failed we will blindly go with writer schema. use option.empty - if (AvroSchemaComparatorForSchemaEvolution.schemaEquals(tableSchemaAtTxnStart.get(), tableSchemaAtTxnValidation.get())) { + if (HoodieSchemaComparatorForSchemaEvolution.schemaEquals(tableSchemaAtTxnStart.get(), tableSchemaAtTxnValidation.get())) { return Option.of(writerSchemaOfTxn); } // Case 6: Current txn does not evolve schema, the tableSchema we saw at validation phase // might be an evolved one, use it. - if (AvroSchemaComparatorForSchemaEvolution.schemaEquals(writerSchemaOfTxn, tableSchemaAtTxnStart.get())) { + if (HoodieSchemaComparatorForSchemaEvolution.schemaEquals(writerSchemaOfTxn, tableSchemaAtTxnStart.get())) { return tableSchemaAtTxnValidation; } @@ -164,9 +163,10 @@ public class SimpleSchemaConflictResolutionStrategy implements SchemaConflictRes .findFirst()); } - private static Option<Schema> getTableSchemaAtInstant(ConcurrentSchemaEvolutionTableSchemaGetter schemaResolver, HoodieInstant instant) { + private static Option<HoodieSchema> getTableSchemaAtInstant(ConcurrentSchemaEvolutionTableSchemaGetter schemaResolver, HoodieInstant instant) { try { - return schemaResolver.getTableAvroSchemaIfPresent(false, Option.of(instant)); + return schemaResolver.getTableAvroSchemaIfPresent(false, Option.of(instant)) + .map(HoodieSchema::fromAvroSchema); } catch (Exception ex) { log.error("Cannot get table schema for instant {}", instant); throw new HoodieException("Unable to get table schema", ex); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java index 8e26add42992..e8cf8e7f9d28 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java @@ -23,6 +23,7 @@ import org.apache.hudi.client.transaction.ConflictResolutionStrategy; import org.apache.hudi.client.transaction.SimpleSchemaConflictResolutionStrategy; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -36,7 +37,6 @@ import org.apache.hudi.exception.HoodieWriteConflictException; import org.apache.hudi.table.HoodieTable; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; import java.io.IOException; import java.util.Map; @@ -78,7 +78,7 @@ public class TransactionUtils { Stream<HoodieInstant> completedInstantsDuringCurrentWriteOperation = getCompletedInstantsDuringCurrentWriteOperation(table.getMetaClient(), pendingInstants); ConflictResolutionStrategy resolutionStrategy = config.getWriteConflictResolutionStrategy(); - Option<Schema> newTableSchema = resolveSchemaConflictIfNeeded(table, config, lastCompletedTxnOwnerInstant, currentTxnOwnerInstant); + Option<HoodieSchema> newTableSchema = resolveSchemaConflictIfNeeded(table, config, lastCompletedTxnOwnerInstant, currentTxnOwnerInstant); Stream<HoodieInstant> instantStream = Stream.concat(resolutionStrategy.getCandidateInstants( table.getMetaClient(), currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant), @@ -117,7 +117,7 @@ public class TransactionUtils { * @param currentTxnOwnerInstant current instant * @return new table schema after successful schema resolution; empty if nothing to be resolved. */ - public static Option<Schema> resolveSchemaConflictIfNeeded(final HoodieTable table, + public static Option<HoodieSchema> resolveSchemaConflictIfNeeded(final HoodieTable table, final HoodieWriteConfig config, final Option<HoodieInstant> lastCompletedTxnOwnerInstant, final Option<HoodieInstant> currentTxnOwnerInstant) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java index dca188c8ed02..55e2b605ba94 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -40,6 +40,8 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.MetadataValues; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaCache; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -136,10 +138,10 @@ public class HoodieIndexUtils { * @param tableSchema table schema * @return true if each field's data type are supported for secondary index, false otherwise */ - static boolean validateDataTypeForSecondaryIndex(List<String> sourceFields, Schema tableSchema) { + static boolean validateDataTypeForSecondaryIndex(List<String> sourceFields, HoodieSchema tableSchema) { return sourceFields.stream().allMatch(fieldToIndex -> { - Schema schema = getNestedFieldSchemaFromWriteSchema(tableSchema, fieldToIndex); - return isSecondaryIndexSupportedType(schema); + Option<Pair<String, HoodieSchemaField>> schema = HoodieSchemaUtils.getNestedField(tableSchema, fieldToIndex); + return isSecondaryIndexSupportedType(schema.get().getRight().schema()); }); } @@ -150,10 +152,11 @@ public class HoodieIndexUtils { * @param tableSchema table schema * @return true if each field's data types are supported, false otherwise */ - public static boolean validateDataTypeForSecondaryOrExpressionIndex(List<String> sourceFields, Schema tableSchema) { + public static boolean validateDataTypeForSecondaryOrExpressionIndex(List<String> sourceFields, HoodieSchema tableSchema) { return sourceFields.stream().anyMatch(fieldToIndex -> { - Schema schema = getNestedFieldSchemaFromWriteSchema(tableSchema, fieldToIndex); - return schema.getType() != Schema.Type.RECORD && schema.getType() != Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP; + Option<Pair<String, HoodieSchemaField>> nestedFieldOpt = HoodieSchemaUtils.getNestedField(tableSchema, fieldToIndex); + HoodieSchema fieldSchema = nestedFieldOpt.get().getRight().schema(); + return fieldSchema.getType() != HoodieSchemaType.RECORD && fieldSchema.getType() != HoodieSchemaType.ARRAY && fieldSchema.getType() != HoodieSchemaType.MAP; }); } @@ -161,39 +164,26 @@ public class HoodieIndexUtils { * Check if the given schema type is supported for secondary index. * Supported types are: String (including CHAR), Integer types (Int, BigInt, Long, Short), and timestamp */ - private static boolean isSecondaryIndexSupportedType(Schema schema) { + private static boolean isSecondaryIndexSupportedType(HoodieSchema schema) { // Handle union types (nullable fields) - if (schema.getType() == Schema.Type.UNION) { + if (schema.getType() == HoodieSchemaType.UNION) { // For union types, check if any of the types is supported return schema.getTypes().stream() - .anyMatch(s -> s.getType() != Schema.Type.NULL && isSecondaryIndexSupportedType(s)); + .anyMatch(s -> s.getType() != HoodieSchemaType.NULL && isSecondaryIndexSupportedType(s)); } // Check basic types switch (schema.getType()) { case STRING: - // STRING type can have UUID logical type which we don't support - return schema.getLogicalType() == null; // UUID and other string-based logical types are not supported - // Regular STRING (includes CHAR) case INT: - // INT type can represent regular integers or dates/times with logical types - if (schema.getLogicalType() != null) { - // Support date and time-millis logical types - return schema.getLogicalType() == LogicalTypes.date() - || schema.getLogicalType() == LogicalTypes.timeMillis(); - } - return true; // Regular INT case LONG: - // LONG type can represent regular longs or timestamps with logical types - if (schema.getLogicalType() != null) { - // Support timestamp logical types - return schema.getLogicalType() == LogicalTypes.timestampMillis() - || schema.getLogicalType() == LogicalTypes.timestampMicros() - || schema.getLogicalType() == LogicalTypes.timeMicros(); - } - return true; // Regular LONG case DOUBLE: - return true; // Support DOUBLE type + case DATE: + case TIME: + return true; + case TIMESTAMP: + // LOCAL timestamps are not supported + return ((HoodieSchema.Timestamp) schema).isUtcAdjusted(); default: return false; } @@ -721,40 +711,36 @@ public class HoodieIndexUtils { Map<String, String> options, Map<String, Map<String, String>> columns, String userIndexName) throws Exception { - Schema tableSchema = new TableSchemaResolver(metaClient).getTableAvroSchema(); + HoodieSchema tableSchema = new TableSchemaResolver(metaClient).getTableSchema(); List<String> sourceFields = new ArrayList<>(columns.keySet()); String columnName = sourceFields.get(0); // We know there's only one column from the check above // First check if the field exists - try { - getNestedFieldSchemaFromWriteSchema(tableSchema, columnName); - } catch (Exception e) { + Option<Pair<String, HoodieSchemaField>> fieldSchemaOpt = HoodieSchemaUtils.getNestedField(tableSchema, columnName); + if (fieldSchemaOpt.isEmpty()) { throw new HoodieMetadataIndexException(String.format( "Cannot create %s index '%s': Column '%s' does not exist in the table schema. " - + "Please verify the column name and ensure it exists in the table.", + + "Please verify the column name and ensure it exists in the table.", indexType.equals(PARTITION_NAME_SECONDARY_INDEX) ? "secondary" : "expression", userIndexName, columnName)); } + Pair<String, HoodieSchemaField> fieldSchema = fieldSchemaOpt.get(); + // Check for complex types (RECORD, ARRAY, MAP) - not supported for any index type if (!validateDataTypeForSecondaryOrExpressionIndex(sourceFields, tableSchema)) { - Schema fieldSchema = getNestedFieldSchemaFromWriteSchema(tableSchema, columnName); throw new HoodieMetadataIndexException(String.format( "Cannot create %s index '%s': Column '%s' has unsupported data type '%s'. " + "Complex types (RECORD, ARRAY, MAP) are not supported for indexing. " + "Please choose a column with a primitive data type.", indexType.equals(PARTITION_NAME_SECONDARY_INDEX) ? "secondary" : "expression", - userIndexName, columnName, fieldSchema.getType())); + userIndexName, columnName, fieldSchema.getRight().schema().getType())); } // For secondary index, apply stricter data type validation if (indexType.equals(PARTITION_NAME_SECONDARY_INDEX)) { if (!validateDataTypeForSecondaryIndex(sourceFields, tableSchema)) { - Schema fieldSchema = getNestedFieldSchemaFromWriteSchema(tableSchema, columnName); - String actualType = fieldSchema.getType().toString(); - if (fieldSchema.getLogicalType() != null) { - actualType += " with logical type " + fieldSchema.getLogicalType(); - } + String actualType = fieldSchema.getRight().schema().getType().toString(); throw new HoodieMetadataIndexException(String.format( "Cannot create secondary index '%s': Column '%s' has unsupported data type '%s'. " diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java index c970e0d1361c..5c138ec3cc81 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java @@ -46,6 +46,7 @@ import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; @@ -79,7 +80,6 @@ import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy import org.apache.hudi.util.Lazy; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; import java.util.ArrayList; import java.util.Collection; @@ -95,7 +95,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static java.util.stream.Collectors.toList; -import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields; import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ASYNC_CLEAN; import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_CLEANER_COMMITS_RETAINED; import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_POPULATE_META_FIELDS; @@ -410,14 +409,15 @@ public class HoodieMetadataWriteUtils { HoodieTableMetadata tableMetadata, HoodieMetadataConfig metadataConfig, Option<HoodieRecord.HoodieRecordType> recordTypeOpt, boolean isDeletePartition) { try { - Option<Schema> writerSchema = + Option<HoodieSchema> writerSchema = Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY)) .flatMap(writerSchemaStr -> isNullOrEmpty(writerSchemaStr) ? Option.empty() - : Option.of(new Schema.Parser().parse(writerSchemaStr))); + : Option.of(HoodieSchema.parse(writerSchemaStr))); HoodieTableConfig tableConfig = dataMetaClient.getTableConfig(); - Option<HoodieSchema> tableSchema = writerSchema.map(schema -> tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema).map(HoodieSchema::fromAvroSchema); + Option<HoodieSchema> tableSchema = writerSchema.map(schema -> tableConfig.populateMetaFields() ? HoodieSchemaUtils.addMetadataFields(schema) : schema); + if (tableSchema.isEmpty()) { return engineContext.emptyHoodieData(); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConcurrentSchemaEvolutionTableSchemaGetter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConcurrentSchemaEvolutionTableSchemaGetter.java index 4e02d74d1983..7317cd13050a 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConcurrentSchemaEvolutionTableSchemaGetter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConcurrentSchemaEvolutionTableSchemaGetter.java @@ -32,6 +32,8 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler; @@ -41,7 +43,6 @@ import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; -import org.apache.avro.Schema; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -57,7 +58,6 @@ import java.util.HashMap; import java.util.Properties; import java.util.stream.Stream; -import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields; import static org.apache.hudi.common.table.HoodieTableConfig.PARTITION_FIELDS; import static org.apache.hudi.common.table.timeline.HoodieTimeline.CLUSTERING_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; @@ -122,10 +122,10 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon + " {\"name\":\"partitionColumn\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}" + " ]\n" + "}"; - private static Schema SCHEMA_WITHOUT_METADATA2 = new Schema.Parser().parse(SCHEMA_WITHOUT_METADATA_STR2); - private static Schema SCHEMA_WITHOUT_METADATA = new Schema.Parser().parse(SCHEMA_WITHOUT_METADATA_STR); - private static Schema SCHEMA_WITH_METADATA = addMetadataFields(SCHEMA_WITHOUT_METADATA, false); - private static Schema SCHEMA_WITH_PARTITION_COLUMN = new Schema.Parser().parse(SCHEMA_WITH_PARTITION_COLUMN_STR); + private static HoodieSchema SCHEMA_WITHOUT_METADATA2 = HoodieSchema.parse(SCHEMA_WITHOUT_METADATA_STR2); + private static HoodieSchema SCHEMA_WITHOUT_METADATA = HoodieSchema.parse(SCHEMA_WITHOUT_METADATA_STR); + private static HoodieSchema SCHEMA_WITH_METADATA = HoodieSchemaUtils.addMetadataFields(SCHEMA_WITHOUT_METADATA); + private static HoodieSchema SCHEMA_WITH_PARTITION_COLUMN = HoodieSchema.parse(SCHEMA_WITH_PARTITION_COLUMN_STR); @BeforeEach public void setUp() throws Exception { @@ -195,7 +195,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon metaClient.reloadActiveTimeline(); ConcurrentSchemaEvolutionTableSchemaGetter resolver = new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient); - Option<Schema> schemaOption = resolver.getTableAvroSchemaIfPresent(false, Option.empty()); + Option<HoodieSchema> schemaOption = resolver.getTableSchemaIfPresent(false, Option.empty()); assertTrue(schemaOption.isPresent()); assertEquals(SCHEMA_WITHOUT_METADATA, schemaOption.get()); } @@ -235,7 +235,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon metaClient.reloadActiveTimeline(); ConcurrentSchemaEvolutionTableSchemaGetter resolver = new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient); - Option<Schema> schemaOption = resolver.getTableAvroSchemaIfPresent(false, Option.empty()); + Option<HoodieSchema> schemaOption = resolver.getTableSchemaIfPresent(false, Option.empty()); assertTrue(schemaOption.isPresent()); assertEquals(SCHEMA_WITHOUT_METADATA, schemaOption.get()); } @@ -276,7 +276,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon metaClient.reloadActiveTimeline(); ConcurrentSchemaEvolutionTableSchemaGetter resolver = new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient); - Option<Schema> schemaOption = resolver.getTableAvroSchemaIfPresent(true, Option.empty()); + Option<HoodieSchema> schemaOption = resolver.getTableSchemaIfPresent(true, Option.empty()); assertFalse(schemaOption.isPresent()); } @@ -296,7 +296,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon metaClient.reloadActiveTimeline(); ConcurrentSchemaEvolutionTableSchemaGetter resolver = new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient); - Option<Schema> schemaOption = resolver.getTableAvroSchemaIfPresent(true, Option.empty()); + Option<HoodieSchema> schemaOption = resolver.getTableSchemaIfPresent(true, Option.empty()); assertTrue(schemaOption.isEmpty()); } @@ -383,7 +383,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon @ParameterizedTest @MethodSource("schemaTestParams") - void testGetTableAvroSchema(Schema inputSchema, boolean includeMetadataFields, Schema expectedSchema) throws Exception { + void testGetTableAvroSchema(HoodieSchema inputSchema, boolean includeMetadataFields, HoodieSchema expectedSchema) throws Exception { metaClient = HoodieTestUtils.getMetaClientBuilder(HoodieTableType.COPY_ON_WRITE, new Properties(),"") .setTableCreateSchema(SCHEMA_WITH_METADATA.toString()) .initTable(getDefaultStorageConf(), basePath); @@ -397,9 +397,9 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon inputSchema.toString(), COMMIT_ACTION))); - assertEquals(expectedSchema, new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableAvroSchemaIfPresent(includeMetadataFields, Option.empty()).get()); + assertEquals(expectedSchema, new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableSchemaIfPresent(includeMetadataFields, Option.empty()).get()); HoodieInstant instant = metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED, COMMIT_ACTION, "0010", "0011"); - assertEquals(expectedSchema, new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableAvroSchemaIfPresent( + assertEquals(expectedSchema, new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableSchemaIfPresent( includeMetadataFields, Option.of(instant)).get()); } @@ -412,7 +412,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon @ParameterizedTest @MethodSource("partitionColumnSchemaTestParams") - void testGetTableAvroSchemaAppendPartitionColumn(boolean shouldIncludePartitionColumns, Schema expectedSchema) throws Exception { + void testGetTableSchemaAppendPartitionColumn(boolean shouldIncludePartitionColumns, HoodieSchema expectedSchema) throws Exception { metaClient = HoodieTestUtils.getMetaClientBuilder(HoodieTableType.COPY_ON_WRITE, new Properties(),"") .setPartitionFields("partitionColumn") .setShouldDropPartitionColumns(shouldIncludePartitionColumns) @@ -427,8 +427,8 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon SCHEMA_WITHOUT_METADATA.toString(), COMMIT_ACTION))); - assertEquals(expectedSchema, new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableAvroSchemaIfPresent(false, Option.empty()).get()); - assertEquals(expectedSchema, new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableAvroSchemaIfPresent( + assertEquals(expectedSchema, new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableSchemaIfPresent(false, Option.empty()).get()); + assertEquals(expectedSchema, new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableSchemaIfPresent( false, Option.of(metaClient.getInstantGenerator().createNewInstant( HoodieInstant.State.COMPLETED, COMMIT_ACTION, "0010", "0011"))).get()); } @@ -442,15 +442,15 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon @ParameterizedTest @MethodSource("createSchemaTestParam") - void testGetTableCreateAvroSchema(boolean includeMetadataFields, Schema expectedSchema) throws Exception { + void testGetTableCreateAvroSchema(boolean includeMetadataFields, HoodieSchema expectedSchema) throws Exception { metaClient = HoodieTestUtils.getMetaClientBuilder(HoodieTableType.COPY_ON_WRITE, new Properties(),"") .setTableCreateSchema(SCHEMA_WITH_METADATA.toString()) .initTable(getDefaultStorageConf(), basePath); testTable = HoodieTestTable.of(metaClient); - assertEquals(expectedSchema, new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableAvroSchemaIfPresent(includeMetadataFields, Option.empty()).get()); + assertEquals(expectedSchema, new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableSchemaIfPresent(includeMetadataFields, Option.empty()).get()); // getTableAvroSchemaFromLatestCommit only cares about active timeline, since it is empty, no schema is returned. - assertEquals(expectedSchema, new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableAvroSchemaIfPresent( + assertEquals(expectedSchema, new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableSchemaIfPresent( includeMetadataFields, Option.of(metaClient.getInstantGenerator().createNewInstant( HoodieInstant.State.COMPLETED, COMMIT_ACTION, "0010", "0011"))).get()); } @@ -467,10 +467,10 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon metaClient.reloadActiveTimeline(); ConcurrentSchemaEvolutionTableSchemaGetter resolver = new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient); - Option<Schema> schemaOption = resolver.getTableAvroSchemaIfPresent(true, Option.empty()); + Option<HoodieSchema> schemaOption = resolver.getTableSchemaIfPresent(true, Option.empty()); assertTrue(schemaOption.isPresent()); - Schema resultSchema = schemaOption.get(); + HoodieSchema resultSchema = schemaOption.get(); assertTrue(resultSchema.getFields().stream() .anyMatch(f -> f.name().equals("partition_path"))); } @@ -481,8 +481,8 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon initMetaClient(false, tableType); testTable = HoodieTestTable.of(metaClient); - Schema schema1 = new Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); - Schema schema2 = new Schema.Parser().parse(TRIP_SCHEMA); + HoodieSchema schema1 = HoodieSchema.parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); + HoodieSchema schema2 = HoodieSchema.parse(TRIP_SCHEMA); // Create two commits with different schemas int startCommitTime = 10; @@ -514,7 +514,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon // Test getting schema from first instant String timestamp1 = padWithLeadingZeros(Integer.toString(10), REQUEST_TIME_LENGTH); - Option<Schema> schema1Option = resolver.getTableAvroSchemaIfPresent( + Option<HoodieSchema> schema1Option = resolver.getTableSchemaIfPresent( false, Option.of(metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED, COMMIT_ACTION, timestamp1, incTimestampStrByOne(timestamp1)))); assertTrue(schema1Option.isPresent()); @@ -522,7 +522,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon // Test getting schema from second instant String timestamp2 = padWithLeadingZeros(Integer.toString(20), REQUEST_TIME_LENGTH); - Option<Schema> schema2Option = resolver.getTableAvroSchemaIfPresent( + Option<HoodieSchema> schema2Option = resolver.getTableSchemaIfPresent( false, Option.of(metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED, COMMIT_ACTION, timestamp2, incTimestampStrByOne(timestamp2)))); assertTrue(schema2Option.isPresent()); @@ -534,7 +534,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon for (Integer i = startCommitTime + 10; i <= endCommitTime + 10; i += 10) { String timestampI = padWithLeadingZeros(Integer.toString(i), REQUEST_TIME_LENGTH); - schema2Option = resolver.getTableAvroSchemaIfPresent(false, + schema2Option = resolver.getTableSchemaIfPresent(false, Option.of(metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED, COMMIT_ACTION, timestampI, incTimestampStrByOne(timestampI)))); assertTrue(schema2Option.isPresent(), i::toString); assertEquals(schema2.toString(), schema2Option.get().toString()); @@ -548,8 +548,8 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon testTable = HoodieTestTable.of(metaClient); // Create test schema - Schema schema1 = new Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); - Schema schema2 = new Schema.Parser().parse(HoodieTestDataGenerator.SHORT_TRIP_SCHEMA); + HoodieSchema schema1 = HoodieSchema.parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); + HoodieSchema schema2 = HoodieSchema.parse(HoodieTestDataGenerator.SHORT_TRIP_SCHEMA); // Create a commit with schema1 String commitTime1 = "0010"; @@ -578,7 +578,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon HoodieInstant instant1 = metaClient.getCommitsTimeline().filterCompletedInstants().nthInstant(0).get(); // Case 1: First call with empty instant - should fetch from timeline and cache - Option<Schema> schemaOption1 = resolver.getTableAvroSchemaFromTimelineWithCache(Option.empty()); + Option<HoodieSchema> schemaOption1 = resolver.getTableAvroSchemaFromTimelineWithCache(Option.empty()).map(HoodieSchema::fromAvroSchema); assertTrue(schemaOption1.isPresent()); assertEquals(schema2, schemaOption1.get()); @@ -586,7 +586,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon verify(resolver, times(1)).getLastCommitMetadataWithValidSchemaFromTimeline(any(), any()); // Case 2: Second call with empty instant - should use cache - Option<Schema> schemaOption2 = resolver.getTableAvroSchemaFromTimelineWithCache(Option.empty()); + Option<HoodieSchema> schemaOption2 = resolver.getTableAvroSchemaFromTimelineWithCache(Option.empty()).map(HoodieSchema::fromAvroSchema); assertTrue(schemaOption2.isPresent()); assertEquals(schema2, schemaOption2.get()); @@ -594,7 +594,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon verify(resolver, times(1)).getLastCommitMetadataWithValidSchemaFromTimeline(any(), any()); // Case 3: Call with the latest valid instant - there should be a cache hit - Option<Schema> schemaOption3 = resolver.getTableAvroSchemaFromTimelineWithCache(Option.of(instant2)); + Option<HoodieSchema> schemaOption3 = resolver.getTableAvroSchemaFromTimelineWithCache(Option.of(instant2)).map(HoodieSchema::fromAvroSchema); assertTrue(schemaOption3.isPresent()); assertEquals(schema2, schemaOption3.get()); @@ -602,7 +602,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon verify(resolver, times(1)).getLastCommitMetadataWithValidSchemaFromTimeline(any(), any()); // Case 4: Second call with some other instant - should use cache - Option<Schema> schemaOption4 = resolver.getTableAvroSchemaFromTimelineWithCache(Option.of(instant1)); + Option<HoodieSchema> schemaOption4 = resolver.getTableAvroSchemaFromTimelineWithCache(Option.of(instant1)).map(HoodieSchema::fromAvroSchema); assertTrue(schemaOption4.isPresent()); assertEquals(schema1, schemaOption4.get()); @@ -613,7 +613,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon String nonExistentTime = "9999"; HoodieInstant nonExistentInstant = metaClient.getInstantGenerator().createNewInstant( HoodieInstant.State.COMPLETED, COMMIT_ACTION, nonExistentTime, nonExistentTime); - Option<Schema> schemaOption5 = resolver.getTableAvroSchemaFromTimelineWithCache(Option.of(nonExistentInstant)); + Option<HoodieSchema> schemaOption5 = resolver.getTableAvroSchemaFromTimelineWithCache(Option.of(nonExistentInstant)).map(HoodieSchema::fromAvroSchema); assertEquals(schema2, schemaOption5.get()); // Verify one more call to timeline for non-existent instant diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleSchemaConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleSchemaConflictResolutionStrategy.java index db6a6d5abf0e..4873a608cb38 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleSchemaConflictResolutionStrategy.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleSchemaConflictResolutionStrategy.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.FileSystemViewManager; @@ -137,9 +138,9 @@ public class TestSimpleSchemaConflictResolutionStrategy { @Test void testNoConflictFirstCommit() throws Exception { setupInstants(null, null, SCHEMA1, true, false); - Schema result = strategy.resolveConcurrentSchemaEvolution( + HoodieSchema result = strategy.resolveConcurrentSchemaEvolution( table, config, Option.empty(), nonTableCompactionInstant).get(); - assertEquals(new Schema.Parser().parse(SCHEMA1), result); + assertEquals(HoodieSchema.parse(SCHEMA1), result); } @Test @@ -152,9 +153,9 @@ public class TestSimpleSchemaConflictResolutionStrategy { @Test void testNullTypeWriterSchema() throws Exception { setupInstants(SCHEMA1, SCHEMA1, NULL_SCHEMA, true, false); - Schema result = strategy.resolveConcurrentSchemaEvolution( + HoodieSchema result = strategy.resolveConcurrentSchemaEvolution( table, config, lastCompletedTxnOwnerInstant, nonTableCompactionInstant).get(); - assertEquals(new Schema.Parser().parse(SCHEMA1), result); + assertEquals(HoodieSchema.parse(SCHEMA1), result); } @Test @@ -167,41 +168,41 @@ public class TestSimpleSchemaConflictResolutionStrategy { @Test void testConflictSecondCommitSameSchema() throws Exception { setupInstants(null, SCHEMA1, SCHEMA1, true, false); - Schema result = strategy.resolveConcurrentSchemaEvolution( + HoodieSchema result = strategy.resolveConcurrentSchemaEvolution( table, config, Option.empty(), nonTableCompactionInstant).get(); - assertEquals(new Schema.Parser().parse(SCHEMA1), result); + assertEquals(HoodieSchema.parse(SCHEMA1), result); } @Test void testNoConflictSameSchema() throws Exception { setupInstants(SCHEMA1, SCHEMA1, SCHEMA1, true, false); - Schema result = strategy.resolveConcurrentSchemaEvolution( + HoodieSchema result = strategy.resolveConcurrentSchemaEvolution( table, config, lastCompletedTxnOwnerInstant, nonTableCompactionInstant).get(); - assertEquals(new Schema.Parser().parse(SCHEMA1), result); + assertEquals(HoodieSchema.parse(SCHEMA1), result); } @Test void testNoConflictBackwardsCompatible1() throws Exception { setupInstants(SCHEMA1, SCHEMA2, SCHEMA1, true, false); - Schema result = strategy.resolveConcurrentSchemaEvolution( + HoodieSchema result = strategy.resolveConcurrentSchemaEvolution( table, config, lastCompletedTxnOwnerInstant, nonTableCompactionInstant).get(); - assertEquals(new Schema.Parser().parse(SCHEMA2), result); + assertEquals(HoodieSchema.parse(SCHEMA2), result); } @Test void testNoConflictBackwardsCompatible2() throws Exception { setupInstants(SCHEMA1, SCHEMA1, SCHEMA2, true, false); - Schema result = strategy.resolveConcurrentSchemaEvolution( + HoodieSchema result = strategy.resolveConcurrentSchemaEvolution( table, config, lastCompletedTxnOwnerInstant, nonTableCompactionInstant).get(); - assertEquals(new Schema.Parser().parse(SCHEMA2), result); + assertEquals(HoodieSchema.parse(SCHEMA2), result); } @Test void testNoConflictConcurrentEvolutionSameSchema() throws Exception { setupInstants(SCHEMA1, SCHEMA2, SCHEMA2, true, false); - Schema result = strategy.resolveConcurrentSchemaEvolution( + HoodieSchema result = strategy.resolveConcurrentSchemaEvolution( table, config, lastCompletedTxnOwnerInstant, nonTableCompactionInstant).get(); - assertEquals(new Schema.Parser().parse(SCHEMA2), result); + assertEquals(HoodieSchema.parse(SCHEMA2), result); } @Test diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/TestHoodieIndexUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/TestHoodieIndexUtils.java index 656203d8a905..a2b233925d73 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/TestHoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/TestHoodieIndexUtils.java @@ -18,15 +18,15 @@ package org.apache.hudi.index; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.exception.HoodieMetadataIndexException; import org.apache.hudi.metadata.MetadataPartitionType; -import org.apache.avro.LogicalTypes; -import org.apache.avro.Schema; -import org.apache.avro.SchemaBuilder; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; @@ -79,20 +79,17 @@ public class TestHoodieIndexUtils { public void testIsEligibleForSecondaryIndexWithSupportedDataTypes() { // Given: A schema with supported data types for secondary index (String/CHAR, Int, Long, Float, Double) // Note: CHAR is represented as STRING in Avro schema - Schema schema = SchemaBuilder.record("TestRecord") - .fields() - .requiredString("stringField") - .requiredString("charField") // CHAR is represented as STRING in Avro - .optionalInt("intField") - .requiredLong("longField") - .name("doubleField").type().doubleType().noDefault() - .endRecord(); + HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, Arrays.asList( + HoodieSchemaField.of("stringField", HoodieSchema.create(HoodieSchemaType.STRING)), + HoodieSchemaField.of("charField", HoodieSchema.create(HoodieSchemaType.STRING)), + HoodieSchemaField.of("intField", HoodieSchema.createNullable(HoodieSchemaType.INT)), + HoodieSchemaField.of("longField", HoodieSchema.create(HoodieSchemaType.LONG)), + HoodieSchemaField.of("doubleField", HoodieSchema.create(HoodieSchemaType.DOUBLE)) + )); // Mock the schema resolver try (MockedConstruction<TableSchemaResolver> mockedResolver = Mockito.mockConstruction(TableSchemaResolver.class, - (mock, context) -> { - when(mock.getTableAvroSchema()).thenReturn(schema); - })) { + (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) { // Test case 1: Secondary index with record index already present // Given: Record index partition already exists @@ -143,21 +140,18 @@ public class TestHoodieIndexUtils { } } + @Test public void testValidateDataTypeForSecondaryOrExpressionIndex() { // Create a dummy schema with both complex and primitive types - Schema schema = SchemaBuilder.record("TestRecord") - .fields() - .requiredString("stringField") - .optionalInt("intField") - .name("arrayField").type().array().items().stringType().noDefault() - .name("mapField").type().map().values().intType().noDefault() - .name("structField").type().record("NestedRecord") - .fields() - .requiredString("nestedString") - .endRecord() - .noDefault() - .endRecord(); - + HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, Arrays.asList( + HoodieSchemaField.of("stringField", HoodieSchema.create(HoodieSchemaType.STRING)), + HoodieSchemaField.of("intField", HoodieSchema.createNullable(HoodieSchemaType.INT)), + HoodieSchemaField.of("arrayField", HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.STRING))), + HoodieSchemaField.of("mapField", HoodieSchema.createMap(HoodieSchema.create(HoodieSchemaType.INT))), + HoodieSchemaField.of("structField", HoodieSchema.createRecord("NestedRecord", null, null, Arrays.asList( + HoodieSchemaField.of("nestedString", HoodieSchema.create(HoodieSchemaType.STRING)) + ))) + )); // Test for primitive fields assertTrue(validateDataTypeForSecondaryOrExpressionIndex(Arrays.asList("stringField", "intField"), schema)); @@ -175,24 +169,21 @@ public class TestHoodieIndexUtils { @Test public void testValidateDataTypeForSecondaryIndex() { // Create a schema with various data types - Schema schema = SchemaBuilder.record("TestRecord") - .fields() - .requiredString("stringField") - .requiredString("charField") // CHAR is represented as STRING in Avro - .optionalInt("intField") - .requiredLong("longField") - .name("timestampField").type().longType().longDefault(0L) // timestamp as long - .name("booleanField").type().booleanType().noDefault() - .name("floatField").type().floatType().noDefault() - .name("doubleField").type().doubleType().noDefault() - .name("arrayField").type().array().items().stringType().noDefault() - .name("mapField").type().map().values().intType().noDefault() - .name("structField").type().record("NestedRecord") - .fields() - .requiredString("nestedString") - .endRecord() - .noDefault() - .endRecord(); + HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, Arrays.asList( + HoodieSchemaField.of("stringField", HoodieSchema.create(HoodieSchemaType.STRING)), + HoodieSchemaField.of("charField", HoodieSchema.create(HoodieSchemaType.STRING)), // CHAR is represented as STRING + HoodieSchemaField.of("intField", HoodieSchema.createNullable(HoodieSchemaType.INT)), + HoodieSchemaField.of("longField", HoodieSchema.create(HoodieSchemaType.LONG)), + HoodieSchemaField.of("timestampField", HoodieSchema.create(HoodieSchemaType.LONG), null, 0L), // timestamp as long + HoodieSchemaField.of("booleanField", HoodieSchema.create(HoodieSchemaType.BOOLEAN)), + HoodieSchemaField.of("floatField", HoodieSchema.create(HoodieSchemaType.FLOAT)), + HoodieSchemaField.of("doubleField", HoodieSchema.create(HoodieSchemaType.DOUBLE)), + HoodieSchemaField.of("arrayField", HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.STRING))), + HoodieSchemaField.of("mapField", HoodieSchema.createMap(HoodieSchema.create(HoodieSchemaType.INT))), + HoodieSchemaField.of("structField", HoodieSchema.createRecord("NestedRecord", null, null, Arrays.asList( + HoodieSchemaField.of("nestedString", HoodieSchema.create(HoodieSchemaType.STRING)) + ))) + )); // Test supported types for secondary index assertTrue(validateDataTypeForSecondaryIndex(Collections.singletonList("stringField"), schema)); @@ -227,32 +218,31 @@ public class TestHoodieIndexUtils { @Test public void testValidateDataTypeForSecondaryIndexWithLogicalTypes() { // Supported logical types - Schema timestampMillis = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)); - Schema timestampMicros = LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)); - Schema date = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)); - Schema timeMillis = LogicalTypes.timeMillis().addToSchema(Schema.create(Schema.Type.INT)); - Schema timeMicros = LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG)); - + HoodieSchema timestampMillis = HoodieSchema.createTimestampMillis(); + HoodieSchema timestampMicros = HoodieSchema.createTimestampMicros(); + HoodieSchema date = HoodieSchema.createDate(); + HoodieSchema timeMillis = HoodieSchema.createTimeMillis(); + HoodieSchema timeMicros = HoodieSchema.createTimeMicros(); + // Unsupported logical types - Schema decimal = LogicalTypes.decimal(10, 2).addToSchema(Schema.create(Schema.Type.BYTES)); - Schema uuid = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)); - Schema localTimestampMillis = LogicalTypes.localTimestampMillis().addToSchema(Schema.create(Schema.Type.LONG)); - Schema localTimestampMicros = LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG)); - - Schema schemaWithLogicalTypes = SchemaBuilder.record("TestRecord") - .fields() + HoodieSchema decimal = HoodieSchema.createDecimal(10, 2); + HoodieSchema uuid = HoodieSchema.createUUID(); + HoodieSchema localTimestampMillis = HoodieSchema.createLocalTimestampMillis(); + HoodieSchema localTimestampMicros = HoodieSchema.createLocalTimestampMicros(); + + HoodieSchema schemaWithLogicalTypes = HoodieSchema.createRecord("TestRecord", null, null, Arrays.asList( // Supported logical types - .name("timestampMillisField").type(timestampMillis).noDefault() - .name("timestampMicrosField").type(timestampMicros).noDefault() - .name("dateField").type(date).noDefault() - .name("timeMillisField").type(timeMillis).noDefault() - .name("timeMicrosField").type(timeMicros).noDefault() + HoodieSchemaField.of("timestampMillisField", timestampMillis), + HoodieSchemaField.of("timestampMicrosField", timestampMicros), + HoodieSchemaField.of("dateField", date), + HoodieSchemaField.of("timeMillisField", timeMillis), + HoodieSchemaField.of("timeMicrosField", timeMicros), // Unsupported logical types - .name("decimalField").type(decimal).noDefault() - .name("uuidField").type(uuid).noDefault() - .name("localTimestampMillisField").type(localTimestampMillis).noDefault() - .name("localTimestampMicrosField").type(localTimestampMicros).noDefault() - .endRecord(); + HoodieSchemaField.of("decimalField", decimal), + HoodieSchemaField.of("uuidField", uuid), + HoodieSchemaField.of("localTimestampMillisField", localTimestampMillis), + HoodieSchemaField.of("localTimestampMicrosField", localTimestampMicros) + )); // Test supported timestamp and date/time logical types assertTrue(validateDataTypeForSecondaryIndex(Collections.singletonList("timestampMillisField"), schemaWithLogicalTypes)); @@ -282,21 +272,18 @@ public class TestHoodieIndexUtils { public void testIsEligibleForSecondaryIndexWithUnsupportedDataTypes() { // Given: A schema with unsupported data types for secondary index (Boolean, Decimal) // Note: Float and Double are now supported - Schema decimalType = LogicalTypes.decimal(10, 2).addToSchema(Schema.create(Schema.Type.BYTES)); - Schema schema = SchemaBuilder.record("TestRecord") - .fields() - .requiredString("stringField") - .name("floatField").type().floatType().noDefault() - .name("doubleField").type().doubleType().noDefault() - .name("booleanField").type().booleanType().noDefault() - .name("decimalField").type(decimalType).noDefault() - .endRecord(); + HoodieSchema decimalType = HoodieSchema.createDecimal(10, 2); + HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, Arrays.asList( + HoodieSchemaField.of("stringField", HoodieSchema.create(HoodieSchemaType.STRING)), + HoodieSchemaField.of("floatField", HoodieSchema.create(HoodieSchemaType.FLOAT)), + HoodieSchemaField.of("doubleField", HoodieSchema.create(HoodieSchemaType.DOUBLE)), + HoodieSchemaField.of("booleanField", HoodieSchema.create(HoodieSchemaType.BOOLEAN)), + HoodieSchemaField.of("decimalField", decimalType) + )); // Mock the schema resolver try (MockedConstruction<TableSchemaResolver> mockedResolver = Mockito.mockConstruction(TableSchemaResolver.class, - (mock, context) -> { - when(mock.getTableAvroSchema()).thenReturn(schema); - })) { + (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) { // Given: Record index partition exists Set<String> partitions = new HashSet<>(); @@ -351,8 +338,7 @@ public class TestHoodieIndexUtils { () -> HoodieIndexUtils.validateEligibilityForSecondaryOrExpressionIndex( mockMetaClient, PARTITION_NAME_SECONDARY_INDEX, options, columns, "test_index")); assertTrue(ex4.getMessage().contains("unsupported data type")); - assertTrue(ex4.getMessage().contains("BYTES with logical type")); - assertTrue(ex4.getMessage().contains("Decimal")); + assertTrue(ex4.getMessage().contains("DECIMAL")); assertTrue(ex4.getMessage().contains("Secondary indexes only support")); // Test case 5: Mix of supported fields (now including double) @@ -378,20 +364,16 @@ public class TestHoodieIndexUtils { @Test public void testIsEligibleForSecondaryIndexWithLogicalTypes() { // Given: A schema with timestamp and date logical types - Schema timestampMillis = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)); - Schema date = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)); - - Schema schema = SchemaBuilder.record("TestRecord") - .fields() - .name("timestampField").type(timestampMillis).noDefault() - .name("dateField").type(date).noDefault() - .endRecord(); + HoodieSchema timestampMillis = HoodieSchema.createTimestampMillis(); + HoodieSchema date = HoodieSchema.createDate(); + HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, Arrays.asList( + HoodieSchemaField.of("timestampField", timestampMillis), + HoodieSchemaField.of("dateField", date) + )); // Mock the schema resolver try (MockedConstruction<TableSchemaResolver> mockedResolver = Mockito.mockConstruction(TableSchemaResolver.class, - (mock, context) -> { - when(mock.getTableAvroSchema()).thenReturn(schema); - })) { + (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) { // Given: Record index partition exists Set<String> partitions = new HashSet<>(); @@ -420,16 +402,13 @@ public class TestHoodieIndexUtils { @Test public void testIsEligibleForSecondaryIndexWithoutRecordIndex() { // Given: A schema with supported data types - Schema schema = SchemaBuilder.record("TestRecord") - .fields() - .requiredString("stringField") - .endRecord(); + HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, Collections.singletonList( + HoodieSchemaField.of("stringField", HoodieSchema.create(HoodieSchemaType.STRING)) + )); // Mock the schema resolver try (MockedConstruction<TableSchemaResolver> mockedResolver = Mockito.mockConstruction(TableSchemaResolver.class, - (mock, context) -> { - when(mock.getTableAvroSchema()).thenReturn(schema); - })) { + (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) { // Test case 1: No record index partition and not enabled in options // Given: No record index partition exists and not enabled in options @@ -471,19 +450,16 @@ public class TestHoodieIndexUtils { @Test public void testIsEligibleForExpressionIndex() { // Given: A schema with various data types including complex types - Schema schema = SchemaBuilder.record("TestRecord") - .fields() - .requiredString("stringField") - .name("floatField").type().floatType().noDefault() - .name("arrayField").type().array().items().stringType().noDefault() - .name("mapField").type().map().values().intType().noDefault() - .endRecord(); + HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, Arrays.asList( + HoodieSchemaField.of("stringField", HoodieSchema.create(HoodieSchemaType.STRING)), + HoodieSchemaField.of("floatField", HoodieSchema.create(HoodieSchemaType.FLOAT)), + HoodieSchemaField.of("arrayField", HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.STRING))), + HoodieSchemaField.of("mapField", HoodieSchema.createMap(HoodieSchema.create(HoodieSchemaType.INT))) + )); // Mock the schema resolver try (MockedConstruction<TableSchemaResolver> mockedResolver = Mockito.mockConstruction(TableSchemaResolver.class, - (mock, context) -> { - when(mock.getTableAvroSchema()).thenReturn(schema); - })) { + (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) { Map<String, Map<String, String>> columns = new HashMap<>(); Map<String, String> options = new HashMap<>(); @@ -544,18 +520,17 @@ public class TestHoodieIndexUtils { */ @Test public void testIsEligibleForExpressionIndexWithNullableFields() { + // An int with default 0 must have the int type defined first. + // If null is defined first, which HoodieSchema#createNullable does, an error will be thrown + HoodieSchema nullableIntWithDefault = HoodieSchema.createUnion(HoodieSchema.create(HoodieSchemaType.INT), HoodieSchema.create(HoodieSchemaType.NULL)); // Given: A schema with nullable fields (union types) - Schema schema = SchemaBuilder.record("TestRecord") - .fields() - .optionalString("nullableStringField") - .name("nullableIntField").type().nullable().intType().intDefault(0) - .endRecord(); - + HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, Arrays.asList( + HoodieSchemaField.of("nullableStringField", HoodieSchema.create(HoodieSchemaType.STRING)), + HoodieSchemaField.of("nullableIntField", nullableIntWithDefault, null, 0) + )); // Mock the schema resolver try (MockedConstruction<TableSchemaResolver> mockedResolver = Mockito.mockConstruction(TableSchemaResolver.class, - (mock, context) -> { - when(mock.getTableAvroSchema()).thenReturn(schema); - })) { + (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) { Map<String, Map<String, String>> columns = new HashMap<>(); columns.put("nullableStringField", Collections.emptyMap()); @@ -578,19 +553,18 @@ public class TestHoodieIndexUtils { */ @Test public void testIsEligibleForSecondaryIndexWithNullableFields() { + HoodieSchema nullableIntWithDefault = HoodieSchema.createUnion(HoodieSchema.create(HoodieSchemaType.INT), HoodieSchema.create(HoodieSchemaType.NULL)); + HoodieSchema nullableLongWithDefault = HoodieSchema.createUnion(HoodieSchema.create(HoodieSchemaType.LONG), HoodieSchema.create(HoodieSchemaType.NULL)); // Given: A schema with nullable fields that are supported for secondary index - Schema schema = SchemaBuilder.record("TestRecord") - .fields() - .optionalString("nullableStringField") - .name("nullableIntField").type().nullable().intType().intDefault(0) - .name("nullableLongField").type().nullable().longType().longDefault(0L) - .endRecord(); + HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, Arrays.asList( + HoodieSchemaField.of("nullableStringField", HoodieSchema.create(HoodieSchemaType.STRING)), + HoodieSchemaField.of("nullableIntField", nullableIntWithDefault, null, 0), + HoodieSchemaField.of("nullableLongField", nullableLongWithDefault, null, 0L) + )); // Mock the schema resolver try (MockedConstruction<TableSchemaResolver> mockedResolver = Mockito.mockConstruction(TableSchemaResolver.class, - (mock, context) -> { - when(mock.getTableAvroSchema()).thenReturn(schema); - })) { + (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) { // Given: Record index partition exists Set<String> partitions = new HashSet<>(); @@ -620,26 +594,23 @@ public class TestHoodieIndexUtils { @Test public void testIsEligibleForSecondaryIndexWithAllLogicalTypes() { // Given: A schema with all supported timestamp logical types - Schema timestampMillis = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)); - Schema timestampMicros = LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)); - Schema date = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)); - Schema timeMillis = LogicalTypes.timeMillis().addToSchema(Schema.create(Schema.Type.INT)); - Schema timeMicros = LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG)); - - Schema schema = SchemaBuilder.record("TestRecord") - .fields() - .name("timestampMillisField").type(timestampMillis).noDefault() - .name("timestampMicrosField").type(timestampMicros).noDefault() - .name("dateField").type(date).noDefault() - .name("timeMillisField").type(timeMillis).noDefault() - .name("timeMicrosField").type(timeMicros).noDefault() - .endRecord(); + HoodieSchema timestampMillis = HoodieSchema.createTimestampMillis(); + HoodieSchema timestampMicros = HoodieSchema.createTimestampMicros(); + HoodieSchema date = HoodieSchema.createDate(); + HoodieSchema timeMillis = HoodieSchema.createTimeMillis(); + HoodieSchema timeMicros = HoodieSchema.createTimeMicros(); + + HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, Arrays.asList( + HoodieSchemaField.of("timestampMillisField", timestampMillis), + HoodieSchemaField.of("timestampMicrosField", timestampMicros), + HoodieSchemaField.of("dateField", date), + HoodieSchemaField.of("timeMillisField", timeMillis), + HoodieSchemaField.of("timeMicrosField", timeMicros) + )); // Mock the schema resolver try (MockedConstruction<TableSchemaResolver> mockedResolver = Mockito.mockConstruction(TableSchemaResolver.class, - (mock, context) -> { - when(mock.getTableAvroSchema()).thenReturn(schema); - })) { + (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) { // Given: Record index is enabled Set<String> partitions = new HashSet<>(); @@ -671,16 +642,13 @@ public class TestHoodieIndexUtils { @Test public void testIsEligibleForSecondaryIndexWithColumnNotInSchema() { // Given: A schema without the requested column - Schema schema = SchemaBuilder.record("TestRecord") - .fields() - .requiredString("existingField") - .endRecord(); + HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, Arrays.asList( + HoodieSchemaField.of("existingField", HoodieSchema.create(HoodieSchemaType.STRING)) + )); // Mock the schema resolver try (MockedConstruction<TableSchemaResolver> mockedResolver = Mockito.mockConstruction(TableSchemaResolver.class, - (mock, context) -> { - when(mock.getTableAvroSchema()).thenReturn(schema); - })) { + (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) { // Given: Record index is enabled Set<String> partitions = new HashSet<>(); @@ -711,19 +679,16 @@ public class TestHoodieIndexUtils { @Test public void testIsEligibleForSecondaryIndexWithStringLogicalTypes() { // Given: A schema with UUID logical type on string field - Schema uuidSchema = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)); - - Schema schema = SchemaBuilder.record("TestRecord") - .fields() - .name("uuidField").type(uuidSchema).noDefault() - .requiredString("regularStringField") - .endRecord(); + HoodieSchema uuidSchema = HoodieSchema.createUUID(); + + HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, Arrays.asList( + HoodieSchemaField.of("uuidField", uuidSchema), + HoodieSchemaField.of("regularStringField", HoodieSchema.create(HoodieSchemaType.STRING)) + )); // Mock the schema resolver try (MockedConstruction<TableSchemaResolver> mockedResolver = Mockito.mockConstruction(TableSchemaResolver.class, - (mock, context) -> { - when(mock.getTableAvroSchema()).thenReturn(schema); - })) { + (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) { // Given: Record index is enabled Set<String> partitions = new HashSet<>(); @@ -767,16 +732,13 @@ public class TestHoodieIndexUtils { @Test public void testIsEligibleForExpressionIndexWithColumnNotInSchema() { // Given: A schema without the requested column - Schema schema = SchemaBuilder.record("TestRecord") - .fields() - .requiredString("existingField") - .endRecord(); + HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, Arrays.asList( + HoodieSchemaField.of("existingField", HoodieSchema.create(HoodieSchemaType.STRING)) + )); // Mock the schema resolver try (MockedConstruction<TableSchemaResolver> mockedResolver = Mockito.mockConstruction(TableSchemaResolver.class, - (mock, context) -> { - when(mock.getTableAvroSchema()).thenReturn(schema); - })) { + (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) { Map<String, Map<String, String>> columns = new HashMap<>(); columns.put("nonExistentField", Collections.emptyMap()); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java index 67d189ed1b1d..b0993f5d0839 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java @@ -18,10 +18,13 @@ package org.apache.hudi.testutils; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.config.HoodieReaderConfig; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.config.HoodieWriteConfig; @@ -35,7 +38,6 @@ import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.ArrayWritable; @@ -67,8 +69,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class GenericRecordValidationTestUtils { public static void assertGenericRecords(GenericRecord record1, GenericRecord record2, - Schema schema, List<String> excludeFields) { - for (Schema.Field f: schema.getFields()) { + HoodieSchema schema, List<String> excludeFields) { + for (HoodieSchemaField f: schema.getFields()) { String fieldName = f.name(); if (excludeFields.contains(fieldName)) { continue; @@ -81,7 +83,7 @@ public class GenericRecordValidationTestUtils { HoodieRealtimeRecordReaderUtils.arrayWritableToString((ArrayWritable) value2)); } else if (value1 instanceof Text && value2 instanceof BytesWritable) { assertArrayEquals(((Text) value1).getBytes(), ((BytesWritable) value2).getBytes()); - } else if (f.schema().getType() == Schema.Type.ENUM + } else if (f.schema().getType() == HoodieSchemaType.ENUM && value1 instanceof BytesWritable && value2 instanceof Text) { // TODO(HUDI-8660): Revisit ENUM handling in Spark parquet reader and writer assertArrayEquals(((BytesWritable) value1).getBytes(), ((Text) value2).getBytes()); @@ -126,8 +128,8 @@ public class GenericRecordValidationTestUtils { // Verify row count. assertEquals(prevRecordsMap.size(), newRecordsMap.size()); - Schema readerSchema = HoodieAvroUtils.addMetadataFields( - new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField()); + HoodieSchema readerSchema = HoodieSchemaUtils.addMetadataFields( + HoodieSchema.parse(config.getSchema()), config.allowOperationMetadataField()); // Verify every field. prevRecordsMap.forEach((key, value) -> { diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java index cd90c8d2613a..01c830ff198b 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java @@ -18,7 +18,9 @@ package org.apache.hudi.testutils; -import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.testutils.HadoopFSTestUtils; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; @@ -28,8 +30,6 @@ import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.storage.StorageConfiguration; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; @@ -65,43 +65,43 @@ public class HoodieMergeOnReadTestUtils { public static List<GenericRecord> getRecordsUsingInputFormat(StorageConfiguration<?> conf, List<String> inputPaths, String basePath, JobConf jobConf, boolean realtime, boolean populateMetaFields) { - Schema schema = new Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); + HoodieSchema schema = HoodieSchema.parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); return getRecordsUsingInputFormat(conf, inputPaths, basePath, jobConf, realtime, schema, HoodieTestDataGenerator.TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>(), populateMetaFields); } - public static List<GenericRecord> getRecordsUsingInputFormat(StorageConfiguration<?> conf, List<String> inputPaths, String basePath, JobConf jobConf, boolean realtime, Schema rawSchema, + public static List<GenericRecord> getRecordsUsingInputFormat(StorageConfiguration<?> conf, List<String> inputPaths, String basePath, JobConf jobConf, boolean realtime, HoodieSchema rawSchema, String rawHiveColumnTypes, boolean projectCols, List<String> projectedColumns) { return getRecordsUsingInputFormat(conf, inputPaths, basePath, jobConf, realtime, rawSchema, rawHiveColumnTypes, projectCols, projectedColumns, true); } - public static List<GenericRecord> getRecordsUsingInputFormat(StorageConfiguration<?> conf, List<String> inputPaths, String basePath, JobConf jobConf, boolean realtime, Schema rawSchema, + public static List<GenericRecord> getRecordsUsingInputFormat(StorageConfiguration<?> conf, List<String> inputPaths, String basePath, JobConf jobConf, boolean realtime, HoodieSchema rawSchema, String rawHiveColumnTypes, boolean projectCols, List<String> projectedColumns, boolean populateMetaFields) { HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(conf, basePath); FileInputFormat inputFormat = HoodieInputFormatUtils.getInputFormat(metaClient.getTableConfig().getBaseFileFormat(), realtime, jobConf); - Schema schema; + HoodieSchema schema; String hiveColumnTypes; if (populateMetaFields) { - schema = HoodieAvroUtils.addMetadataFields(rawSchema); - hiveColumnTypes = HoodieAvroUtils.addMetadataColumnTypes(rawHiveColumnTypes); + schema = HoodieSchemaUtils.addMetadataFields(rawSchema); + hiveColumnTypes = HoodieSchemaUtils.addMetadataColumnTypes(rawHiveColumnTypes); } else { schema = rawSchema; hiveColumnTypes = rawHiveColumnTypes; } setPropsForInputFormat(inputFormat, jobConf, schema, hiveColumnTypes, projectCols, projectedColumns, populateMetaFields); - final List<Field> fields; + final List<HoodieSchemaField> fields; if (projectCols) { fields = schema.getFields().stream().filter(f -> projectedColumns.contains(f.name())) .collect(Collectors.toList()); } else { fields = schema.getFields(); } - final Schema projectedSchema = Schema.createRecord(fields.stream() - .map(HoodieAvroUtils::createNewSchemaField) + final HoodieSchema projectedSchema = HoodieSchema.createRecord("testRecord", null, null, fields.stream() + .map(HoodieSchemaUtils::createNewSchemaField) .collect(Collectors.toList())); List<GenericRecord> records = new ArrayList<>(); @@ -114,7 +114,7 @@ public class HoodieMergeOnReadTestUtils { Object key = recordReader.createKey(); ArrayWritable writable = (ArrayWritable) recordReader.createValue(); while (recordReader.next(key, writable)) { - GenericRecordBuilder newRecord = new GenericRecordBuilder(projectedSchema); + GenericRecordBuilder newRecord = new GenericRecordBuilder(projectedSchema.toAvroSchema()); // writable returns an array with [field1, field2, _hoodie_commit_time, // _hoodie_commit_seqno] Writable[] values = writable.get(); @@ -122,7 +122,7 @@ public class HoodieMergeOnReadTestUtils { .filter(f -> !projectCols || projectedColumns.contains(f.name())) .map(f -> Pair.of(projectedSchema.getFields().stream() .filter(p -> f.name().equals(p.name())).findFirst().get(), f)) - .forEach(fieldsPair -> newRecord.set(fieldsPair.getKey(), values[fieldsPair.getValue().pos()])); + .forEach(fieldsPair -> newRecord.set(fieldsPair.getKey().getAvroField(), values[fieldsPair.getValue().pos()])); records.add(newRecord.build()); } recordReader.close(); @@ -133,12 +133,12 @@ public class HoodieMergeOnReadTestUtils { return records; } - private static void setPropsForInputFormat(FileInputFormat inputFormat, JobConf jobConf, Schema schema, String hiveColumnTypes, boolean projectCols, List<String> projectedCols, + private static void setPropsForInputFormat(FileInputFormat inputFormat, JobConf jobConf, HoodieSchema schema, String hiveColumnTypes, boolean projectCols, List<String> projectedCols, boolean populateMetaFieldsConfigValue) { - List<Field> fields = schema.getFields(); + List<HoodieSchemaField> fields = schema.getFields(); final List<String> projectedColNames; if (!projectCols) { - projectedColNames = fields.stream().map(Field::name).collect(Collectors.toList()); + projectedColNames = fields.stream().map(HoodieSchemaField::name).collect(Collectors.toList()); } else { projectedColNames = projectedCols; } @@ -151,7 +151,7 @@ public class HoodieMergeOnReadTestUtils { .map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); String hiveColumnNames = fields.stream() .filter(field -> !field.name().equalsIgnoreCase("datestr")) - .map(Field::name).collect(Collectors.joining(",")); + .map(HoodieSchemaField::name).collect(Collectors.joining(",")); hiveColumnNames = hiveColumnNames + ",datestr"; StorageConfiguration<?> conf = HoodieTestUtils.getDefaultStorageConf(); diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index c4ea3709c115..b5303f2f1bf1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -398,10 +398,6 @@ public class HoodieAvroUtils { return AvroSchemaUtils.createNewSchemaFromFieldsWithReference(schema, filteredFields); } - public static String addMetadataColumnTypes(String hiveColumnTypes) { - return "string,string,string,string,string," + hiveColumnTypes; - } - public static Schema makeFieldNonNull(Schema schema, String fieldName, Object fieldDefaultValue) { ValidationUtils.checkArgument(fieldDefaultValue != null); List<Schema.Field> filteredFields = schema.getFields() diff --git a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java index 2da62bb2924b..c01f04f63ff5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java @@ -779,6 +779,10 @@ public class HoodieSchema implements Serializable { .anyMatch(schema -> schema.getType() == Schema.Type.NULL); } + public boolean isSchemaNull() { + return type == null || type == HoodieSchemaType.NULL; + } + /** * If this is a union schema, returns the non-null type. Otherwise, returns this schema. * diff --git a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaComparatorForSchemaEvolution.java b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaComparatorForSchemaEvolution.java new file mode 100644 index 000000000000..2b0b653bb24a --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaComparatorForSchemaEvolution.java @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.schema; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Defines equality comparison rules for HoodieSchema schemas for schema evolution purposes. + * + * <p>This class provides schema comparison logic that focuses only on attributes that affect + * data readers/writers, ignoring metadata like documentation, namespace, and aliases which + * don't impact schema evolution compatibility.</p> + * + * <h2>Common Rules Across All Types</h2> + * Included in equality check: + * <ul> + * <li>Name/identifier</li> + * <li>Type including primitive type, complex type (see below), and logical type</li> + * </ul> + * Excluded from equality check: + * <ul> + * <li>Namespace</li> + * <li>Documentation</li> + * <li>Aliases</li> + * <li>Custom properties</li> + * </ul> + * + * <h2>Type-Specific Rules</h2> + * + * <h3>Record</h3> + * Included: + * <ul> + * <li>Field names</li> + * <li>Field types</li> + * <li>Field order attribute</li> + * <li>Default values</li> + * </ul> + * Excluded: + * <ul> + * <li>Field documentation</li> + * <li>Field aliases</li> + * </ul> + * + * <h3>Enum</h3> + * Included: + * <ul> + * <li>Name</li> + * <li>Symbol order</li> + * <li>Symbol value</li> + * </ul> + * Excluded: + * <ul> + * <li>Custom properties</li> + * </ul> + * + * <h3>Array</h3> + * Included: + * <ul> + * <li>Items schema</li> + * </ul> + * Excluded: + * <ul> + * <li>Documentation</li> + * <li>Custom properties</li> + * </ul> + * + * <h3>Map</h3> + * Included: + * <ul> + * <li>Values schema</li> + * </ul> + * Excluded: + * <ul> + * <li>Documentation</li> + * <li>Custom properties</li> + * </ul> + * + * <h3>Fixed</h3> + * Included: + * <ul> + * <li>Size</li> + * <li>Name</li> + * </ul> + * Excluded: + * <ul> + * <li>Namespace</li> + * <li>Aliases</li> + * </ul> + * + * <h3>Union</h3> + * Included: + * <ul> + * <li>Member types</li> + * </ul> + * Excluded: + * <ul> + * <li>Member order</li> + * </ul> + * + * <h3>Logical Types</h3> + * Included: + * <ul> + * <li>Logical type name (via schema subclass)</li> + * <li>Underlying primitive type</li> + * <li>Decimal precision/scale (if applicable)</li> + * <li>Timestamp/Time precision (if applicable)</li> + * </ul> + * Excluded: + * <ul> + * <li>Documentation</li> + * <li>Custom properties</li> + * </ul> + */ +public class HoodieSchemaComparatorForSchemaEvolution { + + protected HoodieSchemaComparatorForSchemaEvolution() { + } + + private static final HoodieSchemaComparatorForSchemaEvolution VALIDATOR = new HoodieSchemaComparatorForSchemaEvolution(); + + public static boolean schemaEquals(HoodieSchema s1, HoodieSchema s2) { + return VALIDATOR.schemaEqualsInternal(s1, s2); + } + + protected boolean schemaEqualsInternal(HoodieSchema s1, HoodieSchema s2) { + if (s1 == s2) { + return true; + } + if (s1 == null || s2 == null) { + return false; + } + if (s1.getType() != s2.getType()) { + return false; + } + + switch (s1.getType()) { + case RECORD: + return recordSchemaEquals(s1, s2); + case ENUM: + return enumSchemaEquals(s1, s2); + case ARRAY: + return arraySchemaEquals(s1, s2); + case MAP: + return mapSchemaEquals(s1, s2); + case FIXED: + return fixedSchemaEquals(s1, s2); + case UNION: + return unionSchemaEquals(s1, s2); + case STRING: + case BYTES: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case BOOLEAN: + case NULL: + case DECIMAL: + case TIME: + case TIMESTAMP: + case DATE: + case UUID: + return primitiveSchemaEquals(s1, s2); + default: + throw new IllegalArgumentException("Unknown schema type: " + s1.getType()); + } + } + + protected boolean validateRecord(HoodieSchema s1, HoodieSchema s2) { + if (s1.isError() != s2.isError()) { + return false; + } + + return logicalTypeSchemaEquals(s1, s2); + } + + private boolean recordSchemaEquals(HoodieSchema s1, HoodieSchema s2) { + if (!validateRecord(s1, s2)) { + return false; + } + + List<HoodieSchemaField> fields1 = s1.getFields(); + List<HoodieSchemaField> fields2 = s2.getFields(); + + if (fields1.size() != fields2.size()) { + return false; + } + + for (int i = 0; i < fields1.size(); i++) { + if (!fieldEquals(fields1.get(i), fields2.get(i))) { + return false; + } + } + return true; + } + + protected boolean validateField(HoodieSchemaField f1, HoodieSchemaField f2) { + if (!f1.name().equals(f2.name())) { + return false; + } + + if (f1.order() != f2.order()) { + return false; + } + + // Check if both have default values + if (f1.hasDefaultValue() != f2.hasDefaultValue()) { + return false; + } + + // If both have default values, they must be equal + if (f1.hasDefaultValue() && !f1.defaultVal().get().equals(f2.defaultVal().get())) { + return false; + } + + return true; + } + + private boolean fieldEquals(HoodieSchemaField f1, HoodieSchemaField f2) { + if (!validateField(f1, f2)) { + return false; + } + + return schemaEqualsInternal(f1.schema(), f2.schema()); + } + + protected boolean enumSchemaEquals(HoodieSchema s1, HoodieSchema s2) { + // Check name equality first + if (!s1.getName().equals(s2.getName())) { + return false; + } + + List<String> symbols1 = s1.getEnumSymbols(); + List<String> symbols2 = s2.getEnumSymbols(); + + // Quick size check before creating sets + if (symbols1.size() != symbols2.size()) { + return false; + } + + return symbols1.equals(symbols2); + } + + protected boolean unionSchemaEquals(HoodieSchema s1, HoodieSchema s2) { + List<HoodieSchema> types1 = s1.getTypes(); + List<HoodieSchema> types2 = s2.getTypes(); + + if (types1.size() != types2.size()) { + return false; + } + + // Create sets of effectively equal types + Set<SchemaWrapper> set1 = types1.stream().map(SchemaWrapper::new).collect(Collectors.toSet()); + Set<SchemaWrapper> set2 = types2.stream().map(SchemaWrapper::new).collect(Collectors.toSet()); + + // Compare sets instead of ordered lists + return set1.equals(set2); + } + + private boolean arraySchemaEquals(HoodieSchema s1, HoodieSchema s2) { + return schemaEqualsInternal(s1.getElementType(), s2.getElementType()); + } + + private boolean mapSchemaEquals(HoodieSchema s1, HoodieSchema s2) { + return schemaEqualsInternal(s1.getValueType(), s2.getValueType()); + } + + protected boolean validateFixed(HoodieSchema s1, HoodieSchema s2) { + return s1.getName().equals(s2.getName()) && s1.getFixedSize() == s2.getFixedSize(); + } + + private boolean fixedSchemaEquals(HoodieSchema s1, HoodieSchema s2) { + if (!validateFixed(s1, s2)) { + return false; + } + return logicalTypeSchemaEquals(s1, s2); + } + + private static boolean primitiveSchemaEquals(HoodieSchema s1, HoodieSchema s2) { + // For primitive types, just check logical type + return logicalTypeSchemaEquals(s1, s2); + } + + private static boolean logicalTypeSchemaEquals(HoodieSchema s1, HoodieSchema s2) { + // Check if both schemas are of the same logical type class + boolean s1IsDecimal = s1.getType() == HoodieSchemaType.DECIMAL; + boolean s2IsDecimal = s2.getType() == HoodieSchemaType.DECIMAL; + boolean s1IsTimestamp = s1.getType() == HoodieSchemaType.TIMESTAMP; + boolean s2IsTimestamp = s2.getType() == HoodieSchemaType.TIMESTAMP; + boolean s1IsTime = s1.getType() == HoodieSchemaType.TIME; + boolean s2IsTime = s2.getType() == HoodieSchemaType.TIME; + + // If one is a logical type and the other isn't, they're not equal + if (s1IsDecimal != s2IsDecimal || s1IsTimestamp != s2IsTimestamp || s1IsTime != s2IsTime) { + return false; + } + + // If both are decimals, compare precision, scale, and underlying type (FIXED vs BYTES) + if (s1IsDecimal) { + HoodieSchema.Decimal d1 = (HoodieSchema.Decimal) s1; + HoodieSchema.Decimal d2 = (HoodieSchema.Decimal) s2; + // Check if both use same underlying representation (FIXED vs BYTES) + if (d1.isFixed() != d2.isFixed()) { + return false; + } + return d1.getPrecision() == d2.getPrecision() && d1.getScale() == d2.getScale(); + } + + // If both are timestamps, compare precision and UTC adjustment + if (s1IsTimestamp) { + HoodieSchema.Timestamp t1 = (HoodieSchema.Timestamp) s1; + HoodieSchema.Timestamp t2 = (HoodieSchema.Timestamp) s2; + return t1.getPrecision() == t2.getPrecision() && t1.isUtcAdjusted() == t2.isUtcAdjusted(); + } + + // If both are time types, compare precision + // Note: time-millis is INT, time-micros is LONG, so they have different underlying types + // which is reflected in their precision values + if (s1IsTime) { + HoodieSchema.Time t1 = (HoodieSchema.Time) s1; + HoodieSchema.Time t2 = (HoodieSchema.Time) s2; + return t1.getPrecision() == t2.getPrecision(); + } + + // For non-logical types, they're equal + return true; + } + + /** + * Wrapper class to use HoodieSchema in HashSet with our custom equality + */ + static class SchemaWrapper { + private final HoodieSchema schema; + + public SchemaWrapper(HoodieSchema schema) { + this.schema = schema; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SchemaWrapper that = (SchemaWrapper) o; + return schemaEquals(schema, that.schema); + } + + @Override + public int hashCode() { + // This is a simplified hash code that considers only the type + // It's not perfect but good enough for our use case + return schema.getType().hashCode(); + } + } +} \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java index 4a2b458044d9..a994bb5761bd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java @@ -598,4 +598,8 @@ public final class HoodieSchemaUtils { public static HoodieSchema projectSchema(HoodieSchema fileSchema, List<String> fields) { return HoodieSchema.fromAvroSchema(HoodieAvroUtils.projectSchema(fileSchema.toAvroSchema(), fields)); } + + public static String addMetadataColumnTypes(String hiveColumnTypes) { + return "string,string,string,string,string," + hiveColumnTypes; + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaComparatorForSchemaEvolution.java b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaComparatorForSchemaEvolution.java new file mode 100644 index 000000000000..c044ae9bce87 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaComparatorForSchemaEvolution.java @@ -0,0 +1,505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.schema; + +import org.apache.hudi.io.util.FileIOUtils; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class TestHoodieSchemaComparatorForSchemaEvolution { + @Test + void testAttrsIrrelevantToEquality() throws IOException { + // Validates that schemas with different non-essential attributes (like doc strings or aliases) + // are still considered equivalent for schema evolution purposes + String schemaA = FileIOUtils.readAsUTFString(TestHoodieSchemaComparatorForSchemaEvolution.class.getResourceAsStream("/avro-schema-evo/schema-allshapes-A.txt")); + String schemaB = FileIOUtils.readAsUTFString(TestHoodieSchemaComparatorForSchemaEvolution.class.getResourceAsStream("/avro-schema-evo/schema-allshapes-B.txt")); + + HoodieSchema schema1 = HoodieSchema.parse(schemaA); + HoodieSchema schema2 = HoodieSchema.parse(schemaB); + assertNotEquals(schema1, schema2); + assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(schema1, schema2)); + assertEquals(new HoodieSchemaComparatorForSchemaEvolution.SchemaWrapper(schema1), + new HoodieSchemaComparatorForSchemaEvolution.SchemaWrapper(schema2)); + } + + @Test + void testComparingPrimitiveTypes() { + // Tests comparison of all primitive types against each other + // Validates that each primitive type is equal only to other schemas sharing the same + // primitive type. + HoodieSchemaType[] primitiveTypes = { + HoodieSchemaType.NULL, HoodieSchemaType.BOOLEAN, HoodieSchemaType.INT, + HoodieSchemaType.LONG, HoodieSchemaType.FLOAT, HoodieSchemaType.DOUBLE, + HoodieSchemaType.BYTES, HoodieSchemaType.STRING + }; + + for (HoodieSchemaType primitiveType : primitiveTypes) { + for (HoodieSchemaType type : primitiveTypes) { + if (primitiveType == type) { + assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.create(primitiveType), + HoodieSchema.create(type) + )); + } else { + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.create(primitiveType), + HoodieSchema.create(type) + ), String.format("Types %s and %s should not be equal", + primitiveType, type)); + } + } + } + } + + @Test + void testEqualToSelf() { + // Validates that a schema is equal to itself + // Basic sanity check for schema comparison + String schema = "{\"type\":\"record\",\"name\":\"R\",\"fields\":[" + + "{\"name\":\"field1\",\"type\":\"string\"}]}"; + assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema), + HoodieSchema.parse(schema) + )); + } + + @Test + void testIsErrorFieldInRecordSchema() { + // Tests that a record schema is not equal to an error schema + // even if they have the same structure + HoodieSchema record1 = HoodieSchema.createRecord("TestRecord", null, null, false, + Arrays.asList( + HoodieSchemaField.of("field1", HoodieSchema.create(HoodieSchemaType.STRING), null, null) + )); + + HoodieSchema record2 = HoodieSchema.createRecord("TestRecord", null, null, true, // error record + Arrays.asList( + HoodieSchemaField.of("field1", HoodieSchema.create(HoodieSchemaType.STRING), null, null) + )); + + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(record1, record2)); + } + + @Test + void testRecordFieldTypes() { + // Validates that records with fields of different types are not considered equal + // even if the field names are the same + String schema1 = "{\"type\":\"record\",\"name\":\"R\",\"fields\":[" + + "{\"name\":\"field1\",\"type\":\"string\"}]}"; + String schema2 = "{\"type\":\"record\",\"name\":\"R\",\"fields\":[" + + "{\"name\":\"field1\",\"type\":\"int\"}]}"; + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema1), + HoodieSchema.parse(schema2) + )); + } + + @Test + void testRecordFieldOrderAttribute() { + // Tests that records with different field order attributes are not equal + // This is important for schema evolution as order affects serialization + String schema1 = "{\"type\":\"record\",\"name\":\"R\",\"fields\":[" + + "{\"name\":\"field1\",\"type\":\"string\",\"order\":\"ascending\"}]}"; + String schema2 = "{\"type\":\"record\",\"name\":\"R\",\"fields\":[" + + "{\"name\":\"field1\",\"type\":\"string\",\"order\":\"descending\"}]}"; + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema1), + HoodieSchema.parse(schema2) + )); + } + + @Test + void testArraySchema() { + // Validates that array schemas with different item types are not equal + // even if the array structure is the same + String schema1 = "{\"type\":\"array\",\"items\":\"string\"}"; + String schema2 = "{\"type\":\"array\",\"items\":\"int\"}"; + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema1), + HoodieSchema.parse(schema2) + )); + } + + @Test + void testMapSchema() { + // Tests that map schemas with different value types are not equal + // even if the map structure is the same + String schema1 = "{\"type\":\"map\",\"values\":\"string\"}"; + String schema2 = "{\"type\":\"map\",\"values\":\"int\"}"; + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema1), + HoodieSchema.parse(schema2) + )); + } + + @Test + void testFixedSchemaSizeAttr() { + // Validates that fixed-type schemas with different sizes are not equal + // Size is a critical attribute for fixed-length fields + String schema1 = "{\"type\":\"fixed\",\"name\":\"F\",\"size\":16}"; + String schema2 = "{\"type\":\"fixed\",\"name\":\"F\",\"size\":32}"; + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema1), + HoodieSchema.parse(schema2) + )); + } + + @Test + void testUnionMemberTypes() { + // Tests that unions with different member types are not equal + // even if they have the same number of members + String schema1 = "[\"null\",\"string\"]"; + String schema2 = "[\"null\",\"int\"]"; + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema1), + HoodieSchema.parse(schema2) + )); + } + + @Test + void testUnionMemberOrdering() { + // Validates that the order of union members doesn't affect equality + String schema1 = "[\"null\",\"string\"]"; + String schema2 = "[\"string\",\"null\"]"; + assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema1), + HoodieSchema.parse(schema2) + )); + } + + @Test + void testLogicalTypeDecimalAttr() { + // Tests that decimal logical types with different precision and scale are not equal + String schema1 = "{\"type\":\"fixed\",\"name\":\"D\",\"size\":16," + + "\"logicalType\":\"decimal\",\"precision\":10,\"scale\":2}"; + String schema2 = "{\"type\":\"fixed\",\"name\":\"D\",\"size\":16," + + "\"logicalType\":\"decimal\",\"precision\":8,\"scale\":2}"; + String schema3 = "{\"type\":\"fixed\",\"name\":\"D\",\"size\":16," + + "\"logicalType\":\"decimal\",\"precision\":8,\"scale\":3}"; + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema1), + HoodieSchema.parse(schema2) + )); + + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema2), + HoodieSchema.parse(schema3) + )); + } + + @Test + void testLogicalType() { + // Validates that different logical types on the same underlying type are not equal + String schema1 = "{\"type\":\"int\",\"logicalType\":\"date\"}"; + String schema2 = "{\"type\":\"int\",\"logicalType\":\"time-millis\"}"; + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema1), + HoodieSchema.parse(schema2) + )); + } + + @Test + void testLogicalTypesWithDifferentPrimitiveTypes() { + // Tests that logical types with different underlying types are not equal + // even if they represent the same logical concept (decimal) + String decimalFixed = "{\"type\":\"fixed\",\"name\":\"D\",\"size\":16,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":2}"; + String decimalBytes = "{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":10,\"scale\":2}"; + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(decimalFixed), + HoodieSchema.parse(decimalBytes) + )); + } + + @Test + void testComparingSchemaFieldNames() { + // Validates that schemas with different names are not equal + // even if their structure is identical - tests for records, enums, and fixed types + String record1 = "{\"type\":\"record\",\"name\":\"R1\",\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}"; + String record2 = "{\"type\":\"record\",\"name\":\"R2\",\"fields\":[{\"name\":\"f2\",\"type\":\"string\"}]}"; + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(record1), + HoodieSchema.parse(record2) + )); + + // Enum + String enum1 = "{\"type\":\"enum\",\"name\":\"E1\",\"symbols\":[\"A\"]}"; + String enum2 = "{\"type\":\"enum\",\"name\":\"E2\",\"symbols\":[\"A\"]}"; + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(enum1), + HoodieSchema.parse(enum2) + )); + + // Fixed + String fixed1 = "{\"type\":\"fixed\",\"name\":\"F1\",\"size\":16}"; + String fixed2 = "{\"type\":\"fixed\",\"name\":\"F2\",\"size\":16}"; + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(fixed1), + HoodieSchema.parse(fixed2) + )); + } + + @Test + void testEnumSchemaName() { + // Tests that enum schemas with different names are not equal + // even if they have the same symbols + HoodieSchema schema1 = HoodieSchema.createEnum("enum1", null, null, Arrays.asList("A", "B", "C")); + HoodieSchema schema2 = HoodieSchema.createEnum("enum2", null, null, Arrays.asList("A", "B", "C")); + HoodieSchema schema3 = HoodieSchema.createEnum("enum1", null, null, Arrays.asList("A", "B", "C")); + + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(schema1, schema2)); + assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(schema1, schema3)); + } + + @Test + void testEnumSchema() { + // Validates that enum schemas with different symbol sets are not equal + // even if one is a subset of the other + HoodieSchema schema1 = HoodieSchema.createEnum("enum", null, null, Arrays.asList("A", "C")); + HoodieSchema schema2 = HoodieSchema.createEnum("enum", null, null, Arrays.asList("A", "B", "C")); + + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(schema1, schema2)); + } + + @Test + void testEnumSymbolsOrder() { + // Tests that enum schemas with different symbol orders are not equal + // Order matters for enum serialization + String schema1 = "{\"type\":\"enum\",\"name\":\"E\",\"symbols\":[\"A\",\"B\"]}"; + String schema2 = "{\"type\":\"enum\",\"name\":\"E\",\"symbols\":[\"B\",\"A\"]}"; + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema1), + HoodieSchema.parse(schema2) + )); + } + + @Test + void testDefaultValueEquality() { + // Tests comparison of schemas with different default values + HoodieSchemaField field1 = HoodieSchemaField.of("field", HoodieSchema.create(HoodieSchemaType.STRING), null, "default1"); + HoodieSchemaField field2 = HoodieSchemaField.of("field", HoodieSchema.create(HoodieSchemaType.STRING), null, "default2"); + HoodieSchemaField field3 = HoodieSchemaField.of("field", HoodieSchema.create(HoodieSchemaType.STRING), null, "default1"); + HoodieSchemaField fieldNoDefault = HoodieSchemaField.of("field", HoodieSchema.create(HoodieSchemaType.STRING), null, null); + + HoodieSchema record1 = HoodieSchema.createRecord("test", null, null, false, Collections.singletonList(field1)); + HoodieSchema record2 = HoodieSchema.createRecord("test", null, null, false, Collections.singletonList(field2)); + HoodieSchema record3 = HoodieSchema.createRecord("test", null, null, false, Collections.singletonList(field3)); + HoodieSchema recordNoDefault = HoodieSchema.createRecord("test", null, null, false, Collections.singletonList(fieldNoDefault)); + + // Different default values should not be equal + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(record1, record2)); + + // Same default values should be equal + assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(record1, record3)); + + // No default value vs default value should not be equal + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(record1, recordNoDefault)); + + // No default values should be equal to each other + assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(recordNoDefault, recordNoDefault)); + } + + @Test + void testComplexDefaultValueEquality() { + // Tests equality comparison of schemas with complex default values (nested records) + // Validates that default value comparison works correctly for nested structures + HoodieSchema innerSchema = HoodieSchema.createRecord("inner", null, null, false, + Collections.singletonList( + HoodieSchemaField.of("value", HoodieSchema.create(HoodieSchemaType.STRING), null, null) + )); + + // Create default values as JSON-compatible Maps + Map<String, Object> defaultValue1 = new HashMap<>(); + defaultValue1.put("value", "test"); + + Map<String, Object> defaultValue2 = new HashMap<>(); + defaultValue2.put("value", "test"); + + Map<String, Object> defaultValue3 = new HashMap<>(); + defaultValue3.put("value", "different"); + + HoodieSchemaField field1 = HoodieSchemaField.of("field", innerSchema, null, defaultValue1); + HoodieSchemaField field2 = HoodieSchemaField.of("field", innerSchema, null, defaultValue2); + HoodieSchemaField field3 = HoodieSchemaField.of("field", innerSchema, null, defaultValue3); + + HoodieSchema record1 = HoodieSchema.createRecord("test", null, null, false, Collections.singletonList(field1)); + HoodieSchema record2 = HoodieSchema.createRecord("test", null, null, false, Collections.singletonList(field2)); + HoodieSchema record3 = HoodieSchema.createRecord("test", null, null, false, Collections.singletonList(field3)); + + // Same complex default values should be equal + assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(record1, record2)); + + // Different complex default values should not be equal + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(record1, record3)); + } + + @Test + void testArrayDefaultValueEquality() { + // Tests equality comparison of schemas with array default values + // Validates that default value comparison works correctly for array types + List<String> defaultArray1 = Arrays.asList("a", "b", "c"); + List<String> defaultArray2 = Arrays.asList("a", "b", "c"); + List<String> defaultArray3 = Arrays.asList("x", "y", "z"); + + HoodieSchema arraySchema = HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.STRING)); + + HoodieSchemaField field1 = HoodieSchemaField.of("field", arraySchema, null, defaultArray1); + HoodieSchemaField field2 = HoodieSchemaField.of("field", arraySchema, null, defaultArray2); + HoodieSchemaField field3 = HoodieSchemaField.of("field", arraySchema, null, defaultArray3); + + HoodieSchema record1 = HoodieSchema.createRecord("test", null, null, false, Collections.singletonList(field1)); + HoodieSchema record2 = HoodieSchema.createRecord("test", null, null, false, Collections.singletonList(field2)); + HoodieSchema record3 = HoodieSchema.createRecord("test", null, null, false, Collections.singletonList(field3)); + + // Same array default values should be equal + assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(record1, record2)); + + // Different array default values should not be equal + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(record1, record3)); + } + + @Test + void testCompareWithNull() { + // Tests schema comparison behavior when one or both schemas are null + // Validates proper handling of null cases in the comparator + HoodieSchema schema = HoodieSchema.create(HoodieSchemaType.STRING); + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(schema, null)); + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(null, schema)); + assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(null, null)); + } + + @Test + void testRecordFieldCountMismatch() { + // Tests that records with different number of fields are not equal + // even if all common fields match + HoodieSchema record1 = HoodieSchema.createRecord("TestRecord", null, null, false, + Collections.singletonList( + HoodieSchemaField.of("field1", HoodieSchema.create(HoodieSchemaType.STRING), null, null) + )); + + HoodieSchema record2 = HoodieSchema.createRecord("TestRecord", null, null, false, + Arrays.asList( + HoodieSchemaField.of("field1", HoodieSchema.create(HoodieSchemaType.STRING), null, null), + HoodieSchemaField.of("field2", HoodieSchema.create(HoodieSchemaType.STRING), null, null) + )); + + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(record1, record2)); + } + + @Test + void testUnionSizeMismatch() { + // Tests that unions with different number of types are not equal + // even if all common types match + HoodieSchema union1 = HoodieSchema.createUnion(Arrays.asList( + HoodieSchema.create(HoodieSchemaType.NULL), + HoodieSchema.create(HoodieSchemaType.STRING) + )); + + HoodieSchema union2 = HoodieSchema.createUnion(Arrays.asList( + HoodieSchema.create(HoodieSchemaType.NULL), + HoodieSchema.create(HoodieSchemaType.STRING), + HoodieSchema.create(HoodieSchemaType.INT) + )); + + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(union1, union2)); + } + + @Test + void testUnionOrder() { + // Tests that the order of types in a union doesn't affect equality + // Important for schema evolution as union member order shouldn't matter + HoodieSchema union1 = HoodieSchema.createUnion(Arrays.asList( + HoodieSchema.create(HoodieSchemaType.NULL), + HoodieSchema.create(HoodieSchemaType.STRING) + )); + + HoodieSchema union2 = HoodieSchema.createUnion(Arrays.asList( + HoodieSchema.create(HoodieSchemaType.STRING), + HoodieSchema.create(HoodieSchemaType.NULL) + )); + + assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(union1, union2)); + } + + @Test + void testLogicalTypeOneNull() { + // Tests comparison of schemas where one has a logical type and the other doesn't + // Validates that logical type presence affects equality + String schema1 = "{\"type\":\"int\",\"logicalType\":\"date\"}"; + String schema2 = "{\"type\":\"int\"}"; + + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema1), + HoodieSchema.parse(schema2) + )); + + // Swap the 2 schema position should have no effect. + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema2), + HoodieSchema.parse(schema1) + )); + } + + @Test + void testSchemaWrapperNullAndTypeMismatch() { + // Tests SchemaWrapper's null handling and type comparison behavior + // Validates proper handling of edge cases in the wrapper class + HoodieSchema schema = HoodieSchema.create(HoodieSchemaType.STRING); + HoodieSchemaComparatorForSchemaEvolution.SchemaWrapper wrapper = new HoodieSchemaComparatorForSchemaEvolution.SchemaWrapper(schema); + + assertNotNull(wrapper); + assertNotEquals(wrapper, new Object()); + } + + @Test + void testTimestampLogicalTypeEquality() { + // Tests that timestamp logical types with different precisions are not equal + String timestampMillis = "{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}"; + String timestampMicros = "{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}"; + + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(timestampMillis), + HoodieSchema.parse(timestampMicros) + )); + } + + @Test + void testTimeLogicalTypeEquality() { + // Tests that time logical types with different precisions are not equal + String timeMillis = "{\"type\":\"int\",\"logicalType\":\"time-millis\"}"; + String timeMicros = "{\"type\":\"long\",\"logicalType\":\"time-micros\"}"; + + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(timeMillis), + HoodieSchema.parse(timeMicros) + )); + } +} \ No newline at end of file diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java index e90956d175e1..e571d936462b 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java @@ -18,7 +18,6 @@ package org.apache.hudi.hadoop.hive; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; @@ -336,7 +335,7 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness { String hiveColumnNames = fields.stream().map(HoodieSchemaField::name).collect(Collectors.joining(",")); hiveColumnNames = hiveColumnNames + ",year,month,day"; - String modifiedHiveColumnTypes = HoodieAvroUtils.addMetadataColumnTypes(tripsHiveColumnTypes); + String modifiedHiveColumnTypes = HoodieSchemaUtils.addMetadataColumnTypes(tripsHiveColumnTypes); modifiedHiveColumnTypes = modifiedHiveColumnTypes + ",string,string,string"; jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames); jobConf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, modifiedHiveColumnTypes); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index a8148715dc87..2f502033f208 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -468,7 +468,7 @@ public class InputFormatTestUtil { String hiveColumnNames = fields.stream().filter(field -> !field.name().equalsIgnoreCase("datestr")) .map(Schema.Field::name).collect(Collectors.joining(",")); hiveColumnNames = hiveColumnNames + ",datestr"; - String modifiedHiveColumnTypes = HoodieAvroUtils.addMetadataColumnTypes(hiveColumnTypes); + String modifiedHiveColumnTypes = HoodieSchemaUtils.addMetadataColumnTypes(hiveColumnTypes); modifiedHiveColumnTypes = modifiedHiveColumnTypes + ",string"; jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames); jobConf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, modifiedHiveColumnTypes); @@ -495,7 +495,7 @@ public class InputFormatTestUtil { String hiveColumnNames = fields.stream().filter(field -> !field.name().equalsIgnoreCase("datestr")) .map(Schema.Field::name).collect(Collectors.joining(",")); hiveColumnNames = hiveColumnNames + ",datestr"; - String modifiedHiveColumnTypes = HoodieAvroUtils.addMetadataColumnTypes(hiveColumnTypes); + String modifiedHiveColumnTypes = HoodieSchemaUtils.addMetadataColumnTypes(hiveColumnTypes); modifiedHiveColumnTypes = modifiedHiveColumnTypes + ",string"; jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames); jobConf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, modifiedHiveColumnTypes); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestDataValidationCheckForLogCompactionActions.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestDataValidationCheckForLogCompactionActions.java index f421d3a2741a..73568ebb48f2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestDataValidationCheckForLogCompactionActions.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestDataValidationCheckForLogCompactionActions.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; @@ -46,7 +47,6 @@ import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.hudi.testutils.HoodieSparkWriteableTestTable; import org.apache.hudi.testutils.MetadataMergeWriteStatus; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.spark.api.java.JavaRDD; @@ -178,7 +178,7 @@ public class TestDataValidationCheckForLogCompactionActions extends HoodieClient // Verify row count. assertEquals(mainRecordsMap.size(), experimentRecordsMap.size()); - Schema readerSchema = new Schema.Parser().parse(mainTable.config.getSchema()); + HoodieSchema readerSchema = HoodieSchema.parse(mainTable.config.getSchema()); List<String> excludeFields = CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD, FILENAME_METADATA_FIELD, OPERATION_METADATA_FIELD, RECORD_KEY_METADATA_FIELD); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java index c1d79a9a2ea5..f529ecd50adb 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java @@ -35,6 +35,7 @@ import org.apache.hudi.common.model.HoodieAvroIndexedRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -165,7 +166,7 @@ public class TestBootstrap extends HoodieSparkClientTestBase { rtInputFormat.setConf(rtJobConf); } - public Schema generateNewDataSetAndReturnSchema(long timestamp, int numRecords, List<String> partitionPaths, + public HoodieSchema generateNewDataSetAndReturnSchema(long timestamp, int numRecords, List<String> partitionPaths, String srcPath) throws Exception { boolean isPartitioned = partitionPaths != null && !partitionPaths.isEmpty(); Dataset<Row> df = @@ -185,7 +186,7 @@ public class TestBootstrap extends HoodieSparkClientTestBase { HoodieAvroParquetReader parquetReader = new HoodieAvroParquetReader(metaClient.getStorage(), new StoragePath(filePath)); //TODO boundary to revisit in later pr to use HoodieSchema directly - return parquetReader.getSchema().toAvroSchema(); + return parquetReader.getSchema(); } @Test @@ -254,7 +255,7 @@ public class TestBootstrap extends HoodieSparkClientTestBase { } List<String> partitions = partitioned ? Arrays.asList("2020/04/01", "2020/04/02", "2020/04/03") : Collections.EMPTY_LIST; long timestamp = Instant.now().toEpochMilli(); - Schema schema = generateNewDataSetAndReturnSchema(timestamp, totalRecords, partitions, bootstrapBasePath); + HoodieSchema schema = generateNewDataSetAndReturnSchema(timestamp, totalRecords, partitions, bootstrapBasePath); HoodieWriteConfig config = getConfigBuilder(schema.toString()) .withSchema(schema.toString()) .withKeyGenerator(keyGeneratorClass) @@ -372,13 +373,13 @@ public class TestBootstrap extends HoodieSparkClientTestBase { testBootstrapCommon(true, true, EffectiveMode.MIXED_BOOTSTRAP_MODE); } - private void checkBootstrapResults(int totalRecords, Schema schema, String maxInstant, boolean checkNumRawFiles, + private void checkBootstrapResults(int totalRecords, HoodieSchema schema, String maxInstant, boolean checkNumRawFiles, int expNumInstants, long expTimestamp, long expROTimestamp, boolean isDeltaCommit, boolean validateRecordsForCommitTime) throws Exception { checkBootstrapResults(totalRecords, schema, maxInstant, checkNumRawFiles, expNumInstants, expNumInstants, expTimestamp, expROTimestamp, isDeltaCommit, Arrays.asList(maxInstant), validateRecordsForCommitTime); } - private void checkBootstrapResults(int totalRecords, Schema schema, String instant, boolean checkNumRawFiles, + private void checkBootstrapResults(int totalRecords, HoodieSchema schema, String instant, boolean checkNumRawFiles, int expNumInstants, int numVersions, long expTimestamp, long expROTimestamp, boolean isDeltaCommit, List<String> instantsWithValidRecords, boolean validateRecordsForCommitTime) throws Exception { metaClient.reloadActiveTimeline(); @@ -539,19 +540,19 @@ public class TestBootstrap extends HoodieSparkClientTestBase { throw new HoodieIOException(e.getMessage(), e); } MessageType parquetSchema = reader.getFooter().getFileMetaData().getSchema(); - Schema schema = new AvroSchemaConverter().convert(parquetSchema); + HoodieSchema schema = HoodieSchema.fromAvroSchema(new AvroSchemaConverter().convert(parquetSchema)); return generateInputBatch(jsc, partitionPaths, schema); } } private static JavaRDD<HoodieRecord> generateInputBatch(JavaSparkContext jsc, - List<Pair<String, List<HoodieFileStatus>>> partitionPaths, Schema writerSchema) { + List<Pair<String, List<HoodieFileStatus>>> partitionPaths, HoodieSchema writerSchema) { List<Pair<String, Path>> fullFilePathsWithPartition = partitionPaths.stream().flatMap(p -> p.getValue().stream() .map(x -> Pair.of(p.getKey(), HadoopFSUtils.toPath(x.getPath())))).collect(Collectors.toList()); return jsc.parallelize(fullFilePathsWithPartition.stream().flatMap(p -> { try { Configuration conf = jsc.hadoopConfiguration(); - AvroReadSupport.setAvroReadSchema(conf, writerSchema); + AvroReadSupport.setAvroReadSchema(conf, writerSchema.toAvroSchema()); Iterator<GenericRecord> recIterator = new ParquetReaderIterator( AvroParquetReader.<GenericRecord>builder(p.getValue()).withConf(conf).build()); return StreamSupport.stream(Spliterators.spliteratorUnknownSize(recIterator, 0), false).map(gr -> {
