This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 29b0a479cac6 feat(schema): Phase 18 - HoodieAvroUtils removal (Part 3)
(#17659)
29b0a479cac6 is described below
commit 29b0a479cac67fb07b756babf26f77dc0bc7f78c
Author: voonhous <[email protected]>
AuthorDate: Thu Jan 15 03:47:15 2026 +0800
feat(schema): Phase 18 - HoodieAvroUtils removal (Part 3) (#17659)
---
.../hudi/aws/sync/AWSGlueCatalogSyncClient.java | 6 +-
.../hudi/cli/commands/HoodieLogFileCommand.java | 13 +-
.../org/apache/hudi/cli/commands/TableCommand.java | 4 +-
...ConcurrentSchemaEvolutionTableSchemaGetter.java | 3 +-
.../hudi/table/action/compact/HoodieCompactor.java | 4 +-
.../hudi/client/TestJavaHoodieBackedMetadata.java | 9 +-
.../apache/hudi/index/HoodieSparkIndexClient.java | 2 +-
.../functional/TestHoodieBackedTableMetadata.java | 2 +-
.../hudi/common/schema/HoodieSchemaUtils.java | 47 +++++
.../hudi/common/table/HoodieTableConfig.java | 6 +-
.../hudi/common/table/TableSchemaResolver.java | 190 ++++-------------
.../common/table/read/PartialUpdateHandler.java | 4 +-
.../index/secondary/SecondaryIndexManager.java | 8 +-
.../hudi/metadata/HoodieTableMetadataUtil.java | 2 +-
.../hudi/common/schema/TestHoodieSchemaUtils.java | 229 +++++++++++++++++++++
.../table/read/TestHoodieFileGroupReaderBase.java | 6 +-
.../java/org/apache/hudi/util/CompactionUtil.java | 2 +-
.../hudi/table/catalog/TestHoodieCatalog.java | 3 +-
.../hudi/table/catalog/TestHoodieHiveCatalog.java | 3 +-
.../hudi/common/table/TestTableSchemaResolver.java | 46 +----
.../apache/hudi/hadoop/SchemaEvolutionContext.java | 2 +-
.../hadoop/hive/HoodieCombineHiveInputFormat.java | 2 +-
.../HoodieMergeOnReadTableInputFormat.java | 18 +-
.../scala/org/apache/hudi/BucketIndexSupport.scala | 6 +-
.../scala/org/apache/hudi/HoodieCLIUtils.scala | 2 +-
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 5 +-
.../procedures/RunClusteringProcedure.scala | 2 +-
.../ShowHoodieLogFileMetadataProcedure.scala | 2 +-
.../ShowHoodieLogFileRecordsProcedure.scala | 4 +-
.../hudi/client/TestHoodieClientMultiWriter.java | 2 +-
.../hudi/functional/TestHoodieBackedMetadata.java | 9 +-
.../hudi/functional/TestRecordLevelIndex.scala | 2 +-
.../hudi/functional/TestTimeTravelQuery.scala | 18 +-
.../apache/spark/sql/hudi/ddl/TestAlterTable.scala | 2 +-
.../hudi/feature/index/TestSecondaryIndex.scala | 6 +-
.../org/apache/hudi/hive/HoodieHiveSyncClient.java | 4 +-
.../org/apache/hudi/utilities/HoodieCompactor.java | 4 +-
.../utilities/HoodieMetadataTableValidator.java | 3 +-
.../org/apache/hudi/utilities/UtilHelpers.java | 3 +-
.../deltastreamer/TestHoodieDeltaStreamer.java | 120 +++++++----
...estHoodieDeltaStreamerSchemaEvolutionQuick.java | 32 ++-
41 files changed, 510 insertions(+), 327 deletions(-)
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
index 4050d47ff96b..ccd09675fb51 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
@@ -467,7 +467,7 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
private String getTableDoc() {
try {
- return tableSchemaResolver.getTableAvroSchema(true).getDoc();
+ return tableSchemaResolver.getTableSchema(true).getDoc().orElseGet(() ->
"");
} catch (Exception e) {
throw new HoodieGlueSyncException("Failed to get schema's doc from
storage : ", e);
}
@@ -476,10 +476,10 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
@Override
public List<FieldSchema> getStorageFieldSchemas() {
try {
- return tableSchemaResolver.getTableAvroSchema(true)
+ return tableSchemaResolver.getTableSchema(true)
.getFields()
.stream()
- .map(f -> new FieldSchema(f.name(), f.schema().getType().getName(),
f.doc()))
+ .map(f -> new FieldSchema(f.name(),
f.schema().getType().toAvroType().getName(), f.doc()))
.collect(Collectors.toList());
} catch (Exception e) {
throw new HoodieGlueSyncException("Failed to get field schemas from
storage : ", e);
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index ec06709ab982..6138e1fb3839 100644
---
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -46,14 +46,13 @@ import
org.apache.hudi.common.table.read.HoodieFileGroupReader;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.io.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.io.util.FileIOUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.springframework.shell.standard.ShellComponent;
import org.springframework.shell.standard.ShellMethod;
@@ -116,7 +115,7 @@ public class HoodieLogFileCommand {
} else {
fileName = path.getName();
}
- HoodieSchema writerSchema =
HoodieSchema.fromAvroSchema(TableSchemaResolver.readSchemaFromLogFile(storage,
path));
+ HoodieSchema writerSchema =
TableSchemaResolver.readSchemaFromLogFile(storage, path);
try (Reader reader = HoodieLogFormat.newReader(storage, new
HoodieLogFile(path), writerSchema)) {
// read the avro blocks
@@ -221,10 +220,10 @@ public class HoodieLogFileCommand {
HoodieSchema readerSchema = null;
// get schema from last log file
for (int i = logFilePaths.size() - 1; i >= 0; i--) {
- Schema schema = TableSchemaResolver.readSchemaFromLogFile(
+ HoodieSchema schema = TableSchemaResolver.readSchemaFromLogFile(
storage, new StoragePath(logFilePaths.get(i)));
if (schema != null) {
- readerSchema = HoodieSchema.fromAvroSchema(schema);
+ readerSchema = schema;
break;
}
}
@@ -262,10 +261,10 @@ public class HoodieLogFileCommand {
}
} else {
for (String logFile : logFilePaths) {
- Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(
+ HoodieSchema writerSchema = TableSchemaResolver.readSchemaFromLogFile(
client.getStorage(), new StoragePath(logFile));
try (HoodieLogFormat.Reader reader =
- HoodieLogFormat.newReader(storage, new HoodieLogFile(new
StoragePath(logFile)), HoodieSchema.fromAvroSchema(writerSchema))) {
+ HoodieLogFormat.newReader(storage, new HoodieLogFile(new
StoragePath(logFile)), writerSchema)) {
// read the avro blocks
while (reader.hasNext()) {
HoodieLogBlock n = reader.next();
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
index bdc16cf45196..0e3f7a4029dc 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
@@ -25,6 +25,7 @@ import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
@@ -41,7 +42,6 @@ import org.apache.hudi.storage.StoragePath;
import
org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
import org.springframework.shell.standard.ShellComponent;
import org.springframework.shell.standard.ShellMethod;
import org.springframework.shell.standard.ShellOption;
@@ -204,7 +204,7 @@ public class TableCommand {
help = "File path to write schema") final String outputFilePath)
throws Exception {
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(client);
- Schema schema = tableSchemaResolver.getTableAvroSchema();
+ HoodieSchema schema = tableSchemaResolver.getTableSchema();
if (outputFilePath != null) {
log.info("Latest table schema : " + schema.toString(true));
writeToFile(outputFilePath, schema.toString(true));
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java
index a51dbcc396cf..0bb7db3fa583 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java
@@ -96,8 +96,7 @@ class ConcurrentSchemaEvolutionTableSchemaGetter {
}
private Option<HoodieSchema> getTableCreateSchemaWithoutMetaField() {
- return metaClient.getTableConfig().getTableCreateSchema()
- .map(HoodieSchema::fromAvroSchema);
+ return metaClient.getTableConfig().getTableCreateSchema();
}
private void setCachedLatestCommitWithValidSchema(Option<HoodieInstant>
instantOption) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index 04aaef78ec03..85dd1d45dc12 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.InstantRange;
@@ -46,7 +47,6 @@ import org.apache.hudi.io.HoodieMergeHandleFactory;
import org.apache.hudi.table.HoodieTable;
import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import java.io.IOException;
@@ -110,7 +110,7 @@ public abstract class HoodieCompactor<T, I, K, O>
implements Serializable {
// the same with the table schema.
try {
if (StringUtils.isNullOrEmpty(config.getInternalSchema())) {
- Schema readerSchema = schemaResolver.getTableAvroSchema(false);
+ HoodieSchema readerSchema = schemaResolver.getTableSchema(false);
config.setSchema(readerSchema.toString());
}
} catch (Exception e) {
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index 44bb6d2f89ad..3525995fb540 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -109,7 +109,6 @@ import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import org.apache.hudi.testutils.TestHoodieMetadataBase;
import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.junit.jupiter.api.AfterEach;
@@ -892,7 +891,7 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
private void verifyMetadataRawRecords(HoodieTable table, List<HoodieLogFile>
logFiles, boolean enableMetaFields) throws IOException {
for (HoodieLogFile logFile : logFiles) {
List<StoragePathInfo> pathInfoList =
storage.listDirectEntries(logFile.getPath());
- Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage,
+ HoodieSchema writerSchema =
TableSchemaResolver.readSchemaFromLogFile(storage,
logFile.getPath());
if (writerSchema == null) {
// not a data block
@@ -900,7 +899,7 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
}
try (HoodieLogFormat.Reader logFileReader =
HoodieLogFormat.newReader(storage,
- new HoodieLogFile(pathInfoList.get(0).getPath()),
HoodieSchema.fromAvroSchema(writerSchema))) {
+ new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
while (logFileReader.hasNext()) {
HoodieLogBlock logBlock = logFileReader.next();
if (logBlock instanceof HoodieDataBlock) {
@@ -2875,7 +2874,7 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
private void verifyMetadataColumnStatsRecords(List<HoodieLogFile> logFiles)
throws IOException {
for (HoodieLogFile logFile : logFiles) {
List<StoragePathInfo> pathInfoList =
storage.listDirectEntries(logFile.getPath());
- Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage,
+ HoodieSchema writerSchema =
TableSchemaResolver.readSchemaFromLogFile(storage,
logFile.getPath());
if (writerSchema == null) {
// not a data block
@@ -2883,7 +2882,7 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
}
try (HoodieLogFormat.Reader logFileReader =
HoodieLogFormat.newReader(storage,
- new HoodieLogFile(pathInfoList.get(0).getPath()),
HoodieSchema.fromAvroSchema(writerSchema))) {
+ new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
while (logFileReader.hasNext()) {
HoodieLogBlock logBlock = logFileReader.next();
if (logBlock instanceof HoodieDataBlock) {
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/HoodieSparkIndexClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/HoodieSparkIndexClient.java
index 392e8adb95ca..7cc8e7545ae0 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/HoodieSparkIndexClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/HoodieSparkIndexClient.java
@@ -204,7 +204,7 @@ public class HoodieSparkIndexClient extends
BaseHoodieIndexClient {
Option<String> indexTypeOpt,
Map<String, String> configs) {
try {
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
- String schemaStr = schemaUtil.getTableAvroSchema(false).toString();
+ String schemaStr = schemaUtil.getTableSchema(false).toString();
TypedProperties props = getProps(metaClient, indexDefinitionOpt,
indexTypeOpt, schemaStr);
if (!engineContextOpt.isPresent()) {
engineContextOpt = Option.of(new HoodieSparkEngineContext(new
JavaSparkContext(sparkSessionOpt.get().sparkContext())));
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index 974e717a211f..353d94904ceb 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -520,7 +520,7 @@ public class TestHoodieBackedTableMetadata extends
TestHoodieMetadataBase {
for (HoodieLogFile logFile : logFiles) {
List<StoragePathInfo> pathInfoList =
storage.listDirectEntries(logFile.getPath());
HoodieSchema writerSchema =
-
HoodieSchema.fromAvroSchema(TableSchemaResolver.readSchemaFromLogFile(storage,
logFile.getPath()));
+ TableSchemaResolver.readSchemaFromLogFile(storage,
logFile.getPath());
if (writerSchema == null) {
// not a data block
continue;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
index 82fac31a07bd..f188d6c7b3bd 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
@@ -28,6 +28,7 @@ import org.apache.hudi.internal.schema.HoodieSchemaException;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
import java.math.BigDecimal;
import java.math.BigInteger;
@@ -794,4 +795,50 @@ public final class HoodieSchemaUtils {
public static boolean isMetadataField(String fieldName) {
return HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(fieldName);
}
+
+ /**
+ * Converts a HoodieSchemaField's default value to its Java representation.
+ * This is equivalent to {@link
org.apache.hudi.avro.HoodieAvroUtils#toJavaDefaultValue(org.apache.avro.Schema.Field)}
+ * but operates on HoodieSchemaField.
+ *
+ * <p>For primitive types (STRING, INT, LONG, FLOAT, DOUBLE, BOOLEAN, ENUM,
BYTES),
+ * the default value is returned as-is. For complex types (ARRAY, MAP,
RECORD),
+ * Avro's GenericData utility is used to properly construct the default
value.</p>
+ *
+ * @param field the HoodieSchemaField containing the default value
+ * @return the Java representation of the default value, or null if no
default value exists
+ * @throws IllegalArgumentException if the field's type is not supported
+ * @since 1.2.0
+ */
+ public static Object toJavaDefaultValue(HoodieSchemaField field) {
+ ValidationUtils.checkArgument(field != null, "Field cannot be null");
+
+ Option<Object> defaultValOpt = field.defaultVal();
+ if (!defaultValOpt.isPresent() || defaultValOpt.get() ==
HoodieJsonProperties.NULL_VALUE) {
+ return null;
+ }
+
+ Object defaultVal = defaultValOpt.get();
+ HoodieSchemaType type = field.getNonNullSchema().getType();
+
+ switch (type) {
+ case STRING:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case BOOLEAN:
+ case ENUM:
+ case BYTES:
+ return defaultVal;
+ case ARRAY:
+ case MAP:
+ case RECORD:
+ // Use Avro's standard GenericData utility for complex types
+ // Delegate to the underlying Avro field
+ return GenericData.get().getDefaultValue(field.getAvroField());
+ default:
+ throw new IllegalArgumentException("Unsupported HoodieSchema type: " +
type);
+ }
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index c5bcfbaff6f6..cfcf203ebc40 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -44,6 +44,7 @@ import org.apache.hudi.common.model.PartialUpdateAvroPayload;
import org.apache.hudi.common.model.debezium.DebeziumConstants;
import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
@@ -63,7 +64,6 @@ import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
-import org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1113,9 +1113,9 @@ public class HoodieTableConfig extends HoodieConfig {
return Option.ofNullable(getString(BOOTSTRAP_BASE_PATH));
}
- public Option<Schema> getTableCreateSchema() {
+ public Option<HoodieSchema> getTableCreateSchema() {
if (contains(CREATE_SCHEMA)) {
- return Option.of(new Schema.Parser().parse(getString(CREATE_SCHEMA)));
+ return Option.of(HoodieSchema.parse(getString(CREATE_SCHEMA)));
} else {
return Option.empty();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 768db4b533e5..29a20e735c13 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -18,7 +18,6 @@
package org.apache.hudi.common.table;
-import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
@@ -32,7 +31,6 @@ import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
@@ -51,7 +49,6 @@ import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.util.Lazy;
-import org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -107,138 +104,70 @@ public class TableSchemaResolver {
this.hasOperationField = Lazy.lazily(this::hasOperationField);
}
- public Schema getTableAvroSchemaFromDataFile() throws Exception {
- return
getTableAvroSchemaFromDataFileInternal().orElseThrow(schemaNotFoundError());
- }
-
/**
* Gets full schema (user + metadata) for a hoodie table from data file as
HoodieSchema.
- * Delegates to getTableAvroSchemaFromDataFile and wraps the result in a
HoodieSchema.
*
* @return HoodieSchema for this table from data file
* @throws Exception
*/
public HoodieSchema getTableSchemaFromDataFile() throws Exception {
- Schema avroSchema = getTableAvroSchemaFromDataFile();
- return HoodieSchema.fromAvroSchema(avroSchema);
+ return
getTableSchemaFromDataFileInternal().orElseThrow(schemaNotFoundError());
}
- private Option<Schema> getTableAvroSchemaFromDataFileInternal() {
+ private Option<HoodieSchema> getTableSchemaFromDataFileInternal() {
return getTableParquetSchemaFromDataFile();
}
/**
- * Gets full schema (user + metadata) for a hoodie table as HoodieSchema.
- * Delegates to getTableAvroSchema and wraps the result in a HoodieSchema.
+ * Gets full schema (user + metadata) for a hoodie table.
*
* @return HoodieSchema for this table
* @throws Exception
*/
public HoodieSchema getTableSchema() throws Exception {
- Schema avroSchema =
getTableAvroSchema(metaClient.getTableConfig().populateMetaFields());
- return HoodieSchema.fromAvroSchema(avroSchema);
+ return getTableSchema(metaClient.getTableConfig().populateMetaFields());
}
/**
- * Gets full schema (user + metadata) for a hoodie table as HoodieSchema.
- * Delegates to getTableAvroSchema and wraps the result in a HoodieSchema.
+ * Gets schema for a hoodie table, can choose if include metadata fields
should be included.
*
* @param includeMetadataFields choice if include metadata fields
- * @return HoodieSchema for this table
+ * @return schema for this table
* @throws Exception
*/
public HoodieSchema getTableSchema(boolean includeMetadataFields) throws
Exception {
- Schema avroSchema = getTableAvroSchema(includeMetadataFields);
- return HoodieSchema.fromAvroSchema(avroSchema);
+ return getTableSchemaInternal(includeMetadataFields,
Option.empty()).orElseThrow(schemaNotFoundError());
}
/**
- * Fetches tables schema in Avro format as of the given instant as
HoodieSchema.
+ * Fetches tables schema as of the given instant
*
* @param timestamp as of which table's schema will be fetched
*/
public HoodieSchema getTableSchema(String timestamp) throws Exception {
- Schema avroSchema = getTableAvroSchema(timestamp);
- return HoodieSchema.fromAvroSchema(avroSchema);
- }
-
- /**
- * Fetches HoodieSchema as of the given instant
- *
- * @param instant as of which table's schema will be fetched
- */
- public HoodieSchema getTableSchema(HoodieInstant instant, boolean
includeMetadataFields) throws Exception {
- Schema schema = getTableAvroSchema(instant, includeMetadataFields);
- return HoodieSchema.fromAvroSchema(schema);
- }
-
- public Option<HoodieSchema> getTableSchemaIfPresent(boolean
includeMetadataFields) {
- return getTableAvroSchemaInternal(includeMetadataFields,
Option.empty()).map(HoodieSchema::fromAvroSchema);
- }
-
- /**
- * Gets full schema (user + metadata) for a hoodie table in Avro format.
- *
- * @return Avro schema for this table
- * @throws Exception
- */
- public Schema getTableAvroSchema() throws Exception {
- return
getTableAvroSchema(metaClient.getTableConfig().populateMetaFields());
- }
-
- /**
- * Gets schema for a hoodie table in Avro format, can choice if include
metadata fields.
- *
- * @param includeMetadataFields choice if include metadata fields
- * @return Avro schema for this table
- * @throws Exception
- */
- public Schema getTableAvroSchema(boolean includeMetadataFields) throws
Exception {
- return getTableAvroSchemaInternal(includeMetadataFields,
Option.empty()).orElseThrow(schemaNotFoundError());
- }
-
- /**
- * Fetches tables schema in Avro format as of the given instant
- *
- * @param timestamp as of which table's schema will be fetched
- */
- public Schema getTableAvroSchema(String timestamp) throws Exception {
Option<HoodieInstant> instant =
metaClient.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants()
.findInstantsBeforeOrEquals(timestamp)
.lastInstant();
- return
getTableAvroSchemaInternal(metaClient.getTableConfig().populateMetaFields(),
instant)
+ return
getTableSchemaInternal(metaClient.getTableConfig().populateMetaFields(),
instant)
.orElseThrow(schemaNotFoundError());
}
/**
- * Fetches tables schema in Avro format as of the given instant
+ * Fetches tables schema as of the given instant
*
* @param instant as of which table's schema will be fetched
*/
- public Schema getTableAvroSchema(HoodieInstant instant, boolean
includeMetadataFields) throws Exception {
- return getTableAvroSchemaInternal(includeMetadataFields,
Option.of(instant)).orElseThrow(schemaNotFoundError());
- }
-
- /**
- * Gets users data schema for a hoodie table in Avro format.
- *
- * @return Avro user data schema
- * @throws Exception
- *
- * @deprecated use {@link #getTableAvroSchema(boolean)} instead
- */
- @Deprecated
- public Schema getTableAvroSchemaWithoutMetadataFields() throws Exception {
- return getTableAvroSchemaInternal(false,
Option.empty()).orElseThrow(schemaNotFoundError());
+ public HoodieSchema getTableSchema(HoodieInstant instant, boolean
includeMetadataFields) throws Exception {
+ return getTableSchemaInternal(includeMetadataFields,
Option.of(instant)).orElseThrow(schemaNotFoundError());
}
- public Option<Schema> getTableAvroSchemaIfPresent(boolean
includeMetadataFields) {
- return getTableAvroSchemaInternal(includeMetadataFields, Option.empty());
+ public Option<HoodieSchema> getTableSchemaIfPresent(boolean
includeMetadataFields) {
+ return getTableSchemaInternal(includeMetadataFields, Option.empty());
}
- private Option<Schema> getTableAvroSchemaInternal(boolean
includeMetadataFields, Option<HoodieInstant> instantOpt) {
- Option<Schema> schema =
+ private Option<HoodieSchema> getTableSchemaInternal(boolean
includeMetadataFields, Option<HoodieInstant> instantOpt) {
+ Option<HoodieSchema> schema =
(instantOpt.isPresent()
? getTableSchemaFromCommitMetadata(instantOpt.get(),
includeMetadataFields)
: getTableSchemaFromLatestCommitMetadata(includeMetadataFields))
@@ -246,38 +175,37 @@ public class TableSchemaResolver {
metaClient.getTableConfig().getTableCreateSchema()
.map(tableSchema ->
includeMetadataFields
- ? HoodieAvroUtils.addMetadataFields(tableSchema,
hasOperationField.get())
+ ? HoodieSchemaUtils.addMetadataFields(tableSchema,
hasOperationField.get())
: tableSchema)
)
.or(() -> {
- Option<Schema> schemaFromDataFile =
getTableAvroSchemaFromDataFileInternal();
+ Option<HoodieSchema> schemaFromDataFile =
getTableSchemaFromDataFileInternal();
return includeMetadataFields
? schemaFromDataFile
- :
schemaFromDataFile.map(HoodieAvroUtils::removeMetadataFields);
+ :
schemaFromDataFile.map(HoodieSchemaUtils::removeMetadataFields);
});
// TODO partition columns have to be appended in all read-paths
if (metaClient.getTableConfig().shouldDropPartitionColumns() &&
schema.isPresent()) {
- HoodieSchema hoodieSchema = HoodieSchema.fromAvroSchema(schema.get());
+ HoodieSchema hoodieSchema = schema.get();
return metaClient.getTableConfig().getPartitionFields()
.map(partitionFields -> appendPartitionColumns(hoodieSchema,
Option.ofNullable(partitionFields)))
- .map(HoodieSchema::toAvroSchema)
.or(() -> schema);
}
return schema;
}
- private Option<Schema> getTableSchemaFromLatestCommitMetadata(boolean
includeMetadataFields) {
+ private Option<HoodieSchema> getTableSchemaFromLatestCommitMetadata(boolean
includeMetadataFields) {
Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata
= getLatestCommitMetadataWithValidSchema();
if (instantAndCommitMetadata.isPresent()) {
HoodieCommitMetadata commitMetadata =
instantAndCommitMetadata.get().getRight();
String schemaStr =
commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
- Schema schema = new Schema.Parser().parse(schemaStr);
+ HoodieSchema schema = HoodieSchema.parse(schemaStr);
if (includeMetadataFields) {
- schema = HoodieAvroUtils.addMetadataFields(schema,
hasOperationField.get());
+ schema = HoodieSchemaUtils.addMetadataFields(schema,
hasOperationField.get());
} else {
- schema = HoodieAvroUtils.removeMetadataFields(schema);
+ schema = HoodieSchemaUtils.removeMetadataFields(schema);
}
return Option.of(schema);
} else {
@@ -285,7 +213,7 @@ public class TableSchemaResolver {
}
}
- private Option<Schema> getTableSchemaFromCommitMetadata(HoodieInstant
instant, boolean includeMetadataFields) {
+ private Option<HoodieSchema> getTableSchemaFromCommitMetadata(HoodieInstant
instant, boolean includeMetadataFields) {
try {
HoodieCommitMetadata metadata = getCachedCommitMetadata(instant);
String existingSchemaStr =
metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
@@ -294,11 +222,11 @@ public class TableSchemaResolver {
return Option.empty();
}
- Schema schema = new Schema.Parser().parse(existingSchemaStr);
+ HoodieSchema schema = HoodieSchema.parse(existingSchemaStr);
if (includeMetadataFields) {
- schema = HoodieAvroUtils.addMetadataFields(schema,
hasOperationField.get());
+ schema = HoodieSchemaUtils.addMetadataFields(schema,
hasOperationField.get());
} else {
- schema = HoodieAvroUtils.removeMetadataFields(schema);
+ schema = HoodieSchemaUtils.removeMetadataFields(schema);
}
return Option.of(schema);
} catch (Exception e) {
@@ -309,7 +237,7 @@ public class TableSchemaResolver {
/**
* Fetches the schema for a table from any the table's data files
*/
- private Option<Schema> getTableParquetSchemaFromDataFile() {
+ private Option<HoodieSchema> getTableParquetSchemaFromDataFile() {
Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata
= getLatestCommitMetadataWithInsertOrUpdate();
switch (metaClient.getTableType()) {
case COPY_ON_WRITE:
@@ -335,59 +263,22 @@ public class TableSchemaResolver {
}
}
- /**
- * Returns table's latest Avro {@link Schema} iff table is non-empty (ie
there's at least
- * a single commit)
- *
- * This method differs from {@link #getTableAvroSchema(boolean)} in that it
won't fallback
- * to use table's schema used at creation
- */
- public Option<Schema> getTableAvroSchemaFromLatestCommit(boolean
includeMetadataFields) throws Exception {
- if (metaClient.isTimelineNonEmpty()) {
- return getTableAvroSchemaInternal(includeMetadataFields, Option.empty());
- }
-
- return Option.empty();
- }
-
/**
* Returns table's latest {@link HoodieSchema} iff table is non-empty (ie
there's at least
* a single commit)
*
- * This method differs from {@link #getTableAvroSchema(boolean)} in that it
won't fallback
+ * This method differs from {@link #getTableSchema(boolean)} in that it
won't fallback
* to use table's schema used at creation
*/
- public Option<HoodieSchema> getTableSchemaFromLatestCommit(boolean
includeMetadataFields) {
+ public Option<HoodieSchema> getTableSchemaFromLatestCommit(boolean
includeMetadataFields) throws Exception {
if (metaClient.isTimelineNonEmpty()) {
- return getTableAvroSchemaInternal(includeMetadataFields,
Option.empty()).map(HoodieSchema::fromAvroSchema);
+ return getTableSchemaInternal(includeMetadataFields, Option.empty());
}
return Option.empty();
}
- /**
- * Read schema from a data file from the last compaction commit done.
- *
- * @deprecated please use {@link #getTableAvroSchema(HoodieInstant,
boolean)} instead
- */
- public Schema readSchemaFromLastCompaction(Option<HoodieInstant>
lastCompactionCommitOpt) throws Exception {
- HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
-
- HoodieInstant lastCompactionCommit =
lastCompactionCommitOpt.orElseThrow(() -> new Exception(
- "Could not read schema from last compaction, no compaction commits
found on path " + metaClient));
-
- // Read from the compacted file wrote
- HoodieCommitMetadata compactionMetadata =
- activeTimeline.readCommitMetadata(lastCompactionCommit);
- String filePath =
compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny()
- .orElseThrow(() -> new IllegalArgumentException("Could not find any
data file written for compaction "
- + lastCompactionCommit + ", could not get schema for table " +
metaClient.getBasePath()));
- StoragePath path = new StoragePath(filePath);
- return HoodieIOFactory.getIOFactory(metaClient.getStorage())
- .getFileFormatUtils(path).readSchema(metaClient.getStorage(),
path).toAvroSchema();
- }
-
- private Schema readSchemaFromLogFile(StoragePath path) throws IOException {
+ private HoodieSchema readSchemaFromLogFile(StoragePath path) throws
IOException {
return readSchemaFromLogFile(metaClient.getRawStorage(), path);
}
@@ -396,7 +287,7 @@ public class TableSchemaResolver {
*
* @return
*/
- public static Schema readSchemaFromLogFile(HoodieStorage storage,
StoragePath path) throws IOException {
+ public static HoodieSchema readSchemaFromLogFile(HoodieStorage storage,
StoragePath path) throws IOException {
// We only need to read the schema from the log block header,
// so we read the block lazily to avoid reading block content
// containing the records
@@ -408,7 +299,7 @@ public class TableSchemaResolver {
lastBlock = (HoodieDataBlock) block;
}
}
- return lastBlock != null ? lastBlock.getSchema().toAvroSchema() : null;
+ return lastBlock != null ? lastBlock.getSchema() : null;
}
}
@@ -476,10 +367,10 @@ public class TableSchemaResolver {
*/
public boolean hasOperationField() {
try {
- Schema tableAvroSchema = getTableAvroSchemaFromDataFile();
- return tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD)
!= null;
+ HoodieSchema tableSchema = getTableSchemaFromDataFile();
+ return
tableSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD).isPresent();
} catch (Exception e) {
- LOG.info("Failed to read operation field from avro schema ({})",
e.getMessage());
+ LOG.info("Failed to read operation field from schema ({})",
e.getMessage());
return false;
}
}
@@ -560,16 +451,15 @@ public class TableSchemaResolver {
});
}
- private Schema fetchSchemaFromFiles(Stream<StoragePath> filePaths) {
+ private HoodieSchema fetchSchemaFromFiles(Stream<StoragePath> filePaths) {
return filePaths.map(filePath -> {
try {
if (FSUtils.isLogFile(filePath)) {
// this is a log file
return readSchemaFromLogFile(filePath);
} else {
- HoodieSchema hoodieSchema =
HoodieIOFactory.getIOFactory(metaClient.getStorage())
+ return HoodieIOFactory.getIOFactory(metaClient.getStorage())
.getFileFormatUtils(filePath).readSchema(metaClient.getStorage(), filePath);
- return hoodieSchema != null ? hoodieSchema.toAvroSchema() : null;
}
} catch (IOException e) {
throw new HoodieIOException("Failed to read schema from file: " +
filePath, e);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateHandler.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateHandler.java
index 9af83d5054f9..58f4043f16b3 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateHandler.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateHandler.java
@@ -19,12 +19,12 @@
package org.apache.hudi.common.table.read;
-import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.RecordContext;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.PartialUpdateMode;
import java.io.Serializable;
@@ -117,7 +117,7 @@ public class PartialUpdateHandler<T> implements
Serializable {
String fieldName = field.name();
// The default value only from the top-level data type is validated.
That means,
// for nested columns, we do not check the leaf level data type defaults.
- Object defaultValue =
HoodieAvroUtils.toJavaDefaultValue(field.getAvroField());
+ Object defaultValue = HoodieSchemaUtils.toJavaDefaultValue(field);
Object newValue = recordContext.getValue(highOrderRecord.getRecord(),
highOrderSchema, fieldName);
if (defaultValue == newValue) {
fieldVals[idx++] = recordContext.getValue(lowOrderRecord.getRecord(),
lowOrderSchema, fieldName);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/index/secondary/SecondaryIndexManager.java
b/hudi-common/src/main/java/org/apache/hudi/index/secondary/SecondaryIndexManager.java
index 3ad8bcf29dca..2cc3a44b2dbe 100644
---
a/hudi-common/src/main/java/org/apache/hudi/index/secondary/SecondaryIndexManager.java
+++
b/hudi-common/src/main/java/org/apache/hudi/index/secondary/SecondaryIndexManager.java
@@ -19,6 +19,7 @@
package org.apache.hudi.index.secondary;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
@@ -29,7 +30,6 @@ import
org.apache.hudi.exception.HoodieSecondaryIndexException;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
import java.util.Collections;
import java.util.LinkedHashMap;
@@ -82,16 +82,16 @@ public class SecondaryIndexManager {
Map<String, String> options) {
Option<List<HoodieSecondaryIndex>> secondaryIndexes =
SecondaryIndexUtils.getSecondaryIndexes(metaClient);
Set<String> colNames = columns.keySet();
- Schema avroSchema;
+ HoodieSchema schema;
try {
- avroSchema = new
TableSchemaResolver(metaClient).getTableAvroSchema(false);
+ schema = new TableSchemaResolver(metaClient).getTableSchema(false);
} catch (Exception e) {
throw new HoodieSecondaryIndexException(
"Failed to get table avro schema: " +
metaClient.getTableConfig().getTableName());
}
for (String col : colNames) {
- if (avroSchema.getField(col) == null) {
+ if (schema.getField(col).isEmpty()) {
throw new HoodieSecondaryIndexException("Field not exists: " + col);
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index b2839706ae78..30683f901219 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -1607,7 +1607,7 @@ public class HoodieTableMetadataUtil {
// NOTE: Writer schema added to commit metadata will not contain Hudi's
metadata fields
Option<HoodieSchema> tableSchema = writerSchema.isEmpty()
- ? tableConfig.getTableCreateSchema().map(HoodieSchema::fromAvroSchema)
// the write schema does not set up correctly
+ ? tableConfig.getTableCreateSchema() // the write schema does not set
up correctly
: writerSchema.map(schema -> tableConfig.populateMetaFields() ?
addMetadataFields(schema) : schema)
.map(HoodieSchema::fromAvroSchema);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
index 9cafba1fb80d..f79405cfbf30 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
@@ -26,6 +26,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.internal.schema.HoodieSchemaException;
import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -36,15 +37,20 @@ import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
@@ -1760,6 +1766,229 @@ public class TestHoodieSchemaUtils {
assertEquals(HoodieSchemaType.STRING, result.getType());
}
+ @Test
+ public void testToJavaDefaultValueNull() {
+ // Field with no default value
+ HoodieSchemaField field = HoodieSchemaField.of("testField",
HoodieSchema.create(HoodieSchemaType.STRING));
+ Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+ assertNull(result);
+ }
+
+ @Test
+ public void testToJavaDefaultValueNullValue() {
+ // Field with explicit NULL default value
+ HoodieSchemaField field = HoodieSchemaField.of("testField",
+
HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.STRING)),
+ null,
+ HoodieSchema.NULL_VALUE);
+ Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+ assertNull(result);
+ }
+
+ @Test
+ public void testToJavaDefaultValueString() {
+ String defaultVal = "defaultString";
+ HoodieSchemaField field = HoodieSchemaField.of("stringField",
+ HoodieSchema.create(HoodieSchemaType.STRING),
+ null,
+ defaultVal);
+ Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+ assertEquals(defaultVal, result);
+ }
+
+ @Test
+ public void testToJavaDefaultValueInt() {
+ int defaultVal = 42;
+ HoodieSchemaField field = HoodieSchemaField.of("intField",
+ HoodieSchema.create(HoodieSchemaType.INT),
+ null,
+ defaultVal);
+ Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+ assertEquals(defaultVal, result);
+ }
+
+ @Test
+ public void testToJavaDefaultValueLong() {
+ long defaultVal = 12345L;
+ HoodieSchemaField field = HoodieSchemaField.of("longField",
+ HoodieSchema.create(HoodieSchemaType.LONG),
+ null,
+ defaultVal);
+ Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+ assertEquals(defaultVal, result);
+ }
+
+ @Test
+ public void testToJavaDefaultValueFloat() {
+ float defaultVal = 3.14f;
+ HoodieSchemaField field = HoodieSchemaField.of("floatField",
+ HoodieSchema.create(HoodieSchemaType.FLOAT),
+ null,
+ defaultVal);
+ Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+ assertEquals(defaultVal, result);
+ }
+
+ @Test
+ public void testToJavaDefaultValueDouble() {
+ double defaultVal = 2.718;
+ HoodieSchemaField field = HoodieSchemaField.of("doubleField",
+ HoodieSchema.create(HoodieSchemaType.DOUBLE),
+ null,
+ defaultVal);
+ Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+ assertEquals(defaultVal, result);
+ }
+
+ @Test
+ public void testToJavaDefaultValueBoolean() {
+ HoodieSchemaField field = HoodieSchemaField.of("boolField",
+ HoodieSchema.create(HoodieSchemaType.BOOLEAN),
+ null,
+ true);
+ Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+ assertEquals(true, result);
+ }
+
+ @Test
+ public void testToJavaDefaultValueBytes() {
+ byte[] defaultBytes = new byte[]{1, 2, 3, 4};
+ HoodieSchemaField field = HoodieSchemaField.of("bytesField",
+ HoodieSchema.create(HoodieSchemaType.BYTES),
+ null,
+ defaultBytes);
+ Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+ assertArrayEquals(defaultBytes, (byte[]) result);
+ }
+
+ @Test
+ public void testToJavaDefaultValueEnum() {
+ HoodieSchema enumSchema = HoodieSchema.createEnum("Status", null, null,
Arrays.asList("ACTIVE", "INACTIVE", "PENDING"));
+ HoodieSchemaField field = HoodieSchemaField.of("statusField",
+ enumSchema,
+ null,
+ "ACTIVE");
+ Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+ assertEquals("ACTIVE", result);
+ }
+
+ @Test
+ public void testToJavaDefaultValueArray() {
+ // Create array schema with int elements
+ HoodieSchema arraySchema =
HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.INT));
+
+ // Default value as a list
+ List<Integer> defaultList = Arrays.asList(1, 2, 3);
+ HoodieSchemaField field = HoodieSchemaField.of("arrayField",
+ arraySchema,
+ null,
+ defaultList);
+
+ Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+ assertNotNull(result);
+ assertInstanceOf(Collection.class, result);
+ assertArrayEquals(defaultList.toArray(), ((Collection<?>)
result).toArray());
+ }
+
+ @Test
+ public void testToJavaDefaultValueMap() {
+ // Create map schema with string values
+ HoodieSchema mapSchema =
HoodieSchema.createMap(HoodieSchema.create(HoodieSchemaType.STRING));
+
+ // Default value as a map
+ Map<String, String> defaultMap = new HashMap<>();
+ defaultMap.put("key1", "value1");
+ defaultMap.put("key2", "value2");
+
+ HoodieSchemaField field = HoodieSchemaField.of("mapField",
+ mapSchema,
+ null,
+ defaultMap);
+
+ Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+ assertNotNull(result);
+ // GenericData should return a map
+ assertInstanceOf(Map.class, result);
+ }
+
+ @Test
+ public void testToJavaDefaultValueRecord() {
+ // Create nested record schema
+ HoodieSchema nestedRecordSchema = HoodieSchema.createRecord(
+ "NestedRecord",
+ null,
+ null,
+ Arrays.asList(
+ HoodieSchemaField.of("field1",
HoodieSchema.create(HoodieSchemaType.STRING), null, "default1"),
+ HoodieSchemaField.of("field2",
HoodieSchema.create(HoodieSchemaType.INT), null, 10)
+ )
+ );
+
+ // Create a default record value
+ Map<String, Object> defaultRecord = new HashMap<>();
+ defaultRecord.put("field1", "default1");
+ defaultRecord.put("field2", 10);
+
+ HoodieSchemaField field = HoodieSchemaField.of("recordField",
+ nestedRecordSchema,
+ null,
+ defaultRecord);
+
+ Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+ assertNotNull(result);
+ // GenericData should return a GenericRecord
+ assertInstanceOf(GenericRecord.class, result);
+ }
+
+ @Test
+ public void testToJavaDefaultValueNullableField() {
+ // Create nullable string field with default value
+ // With a non-null defaultValue, the union type must be ["string", null]
+ HoodieSchema nullableStringSchema = HoodieSchema.createUnion(
+ HoodieSchema.create(HoodieSchemaType.STRING),
HoodieSchema.create(HoodieSchemaType.NULL));
+ HoodieSchemaField field = HoodieSchemaField.of("nullableField",
+ nullableStringSchema,
+ null,
+ "defaultValue");
+
+ Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+ assertEquals("defaultValue", result);
+ }
+
+ @SuppressWarnings("DataFlowIssue")
+ @Test
+ public void testToJavaDefaultValueNullFieldArgument() {
+ // Should throw IllegalArgumentException for null field
+ assertThrows(IllegalArgumentException.class, () ->
HoodieSchemaUtils.toJavaDefaultValue(null));
+ }
+
+ @Test
+ public void testToJavaDefaultValueConsistencyWithAvro() {
+ // Test that HoodieSchemaUtils.toJavaDefaultValue produces equivalent
results to HoodieAvroUtils.toJavaDefaultValue
+
+ // Test with primitive types
+ HoodieSchemaField stringField = HoodieSchemaField.of("stringField",
+ HoodieSchema.create(HoodieSchemaType.STRING),
+ null,
+ "test");
+
+ Object hoodieResult = HoodieSchemaUtils.toJavaDefaultValue(stringField);
+ Object avroResult =
HoodieAvroUtils.toJavaDefaultValue(stringField.getAvroField());
+
+ assertEquals(avroResult, hoodieResult);
+
+ // Test with int
+ HoodieSchemaField intField = HoodieSchemaField.of("intField",
+ HoodieSchema.create(HoodieSchemaType.INT),
+ null,
+ 42);
+
+ Object hoodieIntResult = HoodieSchemaUtils.toJavaDefaultValue(intField);
+ Object avroIntResult =
HoodieAvroUtils.toJavaDefaultValue(intField.getAvroField());
+
+ assertEquals(avroIntResult, hoodieIntResult);
+ }
+
@Test
void testLogicalTypesRetainedAfterPruneWithNestedRecords() {
final String logicalTypeKey = "logicalType";
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index 4c79254fdd22..2b850aaa95c7 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -715,8 +715,8 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
HoodieSchema schema = new TableSchemaResolver(metaClient).getTableSchema();
expectedHoodieRecords =
getExpectedHoodieRecordsWithOrderingValue(expectedHoodieRecords, metaClient,
schema);
expectedHoodieUnmergedRecords =
getExpectedHoodieRecordsWithOrderingValue(expectedHoodieUnmergedRecords,
metaClient, schema);
- List<HoodieTestDataGenerator.RecordIdentifier> expectedRecords =
convertHoodieRecords(expectedHoodieRecords, schema.toAvroSchema(),
orderingFields);
- List<HoodieTestDataGenerator.RecordIdentifier> expectedUnmergedRecords =
convertHoodieRecords(expectedHoodieUnmergedRecords, schema.toAvroSchema(),
orderingFields);
+ List<HoodieTestDataGenerator.RecordIdentifier> expectedRecords =
convertHoodieRecords(expectedHoodieRecords, schema, orderingFields);
+ List<HoodieTestDataGenerator.RecordIdentifier> expectedUnmergedRecords =
convertHoodieRecords(expectedHoodieUnmergedRecords, schema, orderingFields);
validateOutputFromFileGroupReaderWithExistingRecords(
storageConf, tablePath, containsBaseFile, expectedLogFileNum,
recordMergeMode,
expectedRecords, expectedUnmergedRecords);
@@ -936,7 +936,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
.collect(Collectors.toList());
}
- private List<HoodieTestDataGenerator.RecordIdentifier>
convertHoodieRecords(List<HoodieRecord> records, Schema schema, String[]
orderingFields) {
+ private List<HoodieTestDataGenerator.RecordIdentifier>
convertHoodieRecords(List<HoodieRecord> records, HoodieSchema schema, String[]
orderingFields) {
return records.stream().map(record ->
HoodieTestDataGenerator.RecordIdentifier.fromTripTestPayload((HoodieAvroIndexedRecord)
record, orderingFields)).collect(Collectors.toList());
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
index e796ef174d8f..b2e5072cdbbb 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
@@ -129,7 +129,7 @@ public class CompactionUtil {
*/
public static void inferChangelogMode(Configuration conf,
HoodieTableMetaClient metaClient) throws Exception {
TableSchemaResolver tableSchemaResolver = new
TableSchemaResolver(metaClient);
- HoodieSchema tableAvroSchema =
HoodieSchema.fromAvroSchema(tableSchemaResolver.getTableAvroSchemaFromDataFile());
+ HoodieSchema tableAvroSchema =
tableSchemaResolver.getTableSchemaFromDataFile();
if
(tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD).isPresent()) {
conf.set(FlinkOptions.CHANGELOG_ENABLED, true);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
index d8f41fd984ef..ab8ce6f58317 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
@@ -22,6 +22,7 @@ import
org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -261,7 +262,7 @@ public class TestHoodieCatalog extends
BaseTestHoodieCatalog {
HoodieTableConfig tableConfig = StreamerUtil.getTableConfig(
catalog.getTable(tablePath).getOptions().get(FlinkOptions.PATH.key()),
HadoopConfigurations.getHadoopConf(new Configuration())).get();
- Option<org.apache.avro.Schema> tableCreateSchema =
tableConfig.getTableCreateSchema();
+ Option<HoodieSchema> tableCreateSchema =
tableConfig.getTableCreateSchema();
assertTrue(tableCreateSchema.isPresent(), "Table should have been
created");
assertThat(tableCreateSchema.get().getFullName(),
is("hoodie.tb1.tb1_record"));
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
index 795730e9b098..f3096b5d0df8 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -220,7 +221,7 @@ public class TestHoodieHiveCatalog extends
BaseTestHoodieCatalog {
// validate the full name of table create schema
HoodieTableConfig tableConfig =
StreamerUtil.getTableConfig(table1.getOptions().get(FlinkOptions.PATH.key()),
hoodieCatalog.getHiveConf()).get();
- Option<org.apache.avro.Schema> tableCreateSchema =
tableConfig.getTableCreateSchema();
+ Option<HoodieSchema> tableCreateSchema =
tableConfig.getTableCreateSchema();
assertTrue(tableCreateSchema.isPresent(), "Table should have been
created");
assertThat(tableCreateSchema.get().getFullName(),
is("hoodie.test.test_record"));
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
index 956b116ecd8d..4a26f8715e96 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
@@ -40,10 +40,10 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.internal.schema.HoodieSchemaException;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
-import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
@@ -64,7 +64,6 @@ import static
org.apache.hudi.common.testutils.HoodieCommonTestHarness.getDataBl
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
@@ -113,49 +112,13 @@ class TestTableSchemaResolver {
}
}
- @Test
- void testGetTableSchema() throws Exception {
- // Setup: Create mock metaClient and configure behavior
- HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class,
RETURNS_DEEP_STUBS);
- HoodieSchema expectedSchema = getSimpleSchema();
-
- // Mock table setup
- when(metaClient.getTableConfig().populateMetaFields()).thenReturn(true);
- when(metaClient.getTableConfig().getTableCreateSchema())
- .thenReturn(Option.of(expectedSchema.toAvroSchema()));
-
- when(metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema())
- .thenReturn(Option.empty());
-
- // Create resolver and call both methods
- TableSchemaResolver resolver = new TableSchemaResolver(metaClient);
-
- // Test 1: getTableSchema() - should use table config's populateMetaFields
(true)
- Schema avroSchema = resolver.getTableAvroSchema();
- HoodieSchema hoodieSchema = resolver.getTableSchema();
- assertNotNull(hoodieSchema);
- assertEquals(avroSchema, hoodieSchema.getAvroSchema());
-
- // Test 2: getTableSchema(true) - explicitly include metadata fields
- Schema avroSchemaWithMetadata = resolver.getTableAvroSchema(true);
- HoodieSchema hoodieSchemaWithMetadata = resolver.getTableSchema(true);
- assertNotNull(hoodieSchemaWithMetadata);
- assertEquals(avroSchemaWithMetadata,
hoodieSchemaWithMetadata.getAvroSchema());
-
- // Test 3: getTableSchema(false) - explicitly exclude metadata fields
- Schema avroSchemaWithoutMetadata = resolver.getTableAvroSchema(false);
- HoodieSchema hoodieSchemaWithoutMetadata = resolver.getTableSchema(false);
- assertNotNull(hoodieSchemaWithoutMetadata);
- assertEquals(avroSchemaWithoutMetadata,
hoodieSchemaWithoutMetadata.getAvroSchema());
- }
-
@Test
void testReadSchemaFromLogFile() throws IOException, URISyntaxException,
InterruptedException {
String testDir = initTestDir("read_schema_from_log_file");
StoragePath partitionPath = new StoragePath(testDir, "partition1");
HoodieSchema expectedSchema = getSimpleSchema();
StoragePath logFilePath = writeLogFile(partitionPath,
expectedSchema.toAvroSchema());
- assertEquals(expectedSchema.toAvroSchema(),
TableSchemaResolver.readSchemaFromLogFile(
+ assertEquals(expectedSchema, TableSchemaResolver.readSchemaFromLogFile(
HoodieStorageUtils.getStorage(new StoragePath(logFilePath.toString()),
HoodieTestUtils.getDefaultStorageConf()),
logFilePath));
}
@@ -194,8 +157,9 @@ class TestTableSchemaResolver {
try (MockedStatic<HoodieIOFactory> ioFactoryMockedStatic =
mockStatic(HoodieIOFactory.class);
MockedStatic<TableSchemaResolver> tableSchemaResolverMockedStatic =
mockStatic(TableSchemaResolver.class)) {
// return null for first parquet file to force iteration to inspect the
next file
- Schema schema = Schema.createRecord("test_schema", null,
"test_namespace", false);
- schema.setFields(Arrays.asList(new Schema.Field("int_field",
Schema.create(Schema.Type.INT)), new Schema.Field("_hoodie_operation",
Schema.create(Schema.Type.STRING))));
+ HoodieSchema schema = HoodieSchema.createRecord("test_schema", null,
"test_namespace", false,
+ Arrays.asList(HoodieSchemaField.of("int_field",
HoodieSchema.create(HoodieSchemaType.INT)),
+ HoodieSchemaField.of("_hoodie_operation",
HoodieSchema.create(HoodieSchemaType.STRING))));
// mock parquet file schema reading to return null for the first base
file to force iteration
HoodieIOFactory ioFactory = mock(HoodieIOFactory.class);
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
index 631065bba788..dd7cc6d17809 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
@@ -133,7 +133,7 @@ public class SchemaEvolutionContext {
json -> Option.ofNullable(new Schema.Parser().parse(json)));
if (avroSchemaOpt == null) {
// the code path should only be invoked in tests.
- return new TableSchemaResolver(this.metaClient).getTableAvroSchema();
+ return new
TableSchemaResolver(this.metaClient).getTableSchema().toAvroSchema();
}
return avroSchemaOpt.orElseThrow(() -> new HoodieValidationException("The
avro schema cache should always be set up together with the internal schema
cache"));
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
index 51477deffee6..a71695be427e 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
@@ -409,7 +409,7 @@ public class HoodieCombineHiveInputFormat<K extends
WritableComparable, V extend
for (String path : uniqTablePaths) {
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setBasePath(path).setConf(new
HadoopStorageConfiguration(job)).build();
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
- String avroSchema = schemaUtil.getTableAvroSchema().toString();
+ String avroSchema = schemaUtil.getTableSchema().toString();
Option<InternalSchema> internalSchema =
schemaUtil.getTableInternalSchemaFromCommitMetadata();
if (internalSchema.isPresent()) {
LOG.info("Set internal and avro schema cache with path: {}", path);
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
index 4d4d8a4da0e8..de352328973e 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
@@ -42,10 +43,10 @@ import org.apache.hudi.hadoop.RealtimeFileStatus;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.storage.StoragePathInfo;
-import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -419,14 +420,19 @@ public class HoodieMergeOnReadTableInputFormat extends
HoodieCopyOnWriteTableInp
}
TableSchemaResolver tableSchemaResolver = new
TableSchemaResolver(metaClient);
try {
- Schema schema = tableSchemaResolver.getTableAvroSchema();
- boolean isNonPartitionedKeyGen =
StringUtils.isNullOrEmpty(tableConfig.getPartitionFieldProp());
+ HoodieSchema schema = tableSchemaResolver.getTableSchema();
+ String partitionFieldProp = tableConfig.getPartitionFieldProp();
+ boolean isNonPartitionedKeyGen =
StringUtils.isNullOrEmpty(partitionFieldProp);
return Option.of(
new HoodieVirtualKeyInfo(
tableConfig.getRecordKeyFieldProp(),
- isNonPartitionedKeyGen ? Option.empty() :
Option.of(tableConfig.getPartitionFieldProp()),
- schema.getField(tableConfig.getRecordKeyFieldProp()).pos(),
- isNonPartitionedKeyGen ? Option.empty() :
Option.of(schema.getField(tableConfig.getPartitionFieldProp()).pos())));
+ isNonPartitionedKeyGen ? Option.empty() :
Option.of(partitionFieldProp),
+ schema.getField(tableConfig.getRecordKeyFieldProp())
+ .orElseThrow(() -> new HoodieSchemaException("Field: " +
tableConfig.getRecordKeyFieldProp() + " not found"))
+ .pos(),
+ isNonPartitionedKeyGen ? Option.empty() :
Option.of(schema.getField(partitionFieldProp)
+ .orElseThrow(() -> new HoodieSchemaException("Field: " +
partitionFieldProp + " not found"))
+ .pos())));
} catch (Exception exception) {
throw new HoodieException("Fetching table schema failed with exception
", exception);
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
index dcc002adfb6f..71c0ac4d379a 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
@@ -52,7 +52,7 @@ class BucketIndexSupport(spark: SparkSession,
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
}
- private lazy val avroSchema = new
TableSchemaResolver(metaClient).getTableAvroSchema(false)
+ private lazy val schema = new
TableSchemaResolver(metaClient).getTableSchema(false)
override def getIndexName: String = BucketIndexSupport.INDEX_NAME
@@ -133,7 +133,7 @@ class BucketIndexSupport(spark: SparkSession,
if (hashValuePairs.size != indexBucketHashFields.size) {
matchedBuckets.setUntil(numBuckets)
} else {
- val record = new GenericData.Record(avroSchema)
+ val record = new GenericData.Record(schema.toAvroSchema)
hashValuePairs.foreach(p => record.put(p.getKey, p.getValue))
val hoodieKey = keyGenerator.getKey(record)
matchedBuckets.set(BucketIdentifier.getBucketId(hoodieKey.getRecordKey,
indexBucketHashFieldsOpt.get, numBuckets))
@@ -153,7 +153,7 @@ class BucketIndexSupport(spark: SparkSession,
private def getBucketsBySingleHashFields(expr: Expression, bucketColumnName:
String, numBuckets: Int): BitSet = {
def getBucketNumber(attr: Attribute, v: Any): Int = {
- val record = new GenericData.Record(avroSchema)
+ val record = new GenericData.Record(schema.toAvroSchema)
record.put(attr.name, v)
val hoodieKey = keyGenerator.getKey(record)
BucketIdentifier.getBucketId(hoodieKey.getRecordKey,
indexBucketHashFieldsOpt.get, numBuckets)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
index e2c39c3938bb..1b40b0fe5701 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
@@ -50,7 +50,7 @@ object HoodieCLIUtils extends Logging {
val metaClient = HoodieTableMetaClient.builder().setBasePath(basePath)
.setConf(HadoopFSUtils.getStorageConf(sparkSession.sessionState.newHadoopConf())).build()
val schemaUtil = new TableSchemaResolver(metaClient)
- val schemaStr = schemaUtil.getTableAvroSchema(false).toString
+ val schemaStr = schemaUtil.getTableSchema(false).toString
// If tableName is provided, we need to add catalog props
val catalogProps = tableName match {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index f17d641edb11..38b22715f4c4 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -446,7 +446,7 @@ class HoodieSparkSqlWriterInternal {
}
// Issue the delete.
- val schemaStr = new
TableSchemaResolver(tableMetaClient).getTableAvroSchema(false).toString
+ val schemaStr = new
TableSchemaResolver(tableMetaClient).getTableSchema(false).toString
val client =
hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
schemaStr, path, tblName,
parameters.asJava))
@@ -683,8 +683,7 @@ class HoodieSparkSqlWriterInternal {
private def getLatestTableSchema(tableMetaClient: HoodieTableMetaClient,
schemaFromCatalog: Option[HoodieSchema]): Option[HoodieSchema] = {
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
-
toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false))
- .map(HoodieSchema.fromAvroSchema)
+ toScalaOption(tableSchemaResolver.getTableSchemaFromLatestCommit(false))
.orElse(schemaFromCatalog)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
index 4a79350b35c3..74b04503a563 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
@@ -233,7 +233,7 @@ class RunClusteringProcedure extends BaseProcedure
}
val tableSchemaResolver = new TableSchemaResolver(metaClient)
- val fields = tableSchemaResolver.getTableAvroSchema(false)
+ val fields = tableSchemaResolver.getTableSchema(false)
.getFields.asScala.map(_.name().toLowerCase)
orderColumns.split(",").foreach(col => {
if (!fields.contains(col.toLowerCase)) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
index 1ffdd0d80cad..5257f3c20bda 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
@@ -74,7 +74,7 @@ class ShowHoodieLogFileMetadataProcedure extends
BaseProcedure with ProcedureBui
logFilePath => {
val statuses = storage.listDirectEntries(new StoragePath(logFilePath))
val schema = TableSchemaResolver.readSchemaFromLogFile(storage, new
StoragePath(logFilePath))
- val reader = HoodieLogFormat.newReader(storage, new
HoodieLogFile(statuses.get(0).getPath), HoodieSchema.fromAvroSchema(schema))
+ val reader = HoodieLogFormat.newReader(storage, new
HoodieLogFile(statuses.get(0).getPath), schema)
// read the avro blocks
while (reader.hasNext) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
index 58c4fa1acb43..4b9e0e83c7d5 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
@@ -70,7 +70,7 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure
with ProcedureBuil
ValidationUtils.checkArgument(logFilePaths.nonEmpty, "There is no log
file")
val allRecords: java.util.List[IndexedRecord] = new
java.util.ArrayList[IndexedRecord]
if (merge) {
- val schema =
HoodieSchema.fromAvroSchema(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage,
new StoragePath(logFilePaths.last))))
+ val schema =
Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage, new
StoragePath(logFilePaths.last)))
val scanner = HoodieMergedLogRecordScanner.newBuilder
.withStorage(storage)
.withBasePath(basePath)
@@ -94,7 +94,7 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure
with ProcedureBuil
logFilePaths.toStream.takeWhile(_ => allRecords.size() < limit).foreach {
logFilePath => {
val schema =
Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage, new
StoragePath(logFilePath)))
- val reader = HoodieLogFormat.newReader(storage, new
HoodieLogFile(logFilePath), HoodieSchema.fromAvroSchema(schema))
+ val reader = HoodieLogFormat.newReader(storage, new
HoodieLogFile(logFilePath), schema)
while (reader.hasNext) {
val block = reader.next()
block match {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
index 72d0db584ae8..1a0c0c730741 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
@@ -320,7 +320,7 @@ public class TestHoodieClientMultiWriter extends
HoodieClientTestBase {
// Validate table schema in the end.
TableSchemaResolver r = new TableSchemaResolver(metaClient);
// Assert no table schema is defined.
- assertThrows(HoodieSchemaNotFoundException.class, () ->
r.getTableAvroSchema(false));
+ assertThrows(HoodieSchemaNotFoundException.class, () ->
r.getTableSchema(false));
}
// Start txn 002 altering table schema
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
index a35348f6e3aa..82750b5ab4b7 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
@@ -123,7 +123,6 @@ import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.spark.api.java.JavaRDD;
@@ -1402,7 +1401,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
HoodieStorage storage = table.getStorage();
for (HoodieLogFile logFile : logFiles) {
List<StoragePathInfo> pathInfoList =
storage.listDirectEntries(logFile.getPath());
- Schema writerSchema =
+ HoodieSchema writerSchema =
TableSchemaResolver.readSchemaFromLogFile(storage,
logFile.getPath());
if (writerSchema == null) {
// not a data block
@@ -1410,7 +1409,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
}
try (HoodieLogFormat.Reader logFileReader =
HoodieLogFormat.newReader(storage,
- new HoodieLogFile(pathInfoList.get(0).getPath()),
HoodieSchema.fromAvroSchema(writerSchema))) {
+ new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
while (logFileReader.hasNext()) {
HoodieLogBlock logBlock = logFileReader.next();
if (logBlock instanceof HoodieDataBlock) {
@@ -3953,7 +3952,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
private static void verifyMetadataColumnStatsRecords(HoodieStorage storage,
List<HoodieLogFile> logFiles) throws IOException {
for (HoodieLogFile logFile : logFiles) {
List<StoragePathInfo> pathInfoList =
storage.listDirectEntries(logFile.getPath());
- Schema writerSchema =
+ HoodieSchema writerSchema =
TableSchemaResolver.readSchemaFromLogFile(storage,
logFile.getPath());
if (writerSchema == null) {
// not a data block
@@ -3961,7 +3960,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
}
try (HoodieLogFormat.Reader logFileReader =
HoodieLogFormat.newReader(storage,
- new HoodieLogFile(pathInfoList.get(0).getPath()),
HoodieSchema.fromAvroSchema(writerSchema))) {
+ new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
while (logFileReader.hasNext()) {
HoodieLogBlock logBlock = logFileReader.next();
if (logBlock instanceof HoodieDataBlock) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index 475a5bf55d23..c3888baf7113 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -370,7 +370,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase
with SparkDatasetMix
assertEquals(10, spark.read.format("hudi").load(basePath).count())
metaClient.reloadActiveTimeline()
val tableSchemaResolver = new TableSchemaResolver(metaClient)
- val latestTableSchemaFromCommitMetadata =
tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false)
+ val latestTableSchemaFromCommitMetadata =
tableSchemaResolver.getTableSchemaFromLatestCommit(false)
if (failAndDoRollback) {
val updatesToFail = dataGen.generateUniqueUpdates("003", 3)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
index e4eb32814875..4593e21839a7 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
@@ -279,9 +279,9 @@ class TestTimeTravelQuery extends HoodieSparkClientTestBase
with ScalaAssertionS
.select("id", "name", "value", "version")
.take(1)(0)
assertEquals(Row(1, "a1", 10, 1000), result1)
- val schema1 = tableSchemaResolver.getTableAvroSchema(firstCommit)
- assertNull(schema1.getField("year"))
- assertNull(schema1.getField("month"))
+ val schema1 = tableSchemaResolver.getTableSchema(firstCommit)
+ assertTrue(schema1.getField("year").isEmpty)
+ assertTrue(schema1.getField("month").isEmpty)
// Query as of secondCommitTime
val result2 = spark.read.format("hudi")
@@ -290,9 +290,9 @@ class TestTimeTravelQuery extends HoodieSparkClientTestBase
with ScalaAssertionS
.select("id", "name", "value", "version", "year")
.take(1)(0)
assertEquals(Row(1, "a1", 12, 1001, "2022"), result2)
- val schema2 = tableSchemaResolver.getTableAvroSchema(secondCommit)
- assertNotNull(schema2.getField("year"))
- assertNull(schema2.getField("month"))
+ val schema2 = tableSchemaResolver.getTableSchema(secondCommit)
+ assertTrue(schema2.getField("year").isPresent)
+ assertTrue(schema2.getField("month").isEmpty)
// Query as of thirdCommitTime
val result3 = spark.read.format("hudi")
@@ -301,9 +301,9 @@ class TestTimeTravelQuery extends HoodieSparkClientTestBase
with ScalaAssertionS
.select("id", "name", "value", "version", "year", "month")
.take(1)(0)
assertEquals(Row(1, "a1", 13, 1002, "2022", "08"), result3)
- val schema3 = tableSchemaResolver.getTableAvroSchema(thirdCommit)
- assertNotNull(schema3.getField("year"))
- assertNotNull(schema3.getField("month"))
+ val schema3 = tableSchemaResolver.getTableSchema(thirdCommit)
+ assertTrue(schema3.getField("year").isPresent)
+ assertTrue(schema3.getField("month").isPresent)
}
@ParameterizedTest
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala
index 87d5ae5bb666..9a3af922bc2c 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala
@@ -301,7 +301,7 @@ class TestAlterTable extends HoodieSparkSqlTestBase {
def validateTableSchema(tablePath: String): Unit = {
val metaClient = createMetaClient(spark, tablePath)
- val schema = new TableSchemaResolver(metaClient).getTableAvroSchema(false)
+ val schema = new TableSchemaResolver(metaClient).getTableSchema(false)
assertFalse(schema.getFields.asScala.exists(f =>
HoodieRecord.HOODIE_META_COLUMNS.contains(f.name())),
"Metadata fields should be excluded from the table schema")
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
index 537a01e9ec80..5d592e213712 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
@@ -735,10 +735,10 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
.setConf(HoodieTestUtils.getDefaultStorageConf)
.build()
val schemaResolver = new TableSchemaResolver(metaClient)
- val tableSchema = schemaResolver.getTableAvroSchema(false)
+ val tableSchema = schemaResolver.getTableSchema(false)
val field = tableSchema.getField(fieldName)
- assertNotNull(field, s"$fieldName field should exist in table schema")
- val fieldType = field.schema()
+ assertTrue(field.isPresent, s"$fieldName field should exist in table
schema")
+ val fieldType = field.get().schema()
assertTrue(
fieldType.toString.contains(expectedType),
s"$fieldName field should be of type $expectedType, but got:
${fieldType.toString}"
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
index 0701305707b1..f8dc69285aab 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
@@ -445,10 +445,10 @@ public class HoodieHiveSyncClient extends
HoodieSyncClient {
@Override
public List<FieldSchema> getStorageFieldSchemas() {
try {
- return tableSchemaResolver.getTableAvroSchema(false)
+ return tableSchemaResolver.getTableSchema(false)
.getFields()
.stream()
- .map(f -> new FieldSchema(f.name(), f.schema().getType().getName(),
f.doc()))
+ .map(f -> new FieldSchema(f.name(),
f.schema().getType().toAvroType().getName(), f.doc()))
.collect(Collectors.toList());
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to get field schemas from
storage : ", e);
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
index ba7c21e0eb59..6dd1211f1d83 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -37,7 +38,6 @@ import
org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionS
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
-import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -310,7 +310,7 @@ public class HoodieCompactor {
private String getSchemaFromLatestInstant() throws Exception {
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
- Schema schema = schemaUtil.getTableAvroSchema(false);
+ HoodieSchema schema = schemaUtil.getTableSchema(false);
return schema.toString();
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 02fc43dd37fc..cbda5d4bf07a 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -1655,8 +1655,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
for (String logFilePathStr : logFilePathSet) {
HoodieLogFormat.Reader reader = null;
try {
- HoodieSchema readerSchema =
-
HoodieSchema.fromAvroSchema(TableSchemaResolver.readSchemaFromLogFile(storage,
new StoragePath(logFilePathStr)));
+ HoodieSchema readerSchema =
TableSchemaResolver.readSchemaFromLogFile(storage, new
StoragePath(logFilePathStr));
if (readerSchema == null) {
LOG.warn("Cannot read schema from log file {}. Skip the check as
it's likely being written by an inflight instant.", logFilePathStr);
continue;
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 1ee952d9d763..f181c4b44da5 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -76,7 +76,6 @@ import org.apache.hudi.utilities.transform.ChainedTransformer;
import org.apache.hudi.utilities.transform.ErrorTableAwareChainedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
-import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -646,7 +645,7 @@ public class UtilHelpers {
public static String getSchemaFromLatestInstant(HoodieTableMetaClient
metaClient) throws Exception {
TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
- Schema schema = schemaResolver.getTableAvroSchema(false);
+ HoodieSchema schema = schemaResolver.getTableSchema(false);
return schema.toString();
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index ab1c73c56c33..95c4b75f1181 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -50,6 +50,9 @@ import org.apache.hudi.common.model.PartialUpdateAvroPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchema.TimePrecision;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
@@ -126,7 +129,6 @@ import
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
@@ -660,11 +662,10 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
HoodieTestUtils.createMetaClient(storage, tableBasePath));
- Schema tableSchema = tableSchemaResolver.getTableAvroSchema(false);
+ HoodieSchema tableSchema = tableSchemaResolver.getTableSchema(false);
assertNotNull(tableSchema);
- Schema expectedSchema;
- expectedSchema = new Schema.Parser().parse(fs.open(new Path(basePath +
"/source_evolved.avsc")));
+ HoodieSchema expectedSchema = HoodieSchema.parse(fs.open(new Path(basePath
+ "/source_evolved.avsc")));
assertEquals(expectedSchema, tableSchema);
// clean up and reinit
@@ -698,8 +699,12 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
HoodieTestUtils.createMetaClient(storage, tableBasePath));
- Schema tableSchema = tableSchemaResolver.getTableAvroSchema(false);
- assertEquals("timestamp-millis",
tableSchema.getField("current_ts").schema().getLogicalType().getName());
+ HoodieSchema tableSchema = tableSchemaResolver.getTableSchema(false);
+ Option<HoodieSchemaField> currentTsFieldOpt =
tableSchema.getField("current_ts");
+ assertTrue(currentTsFieldOpt.isPresent());
+ HoodieSchema.Timestamp currentTsSchema = (HoodieSchema.Timestamp)
currentTsFieldOpt.get().schema();
+ assertEquals(HoodieSchemaType.TIMESTAMP, currentTsSchema.getType());
+ assertEquals(TimePrecision.MILLIS, currentTsSchema.getPrecision());
assertEquals(1000,
sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(tableBasePath).filter("current_ts
> '1980-01-01'").count());
cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
Collections.singletonList(TestIdentityTransformer.class.getName()),
@@ -718,8 +723,12 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
tableSchemaResolver = new TableSchemaResolver(
HoodieTestUtils.createMetaClient(storage, tableBasePath));
- tableSchema = tableSchemaResolver.getTableAvroSchema(false);
- assertEquals("timestamp-millis",
tableSchema.getField("current_ts").schema().getLogicalType().getName());
+ tableSchema = tableSchemaResolver.getTableSchema(false);
+ currentTsFieldOpt = tableSchema.getField("current_ts");
+ assertTrue(currentTsFieldOpt.isPresent());
+ currentTsSchema = (HoodieSchema.Timestamp)
currentTsFieldOpt.get().schema();
+ assertEquals(HoodieSchemaType.TIMESTAMP, currentTsSchema.getType());
+ assertEquals(TimePrecision.MILLIS, currentTsSchema.getPrecision());
sqlContext.clearCache();
assertEquals(1450,
sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(tableBasePath).filter("current_ts
> '1980-01-01'").count());
assertEquals(1450,
sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(tableBasePath).filter("current_ts
< '2080-01-01'").count());
@@ -759,7 +768,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
HoodieTestUtils.createMetaClient(storage, tableBasePath));
- Schema tableSchema = tableSchemaResolver.getTableAvroSchema(false);
+ HoodieSchema tableSchema = tableSchemaResolver.getTableSchema(false);
Map<String, String> hudiOpts = new HashMap<>();
hudiOpts.put("hoodie.datasource.write.recordkey.field", "id");
logicalAssertions(tableSchema, tableBasePath, hudiOpts,
HoodieTableVersion.current().versionCode());
@@ -779,7 +788,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
tableSchemaResolver = new TableSchemaResolver(
HoodieTestUtils.createMetaClient(storage, tableBasePath));
- tableSchema = tableSchemaResolver.getTableAvroSchema(false);
+ tableSchema = tableSchemaResolver.getTableSchema(false);
logicalAssertions(tableSchema, tableBasePath, hudiOpts,
HoodieTableVersion.current().versionCode());
} finally {
defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
@@ -824,7 +833,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
TestHelpers.assertCommitMetadata(topicName + ",0:500,1:500",
tableBasePath, 1);
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
HoodieTestUtils.createMetaClient(storage, tableBasePath));
- Schema tableSchema = tableSchemaResolver.getTableAvroSchema(false);
+ HoodieSchema tableSchema = tableSchemaResolver.getTableSchema(false);
Map<String, String> hudiOpts = new HashMap<>();
hudiOpts.put("hoodie.datasource.write.recordkey.field", "id");
logicalAssertions(tableSchema, tableBasePath, hudiOpts,
HoodieTableVersion.EIGHT.versionCode());
@@ -840,7 +849,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
TestHelpers.assertCommitMetadata(topicName + ",0:1000,1:1000",
tableBasePath, 2);
tableSchemaResolver = new TableSchemaResolver(
HoodieTestUtils.createMetaClient(storage, tableBasePath));
- tableSchema = tableSchemaResolver.getTableAvroSchema(false);
+ tableSchema = tableSchemaResolver.getTableSchema(false);
logicalAssertions(tableSchema, tableBasePath, hudiOpts,
HoodieTableVersion.EIGHT.versionCode());
} finally {
defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
@@ -882,7 +891,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
HoodieTestUtils.createMetaClient(storage, tableBasePath));
- Schema tableSchema = tableSchemaResolver.getTableAvroSchema(false);
+ HoodieSchema tableSchema = tableSchemaResolver.getTableSchema(false);
Map<String, String> hudiOpts = new HashMap<>();
hudiOpts.put("hoodie.datasource.write.recordkey.field", "id");
logicalAssertions(tableSchema, tableBasePath, hudiOpts,
version.versionCode());
@@ -908,34 +917,67 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
});
}
- private void logicalAssertions(Schema tableSchema, String tableBasePath,
Map<String, String> hudiOpts, int tableVersion) {
+ private void logicalAssertions(HoodieSchema tableSchema, String
tableBasePath, Map<String, String> hudiOpts, int tableVersion) {
if (tableVersion > 8) {
- assertEquals("timestamp-millis",
tableSchema.getField("ts_millis").schema().getLogicalType().getName());
+ Option<HoodieSchemaField> tsMillisFieldOpt =
tableSchema.getField("ts_millis");
+ assertTrue(tsMillisFieldOpt.isPresent());
+ HoodieSchema.Timestamp tsMillisFieldSchema = (HoodieSchema.Timestamp)
tsMillisFieldOpt.get().schema();
+ assertEquals(HoodieSchemaType.TIMESTAMP, tsMillisFieldSchema.getType());
+ assertEquals(TimePrecision.MILLIS, tsMillisFieldSchema.getPrecision());
+ assertTrue(tsMillisFieldSchema.isUtcAdjusted());
}
- assertEquals("timestamp-micros",
tableSchema.getField("ts_micros").schema().getLogicalType().getName());
+ Option<HoodieSchemaField> tsMicrosFieldOpt =
tableSchema.getField("ts_micros");
+ assertTrue(tsMicrosFieldOpt.isPresent());
+ HoodieSchema.Timestamp tsMicrosFieldSchema = (HoodieSchema.Timestamp)
tsMicrosFieldOpt.get().schema();
+ assertEquals(HoodieSchemaType.TIMESTAMP, tsMicrosFieldSchema.getType());
+ assertEquals(TimePrecision.MICROS, tsMicrosFieldSchema.getPrecision());
+ assertTrue(tsMicrosFieldSchema.isUtcAdjusted());
if (tableVersion > 8 && !HoodieSparkUtils.isSpark3_3()) {
- assertEquals("local-timestamp-millis",
tableSchema.getField("local_ts_millis").schema().getLogicalType().getName());
- assertEquals("local-timestamp-micros",
tableSchema.getField("local_ts_micros").schema().getLogicalType().getName());
+ Option<HoodieSchemaField> localTsMillisFieldOpt =
tableSchema.getField("local_ts_millis");
+ assertTrue(localTsMillisFieldOpt.isPresent());
+ HoodieSchema.Timestamp localTsMillisFieldSchema =
(HoodieSchema.Timestamp) localTsMillisFieldOpt.get().schema();
+ assertEquals(HoodieSchemaType.TIMESTAMP,
localTsMillisFieldSchema.getType());
+ assertEquals(TimePrecision.MILLIS,
localTsMillisFieldSchema.getPrecision());
+ assertFalse(localTsMillisFieldSchema.isUtcAdjusted());
+
+ Option<HoodieSchemaField> localTsMicrosFieldOpt =
tableSchema.getField("local_ts_micros");
+ assertTrue(localTsMicrosFieldOpt.isPresent());
+ HoodieSchema.Timestamp localTsMicrosFieldSchema =
(HoodieSchema.Timestamp) localTsMicrosFieldOpt.get().schema();
+ assertEquals(HoodieSchemaType.TIMESTAMP,
localTsMicrosFieldSchema.getType());
+ assertEquals(TimePrecision.MICROS,
localTsMicrosFieldSchema.getPrecision());
+ assertFalse(localTsMicrosFieldSchema.isUtcAdjusted());
}
-
- assertEquals("date",
tableSchema.getField("event_date").schema().getLogicalType().getName());
+ Option<HoodieSchemaField> eventDateFieldOpt =
tableSchema.getField("event_date");
+ assertTrue(eventDateFieldOpt.isPresent());
+ assertEquals(HoodieSchemaType.DATE,
eventDateFieldOpt.get().schema().getType());
if (tableVersion > 8) {
- assertEquals("bytes",
tableSchema.getField("dec_plain_large").schema().getType().getName());
- assertEquals("decimal",
tableSchema.getField("dec_plain_large").schema().getLogicalType().getName());
- assertEquals(20, ((LogicalTypes.Decimal)
tableSchema.getField("dec_plain_large").schema().getLogicalType()).getPrecision());
- assertEquals(10, ((LogicalTypes.Decimal)
tableSchema.getField("dec_plain_large").schema().getLogicalType()).getScale());
+ Option<HoodieSchemaField> decPlainLargeFieldOpt =
tableSchema.getField("dec_plain_large");
+ assertTrue(decPlainLargeFieldOpt.isPresent());
+ HoodieSchema.Decimal decPlainLargeSchema = (HoodieSchema.Decimal)
decPlainLargeFieldOpt.get().schema();
+ // decimal backed by bytes (are not fixed length byte arrays)
+ assertFalse(decPlainLargeSchema.isFixed());
+ assertEquals(HoodieSchemaType.DECIMAL, decPlainLargeSchema.getType());
+ assertEquals(20, decPlainLargeSchema.getPrecision());
+ assertEquals(10, decPlainLargeSchema.getScale());
}
- assertEquals("fixed",
tableSchema.getField("dec_fixed_small").schema().getType().getName());
- assertEquals(3,
tableSchema.getField("dec_fixed_small").schema().getFixedSize());
- assertEquals("decimal",
tableSchema.getField("dec_fixed_small").schema().getLogicalType().getName());
- assertEquals(5, ((LogicalTypes.Decimal)
tableSchema.getField("dec_fixed_small").schema().getLogicalType()).getPrecision());
- assertEquals(2, ((LogicalTypes.Decimal)
tableSchema.getField("dec_fixed_small").schema().getLogicalType()).getScale());
- assertEquals("fixed",
tableSchema.getField("dec_fixed_large").schema().getType().getName());
- assertEquals(8,
tableSchema.getField("dec_fixed_large").schema().getFixedSize());
- assertEquals("decimal",
tableSchema.getField("dec_fixed_large").schema().getLogicalType().getName());
- assertEquals(18, ((LogicalTypes.Decimal)
tableSchema.getField("dec_fixed_large").schema().getLogicalType()).getPrecision());
- assertEquals(9, ((LogicalTypes.Decimal)
tableSchema.getField("dec_fixed_large").schema().getLogicalType()).getScale());
+ Option<HoodieSchemaField> decFixedSmallOpt =
tableSchema.getField("dec_fixed_small");
+ assertTrue(decFixedSmallOpt.isPresent());
+ HoodieSchema.Decimal decFixedSmallSchema = (HoodieSchema.Decimal)
decFixedSmallOpt.get().schema();
+ assertTrue(decFixedSmallSchema.isFixed());
+ assertEquals(3, decFixedSmallSchema.getFixedSize());
+ assertEquals(HoodieSchemaType.DECIMAL, decFixedSmallSchema.getType());
+ assertEquals(5, decFixedSmallSchema.getPrecision());
+ assertEquals(2, decFixedSmallSchema.getScale());
+
+ Option<HoodieSchemaField> decFixedLargeOpt =
tableSchema.getField("dec_fixed_large");
+ assertTrue(decFixedLargeOpt.isPresent());
+ HoodieSchema.Decimal decFixedLargeSchema = (HoodieSchema.Decimal)
decFixedLargeOpt.get().schema();
+ assertTrue(decFixedLargeSchema.isFixed());
+ assertEquals(8, decFixedLargeSchema.getFixedSize());
+ assertEquals(HoodieSchemaType.DECIMAL, decFixedLargeSchema.getType());
+ assertEquals(18, decFixedLargeSchema.getPrecision());
+ assertEquals(9, decFixedLargeSchema.getScale());
sqlContext.clearCache();
Dataset<Row> df = sqlContext.read()
@@ -2292,7 +2334,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
// validate table schema fetches valid schema from last but one commit.
TableSchemaResolver tableSchemaResolver = new
TableSchemaResolver(metaClient);
- assertNotEquals(tableSchemaResolver.getTableAvroSchema(),
Schema.create(Schema.Type.NULL).toString());
+ assertNotEquals(tableSchemaResolver.getTableSchema(),
Schema.create(Schema.Type.NULL).toString());
// schema from latest commit and last but one commit should match
compareLatestTwoSchemas(metaClient);
prepareParquetDFSSource(useSchemaProvider, hasTransformer,
"source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
@@ -2848,7 +2890,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
// validate table schema fetches valid schema from last but one commit.
TableSchemaResolver tableSchemaResolver = new
TableSchemaResolver(metaClient);
- assertNotEquals(tableSchemaResolver.getTableAvroSchema(),
Schema.create(Schema.Type.NULL).toString());
+ assertNotEquals(tableSchemaResolver.getTableSchema(),
Schema.create(Schema.Type.NULL).toString());
// schema from latest commit and last but one commit should match
compareLatestTwoSchemas(metaClient);
prepareParquetDFSSource(useSchemaProvider, hasTransformer,
"source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
@@ -3201,7 +3243,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
// validate schema is set in commit even if target schema returns null on
empty batch
TableSchemaResolver tableSchemaResolver = new
TableSchemaResolver(metaClient);
HoodieInstant secondCommit =
metaClient.reloadActiveTimeline().lastInstant().get();
- Schema lastCommitSchema =
tableSchemaResolver.getTableAvroSchema(secondCommit, true);
+ HoodieSchema lastCommitSchema =
tableSchemaResolver.getTableSchema(secondCommit, true);
assertNotEquals(firstCommit, secondCommit);
assertNotEquals(lastCommitSchema, Schema.create(Schema.Type.NULL));
deltaStreamer2.shutdownGracefully();
@@ -3741,10 +3783,10 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
HoodieTestUtils.createMetaClient(storage, tableBasePath));
// get schema from data file written in the latest commit
- Schema tableSchema = tableSchemaResolver.getTableAvroSchemaFromDataFile();
+ HoodieSchema tableSchema =
tableSchemaResolver.getTableSchemaFromDataFile();
assertNotNull(tableSchema);
- List<String> tableFields =
tableSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
+ List<String> tableFields =
tableSchema.getFields().stream().map(HoodieSchemaField::name).collect(Collectors.toList());
// now assert that the partition column is not in the target schema
assertFalse(tableFields.contains("partition_path"));
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
index 828f0936f2f7..e2fc16dc6a2f 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
@@ -22,6 +22,7 @@ package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.TestHoodieSparkUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -338,8 +339,10 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
metaClient.reloadActiveTimeline();
Option<HoodieSchema> latestTableSchemaOpt =
UtilHelpers.getLatestTableSchema(jsc, storage,
dsConfig.targetBasePath, metaClient);
-
assertTrue(latestTableSchemaOpt.get().getField("rider").get().schema().getTypes()
- .stream().anyMatch(t -> t.getType() == HoodieSchemaType.STRING));
+ Option<HoodieSchemaField> riderFieldOpt =
latestTableSchemaOpt.get().getField("rider");
+ assertTrue(riderFieldOpt.isPresent());
+ assertTrue(riderFieldOpt.get().schema().getTypes()
+ .stream().anyMatch(t -> HoodieSchemaType.STRING == t.getType()));
assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant)
> 0);
}
@@ -413,8 +416,10 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
metaClient.reloadActiveTimeline();
Option<HoodieSchema> latestTableSchemaOpt =
UtilHelpers.getLatestTableSchema(jsc, storage,
dsConfig.targetBasePath, metaClient);
-
assertTrue(latestTableSchemaOpt.get().getField("rider").get().schema().getTypes()
- .stream().anyMatch(t -> t.getType() == HoodieSchemaType.STRING));
+ Option<HoodieSchemaField> riderFieldOpt =
latestTableSchemaOpt.get().getField("rider");
+ assertTrue(riderFieldOpt.isPresent());
+ assertTrue(riderFieldOpt.get().schema().getTypes()
+ .stream().anyMatch(t -> HoodieSchemaType.STRING == t.getType()));
assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant)
> 0);
} catch (MissingSchemaFieldException e) {
assertFalse(allowNullForDeletedCols || targetSchemaSameAsTableSchema);
@@ -493,8 +498,10 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
metaClient.reloadActiveTimeline();
Option<HoodieSchema> latestTableSchemaOpt =
UtilHelpers.getLatestTableSchema(jsc, storage,
dsConfig.targetBasePath, metaClient);
-
assertTrue(latestTableSchemaOpt.get().getField("rider").get().schema().getTypes()
- .stream().anyMatch(t -> t.getType() == HoodieSchemaType.STRING));
+ Option<HoodieSchemaField> riderFieldOpt =
latestTableSchemaOpt.get().getField("rider");
+ assertTrue(riderFieldOpt.isPresent());
+ assertTrue(riderFieldOpt.get().schema().getTypes()
+ .stream().anyMatch(t -> HoodieSchemaType.STRING == t.getType()));
assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant)
> 0);
} catch (Exception e) {
assertTrue(containsErrorMessage(e, "has no default value and is
non-nullable",
@@ -573,9 +580,10 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
metaClient.reloadActiveTimeline();
Option<HoodieSchema> latestTableSchemaOpt =
UtilHelpers.getLatestTableSchema(jsc, storage,
dsConfig.targetBasePath, metaClient);
-
assertTrue(latestTableSchemaOpt.get().getField("distance_in_meters").get().schema().getTypes()
- .stream().anyMatch(t -> t.getType() == HoodieSchemaType.DOUBLE),
-
latestTableSchemaOpt.get().getField("distance_in_meters").get().schema().toString());
+ Option<HoodieSchemaField> distanceInMetersFieldOpt =
latestTableSchemaOpt.get().getField("distance_in_meters");
+ assertTrue(distanceInMetersFieldOpt.isPresent());
+ assertTrue(distanceInMetersFieldOpt.get().schema().getTypes()
+ .stream().anyMatch(t -> HoodieSchemaType.DOUBLE == t.getType()));
assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant)
> 0);
} catch (Exception e) {
assertTrue(targetSchemaSameAsTableSchema);
@@ -661,8 +669,10 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
metaClient.reloadActiveTimeline();
Option<HoodieSchema> latestTableSchemaOpt =
UtilHelpers.getLatestTableSchema(jsc, storage,
dsConfig.targetBasePath, metaClient);
-
assertTrue(latestTableSchemaOpt.get().getField("current_ts").get().schema().getTypes()
- .stream().anyMatch(t -> t.getType() == HoodieSchemaType.LONG));
+ Option<HoodieSchemaField> currentTsFieldOpt =
latestTableSchemaOpt.get().getField("current_ts");
+ assertTrue(currentTsFieldOpt.isPresent());
+ assertTrue(currentTsFieldOpt.get().schema().getTypes()
+ .stream().anyMatch(t -> HoodieSchemaType.LONG == t.getType()));
assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant)
> 0);
}