This is an automated email from the ASF dual-hosted git repository. vhs pushed a commit to branch phase-18-HoodieAvroUtils-removal-p2 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 4b72160248c42873b8a9a34b8c8ac6e7d9d7c810 Author: voon <[email protected]> AuthorDate: Fri Dec 26 17:55:15 2025 +0800 Remove Avro.Schema from TableSchemaResolver --- .../hudi/aws/sync/AWSGlueCatalogSyncClient.java | 6 +- .../hudi/cli/commands/HoodieLogFileCommand.java | 13 +- .../org/apache/hudi/cli/commands/TableCommand.java | 4 +- .../action/commit/BaseCommitActionExecutor.java | 8 +- .../hudi/table/action/compact/HoodieCompactor.java | 4 +- .../hudi/client/TestJavaHoodieBackedMetadata.java | 9 +- .../client/common/SparkReaderContextFactory.java | 2 +- .../hudi/client/utils/SparkValidatorUtils.java | 2 +- .../apache/hudi/index/HoodieSparkIndexClient.java | 2 +- .../functional/TestHoodieBackedTableMetadata.java | 5 +- .../hudi/common/schema/HoodieSchemaUtils.java | 47 +++++ .../hudi/common/table/TableSchemaResolver.java | 193 ++++------------- .../common/table/read/PartialUpdateHandler.java | 4 +- .../index/secondary/SecondaryIndexManager.java | 8 +- .../hudi/common/schema/TestHoodieSchemaUtils.java | 229 +++++++++++++++++++++ .../table/read/TestHoodieFileGroupReaderBase.java | 16 +- .../apache/hudi/table/catalog/HoodieCatalog.java | 10 +- .../java/org/apache/hudi/util/CompactionUtil.java | 2 +- .../test/java/org/apache/hudi/utils/TestData.java | 12 +- .../hudi/common/table/TestTableSchemaResolver.java | 37 ---- .../apache/hudi/hadoop/SchemaEvolutionContext.java | 2 +- .../hadoop/hive/HoodieCombineHiveInputFormat.java | 2 +- .../HoodieMergeOnReadTableInputFormat.java | 18 +- .../scala/org/apache/hudi/BucketIndexSupport.scala | 6 +- .../scala/org/apache/hudi/HoodieCLIUtils.scala | 2 +- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 5 +- .../procedures/RunClusteringProcedure.scala | 2 +- .../ShowHoodieLogFileMetadataProcedure.scala | 2 +- .../ShowHoodieLogFileRecordsProcedure.scala | 4 +- .../hudi/client/TestHoodieClientMultiWriter.java | 2 +- .../hudi/functional/TestHoodieBackedMetadata.java | 9 +- .../TestHoodieSparkSqlWriterWithTestFormat.scala | 2 +- .../hudi/functional/TestBasicSchemaEvolution.scala | 4 +- .../apache/hudi/functional/TestCOWDataSource.scala | 2 +- .../hudi/functional/TestRecordLevelIndex.scala | 2 +- .../hudi/functional/TestTimeTravelQuery.scala | 6 +- .../apache/spark/sql/hudi/ddl/TestAlterTable.scala | 2 +- .../hudi/feature/index/TestSecondaryIndex.scala | 6 +- .../org/apache/hudi/hive/HoodieHiveSyncClient.java | 4 +- .../org/apache/hudi/utilities/HoodieCompactor.java | 4 +- .../utilities/HoodieMetadataTableValidator.java | 3 +- .../org/apache/hudi/utilities/UtilHelpers.java | 3 +- .../deltastreamer/TestHoodieDeltaStreamer.java | 113 ++++++---- ...estHoodieDeltaStreamerSchemaEvolutionQuick.java | 32 ++- 44 files changed, 513 insertions(+), 337 deletions(-) diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java index 4050d47ff96b..ccd09675fb51 100644 --- a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java @@ -467,7 +467,7 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { private String getTableDoc() { try { - return tableSchemaResolver.getTableAvroSchema(true).getDoc(); + return tableSchemaResolver.getTableSchema(true).getDoc().orElseGet(() -> ""); } catch (Exception e) { throw new HoodieGlueSyncException("Failed to get schema's doc from storage : ", e); } @@ -476,10 +476,10 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { @Override public List<FieldSchema> getStorageFieldSchemas() { try { - return tableSchemaResolver.getTableAvroSchema(true) + return tableSchemaResolver.getTableSchema(true) .getFields() .stream() - .map(f -> new FieldSchema(f.name(), f.schema().getType().getName(), f.doc())) + .map(f -> new FieldSchema(f.name(), f.schema().getType().toAvroType().getName(), f.doc())) .collect(Collectors.toList()); } catch (Exception e) { throw new HoodieGlueSyncException("Failed to get field schemas from storage : ", e); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java index ec06709ab982..6138e1fb3839 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java @@ -46,14 +46,13 @@ import org.apache.hudi.common.table.read.HoodieFileGroupReader; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.io.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.io.util.FileIOUtils; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.springframework.shell.standard.ShellComponent; import org.springframework.shell.standard.ShellMethod; @@ -116,7 +115,7 @@ public class HoodieLogFileCommand { } else { fileName = path.getName(); } - HoodieSchema writerSchema = HoodieSchema.fromAvroSchema(TableSchemaResolver.readSchemaFromLogFile(storage, path)); + HoodieSchema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage, path); try (Reader reader = HoodieLogFormat.newReader(storage, new HoodieLogFile(path), writerSchema)) { // read the avro blocks @@ -221,10 +220,10 @@ public class HoodieLogFileCommand { HoodieSchema readerSchema = null; // get schema from last log file for (int i = logFilePaths.size() - 1; i >= 0; i--) { - Schema schema = TableSchemaResolver.readSchemaFromLogFile( + HoodieSchema schema = TableSchemaResolver.readSchemaFromLogFile( storage, new StoragePath(logFilePaths.get(i))); if (schema != null) { - readerSchema = HoodieSchema.fromAvroSchema(schema); + readerSchema = schema; break; } } @@ -262,10 +261,10 @@ public class HoodieLogFileCommand { } } else { for (String logFile : logFilePaths) { - Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile( + HoodieSchema writerSchema = TableSchemaResolver.readSchemaFromLogFile( client.getStorage(), new StoragePath(logFile)); try (HoodieLogFormat.Reader reader = - HoodieLogFormat.newReader(storage, new HoodieLogFile(new StoragePath(logFile)), HoodieSchema.fromAvroSchema(writerSchema))) { + HoodieLogFormat.newReader(storage, new HoodieLogFile(new StoragePath(logFile)), writerSchema)) { // read the avro blocks while (reader.hasNext()) { HoodieLogBlock n = reader.next(); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java index bdc16cf45196..0e3f7a4029dc 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java @@ -25,6 +25,7 @@ import org.apache.hudi.cli.TableHeader; import org.apache.hudi.common.config.HoodieTimeGeneratorConfig; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; @@ -41,7 +42,6 @@ import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; import org.springframework.shell.standard.ShellComponent; import org.springframework.shell.standard.ShellMethod; import org.springframework.shell.standard.ShellOption; @@ -204,7 +204,7 @@ public class TableCommand { help = "File path to write schema") final String outputFilePath) throws Exception { HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(client); - Schema schema = tableSchemaResolver.getTableAvroSchema(); + HoodieSchema schema = tableSchemaResolver.getTableSchema(); if (outputFilePath != null) { log.info("Latest table schema : " + schema.toString(true)); writeToFile(outputFilePath, schema.toString(true)); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java index ae117c002da1..2073e38d93e6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java @@ -33,6 +33,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; @@ -61,7 +62,6 @@ import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; import java.io.IOException; import java.time.Duration; @@ -291,9 +291,9 @@ public abstract class BaseCommitActionExecutor<T, I, K, O, R> ClusteringUtils.transitionClusteringOrReplaceRequestedToInflight(instant, Option.empty(), table.getActiveTimeline()); table.getMetaClient().reloadActiveTimeline(); - Option<Schema> schema; + Option<HoodieSchema> schema; try { - schema = new TableSchemaResolver(table.getMetaClient()).getTableAvroSchemaIfPresent(false); + schema = new TableSchemaResolver(table.getMetaClient()).getTableSchemaIfPresent(false); } catch (Exception ex) { throw new HoodieSchemaException(ex); } @@ -301,7 +301,7 @@ public abstract class BaseCommitActionExecutor<T, I, K, O, R> (ClusteringExecutionStrategy<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>>) ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(), new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config)) - .performClustering(clusteringPlan, schema.get(), instantTime); + .performClustering(clusteringPlan, schema.get().toAvroSchema(), instantTime); HoodieData<WriteStatus> writeStatusList = writeMetadata.getWriteStatuses(); HoodieData<WriteStatus> statuses = updateIndex(writeStatusList, writeMetadata); statuses.persist(config.getString(WRITE_STATUS_STORAGE_LEVEL_VALUE), context, HoodieData.HoodieDataCacheKey.of(config.getBasePath(), instantTime)); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java index 04aaef78ec03..85dd1d45dc12 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.log.InstantRange; @@ -46,7 +47,6 @@ import org.apache.hudi.io.HoodieMergeHandleFactory; import org.apache.hudi.table.HoodieTable; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import java.io.IOException; @@ -110,7 +110,7 @@ public abstract class HoodieCompactor<T, I, K, O> implements Serializable { // the same with the table schema. try { if (StringUtils.isNullOrEmpty(config.getInternalSchema())) { - Schema readerSchema = schemaResolver.getTableAvroSchema(false); + HoodieSchema readerSchema = schemaResolver.getTableSchema(false); config.setSchema(readerSchema.toString()); } } catch (Exception e) { diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java index 44bb6d2f89ad..3525995fb540 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java @@ -109,7 +109,6 @@ import org.apache.hudi.testutils.MetadataMergeWriteStatus; import org.apache.hudi.testutils.TestHoodieMetadataBase; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.junit.jupiter.api.AfterEach; @@ -892,7 +891,7 @@ public class TestJavaHoodieBackedMetadata extends TestHoodieMetadataBase { private void verifyMetadataRawRecords(HoodieTable table, List<HoodieLogFile> logFiles, boolean enableMetaFields) throws IOException { for (HoodieLogFile logFile : logFiles) { List<StoragePathInfo> pathInfoList = storage.listDirectEntries(logFile.getPath()); - Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage, + HoodieSchema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage, logFile.getPath()); if (writerSchema == null) { // not a data block @@ -900,7 +899,7 @@ public class TestJavaHoodieBackedMetadata extends TestHoodieMetadataBase { } try (HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(storage, - new HoodieLogFile(pathInfoList.get(0).getPath()), HoodieSchema.fromAvroSchema(writerSchema))) { + new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) { while (logFileReader.hasNext()) { HoodieLogBlock logBlock = logFileReader.next(); if (logBlock instanceof HoodieDataBlock) { @@ -2875,7 +2874,7 @@ public class TestJavaHoodieBackedMetadata extends TestHoodieMetadataBase { private void verifyMetadataColumnStatsRecords(List<HoodieLogFile> logFiles) throws IOException { for (HoodieLogFile logFile : logFiles) { List<StoragePathInfo> pathInfoList = storage.listDirectEntries(logFile.getPath()); - Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage, + HoodieSchema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage, logFile.getPath()); if (writerSchema == null) { // not a data block @@ -2883,7 +2882,7 @@ public class TestJavaHoodieBackedMetadata extends TestHoodieMetadataBase { } try (HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(storage, - new HoodieLogFile(pathInfoList.get(0).getPath()), HoodieSchema.fromAvroSchema(writerSchema))) { + new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) { while (logFileReader.hasNext()) { HoodieLogBlock logBlock = logFileReader.next(); if (logBlock instanceof HoodieDataBlock) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java index d2f5eb81458f..914fcf8ea932 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java @@ -143,7 +143,7 @@ public class SparkReaderContextFactory implements ReaderContextFactory<InternalR Configuration configs, SparkAdapter sparkAdapter) { try { - StructType dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(resolver.getTableAvroSchema()); + StructType dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(resolver.getTableSchema().toAvroSchema()); return sparkAdapter.createOrcFileReader(false, sqlConf, options, configs, dataSchema); } catch (Exception e) { throw new HoodieException("Failed to broadcast ORC file reader", e); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java index f3de90d587be..fa9ddc64b723 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java @@ -138,7 +138,7 @@ public class SparkValidatorUtils { return sqlContext.createDataFrame( sqlContext.emptyDataFrame().rdd(), AvroConversionUtils.convertAvroSchemaToStructType( - new TableSchemaResolver(table.getMetaClient()).getTableAvroSchema())); + new TableSchemaResolver(table.getMetaClient()).getTableSchema().toAvroSchema())); } catch (Exception e) { LOG.warn("Cannot get table schema from before state.", e); LOG.warn("Using the schema from after state (current transaction) to create the empty Spark dataframe: {}", newStructTypeSchema); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/HoodieSparkIndexClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/HoodieSparkIndexClient.java index 392e8adb95ca..7cc8e7545ae0 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/HoodieSparkIndexClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/HoodieSparkIndexClient.java @@ -204,7 +204,7 @@ public class HoodieSparkIndexClient extends BaseHoodieIndexClient { Option<String> indexTypeOpt, Map<String, String> configs) { try { TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); - String schemaStr = schemaUtil.getTableAvroSchema(false).toString(); + String schemaStr = schemaUtil.getTableSchema(false).toString(); TypedProperties props = getProps(metaClient, indexDefinitionOpt, indexTypeOpt, schemaStr); if (!engineContextOpt.isPresent()) { engineContextOpt = Option.of(new HoodieSparkEngineContext(new JavaSparkContext(sparkSessionOpt.get().sparkContext()))); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java index 62b7a2495557..353d94904ceb 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java @@ -62,7 +62,6 @@ import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.junit.jupiter.params.ParameterizedTest; @@ -520,7 +519,7 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase { private void verifyMetadataRawRecords(HoodieTable table, List<HoodieLogFile> logFiles) throws IOException { for (HoodieLogFile logFile : logFiles) { List<StoragePathInfo> pathInfoList = storage.listDirectEntries(logFile.getPath()); - Schema writerSchema = + HoodieSchema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage, logFile.getPath()); if (writerSchema == null) { // not a data block @@ -528,7 +527,7 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase { } try (HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(storage, - new HoodieLogFile(pathInfoList.get(0).getPath()), HoodieSchema.fromAvroSchema(writerSchema))) { + new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) { while (logFileReader.hasNext()) { HoodieLogBlock logBlock = logFileReader.next(); if (logBlock instanceof HoodieDataBlock) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java index 569d17251759..3da6320adf6d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java @@ -28,6 +28,7 @@ import org.apache.hudi.internal.schema.HoodieSchemaException; import org.apache.avro.JsonProperties; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; import java.math.BigDecimal; import java.math.BigInteger; @@ -794,4 +795,50 @@ public final class HoodieSchemaUtils { public static boolean isMetadataField(String fieldName) { return HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(fieldName); } + + /** + * Converts a HoodieSchemaField's default value to its Java representation. + * This is equivalent to {@link org.apache.hudi.avro.HoodieAvroUtils#toJavaDefaultValue(org.apache.avro.Schema.Field)} + * but operates on HoodieSchemaField. + * + * <p>For primitive types (STRING, INT, LONG, FLOAT, DOUBLE, BOOLEAN, ENUM, BYTES), + * the default value is returned as-is. For complex types (ARRAY, MAP, RECORD), + * Avro's GenericData utility is used to properly construct the default value.</p> + * + * @param field the HoodieSchemaField containing the default value + * @return the Java representation of the default value, or null if no default value exists + * @throws IllegalArgumentException if the field's type is not supported + * @since 1.2.0 + */ + public static Object toJavaDefaultValue(HoodieSchemaField field) { + ValidationUtils.checkArgument(field != null, "Field cannot be null"); + + Option<Object> defaultValOpt = field.defaultVal(); + if (!defaultValOpt.isPresent() || defaultValOpt.get() == HoodieJsonProperties.NULL_VALUE) { + return null; + } + + Object defaultVal = defaultValOpt.get(); + HoodieSchemaType type = field.getNonNullSchema().getType(); + + switch (type) { + case STRING: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case BOOLEAN: + case ENUM: + case BYTES: + return defaultVal; + case ARRAY: + case MAP: + case RECORD: + // Use Avro's standard GenericData utility for complex types + // Delegate to the underlying Avro field + return GenericData.get().getDefaultValue(field.getAvroField()); + default: + throw new IllegalArgumentException("Unsupported HoodieSchema type: " + type); + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 2c5e5b92e7c0..6322063f471e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -18,7 +18,6 @@ package org.apache.hudi.common.table; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieLogFile; @@ -32,7 +31,6 @@ import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat.Reader; import org.apache.hudi.common.table.log.block.HoodieDataBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; @@ -51,7 +49,6 @@ import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.util.Lazy; -import org.apache.avro.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -107,177 +104,109 @@ public class TableSchemaResolver { this.hasOperationField = Lazy.lazily(this::hasOperationField); } - public Schema getTableAvroSchemaFromDataFile() throws Exception { - return getTableAvroSchemaFromDataFileInternal().orElseThrow(schemaNotFoundError()); - } - /** * Gets full schema (user + metadata) for a hoodie table from data file as HoodieSchema. - * Delegates to getTableAvroSchemaFromDataFile and wraps the result in a HoodieSchema. * * @return HoodieSchema for this table from data file * @throws Exception */ public HoodieSchema getTableSchemaFromDataFile() throws Exception { - Schema avroSchema = getTableAvroSchemaFromDataFile(); - return HoodieSchema.fromAvroSchema(avroSchema); + return getTableSchemaFromDataFileInternal().orElseThrow(schemaNotFoundError()); } - private Option<Schema> getTableAvroSchemaFromDataFileInternal() { + private Option<HoodieSchema> getTableSchemaFromDataFileInternal() { return getTableParquetSchemaFromDataFile(); } /** - * Gets full schema (user + metadata) for a hoodie table as HoodieSchema. - * Delegates to getTableAvroSchema and wraps the result in a HoodieSchema. + * Gets full schema (user + metadata) for a hoodie table. * - * @return HoodieSchema for this table + * @return Avro schema for this table * @throws Exception */ public HoodieSchema getTableSchema() throws Exception { - Schema avroSchema = getTableAvroSchema(metaClient.getTableConfig().populateMetaFields()); - return HoodieSchema.fromAvroSchema(avroSchema); + return getTableSchema(metaClient.getTableConfig().populateMetaFields()); } /** - * Gets full schema (user + metadata) for a hoodie table as HoodieSchema. - * Delegates to getTableAvroSchema and wraps the result in a HoodieSchema. + * Gets schema for a hoodie table, can choose if include metadata fields should be included. * * @param includeMetadataFields choice if include metadata fields - * @return HoodieSchema for this table + * @return Hoodie schema for this table * @throws Exception */ public HoodieSchema getTableSchema(boolean includeMetadataFields) throws Exception { - Schema avroSchema = getTableAvroSchema(includeMetadataFields); - return HoodieSchema.fromAvroSchema(avroSchema); + return getTableSchemaInternal(includeMetadataFields, Option.empty()).orElseThrow(schemaNotFoundError()); } /** - * Fetches tables schema in Avro format as of the given instant as HoodieSchema. + * Fetches tables schema as of the given instant * * @param timestamp as of which table's schema will be fetched */ public HoodieSchema getTableSchema(String timestamp) throws Exception { - Schema avroSchema = getTableAvroSchema(timestamp); - return HoodieSchema.fromAvroSchema(avroSchema); - } - - /** - * Fetches HoodieSchema as of the given instant - * - * @param instant as of which table's schema will be fetched - */ - public HoodieSchema getTableSchema(HoodieInstant instant, boolean includeMetadataFields) throws Exception { - Schema schema = getTableAvroSchema(instant, includeMetadataFields); - return HoodieSchema.fromAvroSchema(schema); - } - - public Option<HoodieSchema> getTableSchemaIfPresent(boolean includeMetadataFields) { - return getTableAvroSchemaInternal(includeMetadataFields, Option.empty()).map(HoodieSchema::fromAvroSchema); - } - - /** - * Gets full schema (user + metadata) for a hoodie table in Avro format. - * - * @return Avro schema for this table - * @throws Exception - */ - public Schema getTableAvroSchema() throws Exception { - return getTableAvroSchema(metaClient.getTableConfig().populateMetaFields()); - } - - /** - * Gets schema for a hoodie table in Avro format, can choice if include metadata fields. - * - * @param includeMetadataFields choice if include metadata fields - * @return Avro schema for this table - * @throws Exception - */ - public Schema getTableAvroSchema(boolean includeMetadataFields) throws Exception { - return getTableAvroSchemaInternal(includeMetadataFields, Option.empty()).orElseThrow(schemaNotFoundError()); - } - - /** - * Fetches tables schema in Avro format as of the given instant - * - * @param timestamp as of which table's schema will be fetched - */ - public Schema getTableAvroSchema(String timestamp) throws Exception { Option<HoodieInstant> instant = metaClient.getActiveTimeline().getCommitsTimeline() .filterCompletedInstants() .findInstantsBeforeOrEquals(timestamp) .lastInstant(); - return getTableAvroSchemaInternal(metaClient.getTableConfig().populateMetaFields(), instant) + return getTableSchemaInternal(metaClient.getTableConfig().populateMetaFields(), instant) .orElseThrow(schemaNotFoundError()); } /** - * Fetches tables schema in Avro format as of the given instant + * Fetches tables schema as of the given instant * * @param instant as of which table's schema will be fetched */ - public Schema getTableAvroSchema(HoodieInstant instant, boolean includeMetadataFields) throws Exception { - return getTableAvroSchemaInternal(includeMetadataFields, Option.of(instant)).orElseThrow(schemaNotFoundError()); - } - - /** - * Gets users data schema for a hoodie table in Avro format. - * - * @return Avro user data schema - * @throws Exception - * - * @deprecated use {@link #getTableAvroSchema(boolean)} instead - */ - @Deprecated - public Schema getTableAvroSchemaWithoutMetadataFields() throws Exception { - return getTableAvroSchemaInternal(false, Option.empty()).orElseThrow(schemaNotFoundError()); + public HoodieSchema getTableSchema(HoodieInstant instant, boolean includeMetadataFields) throws Exception { + return getTableSchemaInternal(includeMetadataFields, Option.of(instant)).orElseThrow(schemaNotFoundError()); } - public Option<Schema> getTableAvroSchemaIfPresent(boolean includeMetadataFields) { - return getTableAvroSchemaInternal(includeMetadataFields, Option.empty()); + public Option<HoodieSchema> getTableSchemaIfPresent(boolean includeMetadataFields) { + return getTableSchemaInternal(includeMetadataFields, Option.empty()); } - private Option<Schema> getTableAvroSchemaInternal(boolean includeMetadataFields, Option<HoodieInstant> instantOpt) { - Option<Schema> schema = + private Option<HoodieSchema> getTableSchemaInternal(boolean includeMetadataFields, Option<HoodieInstant> instantOpt) { + Option<HoodieSchema> schema = (instantOpt.isPresent() ? getTableSchemaFromCommitMetadata(instantOpt.get(), includeMetadataFields) : getTableSchemaFromLatestCommitMetadata(includeMetadataFields)) .or(() -> metaClient.getTableConfig().getTableCreateSchema() + .map(HoodieSchema::fromAvroSchema) .map(tableSchema -> includeMetadataFields - ? HoodieAvroUtils.addMetadataFields(tableSchema, hasOperationField.get()) + ? HoodieSchemaUtils.addMetadataFields(tableSchema, hasOperationField.get()) : tableSchema) ) .or(() -> { - Option<Schema> schemaFromDataFile = getTableAvroSchemaFromDataFileInternal(); + Option<HoodieSchema> schemaFromDataFile = getTableSchemaFromDataFileInternal(); return includeMetadataFields ? schemaFromDataFile - : schemaFromDataFile.map(HoodieAvroUtils::removeMetadataFields); + : schemaFromDataFile.map(HoodieSchemaUtils::removeMetadataFields); }); // TODO partition columns have to be appended in all read-paths if (metaClient.getTableConfig().shouldDropPartitionColumns() && schema.isPresent()) { - HoodieSchema hoodieSchema = HoodieSchema.fromAvroSchema(schema.get()); + HoodieSchema hoodieSchema = schema.get(); return metaClient.getTableConfig().getPartitionFields() .map(partitionFields -> appendPartitionColumns(hoodieSchema, Option.ofNullable(partitionFields))) - .map(HoodieSchema::toAvroSchema) .or(() -> schema); } return schema; } - private Option<Schema> getTableSchemaFromLatestCommitMetadata(boolean includeMetadataFields) { + private Option<HoodieSchema> getTableSchemaFromLatestCommitMetadata(boolean includeMetadataFields) { Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata = getLatestCommitMetadataWithValidSchema(); if (instantAndCommitMetadata.isPresent()) { HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight(); String schemaStr = commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY); - Schema schema = new Schema.Parser().parse(schemaStr); + HoodieSchema schema = HoodieSchema.parse(schemaStr); if (includeMetadataFields) { - schema = HoodieAvroUtils.addMetadataFields(schema, hasOperationField.get()); + schema = HoodieSchemaUtils.addMetadataFields(schema, hasOperationField.get()); } else { - schema = HoodieAvroUtils.removeMetadataFields(schema); + schema = HoodieSchemaUtils.removeMetadataFields(schema); } return Option.of(schema); } else { @@ -285,7 +214,7 @@ public class TableSchemaResolver { } } - private Option<Schema> getTableSchemaFromCommitMetadata(HoodieInstant instant, boolean includeMetadataFields) { + private Option<HoodieSchema> getTableSchemaFromCommitMetadata(HoodieInstant instant, boolean includeMetadataFields) { try { HoodieCommitMetadata metadata = getCachedCommitMetadata(instant); String existingSchemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY); @@ -294,11 +223,11 @@ public class TableSchemaResolver { return Option.empty(); } - Schema schema = new Schema.Parser().parse(existingSchemaStr); + HoodieSchema schema = HoodieSchema.parse(existingSchemaStr); if (includeMetadataFields) { - schema = HoodieAvroUtils.addMetadataFields(schema, hasOperationField.get()); + schema = HoodieSchemaUtils.addMetadataFields(schema, hasOperationField.get()); } else { - schema = HoodieAvroUtils.removeMetadataFields(schema); + schema = HoodieSchemaUtils.removeMetadataFields(schema); } return Option.of(schema); } catch (Exception e) { @@ -309,7 +238,7 @@ public class TableSchemaResolver { /** * Fetches the schema for a table from any the table's data files */ - private Option<Schema> getTableParquetSchemaFromDataFile() { + private Option<HoodieSchema> getTableParquetSchemaFromDataFile() { Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata = getLatestCommitMetadataWithInsertOrUpdate(); switch (metaClient.getTableType()) { case COPY_ON_WRITE: @@ -335,59 +264,22 @@ public class TableSchemaResolver { } } - /** - * Returns table's latest Avro {@link Schema} iff table is non-empty (ie there's at least - * a single commit) - * - * This method differs from {@link #getTableAvroSchema(boolean)} in that it won't fallback - * to use table's schema used at creation - */ - public Option<Schema> getTableAvroSchemaFromLatestCommit(boolean includeMetadataFields) throws Exception { - if (metaClient.isTimelineNonEmpty()) { - return getTableAvroSchemaInternal(includeMetadataFields, Option.empty()); - } - - return Option.empty(); - } - /** * Returns table's latest {@link HoodieSchema} iff table is non-empty (ie there's at least * a single commit) * - * This method differs from {@link #getTableAvroSchema(boolean)} in that it won't fallback + * This method differs from {@link #getTableSchema(boolean)} in that it won't fallback * to use table's schema used at creation */ - public Option<HoodieSchema> getTableSchemaFromLatestCommit(boolean includeMetadataFields) { + public Option<HoodieSchema> getTableSchemaFromLatestCommit(boolean includeMetadataFields) throws Exception { if (metaClient.isTimelineNonEmpty()) { - return getTableAvroSchemaInternal(includeMetadataFields, Option.empty()).map(HoodieSchema::fromAvroSchema); + return getTableSchemaInternal(includeMetadataFields, Option.empty()); } return Option.empty(); } - /** - * Read schema from a data file from the last compaction commit done. - * - * @deprecated please use {@link #getTableAvroSchema(HoodieInstant, boolean)} instead - */ - public Schema readSchemaFromLastCompaction(Option<HoodieInstant> lastCompactionCommitOpt) throws Exception { - HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - - HoodieInstant lastCompactionCommit = lastCompactionCommitOpt.orElseThrow(() -> new Exception( - "Could not read schema from last compaction, no compaction commits found on path " + metaClient)); - - // Read from the compacted file wrote - HoodieCommitMetadata compactionMetadata = - activeTimeline.readCommitMetadata(lastCompactionCommit); - String filePath = compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny() - .orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for compaction " - + lastCompactionCommit + ", could not get schema for table " + metaClient.getBasePath())); - StoragePath path = new StoragePath(filePath); - return HoodieIOFactory.getIOFactory(metaClient.getStorage()) - .getFileFormatUtils(path).readHoodieSchema(metaClient.getStorage(), path).toAvroSchema(); - } - - private Schema readSchemaFromLogFile(StoragePath path) throws IOException { + private HoodieSchema readSchemaFromLogFile(StoragePath path) throws IOException { return readSchemaFromLogFile(metaClient.getRawStorage(), path); } @@ -396,7 +288,7 @@ public class TableSchemaResolver { * * @return */ - public static Schema readSchemaFromLogFile(HoodieStorage storage, StoragePath path) throws IOException { + public static HoodieSchema readSchemaFromLogFile(HoodieStorage storage, StoragePath path) throws IOException { // We only need to read the schema from the log block header, // so we read the block lazily to avoid reading block content // containing the records @@ -408,7 +300,7 @@ public class TableSchemaResolver { lastBlock = (HoodieDataBlock) block; } } - return lastBlock != null ? lastBlock.getSchema().toAvroSchema() : null; + return lastBlock != null ? lastBlock.getSchema() : null; } } @@ -476,10 +368,10 @@ public class TableSchemaResolver { */ public boolean hasOperationField() { try { - Schema tableAvroSchema = getTableAvroSchemaFromDataFile(); - return tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD) != null; + HoodieSchema tableSchema = getTableSchemaFromDataFile(); + return tableSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD).isPresent(); } catch (Exception e) { - LOG.info("Failed to read operation field from avro schema ({})", e.getMessage()); + LOG.info("Failed to read operation field from hoodie schema ({})", e.getMessage()); return false; } } @@ -560,16 +452,15 @@ public class TableSchemaResolver { }); } - private Schema fetchSchemaFromFiles(Stream<StoragePath> filePaths) { + private HoodieSchema fetchSchemaFromFiles(Stream<StoragePath> filePaths) { return filePaths.map(filePath -> { try { if (FSUtils.isLogFile(filePath)) { // this is a log file return readSchemaFromLogFile(filePath); } else { - HoodieSchema hoodieSchema = HoodieIOFactory.getIOFactory(metaClient.getStorage()) + return HoodieIOFactory.getIOFactory(metaClient.getStorage()) .getFileFormatUtils(filePath).readHoodieSchema(metaClient.getStorage(), filePath); - return hoodieSchema != null ? hoodieSchema.toAvroSchema() : null; } } catch (IOException e) { throw new HoodieIOException("Failed to read schema from file: " + filePath, e); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateHandler.java index 9af83d5054f9..58f4043f16b3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateHandler.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateHandler.java @@ -19,12 +19,12 @@ package org.apache.hudi.common.table.read; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.RecordContext; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaField; import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.table.PartialUpdateMode; import java.io.Serializable; @@ -117,7 +117,7 @@ public class PartialUpdateHandler<T> implements Serializable { String fieldName = field.name(); // The default value only from the top-level data type is validated. That means, // for nested columns, we do not check the leaf level data type defaults. - Object defaultValue = HoodieAvroUtils.toJavaDefaultValue(field.getAvroField()); + Object defaultValue = HoodieSchemaUtils.toJavaDefaultValue(field); Object newValue = recordContext.getValue(highOrderRecord.getRecord(), highOrderSchema, fieldName); if (defaultValue == newValue) { fieldVals[idx++] = recordContext.getValue(lowOrderRecord.getRecord(), lowOrderSchema, fieldName); diff --git a/hudi-common/src/main/java/org/apache/hudi/index/secondary/SecondaryIndexManager.java b/hudi-common/src/main/java/org/apache/hudi/index/secondary/SecondaryIndexManager.java index 3ad8bcf29dca..ca4c8766fdc4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/index/secondary/SecondaryIndexManager.java +++ b/hudi-common/src/main/java/org/apache/hudi/index/secondary/SecondaryIndexManager.java @@ -19,6 +19,7 @@ package org.apache.hudi.index.secondary; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; @@ -29,7 +30,6 @@ import org.apache.hudi.exception.HoodieSecondaryIndexException; import lombok.AccessLevel; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; import java.util.Collections; import java.util.LinkedHashMap; @@ -82,16 +82,16 @@ public class SecondaryIndexManager { Map<String, String> options) { Option<List<HoodieSecondaryIndex>> secondaryIndexes = SecondaryIndexUtils.getSecondaryIndexes(metaClient); Set<String> colNames = columns.keySet(); - Schema avroSchema; + HoodieSchema schema; try { - avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema(false); + schema = new TableSchemaResolver(metaClient).getTableSchema(false); } catch (Exception e) { throw new HoodieSecondaryIndexException( "Failed to get table avro schema: " + metaClient.getTableConfig().getTableName()); } for (String col : colNames) { - if (avroSchema.getField(col) == null) { + if (schema.getField(col) == null) { throw new HoodieSecondaryIndexException("Field not exists: " + col); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java index b5f649851fad..2580311ff700 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java @@ -26,6 +26,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.internal.schema.HoodieSchemaException; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -36,15 +37,20 @@ import java.nio.ByteBuffer; import java.sql.Timestamp; import java.time.LocalDate; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; @@ -1759,4 +1765,227 @@ public class TestHoodieSchemaUtils { HoodieSchema result = HoodieSchemaUtils.resolveUnionSchema(nullableString, "string"); assertEquals(HoodieSchemaType.STRING, result.getType()); } + + @Test + public void testToJavaDefaultValueNull() { + // Field with no default value + HoodieSchemaField field = HoodieSchemaField.of("testField", HoodieSchema.create(HoodieSchemaType.STRING)); + Object result = HoodieSchemaUtils.toJavaDefaultValue(field); + assertNull(result); + } + + @Test + public void testToJavaDefaultValueNullValue() { + // Field with explicit NULL default value + HoodieSchemaField field = HoodieSchemaField.of("testField", + HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.STRING)), + null, + HoodieSchema.NULL_VALUE); + Object result = HoodieSchemaUtils.toJavaDefaultValue(field); + assertNull(result); + } + + @Test + public void testToJavaDefaultValueString() { + String defaultVal = "defaultString"; + HoodieSchemaField field = HoodieSchemaField.of("stringField", + HoodieSchema.create(HoodieSchemaType.STRING), + null, + defaultVal); + Object result = HoodieSchemaUtils.toJavaDefaultValue(field); + assertEquals(defaultVal, result); + } + + @Test + public void testToJavaDefaultValueInt() { + int defaultVal = 42; + HoodieSchemaField field = HoodieSchemaField.of("intField", + HoodieSchema.create(HoodieSchemaType.INT), + null, + defaultVal); + Object result = HoodieSchemaUtils.toJavaDefaultValue(field); + assertEquals(defaultVal, result); + } + + @Test + public void testToJavaDefaultValueLong() { + long defaultVal = 12345L; + HoodieSchemaField field = HoodieSchemaField.of("longField", + HoodieSchema.create(HoodieSchemaType.LONG), + null, + defaultVal); + Object result = HoodieSchemaUtils.toJavaDefaultValue(field); + assertEquals(defaultVal, result); + } + + @Test + public void testToJavaDefaultValueFloat() { + float defaultVal = 3.14f; + HoodieSchemaField field = HoodieSchemaField.of("floatField", + HoodieSchema.create(HoodieSchemaType.FLOAT), + null, + defaultVal); + Object result = HoodieSchemaUtils.toJavaDefaultValue(field); + assertEquals(defaultVal, result); + } + + @Test + public void testToJavaDefaultValueDouble() { + double defaultVal = 2.718; + HoodieSchemaField field = HoodieSchemaField.of("doubleField", + HoodieSchema.create(HoodieSchemaType.DOUBLE), + null, + defaultVal); + Object result = HoodieSchemaUtils.toJavaDefaultValue(field); + assertEquals(defaultVal, result); + } + + @Test + public void testToJavaDefaultValueBoolean() { + HoodieSchemaField field = HoodieSchemaField.of("boolField", + HoodieSchema.create(HoodieSchemaType.BOOLEAN), + null, + true); + Object result = HoodieSchemaUtils.toJavaDefaultValue(field); + assertEquals(true, result); + } + + @Test + public void testToJavaDefaultValueBytes() { + byte[] defaultBytes = new byte[]{1, 2, 3, 4}; + HoodieSchemaField field = HoodieSchemaField.of("bytesField", + HoodieSchema.create(HoodieSchemaType.BYTES), + null, + defaultBytes); + Object result = HoodieSchemaUtils.toJavaDefaultValue(field); + assertArrayEquals(defaultBytes, (byte[]) result); + } + + @Test + public void testToJavaDefaultValueEnum() { + HoodieSchema enumSchema = HoodieSchema.createEnum("Status", null, null, Arrays.asList("ACTIVE", "INACTIVE", "PENDING")); + HoodieSchemaField field = HoodieSchemaField.of("statusField", + enumSchema, + null, + "ACTIVE"); + Object result = HoodieSchemaUtils.toJavaDefaultValue(field); + assertEquals("ACTIVE", result); + } + + @Test + public void testToJavaDefaultValueArray() { + // Create array schema with int elements + HoodieSchema arraySchema = HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.INT)); + + // Default value as a list + List<Integer> defaultList = Arrays.asList(1, 2, 3); + HoodieSchemaField field = HoodieSchemaField.of("arrayField", + arraySchema, + null, + defaultList); + + Object result = HoodieSchemaUtils.toJavaDefaultValue(field); + assertNotNull(result); + assertInstanceOf(Collection.class, result); + assertArrayEquals(defaultList.toArray(), ((Collection<?>) result).toArray()); + } + + @Test + public void testToJavaDefaultValueMap() { + // Create map schema with string values + HoodieSchema mapSchema = HoodieSchema.createMap(HoodieSchema.create(HoodieSchemaType.STRING)); + + // Default value as a map + Map<String, String> defaultMap = new HashMap<>(); + defaultMap.put("key1", "value1"); + defaultMap.put("key2", "value2"); + + HoodieSchemaField field = HoodieSchemaField.of("mapField", + mapSchema, + null, + defaultMap); + + Object result = HoodieSchemaUtils.toJavaDefaultValue(field); + assertNotNull(result); + // GenericData should return a map + assertInstanceOf(Map.class, result); + } + + @Test + public void testToJavaDefaultValueRecord() { + // Create nested record schema + HoodieSchema nestedRecordSchema = HoodieSchema.createRecord( + "NestedRecord", + null, + null, + Arrays.asList( + HoodieSchemaField.of("field1", HoodieSchema.create(HoodieSchemaType.STRING), null, "default1"), + HoodieSchemaField.of("field2", HoodieSchema.create(HoodieSchemaType.INT), null, 10) + ) + ); + + // Create a default record value + Map<String, Object> defaultRecord = new HashMap<>(); + defaultRecord.put("field1", "default1"); + defaultRecord.put("field2", 10); + + HoodieSchemaField field = HoodieSchemaField.of("recordField", + nestedRecordSchema, + null, + defaultRecord); + + Object result = HoodieSchemaUtils.toJavaDefaultValue(field); + assertNotNull(result); + // GenericData should return a GenericRecord + assertInstanceOf(GenericRecord.class, result); + } + + @Test + public void testToJavaDefaultValueNullableField() { + // Create nullable string field with default value + // With a non-null defaultValue, the union type must be ["string", null] + HoodieSchema nullableStringSchema = HoodieSchema.createUnion( + HoodieSchema.create(HoodieSchemaType.STRING), HoodieSchema.create(HoodieSchemaType.NULL)); + HoodieSchemaField field = HoodieSchemaField.of("nullableField", + nullableStringSchema, + null, + "defaultValue"); + + Object result = HoodieSchemaUtils.toJavaDefaultValue(field); + assertEquals("defaultValue", result); + } + + @SuppressWarnings("DataFlowIssue") + @Test + public void testToJavaDefaultValueNullFieldArgument() { + // Should throw IllegalArgumentException for null field + assertThrows(IllegalArgumentException.class, () -> HoodieSchemaUtils.toJavaDefaultValue(null)); + } + + @Test + public void testToJavaDefaultValueConsistencyWithAvro() { + // Test that HoodieSchemaUtils.toJavaDefaultValue produces equivalent results to HoodieAvroUtils.toJavaDefaultValue + + // Test with primitive types + HoodieSchemaField stringField = HoodieSchemaField.of("stringField", + HoodieSchema.create(HoodieSchemaType.STRING), + null, + "test"); + + Object hoodieResult = HoodieSchemaUtils.toJavaDefaultValue(stringField); + Object avroResult = HoodieAvroUtils.toJavaDefaultValue(stringField.getAvroField()); + + assertEquals(avroResult, hoodieResult); + + // Test with int + HoodieSchemaField intField = HoodieSchemaField.of("intField", + HoodieSchema.create(HoodieSchemaType.INT), + null, + 42); + + Object hoodieIntResult = HoodieSchemaUtils.toJavaDefaultValue(intField); + Object avroIntResult = HoodieAvroUtils.toJavaDefaultValue(intField.getAvroField()); + + assertEquals(avroIntResult, hoodieIntResult); + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java index b1ff1656157d..d7bedf25b0cc 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java @@ -712,21 +712,21 @@ public abstract class TestHoodieFileGroupReaderBase<T> { List<HoodieRecord> expectedHoodieUnmergedRecords, String[] orderingFields) throws Exception { HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(storageConf, tablePath); - Schema avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema(); - expectedHoodieRecords = getExpectedHoodieRecordsWithOrderingValue(expectedHoodieRecords, metaClient, avroSchema); - expectedHoodieUnmergedRecords = getExpectedHoodieRecordsWithOrderingValue(expectedHoodieUnmergedRecords, metaClient, avroSchema); - List<HoodieTestDataGenerator.RecordIdentifier> expectedRecords = convertHoodieRecords(expectedHoodieRecords, avroSchema, orderingFields); - List<HoodieTestDataGenerator.RecordIdentifier> expectedUnmergedRecords = convertHoodieRecords(expectedHoodieUnmergedRecords, avroSchema, orderingFields); + HoodieSchema schema = new TableSchemaResolver(metaClient).getTableSchema(); + expectedHoodieRecords = getExpectedHoodieRecordsWithOrderingValue(expectedHoodieRecords, metaClient, schema); + expectedHoodieUnmergedRecords = getExpectedHoodieRecordsWithOrderingValue(expectedHoodieUnmergedRecords, metaClient, schema); + List<HoodieTestDataGenerator.RecordIdentifier> expectedRecords = convertHoodieRecords(expectedHoodieRecords, schema, orderingFields); + List<HoodieTestDataGenerator.RecordIdentifier> expectedUnmergedRecords = convertHoodieRecords(expectedHoodieUnmergedRecords, schema, orderingFields); validateOutputFromFileGroupReaderWithExistingRecords( storageConf, tablePath, containsBaseFile, expectedLogFileNum, recordMergeMode, expectedRecords, expectedUnmergedRecords); } - private static List<HoodieRecord> getExpectedHoodieRecordsWithOrderingValue(List<HoodieRecord> expectedHoodieRecords, HoodieTableMetaClient metaClient, Schema avroSchema) { + private static List<HoodieRecord> getExpectedHoodieRecordsWithOrderingValue(List<HoodieRecord> expectedHoodieRecords, HoodieTableMetaClient metaClient, HoodieSchema schema) { return expectedHoodieRecords.stream().map(rec -> { List<String> orderingFields = metaClient.getTableConfig().getOrderingFields(); HoodieAvroIndexedRecord avroRecord = ((HoodieAvroIndexedRecord) rec); - Comparable orderingValue = OrderingValues.create(orderingFields, field -> (Comparable) avroRecord.getColumnValueAsJava(avroSchema, field, new TypedProperties())); + Comparable orderingValue = OrderingValues.create(orderingFields, field -> (Comparable) avroRecord.getColumnValueAsJava(schema.toAvroSchema(), field, new TypedProperties())); return new HoodieAvroIndexedRecord(rec.getKey(), avroRecord.getData(), orderingValue); }).collect(Collectors.toList()); } @@ -936,7 +936,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> { .collect(Collectors.toList()); } - private List<HoodieTestDataGenerator.RecordIdentifier> convertHoodieRecords(List<HoodieRecord> records, Schema schema, String[] orderingFields) { + private List<HoodieTestDataGenerator.RecordIdentifier> convertHoodieRecords(List<HoodieRecord> records, HoodieSchema schema, String[] orderingFields) { return records.stream().map(record -> HoodieTestDataGenerator.RecordIdentifier.fromTripTestPayload((HoodieAvroIndexedRecord) record, orderingFields)).collect(Collectors.toList()); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java index 10bdaa7bdf79..15b7fcb5e8bd 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java @@ -20,6 +20,7 @@ package org.apache.hudi.table.catalog; import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.util.CollectionUtils; @@ -35,7 +36,6 @@ import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.CatalogUtils; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; @@ -259,12 +259,12 @@ public class HoodieCatalog extends AbstractCatalog { final String path = inferTablePath(catalogPathStr, tablePath); Map<String, String> options = TableOptionProperties.loadFromProperties(path, hadoopConf); - final Schema latestSchema = getLatestTableSchema(path); + final HoodieSchema latestSchema = getLatestTableSchema(path); if (latestSchema != null) { List<String> pkColumns = TableOptionProperties.getPkColumns(options); // if the table is initialized from spark, the write schema is nullable for pk columns. DataType tableDataType = DataTypeUtils.ensureColumnsAsNonNullable( - AvroSchemaConverter.convertToDataType(latestSchema), pkColumns); + AvroSchemaConverter.convertToDataType(latestSchema.toAvroSchema()), pkColumns); org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder() .fromRowDataType(tableDataType); final String pkConstraintName = TableOptionProperties.getPkConstraintName(options); @@ -592,11 +592,11 @@ public class HoodieCatalog extends AbstractCatalog { throw new UnsupportedOperationException("alterPartitionColumnStatistics is not implemented."); } - private @Nullable Schema getLatestTableSchema(String path) { + private @Nullable HoodieSchema getLatestTableSchema(String path) { if (path != null && StreamerUtil.tableExists(path, hadoopConf)) { try { HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(path, hadoopConf); - return new TableSchemaResolver(metaClient).getTableAvroSchema(false); // change log mode is not supported now + return new TableSchemaResolver(metaClient).getTableSchema(false); // change log mode is not supported now } catch (Throwable throwable) { log.warn("Failed to resolve the latest table schema.", throwable); // ignored diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java index e796ef174d8f..b2e5072cdbbb 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java @@ -129,7 +129,7 @@ public class CompactionUtil { */ public static void inferChangelogMode(Configuration conf, HoodieTableMetaClient metaClient) throws Exception { TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); - HoodieSchema tableAvroSchema = HoodieSchema.fromAvroSchema(tableSchemaResolver.getTableAvroSchemaFromDataFile()); + HoodieSchema tableAvroSchema = tableSchemaResolver.getTableSchemaFromDataFile(); if (tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD).isPresent()) { conf.set(FlinkOptions.CHANGELOG_ENABLED, true); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index 9020073f2a37..2677de0e4792 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -37,10 +37,10 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.sink.utils.BucketStreamWriteFunctionWrapper; -import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper; import org.apache.hudi.sink.utils.BulkInsertFunctionWrapper; import org.apache.hudi.sink.utils.ConsistentBucketStreamWriteFunctionWrapper; import org.apache.hudi.sink.utils.InsertFunctionWrapper; +import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper; import org.apache.hudi.sink.utils.TestFunctionWrapper; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.format.FormatUtils; @@ -48,7 +48,6 @@ import org.apache.hudi.table.format.InternalSchemaManager; import org.apache.hudi.util.AvroToRowDataConverters; import org.apache.hudi.util.RowDataAvroQueryContexts; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.operators.coordination.OperatorEvent; @@ -977,8 +976,7 @@ public class TestData { HoodieTableMetaClient metaClient = createMetaClient(basePath); HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, HoodieFlinkEngineContext.DEFAULT, metaClient); - Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema(); - HoodieSchema hoodieSchema = HoodieSchema.fromAvroSchema(schema); + HoodieSchema schema = new TableSchemaResolver(metaClient).getTableSchema(); String latestInstant = metaClient.getActiveTimeline().filterCompletedInstants() .lastInstant().map(HoodieInstant::requestedTime).orElse(null); @@ -993,7 +991,7 @@ public class TestData { List<String> readBuffer = new ArrayList<>(); List<FileSlice> fileSlices = table.getSliceView().getLatestMergedFileSlicesBeforeOrOn(partitionDir.getName(), latestInstant).collect(Collectors.toList()); for (FileSlice fileSlice : fileSlices) { - try (ClosableIterator<RowData> rowIterator = getRecordIterator(fileSlice, hoodieSchema, metaClient, config)) { + try (ClosableIterator<RowData> rowIterator = getRecordIterator(fileSlice, schema, metaClient, config)) { while (rowIterator.hasNext()) { RowData rowData = rowIterator.next(); readBuffer.add(filterOutVariables(schema, rowData)); @@ -1048,8 +1046,8 @@ public class TestData { return String.join(",", fields); } - private static String filterOutVariables(Schema schema, RowData record) { - RowDataAvroQueryContexts.RowDataQueryContext queryContext = RowDataAvroQueryContexts.fromAvroSchema(schema); + private static String filterOutVariables(HoodieSchema schema, RowData record) { + RowDataAvroQueryContexts.RowDataQueryContext queryContext = RowDataAvroQueryContexts.fromAvroSchema(schema.toAvroSchema()); List<String> fields = new ArrayList<>(); fields.add(getFieldValue(queryContext, record, "_hoodie_record_key")); fields.add(getFieldValue(queryContext, record, "_hoodie_partition_path")); diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java index a80df526e32b..c150898c8ea6 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java @@ -64,7 +64,6 @@ import static org.apache.hudi.common.testutils.HoodieCommonTestHarness.getDataBl import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -113,42 +112,6 @@ class TestTableSchemaResolver { } } - @Test - void testGetTableSchema() throws Exception { - // Setup: Create mock metaClient and configure behavior - HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class, RETURNS_DEEP_STUBS); - HoodieSchema expectedSchema = getSimpleSchema(); - - // Mock table setup - when(metaClient.getTableConfig().populateMetaFields()).thenReturn(true); - when(metaClient.getTableConfig().getTableCreateSchema()) - .thenReturn(Option.of(expectedSchema.toAvroSchema())); - - when(metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema()) - .thenReturn(Option.empty()); - - // Create resolver and call both methods - TableSchemaResolver resolver = new TableSchemaResolver(metaClient); - - // Test 1: getTableSchema() - should use table config's populateMetaFields (true) - Schema avroSchema = resolver.getTableAvroSchema(); - HoodieSchema hoodieSchema = resolver.getTableSchema(); - assertNotNull(hoodieSchema); - assertEquals(avroSchema, hoodieSchema.getAvroSchema()); - - // Test 2: getTableSchema(true) - explicitly include metadata fields - Schema avroSchemaWithMetadata = resolver.getTableAvroSchema(true); - HoodieSchema hoodieSchemaWithMetadata = resolver.getTableSchema(true); - assertNotNull(hoodieSchemaWithMetadata); - assertEquals(avroSchemaWithMetadata, hoodieSchemaWithMetadata.getAvroSchema()); - - // Test 3: getTableSchema(false) - explicitly exclude metadata fields - Schema avroSchemaWithoutMetadata = resolver.getTableAvroSchema(false); - HoodieSchema hoodieSchemaWithoutMetadata = resolver.getTableSchema(false); - assertNotNull(hoodieSchemaWithoutMetadata); - assertEquals(avroSchemaWithoutMetadata, hoodieSchemaWithoutMetadata.getAvroSchema()); - } - @Test void testReadSchemaFromLogFile() throws IOException, URISyntaxException, InterruptedException { String testDir = initTestDir("read_schema_from_log_file"); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java index a30f690b0f5e..dadbef39a6d2 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java @@ -133,7 +133,7 @@ public class SchemaEvolutionContext { json -> Option.ofNullable(new Schema.Parser().parse(json))); if (avroSchemaOpt == null) { // the code path should only be invoked in tests. - return new TableSchemaResolver(this.metaClient).getTableAvroSchema(); + return new TableSchemaResolver(this.metaClient).getTableSchema().toAvroSchema(); } return avroSchemaOpt.orElseThrow(() -> new HoodieValidationException("The avro schema cache should always be set up together with the internal schema cache")); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java index 714a22f8485d..6ca67da6a756 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java @@ -406,7 +406,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend for (String path : uniqTablePaths) { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(path).setConf(new HadoopStorageConfiguration(job)).build(); TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); - String avroSchema = schemaUtil.getTableAvroSchema().toString(); + String avroSchema = schemaUtil.getTableSchema().toString(); Option<InternalSchema> internalSchema = schemaUtil.getTableInternalSchemaFromCommitMetadata(); if (internalSchema.isPresent()) { LOG.info("Set internal and avro schema cache with path: {}", path); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java index 4d4d8a4da0e8..71728b40e707 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; @@ -42,10 +43,10 @@ import org.apache.hudi.hadoop.RealtimeFileStatus; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils; +import org.apache.hudi.internal.schema.HoodieSchemaException; import org.apache.hudi.metadata.HoodieTableMetadataUtil; import org.apache.hudi.storage.StoragePathInfo; -import org.apache.avro.Schema; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -419,14 +420,19 @@ public class HoodieMergeOnReadTableInputFormat extends HoodieCopyOnWriteTableInp } TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); try { - Schema schema = tableSchemaResolver.getTableAvroSchema(); - boolean isNonPartitionedKeyGen = StringUtils.isNullOrEmpty(tableConfig.getPartitionFieldProp()); + HoodieSchema schema = tableSchemaResolver.getTableSchema(); + String partitionFieldProp = tableConfig.getPartitionFieldProp(); + boolean isNonPartitionedKeyGen = StringUtils.isNullOrEmpty(partitionFieldProp); return Option.of( new HoodieVirtualKeyInfo( tableConfig.getRecordKeyFieldProp(), - isNonPartitionedKeyGen ? Option.empty() : Option.of(tableConfig.getPartitionFieldProp()), - schema.getField(tableConfig.getRecordKeyFieldProp()).pos(), - isNonPartitionedKeyGen ? Option.empty() : Option.of(schema.getField(tableConfig.getPartitionFieldProp()).pos()))); + isNonPartitionedKeyGen ? Option.empty() : Option.of(partitionFieldProp), + schema.getField(tableConfig.getRecordKeyFieldProp()) + .orElseThrow(() -> new HoodieSchemaException("Field: " + partitionFieldProp + " not found")) + .pos(), + isNonPartitionedKeyGen ? Option.empty() : Option.of(schema.getField(partitionFieldProp) + .orElseThrow(() -> new HoodieSchemaException("Field: " + partitionFieldProp + " not found")) + .pos()))); } catch (Exception exception) { throw new HoodieException("Fetching table schema failed with exception ", exception); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala index dcc002adfb6f..71c0ac4d379a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala @@ -52,7 +52,7 @@ class BucketIndexSupport(spark: SparkSession, HoodieSparkKeyGeneratorFactory.createKeyGenerator(props) } - private lazy val avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema(false) + private lazy val schema = new TableSchemaResolver(metaClient).getTableSchema(false) override def getIndexName: String = BucketIndexSupport.INDEX_NAME @@ -133,7 +133,7 @@ class BucketIndexSupport(spark: SparkSession, if (hashValuePairs.size != indexBucketHashFields.size) { matchedBuckets.setUntil(numBuckets) } else { - val record = new GenericData.Record(avroSchema) + val record = new GenericData.Record(schema.toAvroSchema) hashValuePairs.foreach(p => record.put(p.getKey, p.getValue)) val hoodieKey = keyGenerator.getKey(record) matchedBuckets.set(BucketIdentifier.getBucketId(hoodieKey.getRecordKey, indexBucketHashFieldsOpt.get, numBuckets)) @@ -153,7 +153,7 @@ class BucketIndexSupport(spark: SparkSession, private def getBucketsBySingleHashFields(expr: Expression, bucketColumnName: String, numBuckets: Int): BitSet = { def getBucketNumber(attr: Attribute, v: Any): Int = { - val record = new GenericData.Record(avroSchema) + val record = new GenericData.Record(schema.toAvroSchema) record.put(attr.name, v) val hoodieKey = keyGenerator.getKey(record) BucketIdentifier.getBucketId(hoodieKey.getRecordKey, indexBucketHashFieldsOpt.get, numBuckets) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala index e2c39c3938bb..1b40b0fe5701 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala @@ -50,7 +50,7 @@ object HoodieCLIUtils extends Logging { val metaClient = HoodieTableMetaClient.builder().setBasePath(basePath) .setConf(HadoopFSUtils.getStorageConf(sparkSession.sessionState.newHadoopConf())).build() val schemaUtil = new TableSchemaResolver(metaClient) - val schemaStr = schemaUtil.getTableAvroSchema(false).toString + val schemaStr = schemaUtil.getTableSchema(false).toString // If tableName is provided, we need to add catalog props val catalogProps = tableName match { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index a9fb6a287639..240f3f9b294b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -446,7 +446,7 @@ class HoodieSparkSqlWriterInternal { } // Issue the delete. - val schemaStr = new TableSchemaResolver(tableMetaClient).getTableAvroSchema(false).toString + val schemaStr = new TableSchemaResolver(tableMetaClient).getTableSchema(false).toString val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, schemaStr, path, tblName, parameters.asJava)) @@ -683,8 +683,7 @@ class HoodieSparkSqlWriterInternal { private def getLatestTableSchema(tableMetaClient: HoodieTableMetaClient, schemaFromCatalog: Option[HoodieSchema]): Option[HoodieSchema] = { val tableSchemaResolver = new TableSchemaResolver(tableMetaClient) - toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false)) - .map(HoodieSchema.fromAvroSchema) + toScalaOption(tableSchemaResolver.getTableSchemaFromLatestCommit(false)) .orElse(schemaFromCatalog) } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala index 4a79350b35c3..74b04503a563 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala @@ -233,7 +233,7 @@ class RunClusteringProcedure extends BaseProcedure } val tableSchemaResolver = new TableSchemaResolver(metaClient) - val fields = tableSchemaResolver.getTableAvroSchema(false) + val fields = tableSchemaResolver.getTableSchema(false) .getFields.asScala.map(_.name().toLowerCase) orderColumns.split(",").foreach(col => { if (!fields.contains(col.toLowerCase)) { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala index 1ffdd0d80cad..5257f3c20bda 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala @@ -74,7 +74,7 @@ class ShowHoodieLogFileMetadataProcedure extends BaseProcedure with ProcedureBui logFilePath => { val statuses = storage.listDirectEntries(new StoragePath(logFilePath)) val schema = TableSchemaResolver.readSchemaFromLogFile(storage, new StoragePath(logFilePath)) - val reader = HoodieLogFormat.newReader(storage, new HoodieLogFile(statuses.get(0).getPath), HoodieSchema.fromAvroSchema(schema)) + val reader = HoodieLogFormat.newReader(storage, new HoodieLogFile(statuses.get(0).getPath), schema) // read the avro blocks while (reader.hasNext) { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala index 58c4fa1acb43..4b9e0e83c7d5 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala @@ -70,7 +70,7 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure with ProcedureBuil ValidationUtils.checkArgument(logFilePaths.nonEmpty, "There is no log file") val allRecords: java.util.List[IndexedRecord] = new java.util.ArrayList[IndexedRecord] if (merge) { - val schema = HoodieSchema.fromAvroSchema(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage, new StoragePath(logFilePaths.last)))) + val schema = Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage, new StoragePath(logFilePaths.last))) val scanner = HoodieMergedLogRecordScanner.newBuilder .withStorage(storage) .withBasePath(basePath) @@ -94,7 +94,7 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure with ProcedureBuil logFilePaths.toStream.takeWhile(_ => allRecords.size() < limit).foreach { logFilePath => { val schema = Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage, new StoragePath(logFilePath))) - val reader = HoodieLogFormat.newReader(storage, new HoodieLogFile(logFilePath), HoodieSchema.fromAvroSchema(schema)) + val reader = HoodieLogFormat.newReader(storage, new HoodieLogFile(logFilePath), schema) while (reader.hasNext) { val block = reader.next() block match { diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java index a4e051d6f23e..5aab9557aa2b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java @@ -319,7 +319,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase { // Validate table schema in the end. TableSchemaResolver r = new TableSchemaResolver(metaClient); // Assert no table schema is defined. - assertThrows(HoodieSchemaNotFoundException.class, () -> r.getTableAvroSchema(false)); + assertThrows(HoodieSchemaNotFoundException.class, () -> r.getTableSchema(false)); } // Start txn 002 altering table schema diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java index 517ac117e51b..486436c7ab58 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java @@ -122,7 +122,6 @@ import org.apache.hudi.table.upgrade.UpgradeDowngrade; import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hudi.testutils.MetadataMergeWriteStatus; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.spark.api.java.JavaRDD; @@ -1403,7 +1402,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { HoodieStorage storage = table.getStorage(); for (HoodieLogFile logFile : logFiles) { List<StoragePathInfo> pathInfoList = storage.listDirectEntries(logFile.getPath()); - Schema writerSchema = + HoodieSchema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage, logFile.getPath()); if (writerSchema == null) { // not a data block @@ -1411,7 +1410,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { } try (HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(storage, - new HoodieLogFile(pathInfoList.get(0).getPath()), HoodieSchema.fromAvroSchema(writerSchema))) { + new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) { while (logFileReader.hasNext()) { HoodieLogBlock logBlock = logFileReader.next(); if (logBlock instanceof HoodieDataBlock) { @@ -3954,7 +3953,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { private static void verifyMetadataColumnStatsRecords(HoodieStorage storage, List<HoodieLogFile> logFiles) throws IOException { for (HoodieLogFile logFile : logFiles) { List<StoragePathInfo> pathInfoList = storage.listDirectEntries(logFile.getPath()); - Schema writerSchema = + HoodieSchema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage, logFile.getPath()); if (writerSchema == null) { // not a data block @@ -3962,7 +3961,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { } try (HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(storage, - new HoodieLogFile(pathInfoList.get(0).getPath()), HoodieSchema.fromAvroSchema(writerSchema))) { + new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) { while (logFileReader.hasNext()) { HoodieLogBlock logBlock = logFileReader.next(); if (logBlock instanceof HoodieDataBlock) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterWithTestFormat.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterWithTestFormat.scala index f81209820e5c..a843d66c2750 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterWithTestFormat.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterWithTestFormat.scala @@ -685,7 +685,7 @@ class TestHoodieSparkSqlWriterWithTestFormat extends HoodieSparkWriterTestBase { private def fetchActualSchema(): HoodieSchema = { val tableMetaClient = createMetaClient(spark, tempBasePath) - new TableSchemaResolver(tableMetaClient).getTableSchema() + new TableSchemaResolver(tableMetaClient).getTableSchema(false) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala index 26fa1c722344..a1884622a056 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala @@ -17,7 +17,7 @@ package org.apache.hudi.functional -import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, ScalaAssertionSupport, SparkAdapterSupport} +import org.apache.hudi.{DataSourceWriteOptions, HoodieSchemaConversionUtils, ScalaAssertionSupport, SparkAdapterSupport} import org.apache.hudi.HoodieConversionUtils.toJavaOption import org.apache.hudi.common.config.{HoodieMetadataConfig, RecordMergeMode} import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType} @@ -125,7 +125,7 @@ class TestBasicSchemaEvolution extends HoodieSparkClientTestBase with ScalaAsser tableMetaClient.reloadActiveTimeline() val resolver = new TableSchemaResolver(tableMetaClient) - val latestTableSchema = AvroConversionUtils.convertAvroSchemaToStructType(resolver.getTableAvroSchema(false)) + val latestTableSchema = HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(resolver.getTableSchema(false)) val df = spark.read.format("org.apache.hudi") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index a49591f429b9..c770f2b0d14f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -696,7 +696,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup val tableMetaClient = createMetaClient(spark, basePath) assertFalse(tableMetaClient.getArchivedTimeline.empty()) - val actualSchema = new TableSchemaResolver(tableMetaClient).getTableAvroSchema(false) + val actualSchema = new TableSchemaResolver(tableMetaClient).getTableSchema(false) val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(CommonOptionUtils.commonOpts(HoodieWriteConfig.TBL_NAME.key)) spark.sparkContext.getConf.registerKryoClasses( Array(classOf[org.apache.avro.generic.GenericData], diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala index 475a5bf55d23..c3888baf7113 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala @@ -370,7 +370,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase with SparkDatasetMix assertEquals(10, spark.read.format("hudi").load(basePath).count()) metaClient.reloadActiveTimeline() val tableSchemaResolver = new TableSchemaResolver(metaClient) - val latestTableSchemaFromCommitMetadata = tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false) + val latestTableSchemaFromCommitMetadata = tableSchemaResolver.getTableSchemaFromLatestCommit(false) if (failAndDoRollback) { val updatesToFail = dataGen.generateUniqueUpdates("003", 3) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala index e4eb32814875..02fbb1bb6da9 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala @@ -279,7 +279,7 @@ class TestTimeTravelQuery extends HoodieSparkClientTestBase with ScalaAssertionS .select("id", "name", "value", "version") .take(1)(0) assertEquals(Row(1, "a1", 10, 1000), result1) - val schema1 = tableSchemaResolver.getTableAvroSchema(firstCommit) + val schema1 = tableSchemaResolver.getTableSchema(firstCommit) assertNull(schema1.getField("year")) assertNull(schema1.getField("month")) @@ -290,7 +290,7 @@ class TestTimeTravelQuery extends HoodieSparkClientTestBase with ScalaAssertionS .select("id", "name", "value", "version", "year") .take(1)(0) assertEquals(Row(1, "a1", 12, 1001, "2022"), result2) - val schema2 = tableSchemaResolver.getTableAvroSchema(secondCommit) + val schema2 = tableSchemaResolver.getTableSchema(secondCommit) assertNotNull(schema2.getField("year")) assertNull(schema2.getField("month")) @@ -301,7 +301,7 @@ class TestTimeTravelQuery extends HoodieSparkClientTestBase with ScalaAssertionS .select("id", "name", "value", "version", "year", "month") .take(1)(0) assertEquals(Row(1, "a1", 13, 1002, "2022", "08"), result3) - val schema3 = tableSchemaResolver.getTableAvroSchema(thirdCommit) + val schema3 = tableSchemaResolver.getTableSchema(thirdCommit) assertNotNull(schema3.getField("year")) assertNotNull(schema3.getField("month")) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala index 87d5ae5bb666..9a3af922bc2c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala @@ -301,7 +301,7 @@ class TestAlterTable extends HoodieSparkSqlTestBase { def validateTableSchema(tablePath: String): Unit = { val metaClient = createMetaClient(spark, tablePath) - val schema = new TableSchemaResolver(metaClient).getTableAvroSchema(false) + val schema = new TableSchemaResolver(metaClient).getTableSchema(false) assertFalse(schema.getFields.asScala.exists(f => HoodieRecord.HOODIE_META_COLUMNS.contains(f.name())), "Metadata fields should be excluded from the table schema") } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala index 537a01e9ec80..5d592e213712 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala @@ -735,10 +735,10 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase { .setConf(HoodieTestUtils.getDefaultStorageConf) .build() val schemaResolver = new TableSchemaResolver(metaClient) - val tableSchema = schemaResolver.getTableAvroSchema(false) + val tableSchema = schemaResolver.getTableSchema(false) val field = tableSchema.getField(fieldName) - assertNotNull(field, s"$fieldName field should exist in table schema") - val fieldType = field.schema() + assertTrue(field.isPresent, s"$fieldName field should exist in table schema") + val fieldType = field.get().schema() assertTrue( fieldType.toString.contains(expectedType), s"$fieldName field should be of type $expectedType, but got: ${fieldType.toString}" diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java index 6393ac95bcb5..db1955ab8397 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java @@ -446,10 +446,10 @@ public class HoodieHiveSyncClient extends HoodieSyncClient { @Override public List<FieldSchema> getStorageFieldSchemas() { try { - return tableSchemaResolver.getTableAvroSchema(false) + return tableSchemaResolver.getTableSchema(false) .getFields() .stream() - .map(f -> new FieldSchema(f.name(), f.schema().getType().getName(), f.doc())) + .map(f -> new FieldSchema(f.name(), f.schema().getType().toAvroType().getName(), f.doc())) .collect(Collectors.toList()); } catch (Exception e) { throw new HoodieHiveSyncException("Failed to get field schemas from storage : ", e); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java index bb4624c231d7..f9088da720dd 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java @@ -22,6 +22,7 @@ import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -35,7 +36,6 @@ import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionS import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; -import org.apache.avro.Schema; import org.apache.hadoop.fs.FileSystem; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -286,7 +286,7 @@ public class HoodieCompactor { private String getSchemaFromLatestInstant() throws Exception { TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); - Schema schema = schemaUtil.getTableAvroSchema(false); + HoodieSchema schema = schemaUtil.getTableSchema(false); return schema.toString(); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index b17116374bff..05b89b75535a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -1655,8 +1655,7 @@ public class HoodieMetadataTableValidator implements Serializable { for (String logFilePathStr : logFilePathSet) { HoodieLogFormat.Reader reader = null; try { - HoodieSchema readerSchema = - HoodieSchema.fromAvroSchema(TableSchemaResolver.readSchemaFromLogFile(storage, new StoragePath(logFilePathStr))); + HoodieSchema readerSchema = TableSchemaResolver.readSchemaFromLogFile(storage, new StoragePath(logFilePathStr)); if (readerSchema == null) { LOG.warn("Cannot read schema from log file {}. Skip the check as it's likely being written by an inflight instant.", logFilePathStr); continue; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index e330b821cc51..a6a9985569c6 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -77,7 +77,6 @@ import org.apache.hudi.utilities.transform.ChainedTransformer; import org.apache.hudi.utilities.transform.ErrorTableAwareChainedTransformer; import org.apache.hudi.utilities.transform.Transformer; -import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -647,7 +646,7 @@ public class UtilHelpers { public static String getSchemaFromLatestInstant(HoodieTableMetaClient metaClient) throws Exception { TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient); - Schema schema = schemaResolver.getTableAvroSchema(false); + HoodieSchema schema = schemaResolver.getTableSchema(false); return schema.toString(); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 85746e3ea7f5..c2e6ab061c8e 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -50,6 +50,9 @@ import org.apache.hudi.common.model.PartialUpdateAvroPayload; import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchema.TimePrecision; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; @@ -126,7 +129,6 @@ import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer; import org.apache.hudi.utilities.transform.Transformer; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; @@ -662,7 +664,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { TableSchemaResolver tableSchemaResolver = new TableSchemaResolver( HoodieTestUtils.createMetaClient(storage, tableBasePath)); - Schema tableSchema = tableSchemaResolver.getTableAvroSchema(false); + HoodieSchema tableSchema = tableSchemaResolver.getTableSchema(false); assertNotNull(tableSchema); Schema expectedSchema; @@ -700,8 +702,12 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { TestHelpers.assertCommitMetadata("00000", tableBasePath, 1); TableSchemaResolver tableSchemaResolver = new TableSchemaResolver( HoodieTestUtils.createMetaClient(storage, tableBasePath)); - Schema tableSchema = tableSchemaResolver.getTableAvroSchema(false); - assertEquals("timestamp-millis", tableSchema.getField("current_ts").schema().getLogicalType().getName()); + HoodieSchema tableSchema = tableSchemaResolver.getTableSchema(false); + Option<HoodieSchemaField> currentTsFieldOpt = tableSchema.getField("current_ts"); + assertTrue(currentTsFieldOpt.isPresent()); + HoodieSchema.Timestamp currentTsSchema = (HoodieSchema.Timestamp) currentTsFieldOpt.get().schema(); + assertEquals(HoodieSchemaType.TIMESTAMP, currentTsSchema.getType()); + assertEquals(TimePrecision.MILLIS, currentTsSchema.getPrecision()); assertEquals(1000, sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(tableBasePath).filter("current_ts > '1980-01-01'").count()); cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, Collections.singletonList(TestIdentityTransformer.class.getName()), @@ -720,8 +726,12 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { TestHelpers.assertCommitMetadata("00001", tableBasePath, 2); tableSchemaResolver = new TableSchemaResolver( HoodieTestUtils.createMetaClient(storage, tableBasePath)); - tableSchema = tableSchemaResolver.getTableAvroSchema(false); - assertEquals("timestamp-millis", tableSchema.getField("current_ts").schema().getLogicalType().getName()); + tableSchema = tableSchemaResolver.getTableSchema(false); + currentTsFieldOpt = tableSchema.getField("current_ts"); + assertTrue(currentTsFieldOpt.isPresent()); + currentTsSchema = (HoodieSchema.Timestamp) currentTsFieldOpt.get().schema(); + assertEquals(HoodieSchemaType.TIMESTAMP, currentTsSchema.getType()); + assertEquals(TimePrecision.MILLIS, currentTsSchema.getPrecision()); sqlContext.clearCache(); assertEquals(1450, sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(tableBasePath).filter("current_ts > '1980-01-01'").count()); assertEquals(1450, sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(tableBasePath).filter("current_ts < '2080-01-01'").count()); @@ -761,7 +771,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { TestHelpers.assertCommitMetadata("00000", tableBasePath, 1); TableSchemaResolver tableSchemaResolver = new TableSchemaResolver( HoodieTestUtils.createMetaClient(storage, tableBasePath)); - Schema tableSchema = tableSchemaResolver.getTableAvroSchema(false); + HoodieSchema tableSchema = tableSchemaResolver.getTableSchema(false); Map<String, String> hudiOpts = new HashMap<>(); hudiOpts.put("hoodie.datasource.write.recordkey.field", "id"); logicalAssertions(tableSchema, tableBasePath, hudiOpts, HoodieTableVersion.current().versionCode()); @@ -781,7 +791,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { TestHelpers.assertCommitMetadata("00001", tableBasePath, 2); tableSchemaResolver = new TableSchemaResolver( HoodieTestUtils.createMetaClient(storage, tableBasePath)); - tableSchema = tableSchemaResolver.getTableAvroSchema(false); + tableSchema = tableSchemaResolver.getTableSchema(false); logicalAssertions(tableSchema, tableBasePath, hudiOpts, HoodieTableVersion.current().versionCode()); } finally { defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName(); @@ -826,7 +836,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { TestHelpers.assertCommitMetadata(topicName + ",0:500,1:500", tableBasePath, 1); TableSchemaResolver tableSchemaResolver = new TableSchemaResolver( HoodieTestUtils.createMetaClient(storage, tableBasePath)); - Schema tableSchema = tableSchemaResolver.getTableAvroSchema(false); + HoodieSchema tableSchema = tableSchemaResolver.getTableSchema(false); Map<String, String> hudiOpts = new HashMap<>(); hudiOpts.put("hoodie.datasource.write.recordkey.field", "id"); logicalAssertions(tableSchema, tableBasePath, hudiOpts, HoodieTableVersion.EIGHT.versionCode()); @@ -842,7 +852,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { TestHelpers.assertCommitMetadata(topicName + ",0:1000,1:1000", tableBasePath, 2); tableSchemaResolver = new TableSchemaResolver( HoodieTestUtils.createMetaClient(storage, tableBasePath)); - tableSchema = tableSchemaResolver.getTableAvroSchema(false); + tableSchema = tableSchemaResolver.getTableSchema(false); logicalAssertions(tableSchema, tableBasePath, hudiOpts, HoodieTableVersion.EIGHT.versionCode()); } finally { defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName(); @@ -884,7 +894,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { TableSchemaResolver tableSchemaResolver = new TableSchemaResolver( HoodieTestUtils.createMetaClient(storage, tableBasePath)); - Schema tableSchema = tableSchemaResolver.getTableAvroSchema(false); + HoodieSchema tableSchema = tableSchemaResolver.getTableSchema(false); Map<String, String> hudiOpts = new HashMap<>(); hudiOpts.put("hoodie.datasource.write.recordkey.field", "id"); logicalAssertions(tableSchema, tableBasePath, hudiOpts, version.versionCode()); @@ -910,34 +920,63 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { }); } - private void logicalAssertions(Schema tableSchema, String tableBasePath, Map<String, String> hudiOpts, int tableVersion) { + private void logicalAssertions(HoodieSchema tableSchema, String tableBasePath, Map<String, String> hudiOpts, int tableVersion) { if (tableVersion > 8) { - assertEquals("timestamp-millis", tableSchema.getField("ts_millis").schema().getLogicalType().getName()); + Option<HoodieSchemaField> tsMillisFieldOpt = tableSchema.getField("ts_millis"); + assertTrue(tsMillisFieldOpt.isPresent()); + HoodieSchema.Timestamp tsMillisFieldSchema = (HoodieSchema.Timestamp) tsMillisFieldOpt.get().schema(); + assertEquals(HoodieSchemaType.TIMESTAMP, tsMillisFieldSchema.getType()); + assertEquals(TimePrecision.MILLIS, tsMillisFieldSchema.getPrecision()); } - assertEquals("timestamp-micros", tableSchema.getField("ts_micros").schema().getLogicalType().getName()); + Option<HoodieSchemaField> tsMicrosFieldOpt = tableSchema.getField("ts_micros"); + assertTrue(tsMicrosFieldOpt.isPresent()); + HoodieSchema.Timestamp tsMicrosFieldSchema = (HoodieSchema.Timestamp) tsMicrosFieldOpt.get().schema(); + assertEquals(HoodieSchemaType.TIMESTAMP, tsMicrosFieldSchema.getType()); + assertEquals(TimePrecision.MICROS, tsMicrosFieldSchema.getPrecision()); if (tableVersion > 8 && !HoodieSparkUtils.isSpark3_3()) { - assertEquals("local-timestamp-millis", tableSchema.getField("local_ts_millis").schema().getLogicalType().getName()); - assertEquals("local-timestamp-micros", tableSchema.getField("local_ts_micros").schema().getLogicalType().getName()); + Option<HoodieSchemaField> localTsMillisFieldOpt = tableSchema.getField("local_ts_millis"); + assertTrue(localTsMillisFieldOpt.isPresent()); + HoodieSchema.Timestamp localTsMillisFieldSchema = (HoodieSchema.Timestamp) localTsMillisFieldOpt.get().schema(); + assertEquals(HoodieSchemaType.TIMESTAMP, localTsMillisFieldSchema.getType()); + assertEquals(TimePrecision.MILLIS, localTsMillisFieldSchema.getPrecision()); + + Option<HoodieSchemaField> localTsMicrosFieldOpt = tableSchema.getField("local_ts_micros"); + assertTrue(localTsMicrosFieldOpt.isPresent()); + HoodieSchema.Timestamp localTsMicrosFieldSchema = (HoodieSchema.Timestamp) localTsMicrosFieldOpt.get().schema(); + assertEquals(HoodieSchemaType.TIMESTAMP, localTsMicrosFieldSchema.getType()); + assertEquals(TimePrecision.MICROS, localTsMicrosFieldSchema.getPrecision()); } - - assertEquals("date", tableSchema.getField("event_date").schema().getLogicalType().getName()); + Option<HoodieSchemaField> eventTimeFieldOpt = tableSchema.getField("event_time"); + assertTrue(eventTimeFieldOpt.isPresent()); + assertEquals(HoodieSchemaType.DATE, eventTimeFieldOpt.get().schema().getType()); if (tableVersion > 8) { - assertEquals("bytes", tableSchema.getField("dec_plain_large").schema().getType().getName()); - assertEquals("decimal", tableSchema.getField("dec_plain_large").schema().getLogicalType().getName()); - assertEquals(20, ((LogicalTypes.Decimal) tableSchema.getField("dec_plain_large").schema().getLogicalType()).getPrecision()); - assertEquals(10, ((LogicalTypes.Decimal) tableSchema.getField("dec_plain_large").schema().getLogicalType()).getScale()); + Option<HoodieSchemaField> decPlainLargeFieldOpt = tableSchema.getField("dec_plain_large"); + assertTrue(decPlainLargeFieldOpt.isPresent()); + HoodieSchema.Decimal decPlainLargeSchema = (HoodieSchema.Decimal) decPlainLargeFieldOpt.get().schema(); + // decimal backed by bytes (are not fixed length byte arrays) + assertFalse(decPlainLargeSchema.isFixed()); + assertEquals(HoodieSchemaType.DECIMAL, decPlainLargeSchema.getType()); + assertEquals(20, decPlainLargeSchema.getPrecision()); + assertEquals(10, decPlainLargeSchema.getScale()); } - assertEquals("fixed", tableSchema.getField("dec_fixed_small").schema().getType().getName()); - assertEquals(3, tableSchema.getField("dec_fixed_small").schema().getFixedSize()); - assertEquals("decimal", tableSchema.getField("dec_fixed_small").schema().getLogicalType().getName()); - assertEquals(5, ((LogicalTypes.Decimal) tableSchema.getField("dec_fixed_small").schema().getLogicalType()).getPrecision()); - assertEquals(2, ((LogicalTypes.Decimal) tableSchema.getField("dec_fixed_small").schema().getLogicalType()).getScale()); - assertEquals("fixed", tableSchema.getField("dec_fixed_large").schema().getType().getName()); - assertEquals(8, tableSchema.getField("dec_fixed_large").schema().getFixedSize()); - assertEquals("decimal", tableSchema.getField("dec_fixed_large").schema().getLogicalType().getName()); - assertEquals(18, ((LogicalTypes.Decimal) tableSchema.getField("dec_fixed_large").schema().getLogicalType()).getPrecision()); - assertEquals(9, ((LogicalTypes.Decimal) tableSchema.getField("dec_fixed_large").schema().getLogicalType()).getScale()); + Option<HoodieSchemaField> decFixedSmallOpt = tableSchema.getField("dec_fixed_small"); + assertTrue(decFixedSmallOpt.isPresent()); + HoodieSchema.Decimal decFixedSmallSchema = (HoodieSchema.Decimal) decFixedSmallOpt.get().schema(); + assertTrue(decFixedSmallSchema.isFixed()); + assertEquals(3, decFixedSmallSchema.getFixedSize()); + assertEquals(HoodieSchemaType.DECIMAL, decFixedSmallSchema.getType()); + assertEquals(5, decFixedSmallSchema.getPrecision()); + assertEquals(2, decFixedSmallSchema.getScale()); + + Option<HoodieSchemaField> decFixedLargeOpt = tableSchema.getField("dec_fixed_large"); + assertTrue(decFixedLargeOpt.isPresent()); + HoodieSchema.Decimal decFixedLargeSchema = (HoodieSchema.Decimal) decFixedLargeOpt.get().schema(); + assertTrue(decFixedLargeSchema.isFixed()); + assertEquals(8, decFixedLargeSchema.getFixedSize()); + assertEquals(HoodieSchemaType.DECIMAL, decFixedLargeSchema.getType()); + assertEquals(18, decFixedLargeSchema.getPrecision()); + assertEquals(9, decFixedLargeSchema.getScale()); sqlContext.clearCache(); Dataset<Row> df = sqlContext.read() @@ -2300,7 +2339,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { // validate table schema fetches valid schema from last but one commit. TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); - assertNotEquals(tableSchemaResolver.getTableAvroSchema(), Schema.create(Schema.Type.NULL).toString()); + assertNotEquals(tableSchemaResolver.getTableSchema(), Schema.create(Schema.Type.NULL).toString()); // schema from latest commit and last but one commit should match compareLatestTwoSchemas(metaClient); prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET, @@ -2856,7 +2895,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { // validate table schema fetches valid schema from last but one commit. TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); - assertNotEquals(tableSchemaResolver.getTableAvroSchema(), Schema.create(Schema.Type.NULL).toString()); + assertNotEquals(tableSchemaResolver.getTableSchema(), Schema.create(Schema.Type.NULL).toString()); // schema from latest commit and last but one commit should match compareLatestTwoSchemas(metaClient); prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET, @@ -3208,7 +3247,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { // validate schema is set in commit even if target schema returns null on empty batch TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); HoodieInstant secondCommit = metaClient.reloadActiveTimeline().lastInstant().get(); - Schema lastCommitSchema = tableSchemaResolver.getTableAvroSchema(secondCommit, true); + HoodieSchema lastCommitSchema = tableSchemaResolver.getTableSchema(secondCommit, true); assertNotEquals(firstCommit, secondCommit); assertNotEquals(lastCommitSchema, Schema.create(Schema.Type.NULL)); deltaStreamer2.shutdownGracefully(); @@ -3755,10 +3794,10 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { TableSchemaResolver tableSchemaResolver = new TableSchemaResolver( HoodieTestUtils.createMetaClient(storage, tableBasePath)); // get schema from data file written in the latest commit - Schema tableSchema = tableSchemaResolver.getTableAvroSchemaFromDataFile(); + HoodieSchema tableSchema = tableSchemaResolver.getTableSchemaFromDataFile(); assertNotNull(tableSchema); - List<String> tableFields = tableSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList()); + List<String> tableFields = tableSchema.getFields().stream().map(HoodieSchemaField::name).collect(Collectors.toList()); // now assert that the partition column is not in the target schema assertFalse(tableFields.contains("partition_path")); UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java index b11e21d4bbe3..096afadc3f2f 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java @@ -22,6 +22,7 @@ package org.apache.hudi.utilities.deltastreamer; import org.apache.hudi.TestHoodieSparkUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; import org.apache.hudi.common.schema.HoodieSchemaType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -337,8 +338,10 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick extends TestHoodieDelta metaClient.reloadActiveTimeline(); Option<HoodieSchema> latestTableSchemaOpt = UtilHelpers.getLatestTableSchema(jsc, storage, dsConfig.targetBasePath, metaClient); - assertTrue(latestTableSchemaOpt.get().getField("rider").get().schema().getTypes() - .stream().anyMatch(t -> t.getType() == HoodieSchemaType.STRING)); + Option<HoodieSchemaField> riderFieldOpt = latestTableSchemaOpt.get().getField("rider"); + assertTrue(riderFieldOpt.isPresent()); + assertTrue(riderFieldOpt.get().schema().getTypes() + .stream().anyMatch(t -> HoodieSchemaType.STRING == t.getType())); assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant) > 0); } @@ -412,8 +415,10 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick extends TestHoodieDelta metaClient.reloadActiveTimeline(); Option<HoodieSchema> latestTableSchemaOpt = UtilHelpers.getLatestTableSchema(jsc, storage, dsConfig.targetBasePath, metaClient); - assertTrue(latestTableSchemaOpt.get().getField("rider").get().schema().getTypes() - .stream().anyMatch(t -> t.getType() == HoodieSchemaType.STRING)); + Option<HoodieSchemaField> riderFieldOpt = latestTableSchemaOpt.get().getField("rider"); + assertTrue(riderFieldOpt.isPresent()); + assertTrue(riderFieldOpt.get().schema().getTypes() + .stream().anyMatch(t -> HoodieSchemaType.STRING == t.getType())); assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant) > 0); } catch (MissingSchemaFieldException e) { assertFalse(allowNullForDeletedCols || targetSchemaSameAsTableSchema); @@ -492,8 +497,10 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick extends TestHoodieDelta metaClient.reloadActiveTimeline(); Option<HoodieSchema> latestTableSchemaOpt = UtilHelpers.getLatestTableSchema(jsc, storage, dsConfig.targetBasePath, metaClient); - assertTrue(latestTableSchemaOpt.get().getField("rider").get().schema().getTypes() - .stream().anyMatch(t -> t.getType() == HoodieSchemaType.STRING)); + Option<HoodieSchemaField> riderFieldOpt = latestTableSchemaOpt.get().getField("rider"); + assertTrue(riderFieldOpt.isPresent()); + assertTrue(riderFieldOpt.get().schema().getTypes() + .stream().anyMatch(t -> HoodieSchemaType.STRING == t.getType())); assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant) > 0); } catch (Exception e) { assertTrue(containsErrorMessage(e, "has no default value and is non-nullable", @@ -572,9 +579,10 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick extends TestHoodieDelta metaClient.reloadActiveTimeline(); Option<HoodieSchema> latestTableSchemaOpt = UtilHelpers.getLatestTableSchema(jsc, storage, dsConfig.targetBasePath, metaClient); - assertTrue(latestTableSchemaOpt.get().getField("distance_in_meters").get().schema().getTypes() - .stream().anyMatch(t -> t.getType() == HoodieSchemaType.DOUBLE), - latestTableSchemaOpt.get().getField("distance_in_meters").get().schema().toString()); + Option<HoodieSchemaField> distanceInMetersFieldOpt = latestTableSchemaOpt.get().getField("distance_in_meters"); + assertTrue(distanceInMetersFieldOpt.isPresent()); + assertTrue(distanceInMetersFieldOpt.get().schema().getTypes() + .stream().anyMatch(t -> HoodieSchemaType.DOUBLE == t.getType())); assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant) > 0); } catch (Exception e) { assertTrue(targetSchemaSameAsTableSchema); @@ -660,8 +668,10 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick extends TestHoodieDelta metaClient.reloadActiveTimeline(); Option<HoodieSchema> latestTableSchemaOpt = UtilHelpers.getLatestTableSchema(jsc, storage, dsConfig.targetBasePath, metaClient); - assertTrue(latestTableSchemaOpt.get().getField("current_ts").get().schema().getTypes() - .stream().anyMatch(t -> t.getType() == HoodieSchemaType.LONG)); + Option<HoodieSchemaField> currentTsFieldOpt = latestTableSchemaOpt.get().getField("current_ts"); + assertTrue(currentTsFieldOpt.isPresent()); + assertTrue(currentTsFieldOpt.get().schema().getTypes() + .stream().anyMatch(t -> HoodieSchemaType.LONG == t.getType())); assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant) > 0); }
