This is an automated email from the ASF dual-hosted git repository. vhs pushed a commit to branch phase-18-HoodieAvroUtils-removal-p4 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 483d0a62f7661530d11ebd924b508bc1f2feaa85 Author: voon <[email protected]> AuthorDate: Sat Dec 27 19:06:51 2025 +0800 feat(schema): phase 18 - Remove HoodieAvroUtils usage (Part 4) --- .../hudi/execution/CopyOnWriteInsertHandler.java | 5 +- .../hudi/execution/HoodieLazyInsertIterable.java | 13 +++-- .../hudi/execution/ExplicitWriteHandler.java | 3 +- .../hudi/execution/FlinkLazyInsertIterable.java | 7 ++- .../run/strategy/JavaExecutionStrategy.java | 11 ++-- .../strategy/JavaSortAndSizeExecutionStrategy.java | 4 +- .../hudi/execution/JavaLazyInsertIterable.java | 7 ++- .../JavaCustomColumnsSortPartitioner.java | 7 ++- .../TestJavaBulkInsertInternalPartitioner.java | 2 +- .../MultipleSparkJobExecutionStrategy.java | 5 +- ...SparkJobConsistentHashingExecutionStrategy.java | 13 +++-- ...onsistentBucketClusteringExecutionStrategy.java | 3 +- .../SparkSingleFileSortExecutionStrategy.java | 5 +- .../SparkSortAndSizeExecutionStrategy.java | 5 +- .../client/utils/SparkMetadataWriterUtils.java | 8 ++- .../hudi/execution/SparkLazyInsertIterable.java | 13 +++-- .../bulkinsert/RDDBucketIndexPartitioner.java | 11 ++-- .../TestBoundedInMemoryExecutorInSpark.java | 6 +-- .../hudi/execution/TestBoundedInMemoryQueue.java | 14 ++--- .../hudi/execution/TestDisruptorMessageQueue.java | 6 +-- .../org/apache/hudi/common/util/SortUtils.java | 8 ++- .../hudi/metadata/HoodieTableMetadataUtil.java | 11 ++-- ...tOverwriteNonDefaultsWithLatestAvroPayload.java | 62 +++++++++++----------- .../org/apache/hudi/common/util/TestSortUtils.java | 3 +- .../common/util/collection/TestBitCaskDiskMap.java | 5 +- .../hudi/metadata/TestHoodieTableMetadataUtil.java | 8 +-- .../realtime/TestHoodieRealtimeRecordReader.java | 6 +-- .../utilities/schema/KafkaOffsetPostProcessor.java | 32 ++++++----- .../schema/SchemaProviderWithPostProcessor.java | 2 + .../DeleteSupportSchemaPostProcessor.java | 25 +++++---- .../DropColumnSchemaPostProcessor.java | 21 +++++--- .../add/AddPrimitiveColumnSchemaPostProcessor.java | 38 +++++++------ 32 files changed, 188 insertions(+), 181 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java index d51c259d7251..1a9183053501 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java @@ -21,7 +21,6 @@ package org.apache.hudi.execution; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.util.queue.HoodieConsumer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.execution.HoodieLazyInsertIterable.HoodieInsertValueGenResult; @@ -82,7 +81,7 @@ public class CopyOnWriteInsertHandler<T> String partitionPath = record.getPartitionPath(); // just skip the ignored record,do not make partitions on fs try { - if (record.shouldIgnore(genResult.schema, config.getProps())) { + if (record.shouldIgnore(genResult.schema.toAvroSchema(), config.getProps())) { numSkippedRecords++; return; } @@ -111,7 +110,7 @@ public class CopyOnWriteInsertHandler<T> record.getPartitionPath(), idPrefix, taskContextSupplier); handles.put(partitionPath, handle); } - handle.write(record, HoodieSchema.fromAvroSchema(genResult.schema), config.getProps()); + handle.write(record, genResult.schema, config.getProps()); } @Override diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java index 8225dff16ad6..8487a1f01ee2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java @@ -22,14 +22,13 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.utils.LazyIterableIterator; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.CreateHandleFactory; import org.apache.hudi.io.WriteHandleFactory; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.util.ExecutorFactory; -import org.apache.avro.Schema; - import java.util.Iterator; import java.util.List; import java.util.function.Function; @@ -76,9 +75,9 @@ public abstract class HoodieLazyInsertIterable<T> // Used for caching HoodieRecord along with insertValue. We need this to offload computation work to buffering thread. public static class HoodieInsertValueGenResult<R extends HoodieRecord> { private final R record; - public final Schema schema; + public final HoodieSchema schema; - public HoodieInsertValueGenResult(R record, Schema schema) { + public HoodieInsertValueGenResult(R record, HoodieSchema schema) { this.record = record; this.schema = schema; } @@ -92,13 +91,13 @@ public abstract class HoodieLazyInsertIterable<T> * Transformer function to help transform a HoodieRecord. This transformer is used by BufferedIterator to offload some * expensive operations of transformation to the reader thread. */ - public <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getTransformer(Schema schema, + public <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getTransformer(HoodieSchema schema, HoodieWriteConfig writeConfig) { return getTransformerInternal(schema, writeConfig); } - public static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getTransformerInternal(Schema schema, - HoodieWriteConfig writeConfig) { + public static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getTransformerInternal(HoodieSchema schema, + HoodieWriteConfig writeConfig) { // NOTE: Whether record have to be cloned here is determined based on the executor type used // for writing: executors relying on an inner queue, will be keeping references to the records // and therefore in the environments where underlying buffer holding the record could be diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java index 7c5482a735ba..0b6f04002f42 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java @@ -20,7 +20,6 @@ package org.apache.hudi.execution; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.util.queue.HoodieConsumer; import org.apache.hudi.io.HoodieWriteHandle; @@ -46,7 +45,7 @@ public class ExplicitWriteHandler<T> @Override public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> genResult) { final HoodieRecord insertPayload = genResult.getResult(); - handle.write(insertPayload, HoodieSchema.fromAvroSchema(genResult.schema), this.handle.getConfig().getProps()); + handle.write(insertPayload, genResult.schema, this.handle.getConfig().getProps()); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java index 00f17b0d48c4..dbeb24c550ec 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java @@ -18,10 +18,11 @@ package org.apache.hudi.execution; -import org.apache.hudi.avro.AvroSchemaCache; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaCache; import org.apache.hudi.common.util.queue.HoodieExecutor; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -30,8 +31,6 @@ import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.util.ExecutorFactory; -import org.apache.avro.Schema; - import java.util.Iterator; import java.util.List; @@ -60,7 +59,7 @@ public class FlinkLazyInsertIterable<T> extends HoodieLazyInsertIterable<T> { // Executor service used for launching writer thread. HoodieExecutor<List<WriteStatus>> executor = null; try { - Schema schema = AvroSchemaCache.intern(new Schema.Parser().parse(hoodieConfig.getSchema())); + HoodieSchema schema = HoodieSchemaCache.intern(HoodieSchema.parse(hoodieConfig.getSchema())); executor = ExecutorFactory.create(hoodieConfig, inputItr, getExplicitInsertHandler(), getTransformer(schema, hoodieConfig)); final List<WriteStatus> result = executor.execute(); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java index c8b2b76a5952..e5ec83557c56 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java @@ -19,7 +19,6 @@ package org.apache.hudi.client.clustering.run.strategy; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieClusteringGroup; import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.client.WriteStatus; @@ -32,6 +31,8 @@ import org.apache.hudi.common.model.ClusteringOperation; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.config.HoodieWriteConfig; @@ -97,7 +98,7 @@ public abstract class JavaExecutionStrategy<T> */ public abstract List<WriteStatus> performClusteringWithRecordList( final List<HoodieRecord<T>> inputRecords, final int numOutputGroups, final String instantTime, - final Map<String, String> strategyParams, final Schema schema, + final Map<String, String> strategyParams, final HoodieSchema schema, final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata); /** @@ -107,11 +108,11 @@ public abstract class JavaExecutionStrategy<T> * @param schema Schema of the data including metadata fields. * @return partitioner for the java engine */ - protected BulkInsertPartitioner<List<HoodieRecord<T>>> getPartitioner(Map<String, String> strategyParams, Schema schema) { + protected BulkInsertPartitioner<List<HoodieRecord<T>>> getPartitioner(Map<String, String> strategyParams, HoodieSchema schema) { if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) { return new JavaCustomColumnsSortPartitioner( strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","), - HoodieAvroUtils.addMetadataFields(schema), getWriteConfig()); + HoodieSchemaUtils.addMetadataFields(schema), getWriteConfig()); } else { return JavaBulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode()); } @@ -124,7 +125,7 @@ public abstract class JavaExecutionStrategy<T> HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams, boolean preserveHoodieMetadata, String instantTime) { List<HoodieRecord<T>> inputRecords = readRecordsForGroup(clusteringGroup, instantTime); - Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema())); + HoodieSchema readerSchema = HoodieSchemaUtils.addMetadataFields(HoodieSchema.parse(getWriteConfig().getSchema())); List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream() .map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId())) .collect(Collectors.toList()); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java index ffc815fd48f2..22d2b4eb53a6 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java @@ -25,13 +25,13 @@ import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.CreateHandleFactory; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.JavaBulkInsertHelper; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; import java.util.List; import java.util.Map; @@ -54,7 +54,7 @@ public class JavaSortAndSizeExecutionStrategy<T> @Override public List<WriteStatus> performClusteringWithRecordList( final List<HoodieRecord<T>> inputRecords, final int numOutputGroups, - final String instantTime, final Map<String, String> strategyParams, final Schema schema, + final String instantTime, final Map<String, String> strategyParams, final HoodieSchema schema, final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata) { log.info("Starting clustering for a group, parallelism: {} commit: {}", numOutputGroups, instantTime); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java index 0a88fd62f71a..f00625ef3cd5 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java @@ -18,10 +18,11 @@ package org.apache.hudi.execution; -import org.apache.hudi.avro.AvroSchemaCache; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaCache; import org.apache.hudi.common.util.queue.HoodieExecutor; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -29,8 +30,6 @@ import org.apache.hudi.io.WriteHandleFactory; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.util.ExecutorFactory; -import org.apache.avro.Schema; - import java.util.Iterator; import java.util.List; @@ -64,7 +63,7 @@ public class JavaLazyInsertIterable<T> extends HoodieLazyInsertIterable<T> { HoodieExecutor<List<WriteStatus>> executor = null; try { - final Schema schema = AvroSchemaCache.intern(new Schema.Parser().parse(hoodieConfig.getSchema())); + final HoodieSchema schema = HoodieSchemaCache.intern(HoodieSchema.parse(hoodieConfig.getSchema())); executor = ExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), getTransformer(schema, hoodieConfig)); final List<WriteStatus> result = executor.execute(); checkState(result != null && !result.isEmpty()); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java index 54f0c94ff069..1faf0ad45d75 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java @@ -20,13 +20,12 @@ package org.apache.hudi.execution.bulkinsert; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.util.SortUtils; import org.apache.hudi.common.util.collection.FlatLists; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.BulkInsertPartitioner; -import org.apache.avro.Schema; - import java.util.List; import java.util.stream.Collectors; @@ -41,11 +40,11 @@ public class JavaCustomColumnsSortPartitioner<T> implements BulkInsertPartitioner<List<HoodieRecord<T>>> { private final String[] sortColumnNames; - private final Schema schema; + private final HoodieSchema schema; private final boolean consistentLogicalTimestampEnabled; private final boolean suffixRecordKey; - public JavaCustomColumnsSortPartitioner(String[] columnNames, Schema schema, HoodieWriteConfig config) { + public JavaCustomColumnsSortPartitioner(String[] columnNames, HoodieSchema schema, HoodieWriteConfig config) { this.sortColumnNames = columnNames; this.schema = schema; this.consistentLogicalTimestampEnabled = config.isConsistentLogicalTimestampEnabled(); diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java index ff8909b85ddc..2dff037f4472 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java @@ -70,7 +70,7 @@ public class TestJavaBulkInsertInternalPartitioner extends HoodieJavaClientTestH cfg.setValue(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME, "partition_path"); cfg.setValue(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED, "false"); testBulkInsertInternalPartitioner( - new JavaCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA, cfg), + new JavaCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.HOODIE_SCHEMA, cfg), records, true, generatePartitionNumRecords(records), Option.of(columnComparator)); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 69d50c1367ca..928f0c3abee3 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -20,7 +20,6 @@ package org.apache.hudi.client.clustering.run.strategy; import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.SparkAdapterSupport$; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieClusteringGroup; import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.client.SparkTaskContextSupplier; @@ -172,7 +171,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T> final int numOutputGroups, final String instantTime, final Map<String, String> strategyParams, - final Schema schema, + final HoodieSchema schema, final List<HoodieFileGroupId> fileGroupIdList, final boolean shouldPreserveHoodieMetadata, final Map<String, String> extraMetadata); @@ -230,7 +229,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T> return CompletableFuture.supplyAsync(() -> { JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext()); HoodieData<HoodieRecord<T>> inputRecords = readRecordsForGroup(jsc, clusteringGroup, instantTime); - Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema())); + HoodieSchema readerSchema = HoodieSchemaUtils.addMetadataFields(HoodieSchema.parse(getWriteConfig().getSchema())); // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of // it since these records will be shuffled later. diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java index c753d83d3462..173cd5e477f3 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java @@ -18,7 +18,6 @@ package org.apache.hudi.client.clustering.run.strategy; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.utils.LazyConcatenatingIterator; @@ -33,6 +32,7 @@ import org.apache.hudi.common.model.ConsistentHashingNode; import org.apache.hudi.common.model.HoodieConsistentHashingMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; @@ -50,7 +50,6 @@ import org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucket import org.apache.hudi.util.ExecutorFactory; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; import java.util.ArrayList; import java.util.Collections; @@ -72,13 +71,13 @@ import java.util.function.Supplier; public class SingleSparkJobConsistentHashingExecutionStrategy<T> extends SingleSparkJobExecutionStrategy<T> { private final String indexKeyFields; - private final Schema readerSchema; + private final HoodieSchema readerSchema; public SingleSparkJobConsistentHashingExecutionStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { super(table, engineContext, writeConfig); this.indexKeyFields = table.getConfig().getBucketIndexHashField(); - this.readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getSchema())); + this.readerSchema = HoodieSchemaUtils.addMetadataFields(HoodieSchema.parse(writeConfig.getSchema())); } @Override @@ -148,10 +147,10 @@ public class SingleSparkJobConsistentHashingExecutionStrategy<T> extends SingleS private final boolean recordsSorted; private final Map<String/*fileIdPrefix*/, HoodieWriteHandle> writeHandles; private final Function<HoodieRecord, String> fileIdPrefixExtractor; - private final Schema schema; + private final HoodieSchema schema; public InsertHandler(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, TaskContextSupplier taskContextSupplier, - WriteHandleFactory writeHandleFactory, boolean recordsSorted, Function<HoodieRecord, String> fileIdPrefixExtractor, Schema schema) { + WriteHandleFactory writeHandleFactory, boolean recordsSorted, Function<HoodieRecord, String> fileIdPrefixExtractor, HoodieSchema schema) { this.config = config; this.instantTime = instantTime; this.hoodieTable = hoodieTable; @@ -176,7 +175,7 @@ public class SingleSparkJobConsistentHashingExecutionStrategy<T> extends SingleS handle = writeHandleFactory.create(config, instantTime, hoodieTable, record.getPartitionPath(), fileIdPrefix, taskContextSupplier); writeHandles.put(fileIdPrefix, handle); } - handle.write(record, HoodieSchema.fromAvroSchema(schema), config.getProps()); + handle.write(record, schema, config.getProps()); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java index 3b63e84bda07..379f2cb4b858 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java @@ -37,7 +37,6 @@ import org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucket import org.apache.hudi.table.action.commit.SparkBulkInsertHelper; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -84,7 +83,7 @@ public class SparkConsistentBucketClusteringExecutionStrategy<T extends HoodieRe @Override public HoodieData<WriteStatus> performClusteringWithRecordsRDD(HoodieData<HoodieRecord<T>> inputRecords, int numOutputGroups, String instantTime, - Map<String, String> strategyParams, Schema schema, List<HoodieFileGroupId> fileGroupIdList, + Map<String, String> strategyParams, HoodieSchema schema, List<HoodieFileGroupId> fileGroupIdList, boolean preserveHoodieMetadata, Map<String, String> extraMetadata) { log.info("Starting clustering for a group, parallelism:{} commit:{}", numOutputGroups, instantTime); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java index 6f201e5b4c70..ea778ce4fc90 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java @@ -35,7 +35,6 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.SparkBulkInsertHelper; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -89,7 +88,7 @@ public class SparkSingleFileSortExecutionStrategy<T> int numOutputGroups, String instantTime, Map<String, String> strategyParams, - Schema schema, + HoodieSchema schema, List<HoodieFileGroupId> fileGroupIdList, boolean shouldPreserveHoodieMetadata, Map<String, String> extraMetadata) { @@ -105,7 +104,7 @@ public class SparkSingleFileSortExecutionStrategy<T> newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(Long.MAX_VALUE)); return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, - false, getRDDPartitioner(strategyParams, HoodieSchema.fromAvroSchema(schema)), true, numOutputGroups, + false, getRDDPartitioner(strategyParams, schema), true, numOutputGroups, new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), shouldPreserveHoodieMetadata)); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java index d66691d77fdd..05d9b23a8934 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java @@ -33,7 +33,6 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.SparkBulkInsertHelper; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -82,7 +81,7 @@ public class SparkSortAndSizeExecutionStrategy<T> final int numOutputGroups, final String instantTime, final Map<String, String> strategyParams, - final Schema schema, + final HoodieSchema schema, final List<HoodieFileGroupId> fileGroupIdList, final boolean shouldPreserveHoodieMetadata, final Map<String, String> extraMetadata) { @@ -95,6 +94,6 @@ public class SparkSortAndSizeExecutionStrategy<T> newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes())); return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), - newConfig, false, getRDDPartitioner(strategyParams, HoodieSchema.fromAvroSchema(schema)), true, numOutputGroups, new CreateHandleFactory(shouldPreserveHoodieMetadata)); + newConfig, false, getRDDPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(shouldPreserveHoodieMetadata)); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java index 9b1f36908b59..6cfec6eff675 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java @@ -72,7 +72,6 @@ import org.apache.hudi.storage.StoragePath; import org.apache.hudi.util.JavaScalaConverters; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; import org.apache.spark.api.java.function.FlatMapGroupsFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Column; @@ -104,7 +103,6 @@ import java.util.stream.Stream; import scala.collection.immutable.Seq; -import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields; import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; import static org.apache.hudi.common.util.ValidationUtils.checkState; @@ -385,14 +383,14 @@ public class SparkMetadataWriterUtils { HoodieIndexDefinition indexDefinition = HoodieTableMetadataUtil.getHoodieIndexDefinition(indexPartition, dataMetaClient); List<String> columnsToIndex = Collections.singletonList(indexDefinition.getSourceFields().get(0)); try { - Option<Schema> writerSchema = + Option<HoodieSchema> writerSchema = Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY)) .flatMap(writerSchemaStr -> isNullOrEmpty(writerSchemaStr) ? Option.empty() - : Option.of(new Schema.Parser().parse(writerSchemaStr))); + : Option.of(HoodieSchema.parse(writerSchemaStr))); HoodieTableConfig tableConfig = dataMetaClient.getTableConfig(); - HoodieSchema tableSchema = writerSchema.map(schema -> tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema).map(HoodieSchema::fromAvroSchema) + HoodieSchema tableSchema = writerSchema.map(schema -> tableConfig.populateMetaFields() ? HoodieSchemaUtils.addMetadataFields(schema) : schema) .orElseThrow(() -> new IllegalStateException(String.format("Expected writer schema in commit metadata %s", commitMetadata))); List<Pair<String, HoodieSchema>> columnsToIndexSchemaMap = columnsToIndex.stream() .map(columnToIndex -> HoodieSchemaUtils.getNestedField(tableSchema, columnToIndex)) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java index 45e29a18110d..6c2c34683dff 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java @@ -18,11 +18,12 @@ package org.apache.hudi.execution; -import org.apache.hudi.avro.AvroSchemaCache; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaCache; +import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.util.queue.HoodieExecutor; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -30,8 +31,6 @@ import org.apache.hudi.io.WriteHandleFactory; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.util.ExecutorFactory; -import org.apache.avro.Schema; - import java.util.Iterator; import java.util.List; @@ -69,11 +68,11 @@ public class SparkLazyInsertIterable<T> extends HoodieLazyInsertIterable<T> { HoodieExecutor<List<WriteStatus>> bufferedIteratorExecutor = null; try { // config.getSchema is not canonicalized, while config.getWriteSchema is canonicalized. So, we have to use the canonicalized schema to read the existing data. - Schema schema = new Schema.Parser().parse(hoodieConfig.getWriteSchema()); + HoodieSchema schema = HoodieSchema.parse(hoodieConfig.getWriteSchema()); if (useWriterSchema) { - schema = HoodieAvroUtils.addMetadataFields(schema); + schema = HoodieSchemaUtils.addMetadataFields(schema); } - schema = AvroSchemaCache.intern(schema); + schema = HoodieSchemaCache.intern(schema); bufferedIteratorExecutor = ExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), getTransformer(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable()); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java index fdbfae12ce32..92404d259d0c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java @@ -19,15 +19,14 @@ package org.apache.hudi.execution.bulkinsert; import org.apache.hudi.SparkAdapterSupport$; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.common.config.SerializableSchema; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.util.collection.FlatLists; import org.apache.hudi.table.BucketIndexBulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; -import org.apache.avro.Schema; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.spark.Partitioner; @@ -87,12 +86,12 @@ public abstract class RDDBucketIndexPartitioner<T> extends BucketIndexBulkInsert */ private JavaRDD<HoodieRecord<T>> doPartitionAndCustomColumnSort(JavaRDD<HoodieRecord<T>> records, Partitioner partitioner) { final String[] sortColumns = sortColumnNames; - final SerializableSchema schema = new SerializableSchema(HoodieAvroUtils.addMetadataFields((new Schema.Parser().parse(table.getConfig().getSchema())))); + final HoodieSchema schema = HoodieSchemaUtils.addMetadataFields(HoodieSchema.parse(table.getConfig().getSchema())); Comparator<HoodieRecord<T>> comparator = (Comparator<HoodieRecord<T>> & Serializable) (t1, t2) -> { FlatLists.ComparableList obj1 = FlatLists.ofComparableArray(utf8StringFactory.wrapArrayOfObjects( - t1.getColumnValues(schema.get(), sortColumns, consistentLogicalTimestampEnabled))); + t1.getColumnValues(schema.toAvroSchema(), sortColumns, consistentLogicalTimestampEnabled))); FlatLists.ComparableList obj2 = FlatLists.ofComparableArray(utf8StringFactory.wrapArrayOfObjects( - t2.getColumnValues(schema.get(), sortColumns, consistentLogicalTimestampEnabled))); + t2.getColumnValues(schema.toAvroSchema(), sortColumns, consistentLogicalTimestampEnabled))); return obj1.compareTo(obj2); }; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java index 9035c10a83fe..deef0929bced 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java @@ -96,7 +96,7 @@ public class TestBoundedInMemoryExecutorInSpark extends HoodieSparkClientTestHar BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor = null; try { executor = new BoundedInMemoryExecutor(writeConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer, - getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), getPreExecuteRunnable()); + getTransformerInternal(HoodieTestDataGenerator.HOODIE_SCHEMA, writeConfig), getPreExecuteRunnable()); int result = executor.execute(); assertEquals(100, result); @@ -136,7 +136,7 @@ public class TestBoundedInMemoryExecutorInSpark extends HoodieSparkClientTestHar BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor = new BoundedInMemoryExecutor(writeConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer, - getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), getPreExecuteRunnable()); + getTransformerInternal(HoodieTestDataGenerator.HOODIE_SCHEMA, writeConfig), getPreExecuteRunnable()); // Interrupt the current thread (therefore triggering executor to throw as soon as it // invokes [[get]] on the [[CompletableFuture]]) @@ -179,7 +179,7 @@ public class TestBoundedInMemoryExecutorInSpark extends HoodieSparkClientTestHar BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor = new BoundedInMemoryExecutor(writeConfig.getWriteBufferLimitBytes(), unboundedRecordIter, - consumer, getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), + consumer, getTransformerInternal(HoodieTestDataGenerator.HOODIE_SCHEMA, writeConfig), getPreExecuteRunnable()); executor.shutdownNow(); boolean terminatedGracefully = executor.awaitTermination(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java index ab1535140b4b..7f098eea5f59 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java @@ -92,7 +92,7 @@ public class TestBoundedInMemoryQueue extends HoodieSparkClientTestHarness { final int numRecords = 128; final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, numRecords); final BoundedInMemoryQueue<HoodieRecord, HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue = - new BoundedInMemoryQueue(FileIOUtils.KB, getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig)); + new BoundedInMemoryQueue(FileIOUtils.KB, getTransformerInternal(HoodieTestDataGenerator.HOODIE_SCHEMA, writeConfig)); // Produce Future<Boolean> resFuture = executorService.submit(() -> { new IteratorBasedQueueProducer<>(hoodieRecords.iterator()).produce(queue); @@ -130,7 +130,7 @@ public class TestBoundedInMemoryQueue extends HoodieSparkClientTestHarness { final List<List<HoodieRecord>> recs = new ArrayList<>(); final BoundedInMemoryQueue<HoodieRecord, HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue = - new BoundedInMemoryQueue(FileIOUtils.KB, getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig)); + new BoundedInMemoryQueue(FileIOUtils.KB, getTransformerInternal(HoodieTestDataGenerator.HOODIE_SCHEMA, writeConfig)); // Record Key to <Producer Index, Rec Index within a producer> Map<String, Tuple2<Integer, Integer>> keyToProducerAndIndexMap = new HashMap<>(); @@ -225,11 +225,11 @@ public class TestBoundedInMemoryQueue extends HoodieSparkClientTestHarness { final int recordLimit = 5; final SizeEstimator<HoodieLazyInsertIterable.HoodieInsertValueGenResult> sizeEstimator = new DefaultSizeEstimator<>(); HoodieLazyInsertIterable.HoodieInsertValueGenResult genResult = - getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig).apply(hoodieRecords.get(0)); + getTransformerInternal(HoodieTestDataGenerator.HOODIE_SCHEMA, writeConfig).apply(hoodieRecords.get(0)); final long objSize = sizeEstimator.sizeEstimate(genResult); final long memoryLimitInBytes = recordLimit * objSize; final BoundedInMemoryQueue<HoodieRecord, HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue = - new BoundedInMemoryQueue(memoryLimitInBytes, getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig)); + new BoundedInMemoryQueue(memoryLimitInBytes, getTransformerInternal(HoodieTestDataGenerator.HOODIE_SCHEMA, writeConfig)); // Produce executorService.submit(() -> { @@ -274,7 +274,7 @@ public class TestBoundedInMemoryQueue extends HoodieSparkClientTestHarness { final SizeEstimator<Tuple2<HoodieRecord, Option<IndexedRecord>>> sizeEstimator = new DefaultSizeEstimator<>(); // queue memory limit HoodieLazyInsertIterable.HoodieInsertValueGenResult genResult = - getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig).apply(hoodieRecords.get(0)); + getTransformerInternal(HoodieTestDataGenerator.HOODIE_SCHEMA, writeConfig).apply(hoodieRecords.get(0)); final long objSize = sizeEstimator.sizeEstimate(new Tuple2(genResult.getResult(), genResult.getResult().toIndexedRecord(HoodieTestDataGenerator.AVRO_SCHEMA, new Properties()))); final long memoryLimitInBytes = 4 * objSize; @@ -282,7 +282,7 @@ public class TestBoundedInMemoryQueue extends HoodieSparkClientTestHarness { // stops and throws // correct exception back. BoundedInMemoryQueue<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>> queue1 = - new BoundedInMemoryQueue(memoryLimitInBytes, getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig)); + new BoundedInMemoryQueue(memoryLimitInBytes, getTransformerInternal(HoodieTestDataGenerator.HOODIE_SCHEMA, writeConfig)); // Produce Future<Boolean> resFuture = executorService.submit(() -> { @@ -310,7 +310,7 @@ public class TestBoundedInMemoryQueue extends HoodieSparkClientTestHarness { when(mockHoodieRecordsIterator.hasNext()).thenReturn(true); when(mockHoodieRecordsIterator.next()).thenThrow(expectedException); BoundedInMemoryQueue<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>> queue2 = - new BoundedInMemoryQueue(memoryLimitInBytes, getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig)); + new BoundedInMemoryQueue(memoryLimitInBytes, getTransformerInternal(HoodieTestDataGenerator.HOODIE_SCHEMA, writeConfig)); // Produce Future<Boolean> res = executorService.submit(() -> { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java index d796ffde358a..30a9ab3f63b9 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java @@ -128,7 +128,7 @@ public class TestDisruptorMessageQueue extends HoodieSparkClientTestHarness { try { exec = new DisruptorExecutor(writeConfig.getWriteExecutorDisruptorWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer, - getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable()); + getTransformerInternal(HoodieTestDataGenerator.HOODIE_SCHEMA, writeConfig), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable()); int result = exec.execute(); // It should buffer and write 100 records assertEquals(100, result); @@ -157,7 +157,7 @@ public class TestDisruptorMessageQueue extends HoodieSparkClientTestHarness { final List<List<HoodieRecord>> recs = new ArrayList<>(); final DisruptorMessageQueue<HoodieRecord, HoodieInsertValueGenResult> queue = - new DisruptorMessageQueue(1024, getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), + new DisruptorMessageQueue(1024, getTransformerInternal(HoodieTestDataGenerator.HOODIE_SCHEMA, writeConfig), "BLOCKING_WAIT", numProducers, new Runnable() { @Override public void run() { @@ -305,7 +305,7 @@ public class TestDisruptorMessageQueue extends HoodieSparkClientTestHarness { }; DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec = new DisruptorExecutor(1024, - producers, consumer, getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), + producers, consumer, getTransformerInternal(HoodieTestDataGenerator.HOODIE_SCHEMA, writeConfig), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable()); final Throwable thrown = assertThrows(HoodieException.class, exec::execute, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SortUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SortUtils.java index c043f7a2fef0..23c9595d6f31 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/SortUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SortUtils.java @@ -23,8 +23,6 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.util.collection.FlatLists; -import org.apache.avro.Schema; - import java.util.function.Function; /** @@ -58,12 +56,12 @@ public class SortUtils { public static FlatLists.ComparableList<Comparable<HoodieRecord>> getComparableSortColumns( HoodieRecord record, String[] sortColumnNames, - Schema schema, + HoodieSchema schema, boolean suffixRecordKey, boolean consistentLogicalTimestampEnabled ) { if (record.getRecordType() == HoodieRecord.HoodieRecordType.SPARK) { - Object[] columnValues = record.getColumnValues(schema, sortColumnNames, consistentLogicalTimestampEnabled); + Object[] columnValues = record.getColumnValues(schema.toAvroSchema(), sortColumnNames, consistentLogicalTimestampEnabled); if (suffixRecordKey) { return FlatLists.ofComparableArray( prependPartitionPathAndSuffixRecordKey(record.getPartitionPath(), record.getRecordKey(), columnValues)); @@ -72,7 +70,7 @@ public class SortUtils { } else if (record.getRecordType() == HoodieRecord.HoodieRecordType.AVRO) { return FlatLists.ofComparableArray( HoodieAvroUtils.getSortColumnValuesWithPartitionPathAndRecordKey( - record, sortColumnNames, schema, suffixRecordKey, consistentLogicalTimestampEnabled + record, sortColumnNames, schema.toAvroSchema(), suffixRecordKey, consistentLogicalTimestampEnabled )); } throw new IllegalArgumentException("Invalid recordType" + record.getRecordType()); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 7fa98d0c6ebb..78c385db925e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -162,8 +162,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static java.util.stream.Collectors.toList; -import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields; -import static org.apache.hudi.avro.HoodieAvroUtils.projectSchema; import static org.apache.hudi.common.config.HoodieCommonConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES; import static org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED; import static org.apache.hudi.common.config.HoodieCommonConfig.MAX_MEMORY_FOR_COMPACTION; @@ -1597,20 +1595,19 @@ public class HoodieTableMetadataUtil { public static Map<String, HoodieSchema> getColumnsToIndex(HoodieCommitMetadata commitMetadata, HoodieTableMetaClient dataMetaClient, HoodieMetadataConfig metadataConfig, Option<HoodieRecordType> recordTypeOpt) { - Option<Schema> writerSchema = + Option<HoodieSchema> writerSchema = Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY)) .flatMap(writerSchemaStr -> isNullOrEmpty(writerSchemaStr) ? Option.empty() - : Option.of(new Schema.Parser().parse(writerSchemaStr))); + : Option.of(HoodieSchema.parse(writerSchemaStr))); HoodieTableConfig tableConfig = dataMetaClient.getTableConfig(); // NOTE: Writer schema added to commit metadata will not contain Hudi's metadata fields Option<HoodieSchema> tableSchema = writerSchema.isEmpty() ? tableConfig.getTableCreateSchema().map(HoodieSchema::fromAvroSchema) // the write schema does not set up correctly - : writerSchema.map(schema -> tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema) - .map(HoodieSchema::fromAvroSchema); + : writerSchema.map(schema -> tableConfig.populateMetaFields() ? HoodieSchemaUtils.addMetadataFields(schema) : schema); HoodieIndexVersion indexVersion = existingIndexVersionOrDefault(PARTITION_NAME_COLUMN_STATS, dataMetaClient); return getColumnsToIndex(tableConfig, metadataConfig, @@ -2585,7 +2582,7 @@ public class HoodieTableMetadataUtil { List<String> mergedFields = new ArrayList<>(partitionFields.size() + sourceFields.size()); mergedFields.addAll(partitionFields); mergedFields.addAll(sourceFields); - return HoodieSchema.fromAvroSchema(addMetadataFields(projectSchema(tableSchema.toAvroSchema(), mergedFields))); + return HoodieSchemaUtils.addMetadataFields(HoodieSchemaUtils.projectSchema(tableSchema, mergedFields)); } public static StoragePath filePath(StoragePath basePath, String partition, String filename) { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java index 97b0ac5c108c..88cb718326ec 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java @@ -18,7 +18,10 @@ package org.apache.hudi.common.model; -import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.avro.JsonProperties; import org.apache.avro.Schema; @@ -40,25 +43,24 @@ import static org.junit.jupiter.api.Assertions.assertNotSame; * Unit tests {@link TestOverwriteNonDefaultsWithLatestAvroPayload}. */ public class TestOverwriteNonDefaultsWithLatestAvroPayload { - private Schema schema; + private HoodieSchema schema; @BeforeEach public void setUp() throws Exception { - schema = Schema.createRecord(Arrays.asList( - new Schema.Field("id", Schema.create(Schema.Type.STRING), "", null), - new Schema.Field("partition", Schema.create(Schema.Type.STRING), "", ""), - new Schema.Field("ts", Schema.create(Schema.Type.LONG), "", null), - new Schema.Field("_hoodie_is_deleted", Schema.create(Schema.Type.BOOLEAN), "", false), - new Schema.Field("city", Schema.create(Schema.Type.STRING), "", "NY"), - new Schema.Field("child", Schema.createArray(Schema.create(Schema.Type.STRING)), "", Collections.emptyList()) - )); + schema = HoodieSchema.createRecord("TestSchema", null, null, false, Arrays.asList( + HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.STRING), "", null), + HoodieSchemaField.of("partition", HoodieSchema.create(HoodieSchemaType.STRING), "", ""), + HoodieSchemaField.of("ts", HoodieSchema.create(HoodieSchemaType.LONG), "", null), + HoodieSchemaField.of("_hoodie_is_deleted", HoodieSchema.create(HoodieSchemaType.BOOLEAN), "", false), + HoodieSchemaField.of("city", HoodieSchema.create(HoodieSchemaType.STRING), "", "NY"), + HoodieSchemaField.of("child", HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.STRING)), "", Collections.emptyList()))); } @Test public void testActiveRecords() throws IOException { - Schema writerSchema = HoodieAvroUtils.addMetadataFields(schema); + HoodieSchema writerSchema = HoodieSchemaUtils.addMetadataFields(schema); - GenericRecord record1 = new GenericData.Record(schema); + GenericRecord record1 = new GenericData.Record(schema.toAvroSchema()); record1.put("id", "1"); record1.put("partition", "partition1"); record1.put("ts", 0L); @@ -66,7 +68,7 @@ public class TestOverwriteNonDefaultsWithLatestAvroPayload { record1.put("city", "NY0"); record1.put("child", Collections.singletonList("A")); - GenericRecord record2 = new GenericData.Record(schema); + GenericRecord record2 = new GenericData.Record(schema.toAvroSchema()); record2.put("id", "2"); record2.put("partition", ""); record2.put("ts", 1L); @@ -74,7 +76,7 @@ public class TestOverwriteNonDefaultsWithLatestAvroPayload { record2.put("city", "NY"); record2.put("child", Collections.emptyList()); - GenericRecord record3 = new GenericData.Record(schema); + GenericRecord record3 = new GenericData.Record(schema.toAvroSchema()); record3.put("id", "2"); record3.put("partition", "partition1"); record3.put("ts", 1L); @@ -115,14 +117,14 @@ public class TestOverwriteNonDefaultsWithLatestAvroPayload { assertEquals(payload1.preCombine(payload2), payload2); assertEquals(payload2.preCombine(payload1), payload2); - assertEquals(record1, payload1.getInsertValue(schema).get()); - assertEquals(record2, payload2.getInsertValue(schema).get()); + assertEquals(record1, payload1.getInsertValue(schema.toAvroSchema()).get()); + assertEquals(record2, payload2.getInsertValue(schema.toAvroSchema()).get()); - IndexedRecord combinedVal1 = payload1.combineAndGetUpdateValue(record2, schema).get(); + IndexedRecord combinedVal1 = payload1.combineAndGetUpdateValue(record2, schema.toAvroSchema()).get(); assertEquals(combinedVal1, record1); assertNotSame(combinedVal1, record1); - IndexedRecord combinedVal2 = payload2.combineAndGetUpdateValue(record1, schema).get(); + IndexedRecord combinedVal2 = payload2.combineAndGetUpdateValue(record1, schema.toAvroSchema()).get(); assertEquals(combinedVal2, record3); assertNotSame(combinedVal2, record3); @@ -130,19 +132,19 @@ public class TestOverwriteNonDefaultsWithLatestAvroPayload { // the payload record could include the metadata fields (for compaction) or not (for normal writer path). // case1: validate normal writer path - IndexedRecord combinedVal3 = payload2.combineAndGetUpdateValue(record4, schema).get(); + IndexedRecord combinedVal3 = payload2.combineAndGetUpdateValue(record4, schema.toAvroSchema()).get(); assertEquals(combinedVal3, record3); assertNotSame(combinedVal3, record3); // case2: validate compaction path - IndexedRecord combinedVal4 = payload5.combineAndGetUpdateValue(record4, writerSchema).get(); + IndexedRecord combinedVal4 = payload5.combineAndGetUpdateValue(record4, writerSchema.toAvroSchema()).get(); assertEquals(combinedVal4, record6); assertNotSame(combinedVal4, record6); } @Test public void testDeletedRecord() throws IOException { - GenericRecord record1 = new GenericData.Record(schema); + GenericRecord record1 = new GenericData.Record(schema.toAvroSchema()); record1.put("id", "1"); record1.put("partition", "partition0"); record1.put("ts", 0L); @@ -150,7 +152,7 @@ public class TestOverwriteNonDefaultsWithLatestAvroPayload { record1.put("city", "NY0"); record1.put("child", Collections.emptyList()); - GenericRecord delRecord1 = new GenericData.Record(schema); + GenericRecord delRecord1 = new GenericData.Record(schema.toAvroSchema()); delRecord1.put("id", "2"); delRecord1.put("partition", "partition1"); delRecord1.put("ts", 1L); @@ -158,7 +160,7 @@ public class TestOverwriteNonDefaultsWithLatestAvroPayload { delRecord1.put("city", "NY0"); delRecord1.put("child", Collections.emptyList()); - GenericRecord record2 = new GenericData.Record(schema); + GenericRecord record2 = new GenericData.Record(schema.toAvroSchema()); record2.put("id", "1"); record2.put("partition", "partition0"); record2.put("ts", 0L); @@ -172,13 +174,13 @@ public class TestOverwriteNonDefaultsWithLatestAvroPayload { assertEquals(payload1.preCombine(payload2), payload2); assertEquals(payload2.preCombine(payload1), payload2); - assertEquals(record1, payload1.getInsertValue(schema).get()); - assertFalse(payload2.getInsertValue(schema).isPresent()); + assertEquals(record1, payload1.getInsertValue(schema.toAvroSchema()).get()); + assertFalse(payload2.getInsertValue(schema.toAvroSchema()).isPresent()); - assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema).get(), record2); - assertFalse(payload2.combineAndGetUpdateValue(record1, schema).isPresent()); + assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema.toAvroSchema()).get(), record2); + assertFalse(payload2.combineAndGetUpdateValue(record1, schema.toAvroSchema()).isPresent()); } - + @Test public void testNullColumn() throws IOException { Schema avroSchema = Schema.createRecord(Arrays.asList( @@ -209,8 +211,8 @@ public class TestOverwriteNonDefaultsWithLatestAvroPayload { assertEquals(payload2.combineAndGetUpdateValue(record1, avroSchema).get(), record3); } - private static GenericRecord createRecordWithMetadataFields(Schema schema, String recordKey, String partitionPath) { - GenericRecord record = new GenericData.Record(schema); + private static GenericRecord createRecordWithMetadataFields(HoodieSchema schema, String recordKey, String partitionPath) { + GenericRecord record = new GenericData.Record(schema.toAvroSchema()); record.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, "001"); record.put(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, "123"); record.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recordKey); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSortUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSortUtils.java index 38df25837a62..233380f3ce2c 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSortUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSortUtils.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.RewriteAvroPayload; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.util.collection.FlatLists; import org.apache.avro.Schema; @@ -77,7 +78,7 @@ public class TestSortUtils { record = new TestSparkRecord(new HoodieKey("record1", "partition1"), payload); } String[] userSortColumns = new String[] {"non_pii_col", "timestamp"}; - FlatLists.ComparableList<Comparable<HoodieRecord>> comparableList = SortUtils.getComparableSortColumns(record, userSortColumns, Schema.parse(SCHEMA), suffixRecordKey, true); + FlatLists.ComparableList<Comparable<HoodieRecord>> comparableList = SortUtils.getComparableSortColumns(record, userSortColumns, HoodieSchema.parse(SCHEMA), suffixRecordKey, true); Object[] expectedSortColumnValues; if (suffixRecordKey) { expectedSortColumnValues = new Object[] {"partition1", "val1", 3.5, "record1"}; diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java index cb6d03972da5..323ac96104df 100755 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java @@ -36,7 +36,6 @@ import org.apache.hudi.common.util.HoodieRecordSizeEstimator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SpillableMapUtils; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.junit.jupiter.api.BeforeEach; @@ -143,7 +142,7 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness { @ParameterizedTest @ValueSource(booleans = {false, true}) public void testSimpleUpsert(boolean isCompressionEnabled) throws IOException, URISyntaxException { - Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema()); + HoodieSchema schema = HoodieSchemaUtils.addMetadataFields(getSimpleSchema()); try (BitCaskDiskMap records = new BitCaskDiskMap<>(basePath, new DefaultSerializer<>(), isCompressionEnabled)) { SchemaTestUtil testUtil = new SchemaTestUtil(); @@ -174,7 +173,7 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness { HoodieRecord<? extends HoodieRecordPayload> rec = itr.next(); assert recordKeys.contains(rec.getRecordKey()); try { - IndexedRecord indexedRecord = (IndexedRecord) rec.getData().getInsertValue(schema).get(); + IndexedRecord indexedRecord = (IndexedRecord) rec.getData().getInsertValue(schema.toAvroSchema()).get(); String latestCommitTime = ((GenericRecord) indexedRecord).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(); assertEquals(latestCommitTime, newCommitTime); diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java index 6f6cf310a0d4..53ba2ea76b3b 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java @@ -19,7 +19,6 @@ package org.apache.hudi.metadata; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieLocalEngineContext; @@ -33,6 +32,7 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaField; import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; @@ -518,12 +518,12 @@ public class TestHoodieTableMetadataUtil extends HoodieCommonTestHarness { expected.add("intField"); assertListEquality(expected, new ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, Lazy.eagerly(Option.of(schema)), false, V1).keySet())); - //test with avro schema with meta cols - HoodieSchema hoodieSchemaWithMetadataFields = HoodieSchema.fromAvroSchema(HoodieAvroUtils.addMetadataFields(schema.toAvroSchema())); + //test schema with meta cols + HoodieSchema hoodieSchemaWithMetadataFields = HoodieSchemaUtils.addMetadataFields(schema); assertListEquality(expected, new ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, metadataConfig, Lazy.eagerly(Option.of(hoodieSchemaWithMetadataFields)), false, V1).keySet())); - //test with avro schema with type filter + //test schema with type filter metadataConfig = HoodieMetadataConfig.newBuilder() .enable(true).withMetadataIndexColumnStats(true) .withMaxColumnsToIndexForColStats(100) diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index 2a258d40123d..be0aa261b047 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -18,7 +18,6 @@ package org.apache.hudi.hadoop.realtime; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieMemoryConfig; @@ -55,7 +54,6 @@ import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -970,8 +968,8 @@ public class TestHoodieRealtimeRecordReader { // In some queries, generic records that Hudi gets are just part of the full records. // Here test the case that some fields are missing in the record. - Schema schemaWithMetaFields = HoodieAvroUtils.addMetadataFields(schema.toAvroSchema()); - ArrayWritable aWritable2 = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record, schemaWithMetaFields); + HoodieSchema schemaWithMetaFields = HoodieSchemaUtils.addMetadataFields(schema); + ArrayWritable aWritable2 = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record, schemaWithMetaFields.toAvroSchema()); assertEquals(schemaWithMetaFields.getFields().size(), aWritable2.get().length); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java index b93f73ad4005..4d64d009af59 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java @@ -18,14 +18,15 @@ package org.apache.hudi.utilities.schema; -import org.apache.hudi.avro.AvroSchemaUtils; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.internal.schema.HoodieSchemaException; import org.apache.hudi.utilities.config.HoodieStreamerConfig; -import org.apache.avro.JsonProperties; import org.apache.avro.Schema; import org.apache.spark.api.java.JavaSparkContext; @@ -64,30 +65,37 @@ public class KafkaOffsetPostProcessor extends SchemaPostProcessor { @Override @Deprecated public Schema processSchema(Schema schema) { + return processSchema(HoodieSchema.fromAvroSchema(schema)).toAvroSchema(); + } + + @Override + public HoodieSchema processSchema(HoodieSchema schema) { // this method adds kafka offset fields namely source offset, partition, timestamp and kafka message key to the schema of the batch. - List<Schema.Field> fieldList = schema.getFields(); - Set<String> fieldNames = fieldList.stream().map(Schema.Field::name).collect(Collectors.toSet()); + List<HoodieSchemaField> fieldList = schema.getFields(); + Set<String> fieldNames = fieldList.stream().map(HoodieSchemaField::name).collect(Collectors.toSet()); // if the source schema already contains the kafka offset fields, then return the schema as is. if (fieldNames.containsAll(Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN, KAFKA_SOURCE_KEY_COLUMN))) { return schema; } try { - List<Schema.Field> newFieldList = fieldList.stream() - .map(HoodieAvroUtils::createNewSchemaField).collect(Collectors.toList()); + List<HoodieSchemaField> newFieldList = fieldList.stream() + .map(HoodieSchemaUtils::createNewSchemaField).collect(Collectors.toList()); // handle case where source schema provider may have already set 1 or more of these fields if (!fieldNames.contains(KAFKA_SOURCE_OFFSET_COLUMN)) { - newFieldList.add(new Schema.Field(KAFKA_SOURCE_OFFSET_COLUMN, Schema.create(Schema.Type.LONG), "offset column", 0)); + newFieldList.add(HoodieSchemaField.of(KAFKA_SOURCE_OFFSET_COLUMN, HoodieSchema.create(HoodieSchemaType.LONG), "offset column", 0)); } if (!fieldNames.contains(KAFKA_SOURCE_PARTITION_COLUMN)) { - newFieldList.add(new Schema.Field(KAFKA_SOURCE_PARTITION_COLUMN, Schema.create(Schema.Type.INT), "partition column", 0)); + newFieldList.add(HoodieSchemaField.of(KAFKA_SOURCE_PARTITION_COLUMN, HoodieSchema.create(HoodieSchemaType.INT), "partition column", 0)); } if (!fieldNames.contains(KAFKA_SOURCE_TIMESTAMP_COLUMN)) { - newFieldList.add(new Schema.Field(KAFKA_SOURCE_TIMESTAMP_COLUMN, Schema.create(Schema.Type.LONG), "timestamp column", 0)); + newFieldList.add(HoodieSchemaField.of(KAFKA_SOURCE_TIMESTAMP_COLUMN, HoodieSchema.create(HoodieSchemaType.LONG), "timestamp column", 0)); } if (!fieldNames.contains(KAFKA_SOURCE_KEY_COLUMN)) { - newFieldList.add(new Schema.Field(KAFKA_SOURCE_KEY_COLUMN, AvroSchemaUtils.createNullableSchema(Schema.Type.STRING), "kafka key column", JsonProperties.NULL_VALUE)); + newFieldList.add( + HoodieSchemaField.of(KAFKA_SOURCE_KEY_COLUMN, + HoodieSchemaUtils.createNullableSchema(HoodieSchema.create(HoodieSchemaType.STRING)), "kafka key column", HoodieSchema.NULL_VALUE)); } - return Schema.createRecord(schema.getName() + "_processed", schema.getDoc(), schema.getNamespace(), false, newFieldList); + return HoodieSchema.createRecord(schema.getName() + "_processed", schema.getDoc().orElse(null), schema.getNamespace().orElse(null), false, newFieldList); } catch (Exception e) { throw new HoodieSchemaException("Kafka offset post processor failed with schema: " + schema, e); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProviderWithPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProviderWithPostProcessor.java index aca1677f53b0..09ec411b24d4 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProviderWithPostProcessor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProviderWithPostProcessor.java @@ -39,6 +39,7 @@ public class SchemaProviderWithPostProcessor extends SchemaProvider { } @Override + @Deprecated public Schema getSourceSchema() { HoodieSchema sourceSchema = schemaProvider.getSourceHoodieSchema(); return schemaPostProcessor.map(processor -> processor.processSchema(sourceSchema).toAvroSchema()) @@ -46,6 +47,7 @@ public class SchemaProviderWithPostProcessor extends SchemaProvider { } @Override + @Deprecated public Schema getTargetSchema() { HoodieSchema targetSchema = schemaProvider.getTargetHoodieSchema(); return schemaPostProcessor.map(processor -> processor.processSchema(targetSchema).toAvroSchema()) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DeleteSupportSchemaPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DeleteSupportSchemaPostProcessor.java index a479e67af5cf..c8a8d9624240 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DeleteSupportSchemaPostProcessor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DeleteSupportSchemaPostProcessor.java @@ -20,6 +20,10 @@ package org.apache.hudi.utilities.schema.postprocessor; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.utilities.schema.SchemaPostProcessor; import org.apache.avro.Schema; @@ -30,8 +34,6 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; -import static org.apache.hudi.avro.HoodieAvroUtils.createNewSchemaField; - /** * An implementation of {@link SchemaPostProcessor} which will add a column named "_hoodie_is_deleted" to the end of * a given schema. @@ -45,25 +47,28 @@ public class DeleteSupportSchemaPostProcessor extends SchemaPostProcessor { } @Override + @Deprecated public Schema processSchema(Schema schema) { + return processSchema(HoodieSchema.fromAvroSchema(schema)).toAvroSchema(); + } + @Override + public HoodieSchema processSchema(HoodieSchema schema) { if (schema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD) != null) { LOG.warn("column {} already exists!", HoodieRecord.HOODIE_IS_DELETED_FIELD); return schema; } - List<Schema.Field> sourceFields = schema.getFields(); - List<Schema.Field> targetFields = new ArrayList<>(sourceFields.size() + 1); + List<HoodieSchemaField> sourceFields = schema.getFields(); + List<HoodieSchemaField> targetFields = new ArrayList<>(sourceFields.size() + 1); // copy existing columns - for (Schema.Field sourceField : sourceFields) { - targetFields.add(createNewSchemaField(sourceField)); + for (HoodieSchemaField sourceField : sourceFields) { + targetFields.add(HoodieSchemaUtils.createNewSchemaField(sourceField)); } // add _hoodie_is_deleted column - targetFields.add(new Schema.Field(HoodieRecord.HOODIE_IS_DELETED_FIELD, Schema.create(Schema.Type.BOOLEAN), null, false)); - - return Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false, targetFields); + targetFields.add(HoodieSchemaField.of(HoodieRecord.HOODIE_IS_DELETED_FIELD, HoodieSchema.create(HoodieSchemaType.BOOLEAN), null, false)); + return HoodieSchema.createRecord(schema.getName(), schema.getDoc().orElse(null), schema.getNamespace().orElse(null), false, targetFields); } - } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DropColumnSchemaPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DropColumnSchemaPostProcessor.java index 020825e2d6f7..703dea14544f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DropColumnSchemaPostProcessor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DropColumnSchemaPostProcessor.java @@ -19,6 +19,9 @@ package org.apache.hudi.utilities.schema.postprocessor; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.utilities.config.SchemaProviderPostProcessorConfig; import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException; @@ -36,7 +39,6 @@ import java.util.Locale; import java.util.Set; import java.util.stream.Collectors; -import static org.apache.hudi.avro.HoodieAvroUtils.createNewSchemaField; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; /** @@ -57,14 +59,20 @@ public class DropColumnSchemaPostProcessor extends SchemaPostProcessor { @Deprecated public static class Config { + @Deprecated public static final String DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP = SchemaProviderPostProcessorConfig.DELETE_COLUMN_POST_PROCESSOR_COLUMN.key(); } @Override + @Deprecated public Schema processSchema(Schema schema) { + return processSchema(HoodieSchema.fromAvroSchema(schema)).toAvroSchema(); + } + @Override + public HoodieSchema processSchema(HoodieSchema schema) { String columnToDeleteStr = getStringWithAltKeys( this.config, SchemaProviderPostProcessorConfig.DELETE_COLUMN_POST_PROCESSOR_COLUMN); @@ -78,12 +86,12 @@ public class DropColumnSchemaPostProcessor extends SchemaPostProcessor { .map(filed -> filed.toLowerCase(Locale.ROOT)) .collect(Collectors.toSet()); - List<Schema.Field> sourceFields = schema.getFields(); - List<Schema.Field> targetFields = new LinkedList<>(); + List<HoodieSchemaField> sourceFields = schema.getFields(); + List<HoodieSchemaField> targetFields = new LinkedList<>(); - for (Schema.Field sourceField : sourceFields) { + for (HoodieSchemaField sourceField : sourceFields) { if (!columnsToDelete.contains(sourceField.name().toLowerCase(Locale.ROOT))) { - targetFields.add(createNewSchemaField(sourceField)); + targetFields.add(HoodieSchemaUtils.createNewSchemaField(sourceField)); } } @@ -91,7 +99,6 @@ public class DropColumnSchemaPostProcessor extends SchemaPostProcessor { throw new HoodieSchemaPostProcessException("Target schema is empty, you can not remove all columns!"); } - return Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false, targetFields); + return HoodieSchema.createRecord(schema.getName(), schema.getDoc().orElse(null), schema.getNamespace().orElse(null), false, targetFields); } - } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/AddPrimitiveColumnSchemaPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/AddPrimitiveColumnSchemaPostProcessor.java index 2be4942ab80b..e83cf9d9a52d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/AddPrimitiveColumnSchemaPostProcessor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/AddPrimitiveColumnSchemaPostProcessor.java @@ -19,6 +19,10 @@ package org.apache.hudi.utilities.schema.postprocessor.add; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; @@ -32,7 +36,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Locale; -import static org.apache.hudi.avro.HoodieAvroUtils.createNewSchemaField; import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; import static org.apache.hudi.common.util.ConfigUtils.getRawValueWithAltKeys; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; @@ -57,29 +60,34 @@ public class AddPrimitiveColumnSchemaPostProcessor extends SchemaPostProcessor { } @Override + @Deprecated public Schema processSchema(Schema schema) { + return processSchema(HoodieSchema.fromAvroSchema(schema)).toAvroSchema(); + } + + @Override + public HoodieSchema processSchema(HoodieSchema schema) { String newColumnName = getStringWithAltKeys(this.config, SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP); if (schema.getField(newColumnName) != null) { throw new HoodieSchemaPostProcessException(String.format("Column %s already exist!", newColumnName)); } - List<Schema.Field> sourceFields = schema.getFields(); - List<Schema.Field> targetFields = new ArrayList<>(sourceFields.size() + 1); + List<HoodieSchemaField> sourceFields = schema.getFields(); + List<HoodieSchemaField> targetFields = new ArrayList<>(sourceFields.size() + 1); - for (Schema.Field sourceField : sourceFields) { - targetFields.add(createNewSchemaField(sourceField)); + for (HoodieSchemaField sourceField : sourceFields) { + targetFields.add(HoodieSchemaUtils.createNewSchemaField(sourceField)); } // add new column to the end targetFields.add(buildNewColumn()); - return Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false, targetFields); + return HoodieSchema.createRecord(schema.getName(), schema.getDoc().orElse(null), schema.getNamespace().orElse(null), false, targetFields); } - private Schema.Field buildNewColumn() { - + private HoodieSchemaField buildNewColumn() { String columnName = getStringWithAltKeys(this.config, SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP); String type = getStringWithAltKeys(this.config, SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP).toUpperCase(Locale.ROOT); String doc = getStringWithAltKeys(this.config, SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP, true); @@ -91,17 +99,13 @@ public class AddPrimitiveColumnSchemaPostProcessor extends SchemaPostProcessor { ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(type)); ValidationUtils.checkArgument(!Schema.Type.NULL.getName().equals(type)); - Schema newSchema = createSchema(type, nullable); - - return new Schema.Field(columnName, newSchema, doc, defaultValue.isPresent() ? defaultValue.get() : null); + HoodieSchema newSchema = createSchema(type, nullable); + return HoodieSchemaField.of(columnName, newSchema, doc, defaultValue.isPresent() ? defaultValue.get() : null); } - private Schema createSchema(String type, boolean nullable) { - Schema schema = Schema.create(Schema.Type.valueOf(type)); - if (nullable) { - schema = Schema.createUnion(Schema.create(Schema.Type.NULL), schema); - } - return schema; + private HoodieSchema createSchema(String type, boolean nullable) { + HoodieSchema schema = HoodieSchema.create(HoodieSchemaType.valueOf(type)); + return nullable ? HoodieSchema.createNullable(schema) : schema; } }
