This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5032c39ebb4e feat(schema): Migrate clustering operations to use
HoodieSchema (#17691)
5032c39ebb4e is described below
commit 5032c39ebb4eacfb4215b268f7011ed28ae587b1
Author: Tim Brown <[email protected]>
AuthorDate: Fri Dec 26 15:34:29 2025 -0500
feat(schema): Migrate clustering operations to use HoodieSchema (#17691)
---
.../cluster/strategy/ClusteringExecutionStrategy.java | 3 +--
.../hudi/table/action/commit/BaseCommitActionExecutor.java | 6 +++---
.../clustering/run/strategy/JavaExecutionStrategy.java | 14 +++++++-------
.../run/strategy/JavaSortAndSizeExecutionStrategy.java | 4 ++--
.../bulkinsert/JavaCustomColumnsSortPartitioner.java | 7 +++----
.../bulkinsert/TestJavaBulkInsertInternalPartitioner.java | 2 +-
.../run/strategy/MultipleSparkJobExecutionStrategy.java | 2 +-
.../SingleSparkJobConsistentHashingExecutionStrategy.java | 3 +--
.../run/strategy/SingleSparkJobExecutionStrategy.java | 11 ++++-------
.../SparkBinaryCopyClusteringExecutionStrategy.java | 14 +++-----------
.../main/java/org/apache/hudi/avro/HoodieAvroUtils.java | 5 +++--
.../java/org/apache/hudi/common/schema/HoodieSchema.java | 1 +
.../common/table/read/FileGroupReaderSchemaHandler.java | 3 +--
.../main/java/org/apache/hudi/common/util/CommitUtils.java | 4 ++--
.../main/java/org/apache/hudi/common/util/SortUtils.java | 8 +++-----
.../internal/schema/convert/InternalSchemaConverter.java | 2 +-
.../java/org/apache/hudi/avro/TestHoodieAvroUtils.java | 2 +-
.../table/read/TestFileGroupReaderSchemaHandler.java | 4 ++--
.../common/table/read/TestHoodieFileGroupReaderBase.java | 10 +++++-----
.../table/read/TestParquetRowIndexBasedSchemaHandler.java | 4 ++--
.../java/org/apache/hudi/common/util/TestSortUtils.java | 8 ++++----
.../main/java/org/apache/hudi/utilities/UtilHelpers.java | 3 +--
.../java/org/apache/hudi/utilities/sources/InputBatch.java | 4 +---
.../sources/helpers/CloudObjectsSelectorCommon.java | 3 +--
.../org/apache/hudi/utilities/streamer/StreamSync.java | 6 +++---
.../org/apache/hudi/utilities/DummySchemaProvider.java | 3 +--
.../org/apache/hudi/utilities/streamer/TestStreamSync.java | 2 +-
27 files changed, 59 insertions(+), 79 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
index d6f957b50092..bc7fec18a564 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
@@ -48,7 +48,6 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import lombok.AccessLevel;
import lombok.Getter;
-import org.apache.avro.Schema;
import java.io.IOException;
import java.io.Serializable;
@@ -88,7 +87,7 @@ public abstract class ClusteringExecutionStrategy<T, I, K, O>
implements Seriali
* file groups created is bounded by numOutputGroups.
* Note that commit is not done as part of strategy. commit is callers
responsibility.
*/
- public abstract HoodieWriteMetadata<O> performClustering(final
HoodieClusteringPlan clusteringPlan, final Schema schema, final String
instantTime);
+ public abstract HoodieWriteMetadata<O> performClustering(final
HoodieClusteringPlan clusteringPlan, final HoodieSchema schema, final String
instantTime);
protected ClosableIterator<HoodieRecord<T>>
getRecordIterator(ReaderContextFactory<T> readerContextFactory,
ClusteringOperation operation, String instantTime, long maxMemory) {
TypedProperties props = getReaderProperties(maxMemory);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index ae117c002da1..ff9fcb866692 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -61,7 +62,6 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import
org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
import java.io.IOException;
import java.time.Duration;
@@ -291,9 +291,9 @@ public abstract class BaseCommitActionExecutor<T, I, K, O,
R>
ClusteringUtils.transitionClusteringOrReplaceRequestedToInflight(instant,
Option.empty(), table.getActiveTimeline());
table.getMetaClient().reloadActiveTimeline();
- Option<Schema> schema;
+ Option<HoodieSchema> schema;
try {
- schema = new
TableSchemaResolver(table.getMetaClient()).getTableAvroSchemaIfPresent(false);
+ schema = new
TableSchemaResolver(table.getMetaClient()).getTableSchemaIfPresent(false);
} catch (Exception ex) {
throw new HoodieSchemaException(ex);
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
index c8b2b76a5952..f0a0cf1c47df 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
@@ -19,7 +19,6 @@
package org.apache.hudi.client.clustering.run.strategy;
-import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
@@ -32,6 +31,8 @@ import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -44,7 +45,6 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import
org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
import java.util.ArrayList;
import java.util.List;
@@ -68,7 +68,7 @@ public abstract class JavaExecutionStrategy<T>
@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(
- HoodieClusteringPlan clusteringPlan, Schema schema, String instantTime) {
+ HoodieClusteringPlan clusteringPlan, HoodieSchema schema, String
instantTime) {
// execute clustering for each group and collect WriteStatus
List<WriteStatus> writeStatusList = new ArrayList<>();
clusteringPlan.getInputGroups().forEach(
@@ -97,7 +97,7 @@ public abstract class JavaExecutionStrategy<T>
*/
public abstract List<WriteStatus> performClusteringWithRecordList(
final List<HoodieRecord<T>> inputRecords, final int numOutputGroups,
final String instantTime,
- final Map<String, String> strategyParams, final Schema schema,
+ final Map<String, String> strategyParams, final HoodieSchema schema,
final List<HoodieFileGroupId> fileGroupIdList, final boolean
preserveHoodieMetadata);
/**
@@ -107,11 +107,11 @@ public abstract class JavaExecutionStrategy<T>
* @param schema Schema of the data including metadata fields.
* @return partitioner for the java engine
*/
- protected BulkInsertPartitioner<List<HoodieRecord<T>>>
getPartitioner(Map<String, String> strategyParams, Schema schema) {
+ protected BulkInsertPartitioner<List<HoodieRecord<T>>>
getPartitioner(Map<String, String> strategyParams, HoodieSchema schema) {
if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
return new JavaCustomColumnsSortPartitioner(
strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","),
- HoodieAvroUtils.addMetadataFields(schema), getWriteConfig());
+ HoodieSchemaUtils.addMetadataFields(schema), getWriteConfig());
} else {
return
JavaBulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode());
}
@@ -124,7 +124,7 @@ public abstract class JavaExecutionStrategy<T>
HoodieClusteringGroup clusteringGroup, Map<String, String>
strategyParams,
boolean preserveHoodieMetadata, String instantTime) {
List<HoodieRecord<T>> inputRecords = readRecordsForGroup(clusteringGroup,
instantTime);
- Schema readerSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(getWriteConfig().getSchema()));
+ HoodieSchema readerSchema =
HoodieSchemaUtils.addMetadataFields(HoodieSchema.parse(getWriteConfig().getSchema()));
List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream()
.map(info -> new HoodieFileGroupId(info.getPartitionPath(),
info.getFileId()))
.collect(Collectors.toList());
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java
index ffc815fd48f2..22d2b4eb53a6 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java
@@ -25,13 +25,13 @@ import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.JavaBulkInsertHelper;
import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
import java.util.List;
import java.util.Map;
@@ -54,7 +54,7 @@ public class JavaSortAndSizeExecutionStrategy<T>
@Override
public List<WriteStatus> performClusteringWithRecordList(
final List<HoodieRecord<T>> inputRecords, final int numOutputGroups,
- final String instantTime, final Map<String, String> strategyParams,
final Schema schema,
+ final String instantTime, final Map<String, String> strategyParams,
final HoodieSchema schema,
final List<HoodieFileGroupId> fileGroupIdList, final boolean
preserveHoodieMetadata) {
log.info("Starting clustering for a group, parallelism: {} commit: {}",
numOutputGroups, instantTime);
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
index 54f0c94ff069..1faf0ad45d75 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
@@ -20,13 +20,12 @@
package org.apache.hudi.execution.bulkinsert;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.SortUtils;
import org.apache.hudi.common.util.collection.FlatLists;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.BulkInsertPartitioner;
-import org.apache.avro.Schema;
-
import java.util.List;
import java.util.stream.Collectors;
@@ -41,11 +40,11 @@ public class JavaCustomColumnsSortPartitioner<T>
implements BulkInsertPartitioner<List<HoodieRecord<T>>> {
private final String[] sortColumnNames;
- private final Schema schema;
+ private final HoodieSchema schema;
private final boolean consistentLogicalTimestampEnabled;
private final boolean suffixRecordKey;
- public JavaCustomColumnsSortPartitioner(String[] columnNames, Schema schema,
HoodieWriteConfig config) {
+ public JavaCustomColumnsSortPartitioner(String[] columnNames, HoodieSchema
schema, HoodieWriteConfig config) {
this.sortColumnNames = columnNames;
this.schema = schema;
this.consistentLogicalTimestampEnabled =
config.isConsistentLogicalTimestampEnabled();
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java
index ff8909b85ddc..2dff037f4472 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java
@@ -70,7 +70,7 @@ public class TestJavaBulkInsertInternalPartitioner extends
HoodieJavaClientTestH
cfg.setValue(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME,
"partition_path");
cfg.setValue(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED,
"false");
testBulkInsertInternalPartitioner(
- new JavaCustomColumnsSortPartitioner(sortColumns,
HoodieTestDataGenerator.AVRO_SCHEMA, cfg),
+ new JavaCustomColumnsSortPartitioner(sortColumns,
HoodieTestDataGenerator.HOODIE_SCHEMA, cfg),
records, true, generatePartitionNumRecords(records),
Option.of(columnComparator));
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 69d50c1367ca..2c409a50bf1b 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -101,7 +101,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
}
@Override
- public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final
HoodieClusteringPlan clusteringPlan, final Schema schema, final String
instantTime) {
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final
HoodieClusteringPlan clusteringPlan, final HoodieSchema schema, final String
instantTime) {
JavaSparkContext engineContext =
HoodieSparkEngineContext.getSparkContext(getEngineContext());
boolean shouldPreserveMetadata =
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(true);
ExecutorService clusteringExecutorService = Executors.newFixedThreadPool(
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
index c753d83d3462..5dccf0637b9f 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
@@ -22,7 +22,6 @@ import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.LazyConcatenatingIterator;
-import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.ReaderContextFactory;
import org.apache.hudi.common.engine.TaskContextSupplier;
@@ -83,7 +82,7 @@ public class
SingleSparkJobConsistentHashingExecutionStrategy<T> extends SingleS
@Override
protected List<WriteStatus>
performClusteringForGroup(ReaderContextFactory<T> readerContextFactory,
ClusteringGroupInfo clusteringGroup, Map<String, String> strategyParams,
- boolean
preserveHoodieMetadata, SerializableSchema schema, TaskContextSupplier
taskContextSupplier, String instantTime) {
+ boolean
preserveHoodieMetadata, HoodieSchema schema, TaskContextSupplier
taskContextSupplier, String instantTime) {
// deal with split / merge operations
ValidationUtils.checkArgument(clusteringGroup.getNumOutputGroups() >= 1,
"Number of output groups should be at least 1");
if (clusteringGroup.getNumOutputGroups() == 1) {
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
index 17ec3ccfd2e0..e4ebf13f2b0f 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
@@ -20,7 +20,6 @@ package org.apache.hudi.client.clustering.run.strategy;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.ReaderContextFactory;
@@ -28,14 +27,13 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.ClusteringGroupInfo;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import
org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
-import org.apache.avro.Schema;
-
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -52,16 +50,15 @@ public abstract class SingleSparkJobExecutionStrategy<T>
}
@Override
- public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final
HoodieClusteringPlan clusteringPlan, final Schema schema, final String
instantTime) {
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final
HoodieClusteringPlan clusteringPlan, final HoodieSchema schema, final String
instantTime) {
final TaskContextSupplier taskContextSupplier =
getEngineContext().getTaskContextSupplier();
- final SerializableSchema serializableSchema = new
SerializableSchema(schema);
final List<ClusteringGroupInfo> clusteringGroupInfos =
clusteringPlan.getInputGroups().stream().map(ClusteringGroupInfo::create).collect(Collectors.toList());
ReaderContextFactory<T> readerContextFactory =
getEngineContext().getReaderContextFactory(getHoodieTable().getMetaClient());
HoodieData<WriteStatus> writeStatus =
getEngineContext().parallelize(clusteringGroupInfos).map(group -> {
return performClusteringForGroup(readerContextFactory, group,
clusteringPlan.getStrategy().getStrategyParams(),
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false),
- serializableSchema, taskContextSupplier, instantTime);
+ schema, taskContextSupplier, instantTime);
}).flatMap(List::iterator);
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = new
HoodieWriteMetadata<>();
writeMetadata.setWriteStatuses(writeStatus);
@@ -72,7 +69,7 @@ public abstract class SingleSparkJobExecutionStrategy<T>
* Submit a task to execute clustering for the group.
*/
protected abstract List<WriteStatus>
performClusteringForGroup(ReaderContextFactory<T> readerContextFactory,
ClusteringGroupInfo clusteringGroup, Map<String, String> strategyParams,
- boolean
preserveHoodieMetadata, SerializableSchema schema,
+ boolean
preserveHoodieMetadata, HoodieSchema schema,
TaskContextSupplier taskContextSupplier, String instantTime);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkBinaryCopyClusteringExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkBinaryCopyClusteringExecutionStrategy.java
index e3b09b48fd1c..f68579e580d7 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkBinaryCopyClusteringExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkBinaryCopyClusteringExecutionStrategy.java
@@ -22,13 +22,13 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieStorageConfig;
-import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.ClusteringGroupInfo;
import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
@@ -40,7 +40,6 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -77,7 +76,7 @@ public class SparkBinaryCopyClusteringExecutionStrategy<T>
extends SparkSortAndS
@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(
HoodieClusteringPlan clusteringPlan,
- Schema schema,
+ HoodieSchema schema,
String instantTime) {
List<ClusteringGroupInfo> clusteringGroupInfos =
clusteringPlan.getInputGroups()
@@ -95,8 +94,6 @@ public class SparkBinaryCopyClusteringExecutionStrategy<T>
extends SparkSortAndS
JavaSparkContext engineContext =
HoodieSparkEngineContext.getSparkContext(getEngineContext());
TaskContextSupplier taskContextSupplier =
getEngineContext().getTaskContextSupplier();
- SerializableSchema serializableSchema = new SerializableSchema(schema);
- boolean shouldPreserveMetadata =
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false);
JavaRDD<ClusteringGroupInfo> groupInfoJavaRDD =
engineContext.parallelize(clusteringGroupInfos, clusteringGroupInfos.size());
log.info("number of partitions for clustering " +
groupInfoJavaRDD.getNumPartitions());
JavaRDD<WriteStatus> writeStatusRDD = groupInfoJavaRDD
@@ -106,9 +103,6 @@ public class SparkBinaryCopyClusteringExecutionStrategy<T>
extends SparkSortAndS
.flatMap(clusteringOp ->
runClusteringForGroup(
clusteringOp,
- clusteringPlan.getStrategy().getStrategyParams(),
- shouldPreserveMetadata,
- serializableSchema,
taskContextSupplier,
instantTime))
.iterator();
@@ -122,9 +116,7 @@ public class SparkBinaryCopyClusteringExecutionStrategy<T>
extends SparkSortAndS
/**
* Submit job to execute clustering for the group.
*/
- private Stream<WriteStatus> runClusteringForGroup(ClusteringGroupInfo
clusteringOps, Map<String, String> strategyParams,
- boolean
preserveHoodieMetadata, SerializableSchema schema,
- TaskContextSupplier
taskContextSupplier, String instantTime) {
+ private Stream<WriteStatus> runClusteringForGroup(ClusteringGroupInfo
clusteringOps, TaskContextSupplier taskContextSupplier, String instantTime) {
List<WriteStatus> statuses = new ArrayList<>();
List<HoodieFileGroupId> inputFileIds = clusteringOps.getOperations()
.stream()
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 46b98f9ca795..0dce463eef0c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -21,6 +21,7 @@ package org.apache.hudi.avro;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.DateTimeUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
@@ -980,11 +981,11 @@ public class HoodieAvroUtils {
*/
public static Object[]
getSortColumnValuesWithPartitionPathAndRecordKey(HoodieRecord record,
String[] columns,
-
Schema schema,
+
HoodieSchema schema,
boolean suffixRecordKey,
boolean consistentLogicalTimestampEnabled) {
try {
- GenericRecord genericRecord = (GenericRecord)
record.toIndexedRecord(schema, PROPERTIES).get().getData();
+ GenericRecord genericRecord = (GenericRecord)
record.toIndexedRecord(schema.toAvroSchema(), PROPERTIES).get().getData();
int numColumns = columns.length;
Object[] values = new Object[columns.length + 1 + (suffixRecordKey ? 1 :
0)];
values[0] = record.getPartitionPath();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
index 2da62bb2924b..3ab2059a18b0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
@@ -85,6 +85,7 @@ public class HoodieSchema implements Serializable {
* This provides compatibility with Avro's JsonProperties while maintaining
Hudi's API.
*/
public static final Object NULL_VALUE = JsonProperties.NULL_VALUE;
+ public static final HoodieSchema NULL_SCHEMA =
HoodieSchema.create(HoodieSchemaType.NULL);
private static final long serialVersionUID = 1L;
private Schema avroSchema;
private HoodieSchemaType type;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
index 09ad49be147f..723bd2ddeea2 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
@@ -44,7 +44,6 @@ import org.apache.hudi.storage.StoragePath;
import lombok.Getter;
import lombok.Setter;
-import org.apache.avro.Schema;
import java.util.ArrayList;
import java.util.Arrays;
@@ -293,7 +292,7 @@ public class FileGroupReaderSchemaHandler<T> {
}
/**
- * Get {@link Schema.Field} from {@link Schema} by field name.
+ * Get {@link HoodieSchemaField} from {@link HoodieSchema} by field name.
*/
private static HoodieSchemaField getField(HoodieSchema schema, String
fieldName) {
Option<HoodieSchemaField> foundFieldOpt = findNestedField(schema,
fieldName);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
index e75e0a0b728a..af551aab0a4e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
@@ -23,6 +23,7 @@ import
org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -31,7 +32,6 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +49,7 @@ import java.util.stream.Collectors;
public class CommitUtils {
private static final Logger LOG = LoggerFactory.getLogger(CommitUtils.class);
- private static final String NULL_SCHEMA_STR =
Schema.create(Schema.Type.NULL).toString();
+ private static final String NULL_SCHEMA_STR =
HoodieSchema.NULL_SCHEMA.toString();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/SortUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/SortUtils.java
index c043f7a2fef0..1558cbd40be2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/SortUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SortUtils.java
@@ -23,8 +23,6 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.collection.FlatLists;
-import org.apache.avro.Schema;
-
import java.util.function.Function;
/**
@@ -58,12 +56,12 @@ public class SortUtils {
public static FlatLists.ComparableList<Comparable<HoodieRecord>>
getComparableSortColumns(
HoodieRecord record,
String[] sortColumnNames,
- Schema schema,
+ HoodieSchema schema,
boolean suffixRecordKey,
boolean consistentLogicalTimestampEnabled
) {
if (record.getRecordType() == HoodieRecord.HoodieRecordType.SPARK) {
- Object[] columnValues = record.getColumnValues(schema, sortColumnNames,
consistentLogicalTimestampEnabled);
+ Object[] columnValues = record.getColumnValues(schema.toAvroSchema(),
sortColumnNames, consistentLogicalTimestampEnabled);
if (suffixRecordKey) {
return FlatLists.ofComparableArray(
prependPartitionPathAndSuffixRecordKey(record.getPartitionPath(),
record.getRecordKey(), columnValues));
@@ -106,7 +104,7 @@ public class SortUtils {
} else if (record.getRecordType() == HoodieRecord.HoodieRecordType.AVRO) {
return FlatLists.ofComparableArray(wrapUTF8StringFunc.apply(
HoodieAvroUtils.getSortColumnValuesWithPartitionPathAndRecordKey(
- record, sortColumnNames, schema.toAvroSchema(), suffixRecordKey,
consistentLogicalTimestampEnabled
+ record, sortColumnNames, schema, suffixRecordKey,
consistentLogicalTimestampEnabled
)));
}
throw new IllegalArgumentException("Invalid recordType" +
record.getRecordType());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java
index d055e16c58ce..d89b7f197685 100644
---
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java
+++
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java
@@ -149,7 +149,7 @@ public class InternalSchemaConverter {
*/
public static HoodieSchema fixNullOrdering(HoodieSchema schema) {
if (schema == null) {
- return HoodieSchema.create(HoodieSchemaType.NULL);
+ return HoodieSchema.NULL_SCHEMA;
} else if (schema.getType() == HoodieSchemaType.NULL) {
return schema;
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index ab57ebb4bea5..2d460af7e9fc 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -969,7 +969,7 @@ public class TestHoodieAvroUtils {
HoodieAvroRecord avroRecord = new HoodieAvroRecord(new
HoodieKey("record1", "partition1"), avroPayload);
String[] userSortColumns = new String[] {"non_pii_col", "timestamp"};
- Object[] sortColumnValues =
HoodieAvroUtils.getSortColumnValuesWithPartitionPathAndRecordKey(avroRecord,
userSortColumns, Schema.parse(EXAMPLE_SCHEMA), suffixRecordKey, true);
+ Object[] sortColumnValues =
HoodieAvroUtils.getSortColumnValuesWithPartitionPathAndRecordKey(avroRecord,
userSortColumns, HoodieSchema.parse(EXAMPLE_SCHEMA), suffixRecordKey, true);
if (suffixRecordKey) {
assertArrayEquals(new Object[] {"partition1", "val1", 3.5, "record1"},
sortColumnValues);
} else {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java
index 0f6c274c0f7d..e2a2b7c34c70 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.InternalSchemaCache;
@@ -41,7 +42,6 @@ import org.apache.hudi.internal.schema.Types;
import org.apache.hudi.internal.schema.convert.InternalSchemaConverter;
import org.apache.hudi.storage.StoragePath;
-import org.apache.avro.Schema;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -96,7 +96,7 @@ public class TestFileGroupReaderSchemaHandler extends
SchemaHandlerTestBase {
assertTrue(readerContext.getNeedsBootstrapMerge());
HoodieSchema expectedRequiredSchema =
generateProjectionSchema("_hoodie_record_key", "begin_lat", "tip_history",
"rider");
assertEquals(expectedRequiredSchema, schemaHandler.getRequiredSchema());
- Pair<List<Schema.Field>, List<Schema.Field>> bootstrapFields =
schemaHandler.getBootstrapRequiredFields();
+ Pair<List<HoodieSchemaField>, List<HoodieSchemaField>> bootstrapFields =
schemaHandler.getBootstrapRequiredFields();
assertEquals(Collections.singletonList(getField("_hoodie_record_key")),
bootstrapFields.getLeft());
assertEquals(Arrays.asList(getField("begin_lat"), getField("tip_history"),
getField("rider")), bootstrapFields.getRight());
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index b1ff1656157d..2101a17e26df 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -712,11 +712,11 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
List<HoodieRecord>
expectedHoodieUnmergedRecords,
String[] orderingFields)
throws Exception {
HoodieTableMetaClient metaClient =
HoodieTestUtils.createMetaClient(storageConf, tablePath);
- Schema avroSchema = new
TableSchemaResolver(metaClient).getTableAvroSchema();
- expectedHoodieRecords =
getExpectedHoodieRecordsWithOrderingValue(expectedHoodieRecords, metaClient,
avroSchema);
- expectedHoodieUnmergedRecords =
getExpectedHoodieRecordsWithOrderingValue(expectedHoodieUnmergedRecords,
metaClient, avroSchema);
- List<HoodieTestDataGenerator.RecordIdentifier> expectedRecords =
convertHoodieRecords(expectedHoodieRecords, avroSchema, orderingFields);
- List<HoodieTestDataGenerator.RecordIdentifier> expectedUnmergedRecords =
convertHoodieRecords(expectedHoodieUnmergedRecords, avroSchema, orderingFields);
+ HoodieSchema schema = new TableSchemaResolver(metaClient).getTableSchema();
+ expectedHoodieRecords =
getExpectedHoodieRecordsWithOrderingValue(expectedHoodieRecords, metaClient,
schema.toAvroSchema());
+ expectedHoodieUnmergedRecords =
getExpectedHoodieRecordsWithOrderingValue(expectedHoodieUnmergedRecords,
metaClient, schema.toAvroSchema());
+ List<HoodieTestDataGenerator.RecordIdentifier> expectedRecords =
convertHoodieRecords(expectedHoodieRecords, schema.toAvroSchema(),
orderingFields);
+ List<HoodieTestDataGenerator.RecordIdentifier> expectedUnmergedRecords =
convertHoodieRecords(expectedHoodieUnmergedRecords, schema.toAvroSchema(),
orderingFields);
validateOutputFromFileGroupReaderWithExistingRecords(
storageConf, tablePath, containsBaseFile, expectedLogFileNum,
recordMergeMode,
expectedRecords, expectedUnmergedRecords);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestParquetRowIndexBasedSchemaHandler.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestParquetRowIndexBasedSchemaHandler.java
index a00546531364..f6cd3426290e 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestParquetRowIndexBasedSchemaHandler.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestParquetRowIndexBasedSchemaHandler.java
@@ -23,10 +23,10 @@ import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.avro.Schema;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -55,7 +55,7 @@ public class TestParquetRowIndexBasedSchemaHandler extends
SchemaHandlerTestBase
//meta cols must go first in the required schema
HoodieSchema expectedRequiredSchema =
generateProjectionSchema("_hoodie_record_key", "begin_lat", "tip_history",
"rider");
assertEquals(expectedRequiredSchema, schemaHandler.getRequiredSchema());
- Pair<List<Schema.Field>, List<Schema.Field>> bootstrapFields =
schemaHandler.getBootstrapRequiredFields();
+ Pair<List<HoodieSchemaField>, List<HoodieSchemaField>> bootstrapFields =
schemaHandler.getBootstrapRequiredFields();
assertEquals(Arrays.asList(getField("_hoodie_record_key"),
getPositionalMergeField()), bootstrapFields.getLeft());
assertEquals(Arrays.asList(getField("begin_lat"), getField("tip_history"),
getField("rider"), getPositionalMergeField()), bootstrapFields.getRight());
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSortUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSortUtils.java
index 38df25837a62..b1a44d978529 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSortUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSortUtils.java
@@ -24,9 +24,9 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.RewriteAvroPayload;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.collection.FlatLists;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.junit.jupiter.api.Assertions;
@@ -63,8 +63,8 @@ public class TestSortUtils {
@ParameterizedTest
@MethodSource("getArguments")
void testGetComparableSortColumnsAvroRecord(HoodieRecordType recordType,
boolean suffixRecordKey) {
- Schema schema = new Schema.Parser().parse(SCHEMA);
- GenericRecord genericRecord = new GenericData.Record(schema);
+ HoodieSchema schema = HoodieSchema.parse(SCHEMA);
+ GenericRecord genericRecord = new
GenericData.Record(schema.toAvroSchema());
genericRecord.put("non_pii_col", "val1");
genericRecord.put("pii_col", "val2");
genericRecord.put("timestamp", 3.5);
@@ -77,7 +77,7 @@ public class TestSortUtils {
record = new TestSparkRecord(new HoodieKey("record1", "partition1"),
payload);
}
String[] userSortColumns = new String[] {"non_pii_col", "timestamp"};
- FlatLists.ComparableList<Comparable<HoodieRecord>> comparableList =
SortUtils.getComparableSortColumns(record, userSortColumns,
Schema.parse(SCHEMA), suffixRecordKey, true);
+ FlatLists.ComparableList<Comparable<HoodieRecord>> comparableList =
SortUtils.getComparableSortColumns(record, userSortColumns,
HoodieSchema.parse(SCHEMA), suffixRecordKey, true);
Object[] expectedSortColumnValues;
if (suffixRecordKey) {
expectedSortColumnValues = new Object[] {"partition1", "val1", 3.5,
"record1"};
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index e330b821cc51..1ee952d9d763 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -68,7 +68,6 @@ import org.apache.hudi.utilities.schema.SchemaPostProcessor;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor;
import
org.apache.hudi.utilities.schema.postprocessor.ChainedSchemaPostProcessor;
-import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.Source;
import
org.apache.hudi.utilities.sources.processor.ChainedJsonKafkaSourcePostProcessor;
import
org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
@@ -225,7 +224,7 @@ public class UtilHelpers {
}
public static StructType getSourceSchema(SchemaProvider schemaProvider) {
- if (schemaProvider != null && schemaProvider.getSourceHoodieSchema() !=
null && schemaProvider.getSourceHoodieSchema() != InputBatch.NULL_SCHEMA) {
+ if (schemaProvider != null && schemaProvider.getSourceHoodieSchema() !=
null && schemaProvider.getSourceHoodieSchema() != HoodieSchema.NULL_SCHEMA) {
return
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schemaProvider.getSourceHoodieSchema());
}
return null;
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
index 00e5fee1ad9e..f07c09b5e8be 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
@@ -20,7 +20,6 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.schema.HoodieSchema;
-import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.util.Option;
@@ -31,7 +30,6 @@ import org.apache.spark.api.java.JavaSparkContext;
public class InputBatch<T> {
- public static final HoodieSchema NULL_SCHEMA =
HoodieSchema.create(HoodieSchemaType.NULL);
private final Option<T> batch;
private final Checkpoint checkpointForNextBatch;
private final SchemaProvider schemaProvider;
@@ -90,7 +88,7 @@ public class InputBatch<T> {
@Override
public HoodieSchema getSourceHoodieSchema() {
- return NULL_SCHEMA;
+ return HoodieSchema.NULL_SCHEMA;
}
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
index 2e08a078daf0..6beb1e7bfb42 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
@@ -34,7 +34,6 @@ import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.utilities.config.CloudSourceConfig;
import org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
-import org.apache.hudi.utilities.sources.InputBatch;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.conf.Configuration;
@@ -290,7 +289,7 @@ public class CloudObjectsSelectorCommon {
StructType rowSchema = null;
if (schemaProviderOption.isPresent()) {
HoodieSchema sourceSchema =
schemaProviderOption.get().getSourceHoodieSchema();
- if (sourceSchema != null &&
!sourceSchema.equals(InputBatch.NULL_SCHEMA)) {
+ if (sourceSchema != null &&
!sourceSchema.equals(HoodieSchema.NULL_SCHEMA)) {
rowSchema =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(sourceSchema);
if (isCoalesceRequired(properties, sourceSchema)) {
reader = reader.schema(addAliasesToRowSchema(sourceSchema,
rowSchema));
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 2995081d82d3..cf2bc551f7c8 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -692,7 +692,7 @@ public class StreamSync implements Serializable, Closeable {
checkpoint = dataAndCheckpoint.getCheckpointForNextBatch();
if (this.userProvidedSchemaProvider != null &&
this.userProvidedSchemaProvider.getTargetHoodieSchema() != null
- && this.userProvidedSchemaProvider.getTargetHoodieSchema() !=
InputBatch.NULL_SCHEMA) {
+ && this.userProvidedSchemaProvider.getTargetHoodieSchema() !=
HoodieSchema.NULL_SCHEMA) {
// Let's deduce the schema provider for writer side first!
schemaProvider =
getDeducedSchemaProvider(this.userProvidedSchemaProvider.getTargetHoodieSchema(),
this.userProvidedSchemaProvider, metaClient);
boolean useRowWriter =
canUseRowWriter(schemaProvider.getTargetHoodieSchema());
@@ -826,7 +826,7 @@ public class StreamSync implements Serializable, Closeable {
}
hoodieConfig.setValue(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(),
HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName(props));
hoodieConfig.setValue("path", cfg.targetBasePath);
- return HoodieSparkSqlWriter.getBulkInsertRowConfig(writerSchema !=
InputBatch.NULL_SCHEMA ? Option.of(writerSchema) : Option.empty(),
+ return HoodieSparkSqlWriter.getBulkInsertRowConfig(writerSchema !=
HoodieSchema.NULL_SCHEMA ? Option.of(writerSchema) : Option.empty(),
hoodieConfig, cfg.targetBasePath, cfg.targetTableName);
}
@@ -1222,7 +1222,7 @@ public class StreamSync implements Serializable,
Closeable {
HoodieSchema newWriteSchema = targetSchema;
try {
// check if targetSchema is equal to NULL schema
- HoodieSchema nullSchema = InputBatch.NULL_SCHEMA;
+ HoodieSchema nullSchema = HoodieSchema.NULL_SCHEMA;
if (targetSchema == null ||
(HoodieSchemaCompatibility.areSchemasCompatible(targetSchema, nullSchema) &&
HoodieSchemaCompatibility.areSchemasCompatible(nullSchema, targetSchema))) {
// target schema is null. fetch schema from commit metadata and use it
int totalCompleted =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants();
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/DummySchemaProvider.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/DummySchemaProvider.java
index f7cce0d3413a..5e1f8ccfb482 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/DummySchemaProvider.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/DummySchemaProvider.java
@@ -20,7 +20,6 @@ package org.apache.hudi.utilities;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.schema.HoodieSchema;
-import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.spark.api.java.JavaSparkContext;
@@ -32,6 +31,6 @@ public class DummySchemaProvider extends SchemaProvider {
@Override
public HoodieSchema getSourceHoodieSchema() {
- return HoodieSchema.create(HoodieSchemaType.NULL);
+ return HoodieSchema.NULL_SCHEMA;
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java
index 95d11e307fd3..26ba7599b451 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java
@@ -233,7 +233,7 @@ public class TestStreamSync extends
SparkClientFunctionalTestHarness {
private SchemaProvider getSchemaProvider(String name, boolean
isNullTargetSchema) {
SchemaProvider schemaProvider = mock(SchemaProvider.class);
HoodieSchema sourceSchema = mock(HoodieSchema.class);
- HoodieSchema targetSchema = isNullTargetSchema ? InputBatch.NULL_SCHEMA :
mock(HoodieSchema.class);
+ HoodieSchema targetSchema = isNullTargetSchema ? HoodieSchema.NULL_SCHEMA
: mock(HoodieSchema.class);
when(schemaProvider.getSourceHoodieSchema()).thenReturn(sourceSchema);
when(schemaProvider.getTargetHoodieSchema()).thenReturn(targetSchema);
when(sourceSchema.toString()).thenReturn(name + "SourceSchema");