This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2bbb5feb80d [HUDI-8637] Refactor broadcast to a more generic reader
context factory (#13148)
2bbb5feb80d is described below
commit 2bbb5feb80d30352f43c0c44dd4762c128160902
Author: Tim Brown <[email protected]>
AuthorDate: Wed Apr 16 21:43:52 2025 -0500
[HUDI-8637] Refactor broadcast to a more generic reader context factory
(#13148)
---
.../apache/hudi/table/EngineBroadcastManager.java | 58 ------------
.../apache/hudi/table/HoodieCompactionHandler.java | 5 +-
.../hudi/table/action/compact/HoodieCompactor.java | 25 ++---
.../hadoop/TestHoodieFileGroupReaderOnHive.java | 2 +-
.../MultipleSparkJobExecutionStrategy.java | 17 ++--
.../client/common/HoodieSparkEngineContext.java | 8 ++
.../common/SparkReaderContextFactory.java} | 97 ++++++++-----------
...HoodieSparkFileGroupReaderBasedMergeHandle.java | 14 +--
.../hudi/table/HoodieSparkCopyOnWriteTable.java | 6 +-
.../HoodieSparkMergeOnReadTableCompactor.java | 10 --
.../hudi/BaseSparkInternalRowReaderContext.java | 5 +
.../SparkFileFormatInternalRowReaderContext.scala | 4 +-
.../common/TestSparkReaderContextFactory.java} | 105 +++++++++++----------
.../hudi/common/engine/HoodieEngineContext.java | 5 +
.../hudi/common/engine/HoodieReaderContext.java | 21 +++--
.../hudi/common/engine/ReaderContextFactory.java | 30 ++++++
.../table/log/HoodieMergedLogRecordReader.java | 4 +-
.../common/table/read/HoodieFileGroupReader.java | 5 +-
.../hudi/common/util/LocalAvroSchemaCache.java | 12 +--
.../testutils/reader/HoodieTestReaderContext.java | 1 +
.../hudi/hadoop/HiveHoodieReaderContext.java | 5 +-
.../HoodieFileGroupReaderBasedRecordReader.java | 3 +-
...odieFileGroupReaderBasedParquetFileFormat.scala | 2 +-
...stSparkFileFormatInternalRowReaderContext.scala | 2 +-
.../procedures/PartitionBucketIndexManager.scala | 10 +-
.../read/TestHoodieFileGroupReaderOnSpark.scala | 4 +-
.../TestSpark35RecordPositionMetadataColumn.scala | 2 +-
27 files changed, 194 insertions(+), 268 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/EngineBroadcastManager.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/EngineBroadcastManager.java
deleted file mode 100644
index bcee2829dd8..00000000000
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/EngineBroadcastManager.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.table;
-
-import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.storage.StoragePath;
-
-import org.apache.hadoop.conf.Configuration;
-
-import java.io.Serializable;
-
-/**
- * Broadcast variable management for engines.
- */
-public class EngineBroadcastManager implements Serializable {
-
- /**
- * Prepares and broadcasts necessary information needed by compactor.
- */
- public void prepareAndBroadcast() {
- // NO operation.
- }
-
- /**
- * Returns the {@link HoodieReaderContext} instance needed by the file group
reader based on
- * the broadcast variables.
- *
- * @param basePath Table base path
- */
- public Option<HoodieReaderContext>
retrieveFileGroupReaderContext(StoragePath basePath) {
- return Option.empty();
- }
-
- /**
- * Retrieves the broadcast configuration.
- */
- public Option<Configuration> retrieveStorageConfig() {
- return Option.empty();
- }
-}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCompactionHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCompactionHandler.java
index e97e613bb4c..87fd81bbd4c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCompactionHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCompactionHandler.java
@@ -28,8 +28,6 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
-import org.apache.hadoop.conf.Configuration;
-
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
@@ -50,8 +48,7 @@ public interface HoodieCompactionHandler<T> {
default List<WriteStatus> compactUsingFileGroupReader(String instantTime,
CompactionOperation
operation,
HoodieWriteConfig
writeConfig,
- HoodieReaderContext
readerContext,
- Configuration conf) {
+ HoodieReaderContext
readerContext) {
throw new HoodieNotSupportedException("This engine does not support file
group reader based compaction.");
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index bca2dfe26c5..a08bd76ac79 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -24,6 +24,8 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.engine.ReaderContextFactory;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.CompactionOperation;
@@ -46,7 +48,6 @@ import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.table.EngineBroadcastManager;
import org.apache.hudi.table.HoodieCompactionHandler;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
@@ -89,16 +90,6 @@ public abstract class HoodieCompactor<T, I, K, O> implements
Serializable {
*/
public abstract void maybePersist(HoodieData<WriteStatus> writeStatus,
HoodieEngineContext context, HoodieWriteConfig config, String instantTime);
- /**
- * @param context {@link HoodieEngineContext} instance
- *
- * @return the {@link EngineBroadcastManager} if available.
- */
- public Option<EngineBroadcastManager>
getEngineBroadcastManager(HoodieEngineContext context,
-
HoodieTableMetaClient metaClient) {
- return Option.empty();
- }
-
/**
* Execute compaction operations and report back status.
*/
@@ -152,11 +143,9 @@ public abstract class HoodieCompactor<T, I, K, O>
implements Serializable {
&& config.populateMetaFields();
// Virtual key support by fg reader is not ready
if (useFileGroupReaderBasedCompaction) {
- Option<EngineBroadcastManager> broadcastManagerOpt =
getEngineBroadcastManager(context, metaClient);
- // Broadcast required information.
-
broadcastManagerOpt.ifPresent(EngineBroadcastManager::prepareAndBroadcast);
+ ReaderContextFactory<T> readerContextFactory =
context.getReaderContextFactory(metaClient);
return context.parallelize(operations).map(
- operation -> compact(compactionHandler, metaClient, config,
operation, compactionInstantTime, broadcastManagerOpt))
+ operation -> compact(compactionHandler, config, operation,
compactionInstantTime, readerContextFactory.getContext()))
.flatMap(List::iterator);
} else {
return context.parallelize(operations).map(
@@ -297,14 +286,12 @@ public abstract class HoodieCompactor<T, I, K, O>
implements Serializable {
* Execute a single compaction operation and report back status.
*/
public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler,
- HoodieTableMetaClient metaClient,
HoodieWriteConfig writeConfig,
CompactionOperation operation,
String instantTime,
- Option<EngineBroadcastManager>
broadcastManagerOpt) throws IOException {
+ HoodieReaderContext<T> hoodieReaderContext)
throws IOException {
return compactionHandler.compactUsingFileGroupReader(instantTime,
operation,
- writeConfig,
broadcastManagerOpt.get().retrieveFileGroupReaderContext(metaClient.getBasePath()).get(),
- broadcastManagerOpt.get().retrieveStorageConfig().get());
+ writeConfig, hoodieReaderContext);
}
public String getMaxInstantTime(HoodieTableMetaClient metaClient) {
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
index 93848f3496a..572fd81638b 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
@@ -142,7 +142,7 @@ public class TestHoodieFileGroupReaderOnHive extends
TestHoodieFileGroupReaderBa
setupJobconf(jobConf);
return new HiveHoodieReaderContext(readerCreator,
getRecordKeyField(metaClient),
getStoredPartitionFieldNames(new
JobConf(storageConf.unwrapAs(Configuration.class)), avroSchema),
- new ObjectInspectorCache(avroSchema, jobConf));
+ new ObjectInspectorCache(avroSchema, jobConf), storageConf);
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 0b922571283..5c53aa8494c 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.engine.ReaderContextFactory;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.FileSlice;
@@ -72,10 +73,8 @@ import
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.SparkBroadcastManager;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import
org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
@@ -455,8 +454,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
SerializableSchema serializableTableSchemaWithMetaFields = new
SerializableSchema(tableSchemaWithMetaFields);
// broadcast reader context.
- SparkBroadcastManager broadcastManager = new
SparkBroadcastManager(getEngineContext(), getHoodieTable().getMetaClient());
- broadcastManager.prepareAndBroadcast();
+ ReaderContextFactory<InternalRow> readerContextFactory =
getEngineContext().getReaderContextFactory(getHoodieTable().getMetaClient());
StructType sparkSchemaWithMetaFields =
AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaWithMetaFields);
RDD<InternalRow> internalRowRDD = jsc.parallelize(clusteringOps,
clusteringOps.size()).flatMap(new FlatMapFunction<ClusteringOperation,
InternalRow>() {
@@ -469,20 +467,17 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
if (isInternalSchemaPresent) {
internalSchemaOption = SerDeHelper.fromJson(internalSchemaStr);
}
- Option<HoodieReaderContext> readerContextOpt =
broadcastManager.retrieveFileGroupReaderContext(new StoragePath(basePath));
- Configuration conf = broadcastManager.retrieveStorageConfig().get();
// instantiate FG reader
- HoodieFileGroupReader<T> fileGroupReader = new
HoodieFileGroupReader<>(readerContextOpt.get(),
- getHoodieTable().getMetaClient().getStorage().newInstance(new
StoragePath(basePath), new HadoopStorageConfiguration(conf)),
+ HoodieReaderContext<InternalRow> readerContext =
readerContextFactory.getContext();
+ HoodieFileGroupReader<InternalRow> fileGroupReader = new
HoodieFileGroupReader<>(readerContext,
+ getHoodieTable().getMetaClient().getStorage().newInstance(new
StoragePath(basePath), readerContext.getStorageConfiguration()),
basePath, instantTime, fileSlice, readerSchema, readerSchema,
internalSchemaOption,
getHoodieTable().getMetaClient(),
getHoodieTable().getMetaClient().getTableConfig().getProps(),
0, Long.MAX_VALUE, usePosition, false);
fileGroupReader.initRecordIterators();
// read records from the FG reader
- HoodieFileGroupReader.HoodieFileGroupReaderIterator<InternalRow>
recordIterator
- =
(HoodieFileGroupReader.HoodieFileGroupReaderIterator<InternalRow>)
fileGroupReader.getClosableIterator();
- return recordIterator;
+ return fileGroupReader.getClosableIterator();
}
}).rdd();
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
index 1bafb29f207..cc8fd4c4ca7 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
@@ -25,11 +25,13 @@ import
org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.ReaderContextFactory;
import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
@@ -48,6 +50,7 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.catalyst.InternalRow;
import javax.annotation.concurrent.ThreadSafe;
@@ -250,6 +253,11 @@ public class HoodieSparkEngineContext extends
HoodieEngineContext {
return HoodieJavaRDD.getJavaRDD(data).aggregate(zeroValue, seqOpFunc,
combOpFunc);
}
+ @Override
+ public ReaderContextFactory<InternalRow>
getReaderContextFactory(HoodieTableMetaClient metaClient) {
+ return new SparkReaderContextFactory(this, metaClient);
+ }
+
public SparkConf getConf() {
return javaSparkContext.getConf();
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/SparkBroadcastManager.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
similarity index 62%
rename from
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/SparkBroadcastManager.java
rename to
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
index af7d18c14fd..d2d5d170fd4 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/SparkBroadcastManager.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
@@ -7,46 +7,44 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-package org.apache.hudi.table;
+package org.apache.hudi.client.common;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.SparkFileFormatInternalRowReaderContext;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.SparkInternalSchemaConverter;
-import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.engine.ReaderContextFactory;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantFileNameGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.InternalSchema;
-import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.datasources.FileFormat;
import org.apache.spark.sql.execution.datasources.parquet.SparkParquetReader;
+import org.apache.spark.sql.hudi.SparkAdapter;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.util.SerializableConfiguration;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -56,30 +54,18 @@ import scala.Tuple2;
import scala.collection.JavaConverters;
/**
- * Broadcast variable management for Spark.
+ * Factory that provides the {@link InternalRow} based {@link
HoodieReaderContext} for reading data into the spark native format.
*/
-public class SparkBroadcastManager extends EngineBroadcastManager {
+class SparkReaderContextFactory implements ReaderContextFactory<InternalRow> {
+ private final Broadcast<SparkParquetReader> parquetReaderBroadcast;
+ private final Broadcast<SerializableConfiguration> configurationBroadcast;
- private final transient HoodieEngineContext context;
- private final transient HoodieTableMetaClient metaClient;
-
- protected Option<SparkParquetReader> parquetReaderOpt = Option.empty();
- protected Broadcast<SQLConf> sqlConfBroadcast;
- protected Broadcast<SparkParquetReader> parquetReaderBroadcast;
- protected Broadcast<SerializableConfiguration> configurationBroadcast;
-
- public SparkBroadcastManager(HoodieEngineContext context,
HoodieTableMetaClient metaClient) {
- this.context = context;
- this.metaClient = metaClient;
+ SparkReaderContextFactory(HoodieSparkEngineContext hoodieSparkEngineContext,
HoodieTableMetaClient metaClient) {
+ this(hoodieSparkEngineContext, metaClient, new
TableSchemaResolver(metaClient), SparkAdapterSupport$.MODULE$.sparkAdapter());
}
- @Override
- public void prepareAndBroadcast() {
- if (!(context instanceof HoodieSparkEngineContext)) {
- throw new HoodieIOException("Expected to be called using Engine's
context and not local context");
- }
-
- HoodieSparkEngineContext hoodieSparkEngineContext =
(HoodieSparkEngineContext) context;
+ SparkReaderContextFactory(HoodieSparkEngineContext hoodieSparkEngineContext,
HoodieTableMetaClient metaClient,
+ TableSchemaResolver resolver, SparkAdapter
sparkAdapter) {
SQLConf sqlConf =
hoodieSparkEngineContext.getSqlContext().sessionState().conf();
JavaSparkContext jsc = hoodieSparkEngineContext.jsc();
@@ -88,49 +74,46 @@ public class SparkBroadcastManager extends
EngineBroadcastManager {
scala.collection.immutable.Map<String, String> options =
scala.collection.immutable.Map$.MODULE$.<String, String>empty()
.$plus(new Tuple2<>(FileFormat.OPTION_RETURNING_BATCH(),
Boolean.toString(returningBatch)));
- TableSchemaResolver resolver = new TableSchemaResolver(metaClient);
InstantFileNameGenerator fileNameGenerator =
metaClient.getTimelineLayout().getInstantFileNameGenerator();
HoodieTimeline timeline =
metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
Map<String, String> schemaEvolutionConfigs =
getSchemaEvolutionConfigs(resolver, timeline, fileNameGenerator,
metaClient.getBasePath().toString());
// Broadcast: SQLConf.
- sqlConfBroadcast = jsc.broadcast(sqlConf);
// Broadcast: Configuration.
Configuration configs = getHadoopConfiguration(jsc.hadoopConfiguration());
- addSchemaEvolutionConfigs(configs, schemaEvolutionConfigs);
+ schemaEvolutionConfigs.forEach(configs::set);
configurationBroadcast = jsc.broadcast(new
SerializableConfiguration(configs));
// Broadcast: ParquetReader.
// Spark parquet reader has to be instantiated on the driver and broadcast
to the executors
- parquetReaderOpt =
Option.of(SparkAdapterSupport$.MODULE$.sparkAdapter().createParquetFileReader(
- false, sqlConfBroadcast.getValue(), options,
configurationBroadcast.getValue().value()));
- parquetReaderBroadcast = jsc.broadcast(parquetReaderOpt.get());
+ SparkParquetReader parquetFileReader =
sparkAdapter.createParquetFileReader(false, sqlConf, options, configs);
+ parquetReaderBroadcast = jsc.broadcast(parquetFileReader);
}
@Override
- public Option<HoodieReaderContext>
retrieveFileGroupReaderContext(StoragePath basePath) {
+ public HoodieReaderContext<InternalRow> getContext() {
if (parquetReaderBroadcast == null) {
throw new HoodieException("Spark Parquet reader broadcast is not
initialized.");
}
+ if (configurationBroadcast == null) {
+ throw new HoodieException("Configuration broadcast is not initialized.");
+ }
+
SparkParquetReader sparkParquetReader = parquetReaderBroadcast.getValue();
if (sparkParquetReader != null) {
- List<Filter> filters = new ArrayList<>();
- return Option.of(new SparkFileFormatInternalRowReaderContext(
+ List<Filter> filters = Collections.emptyList();
+ return new SparkFileFormatInternalRowReaderContext(
sparkParquetReader,
JavaConverters.asScalaBufferConverter(filters).asScala().toSeq(),
- JavaConverters.asScalaBufferConverter(filters).asScala().toSeq()));
+ JavaConverters.asScalaBufferConverter(filters).asScala().toSeq(),
+ new
HadoopStorageConfiguration(configurationBroadcast.getValue().value()));
} else {
throw new HoodieException("Cannot get the broadcast Spark Parquet
reader.");
}
}
- @Override
- public Option<Configuration> retrieveStorageConfig() {
- return Option.of(configurationBroadcast.getValue().value());
- }
-
- static Configuration getHadoopConfiguration(Configuration configuration) {
+ private static Configuration getHadoopConfiguration(Configuration
configuration) {
// new Configuration() is critical so that we don't run into
ConcurrentModificatonException
Configuration hadoopConf = new Configuration(configuration);
hadoopConf.setBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED().key(),
false);
@@ -147,10 +130,10 @@ public class SparkBroadcastManager extends
EngineBroadcastManager {
return (new HadoopStorageConfiguration(hadoopConf).getInline()).unwrap();
}
- static Map<String, String> getSchemaEvolutionConfigs(TableSchemaResolver
schemaResolver,
- HoodieTimeline timeline,
-
InstantFileNameGenerator fileNameGenerator,
- String basePath) {
+ private static Map<String, String>
getSchemaEvolutionConfigs(TableSchemaResolver schemaResolver,
+ HoodieTimeline
timeline,
+
InstantFileNameGenerator fileNameGenerator,
+ String
basePath) {
Option<InternalSchema> internalSchemaOpt =
schemaResolver.getTableInternalSchemaFromCommitMetadata();
Map<String, String> configs = new HashMap<>();
if (internalSchemaOpt.isPresent()) {
@@ -160,10 +143,4 @@ public class SparkBroadcastManager extends
EngineBroadcastManager {
}
return configs;
}
-
- static void addSchemaEvolutionConfigs(Configuration configs, Map<String,
String> schemaEvolutionConfigs) {
- for (Map.Entry<String, String> entry : schemaEvolutionConfigs.entrySet()) {
- configs.set(entry.getKey(), entry.getValue());
- }
- }
-}
\ No newline at end of file
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
index 0fa5d82727c..7a73e3c7824 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
@@ -49,12 +49,10 @@ import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.avro.Schema;
-import org.apache.hadoop.conf.Configuration;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
@@ -82,19 +80,17 @@ import static
org.apache.hudi.common.config.HoodieReaderConfig.MERGE_USE_RECORD_
public class HoodieSparkFileGroupReaderBasedMergeHandle<T, I, K, O> extends
HoodieMergeHandle<T, I, K, O> {
private static final Logger LOG =
LoggerFactory.getLogger(HoodieSparkFileGroupReaderBasedMergeHandle.class);
- protected HoodieReaderContext readerContext;
- protected FileSlice fileSlice;
- protected Configuration conf;
- protected HoodieReadStats readStats;
+ private final HoodieReaderContext<T> readerContext;
+ private final FileSlice fileSlice;
+ private HoodieReadStats readStats;
public HoodieSparkFileGroupReaderBasedMergeHandle(HoodieWriteConfig config,
String instantTime, HoodieTable<T, I, K, O> hoodieTable,
CompactionOperation
operation, TaskContextSupplier taskContextSupplier,
Option<BaseKeyGenerator>
keyGeneratorOpt,
- HoodieReaderContext
readerContext, Configuration conf) {
+ HoodieReaderContext<T>
readerContext) {
super(config, instantTime, operation.getPartitionPath(),
operation.getFileId(), hoodieTable, taskContextSupplier);
this.keyToNewRecords = Collections.emptyMap();
this.readerContext = readerContext;
- this.conf = conf;
Option<HoodieBaseFile> baseFileOpt =
operation.getBaseFile(config.getBasePath(),
operation.getPartitionPath());
List<HoodieLogFile> logFiles =
operation.getDeltaFileNames().stream().map(p ->
@@ -189,7 +185,7 @@ public class HoodieSparkFileGroupReaderBasedMergeHandle<T,
I, K, O> extends Hood
props.put(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(),
String.valueOf(maxMemoryPerCompaction));
// Initializes file group reader
try (HoodieFileGroupReader<T> fileGroupReader = new
HoodieFileGroupReader<>(readerContext,
- storage.newInstance(hoodieTable.getMetaClient().getBasePath(), new
HadoopStorageConfiguration(conf)),
+ storage.newInstance(hoodieTable.getMetaClient().getBasePath(),
readerContext.getStorageConfiguration()),
hoodieTable.getMetaClient().getBasePath().toString(), instantTime,
fileSlice,
writeSchemaWithMetaFields, writeSchemaWithMetaFields,
internalSchemaOption,
hoodieTable.getMetaClient(), props, 0, Long.MAX_VALUE, usePosition,
false)) {
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index c4ebead38aa..c5200625f56 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -80,7 +80,6 @@ import
org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.rollback.RestorePlanActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
-import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -272,12 +271,11 @@ public class HoodieSparkCopyOnWriteTable<T>
public List<WriteStatus> compactUsingFileGroupReader(String instantTime,
CompactionOperation
operation,
HoodieWriteConfig
writeConfig,
- HoodieReaderContext
readerContext,
- Configuration conf) {
+ HoodieReaderContext
readerContext) {
config.setDefault(writeConfig);
Option<BaseKeyGenerator> keyGeneratorOpt =
HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(config);
HoodieSparkFileGroupReaderBasedMergeHandle mergeHandle = new
HoodieSparkFileGroupReaderBasedMergeHandle(config,
- instantTime, this, operation, taskContextSupplier, keyGeneratorOpt,
readerContext, conf);
+ instantTime, this, operation, taskContextSupplier, keyGeneratorOpt,
readerContext);
mergeHandle.write();
return mergeHandle.close();
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
index d6eff9cd0c1..d47eb6d33aa 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
@@ -25,14 +25,10 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.EngineBroadcastManager;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.SparkBroadcastManager;
import static
org.apache.hudi.config.HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL_VALUE;
@@ -45,12 +41,6 @@ import static
org.apache.hudi.config.HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVE
public class HoodieSparkMergeOnReadTableCompactor<T>
extends HoodieCompactor<T, HoodieData<HoodieRecord<T>>,
HoodieData<HoodieKey>, HoodieData<WriteStatus>> {
- @Override
- public Option<EngineBroadcastManager>
getEngineBroadcastManager(HoodieEngineContext context,
-
HoodieTableMetaClient metaClient) {
- return Option.of(new SparkBroadcastManager(context, metaClient));
- }
-
@Override
public void preCompact(
HoodieTable table, HoodieTimeline pendingCompactionTimeline,
WriteOperationType operationType, String instantTime) {
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index c39c430451c..8df5bdf57b7 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieSparkRecord;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.storage.StorageConfiguration;
import org.apache.avro.Schema;
import org.apache.spark.sql.HoodieInternalRowUtils;
@@ -54,6 +55,10 @@ import static
org.apache.spark.sql.HoodieInternalRowUtils.getCachedSchema;
*/
public abstract class BaseSparkInternalRowReaderContext extends
HoodieReaderContext<InternalRow> {
+ protected BaseSparkInternalRowReaderContext(StorageConfiguration<?>
storageConfig) {
+ super(storageConfig);
+ }
+
@Override
public Option<HoodieRecordMerger> getRecordMerger(RecordMergeMode mergeMode,
String mergeStrategyId, String mergeImplClasses) {
// TODO(HUDI-7843):
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index 7eb154ddd0a..6ed68edf40d 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -61,7 +61,9 @@ import scala.collection.mutable
*/
class SparkFileFormatInternalRowReaderContext(parquetFileReader:
SparkParquetReader,
filters: Seq[Filter],
- requiredFilters: Seq[Filter])
extends BaseSparkInternalRowReaderContext {
+ requiredFilters: Seq[Filter],
+ storageConfiguration:
StorageConfiguration[_])
+ extends BaseSparkInternalRowReaderContext(storageConfiguration) {
lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
private lazy val bootstrapSafeFilters: Seq[Filter] =
filters.filter(filterIsSafeForBootstrap) ++ requiredFilters
private val deserializerMap: mutable.Map[Schema, HoodieAvroDeserializer] =
mutable.Map()
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestSparkBroadcastManager.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/common/TestSparkReaderContextFactory.java
similarity index 57%
rename from
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestSparkBroadcastManager.java
rename to
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/common/TestSparkReaderContextFactory.java
index cb33a887c19..1a51ed27510 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestSparkBroadcastManager.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/common/TestSparkReaderContextFactory.java
@@ -7,21 +7,22 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-package org.apache.hudi.table;
+package org.apache.hudi.client.common;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.client.utils.SparkInternalSchemaConverter;
+import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.ActionType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -32,64 +33,44 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.fs.inline.InLineFileSystem;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.datasources.FileFormat;
+import org.apache.spark.sql.execution.datasources.parquet.SparkParquetReader;
+import org.apache.spark.sql.hudi.SparkAdapter;
import org.apache.spark.sql.internal.SQLConf;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
+
+import scala.Tuple2;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-class TestSparkBroadcastManager {
- @Test
- void testGetStorageConfiguration() {
- Configuration config = new Configuration(false);
- Configuration createdConfig =
SparkBroadcastManager.getHadoopConfiguration(config);
-
assertFalse(createdConfig.getBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED().key(),
true));
- assertFalse(createdConfig.getBoolean(SQLConf.CASE_SENSITIVE().key(),
true));
-
assertFalse(createdConfig.getBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key(),
true));
-
assertTrue(createdConfig.getBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(),
false));
-
assertFalse(createdConfig.getBoolean("spark.sql.legacy.parquet.nanosAsLong",
true));
- if (HoodieSparkUtils.gteqSpark3_4()) {
-
assertFalse(createdConfig.getBoolean("spark.sql.parquet.inferTimestampNTZ.enabled",
true));
- }
-
- String inlineClassName = createdConfig.get("fs." + InLineFileSystem.SCHEME
+ ".impl");
- assertEquals(InLineFileSystem.class.getName(), inlineClassName);
- }
-
- @Test
- void testExtraConfigsAdded() {
- Map<String, String> extraConfigs = new HashMap<>();
- extraConfigs.put("K1", "V1");
- Configuration configs = new Configuration(false);
- SparkBroadcastManager.addSchemaEvolutionConfigs(configs, extraConfigs);
- assertEquals("V1", configs.get("K1"));
- }
-
+class TestSparkReaderContextFactory extends HoodieClientTestBase {
@Test
void testGetSchemaEvolutionConfigurations() {
TableSchemaResolver schemaResolver = mock(TableSchemaResolver.class);
HoodieTimeline timeline = mock(HoodieTimeline.class);
InstantFileNameGenerator fileNameGenerator = new
InstantFileNameGeneratorV2();
String basePath = "any_table_path";
+ metaClient = mock(HoodieTableMetaClient.class, RETURNS_DEEP_STUBS);
+ when(metaClient.getBasePath()).thenReturn(new StoragePath(basePath));
+
when(metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants()).thenReturn(timeline);
+
when(metaClient.getTimelineLayout().getInstantFileNameGenerator()).thenReturn(fileNameGenerator);
- // Test when InternalSchema is empty.
-
when(schemaResolver.getTableInternalSchemaFromCommitMetadata()).thenReturn(Option.empty());
- Map<String, String> schemaEvolutionConfigs =
SparkBroadcastManager.getSchemaEvolutionConfigs(
- schemaResolver, timeline, fileNameGenerator, basePath);
- assertTrue(schemaEvolutionConfigs.isEmpty());
-
- // Test when InternalSchema is not empty.
InstantGeneratorV2 instantGen = new InstantGeneratorV2();
Types.RecordType record = Types.RecordType.get(Collections.singletonList(
Types.Field.get(0, "col1", Types.BooleanType.get())));
@@ -103,14 +84,38 @@ class TestSparkBroadcastManager {
InternalSchema internalSchema = new InternalSchema(record);
when(schemaResolver.getTableInternalSchemaFromCommitMetadata()).thenReturn(Option.of(internalSchema));
when(timeline.getInstants()).thenReturn(instants);
- schemaEvolutionConfigs = SparkBroadcastManager.getSchemaEvolutionConfigs(
- schemaResolver, timeline, fileNameGenerator, basePath);
- assertFalse(schemaEvolutionConfigs.isEmpty());
+ SparkAdapter sparkAdapter = mock(SparkAdapter.class);
+ scala.collection.immutable.Map<String, String> options =
+ scala.collection.immutable.Map$.MODULE$.<String, String>empty()
+ .$plus(new Tuple2<>(FileFormat.OPTION_RETURNING_BATCH(),
Boolean.toString(true)));
+ ArgumentCaptor<Configuration> configurationArgumentCaptor =
ArgumentCaptor.forClass(Configuration.class);
+ SparkParquetReader sparkParquetReader = mock(SparkParquetReader.class);
+ when(sparkAdapter.createParquetFileReader(eq(false),
eq(context.getSqlContext().sessionState().conf()), eq(options),
configurationArgumentCaptor.capture()))
+ .thenReturn(sparkParquetReader);
+
+ SparkReaderContextFactory sparkHoodieReaderContextFactory = new
SparkReaderContextFactory(context, metaClient, schemaResolver, sparkAdapter);
+ HoodieReaderContext<InternalRow> readerContext =
sparkHoodieReaderContextFactory.getContext();
+
+ Configuration createdConfig =
readerContext.getStorageConfiguration().unwrapAs(Configuration.class);
+ assertEquals(createdConfig, configurationArgumentCaptor.getValue());
+
+
assertFalse(createdConfig.getBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED().key(),
true));
+ assertFalse(createdConfig.getBoolean(SQLConf.CASE_SENSITIVE().key(),
true));
+
assertFalse(createdConfig.getBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key(),
true));
+
assertTrue(createdConfig.getBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(),
false));
+
assertFalse(createdConfig.getBoolean("spark.sql.legacy.parquet.nanosAsLong",
true));
+ if (HoodieSparkUtils.gteqSpark3_4()) {
+
assertFalse(createdConfig.getBoolean("spark.sql.parquet.inferTimestampNTZ.enabled",
true));
+ }
+
+ String inlineClassName = createdConfig.get("fs." + InLineFileSystem.SCHEME
+ ".impl");
+ assertEquals(InLineFileSystem.class.getName(), inlineClassName);
+
assertEquals(
"0001_0005.deltacommit,0002_0006.deltacommit,0003_0007.commit",
-
schemaEvolutionConfigs.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST));
+
createdConfig.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST));
assertEquals(
- "any_table_path",
-
schemaEvolutionConfigs.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH));
+ basePath,
+ createdConfig.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH));
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
index 5334f06fafa..6a8ecd243b1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
@@ -130,4 +131,8 @@ public abstract class HoodieEngineContext {
* @return the result of the aggregation
*/
public abstract <I, O> O aggregate(HoodieData<I> data, O zeroValue,
Functions.Function2<O, I, O> seqOp, Functions.Function2<O, O, O> combOp);
+
+ public <T> ReaderContextFactory<T>
getReaderContextFactory(HoodieTableMetaClient metaClient) {
+ throw new UnsupportedOperationException("Reader context factory is not yet
supported");
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 2751998b111..88d6ca971d3 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.util.LocalAvroSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
@@ -36,7 +37,6 @@ import org.apache.avro.generic.IndexedRecord;
import javax.annotation.Nullable;
-import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
@@ -57,8 +57,8 @@ import static
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIEL
* @param <T> The type of engine-specific record representation, e.g.,{@code
InternalRow} in Spark
* and {@code RowData} in Flink.
*/
-public abstract class HoodieReaderContext<T> implements Closeable {
-
+public abstract class HoodieReaderContext<T> {
+ private final StorageConfiguration<?> storageConfiguration;
private FileGroupReaderSchemaHandler<T> schemaHandler = null;
private String tablePath = null;
private String latestCommitTime = null;
@@ -71,6 +71,10 @@ public abstract class HoodieReaderContext<T> implements
Closeable {
// for encoding and decoding schemas to the spillable map
private final LocalAvroSchemaCache localAvroSchemaCache =
LocalAvroSchemaCache.getInstance();
+ protected HoodieReaderContext(StorageConfiguration<?> storageConfiguration) {
+ this.storageConfiguration = storageConfiguration;
+ }
+
// Getter and Setter for schemaHandler
public FileGroupReaderSchemaHandler<T> getSchemaHandler() {
return schemaHandler;
@@ -143,6 +147,10 @@ public abstract class HoodieReaderContext<T> implements
Closeable {
this.shouldMergeUseRecordPosition = shouldMergeUseRecordPosition;
}
+ public StorageConfiguration<?> getStorageConfiguration() {
+ return storageConfiguration;
+ }
+
// These internal key names are only used in memory for record metadata and
merging,
// and should not be persisted to storage.
public static final String INTERNAL_META_RECORD_KEY = "_0";
@@ -420,11 +428,4 @@ public abstract class HoodieReaderContext<T> implements
Closeable {
private Schema decodeAvroSchema(Object versionId) {
return this.localAvroSchemaCache.getSchema((Integer)
versionId).orElse(null);
}
-
- @Override
- public void close() {
- if (this.localAvroSchemaCache != null) {
- this.localAvroSchemaCache.close();
- }
- }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/ReaderContextFactory.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/ReaderContextFactory.java
new file mode 100644
index 00000000000..c6902e5b022
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/ReaderContextFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.engine;
+
+import java.io.Serializable;
+
+/**
+ * A factory that will return a {@link HoodieReaderContext<T>} for an engine
specific data type.
+ * The factory must be serializable by default so the factory can be
serialized to worker nodes in a distributed compute framework like Spark.
+ * @param <T> The engine specific data type for the reader
+ */
+public interface ReaderContextFactory<T> extends Serializable {
+ HoodieReaderContext<T> getContext();
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
index ab45b8c7fa0..91ef3681120 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
@@ -181,8 +181,8 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
/**
* Returns the builder for {@code HoodieMergedLogRecordReader}.
*/
- public static Builder newBuilder() {
- return new Builder();
+ public static <T> Builder<T> newBuilder() {
+ return new Builder<>();
}
public long getTotalTimeTakenToReadAndMergeBlocks() {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 58c04d3e733..a57fb777fbd 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -290,7 +290,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
private void scanLogFiles() {
String path = readerContext.getTablePath();
- try (HoodieMergedLogRecordReader logRecordReader =
HoodieMergedLogRecordReader.newBuilder()
+ try (HoodieMergedLogRecordReader<T> logRecordReader =
HoodieMergedLogRecordReader.<T>newBuilder()
.withHoodieReaderContext(readerContext)
.withStorage(storage)
.withLogFiles(logFiles)
@@ -319,9 +319,6 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
if (recordBuffer != null) {
recordBuffer.close();
}
- if (readerContext != null) {
- readerContext.close();
- }
}
public HoodieFileGroupReaderIterator<T> getClosableIterator() {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/LocalAvroSchemaCache.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/LocalAvroSchemaCache.java
index 2286cba71b8..3d4090534f5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/LocalAvroSchemaCache.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/LocalAvroSchemaCache.java
@@ -19,12 +19,9 @@
package org.apache.hudi.common.util;
import org.apache.avro.Schema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
-import java.io.Closeable;
import java.util.HashMap;
import java.util.Map;
@@ -34,8 +31,7 @@ import java.util.Map;
* A map of {version_id, schema} is maintained.
*/
@NotThreadSafe
-public class LocalAvroSchemaCache implements Closeable {
- private static final Logger LOG =
LoggerFactory.getLogger(LocalAvroSchemaCache.class);
+public class LocalAvroSchemaCache {
private final Map<Integer, Schema> versionIdToSchema; // the mapping from
version_id -> schema
private final Map<Schema, Integer> schemaToVersionId; // the mapping from
schema -> version_id
@@ -63,10 +59,4 @@ public class LocalAvroSchemaCache implements Closeable {
public Option<Schema> getSchema(Integer versionId) {
return Option.ofNullable(this.versionIdToSchema.get(versionId));
}
-
- @Override
- public void close() {
- this.schemaToVersionId.clear();
- this.versionIdToSchema.clear();
- }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
index 9f97bccece6..1c536e1b7a7 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
@@ -59,6 +59,7 @@ public class HoodieTestReaderContext extends
HoodieReaderContext<IndexedRecord>
public HoodieTestReaderContext(
Option<HoodieRecordMerger> customMerger,
Option<String> payloadClass) {
+ super(null);
this.customMerger = customMerger;
this.payloadClass = payloadClass;
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index b0eaf6cb25c..b37c7d910dd 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -37,6 +37,7 @@ import
org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
@@ -95,7 +96,9 @@ public class HiveHoodieReaderContext extends
HoodieReaderContext<ArrayWritable>
protected
HiveHoodieReaderContext(HoodieFileGroupReaderBasedRecordReader.HiveReaderCreator
readerCreator,
String recordKeyField,
List<String> partitionCols,
- ObjectInspectorCache objectInspectorCache)
{
+ ObjectInspectorCache objectInspectorCache,
+ StorageConfiguration<?>
storageConfiguration) {
+ super(storageConfiguration);
this.readerCreator = readerCreator;
this.partitionCols = partitionCols;
this.partitionColSet = new HashSet<>(this.partitionCols);
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
index 9100ffad49d..39deb6a0971 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
@@ -38,6 +38,7 @@ import org.apache.hudi.hadoop.realtime.RealtimeSplit;
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
@@ -124,7 +125,7 @@ public class HoodieFileGroupReaderBasedRecordReader
implements RecordReader<Null
Schema requestedSchema = createRequestedSchema(tableSchema, jobConfCopy);
this.readerContext = new HiveHoodieReaderContext(readerCreator,
getRecordKeyField(metaClient),
getStoredPartitionFieldNames(jobConfCopy, tableSchema),
- new ObjectInspectorCache(tableSchema, jobConfCopy));
+ new ObjectInspectorCache(tableSchema, jobConfCopy), new
HadoopStorageConfiguration(jobConfCopy));
this.arrayWritable = new ArrayWritable(Writable.class, new
Writable[requestedSchema.getFields().size()]);
TypedProperties props = metaClient.getTableConfig().getProps();
jobConf.forEach(e -> {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index a5afcf42564..3bceb6eb4b8 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -170,7 +170,7 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String,
.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file))
fileSliceMapping.getSlice(filegroupName) match {
case Some(fileSlice) if !isCount && (requiredSchema.nonEmpty ||
fileSlice.getLogFiles.findAny().isPresent) =>
- val readerContext = new
SparkFileFormatInternalRowReaderContext(parquetFileReader.value, filters,
requiredFilters)
+ val readerContext = new
SparkFileFormatInternalRowReaderContext(parquetFileReader.value, filters,
requiredFilters, storageConf)
val metaClient: HoodieTableMetaClient = HoodieTableMetaClient
.builder().setConf(storageConf).setBasePath(tablePath).build
val props = metaClient.getTableConfig.getProps
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
index fec096920aa..5c7f1816389 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
@@ -78,7 +78,7 @@ class TestSparkFileFormatInternalRowReaderContext extends
SparkClientFunctionalT
def testConvertValueToEngineType(): Unit = {
val reader = Mockito.mock(classOf[SparkParquetReader])
val stringValue = "string_value"
- val sparkReaderContext = new
SparkFileFormatInternalRowReaderContext(reader, Seq.empty, Seq.empty)
+ val sparkReaderContext = new
SparkFileFormatInternalRowReaderContext(reader, Seq.empty, Seq.empty,
storageConf())
assertEquals(1, sparkReaderContext.convertValueToEngineType(1))
assertEquals(1L, sparkReaderContext.convertValueToEngineType(1L))
assertEquals(1.1f, sparkReaderContext.convertValueToEngineType(1.1f))
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
index 2d8811f9612..24c4663583e 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
@@ -22,7 +22,7 @@ import
org.apache.hudi.DataSourceWriteOptions.{BULK_INSERT_OPERATION_OPT_VAL, EN
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.config.{HoodieMetadataConfig,
HoodieReaderConfig, SerializableSchema}
-import org.apache.hudi.common.engine.HoodieEngineContext
+import org.apache.hudi.common.engine.{HoodieEngineContext,
ReaderContextFactory}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{PartitionBucketIndexHashingConfig,
WriteOperationType}
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
@@ -35,7 +35,6 @@ import org.apache.hudi.exception.HoodieException
import org.apache.hudi.index.bucket.partition.{PartitionBucketIndexCalculator,
PartitionBucketIndexUtils}
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.storage.StoragePath
-import org.apache.hudi.table.SparkBroadcastManager
import org.apache.avro.Schema
import org.apache.spark.internal.Logging
@@ -213,9 +212,7 @@ class PartitionBucketIndexManager extends BaseProcedure
throw new HoodieException("Failed to get table schema during
clustering", e)
}
- // broadcast reader context.
- val broadcastManager = new SparkBroadcastManager(context, metaClient)
- broadcastManager.prepareAndBroadcast()
+ val readerContextFactory: ReaderContextFactory[InternalRow] =
context.getReaderContextFactory(metaClient)
val sparkSchemaWithMetaFields =
AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaWithMetaFields)
val res: RDD[InternalRow] = if (allFileSlice.isEmpty) {
@@ -227,10 +224,9 @@ class PartitionBucketIndexManager extends BaseProcedure
spark.sparkContext.parallelize(allFileSlice,
allFileSlice.size).flatMap(fileSlice => {
// instantiate other supporting cast
val readerSchema = serializableTableSchemaWithMetaFields.get
- val readerContextOpt =
broadcastManager.retrieveFileGroupReaderContext(basePath)
val internalSchemaOption: Option[InternalSchema] = Option.empty()
// instantiate FG reader
- val fileGroupReader = new
HoodieFileGroupReader(readerContextOpt.get(),
+ val fileGroupReader = new
HoodieFileGroupReader(readerContextFactory.getContext,
metaClient.getStorage,
basePath.toString,
latestInstantTime.requestedTime(),
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index b9f71c687d2..a5290bcab0f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -99,7 +99,7 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
override def getHoodieReaderContext(tablePath: String, avroSchema: Schema,
storageConf: StorageConfiguration[_]): HoodieReaderContext[InternalRow] = {
val reader = sparkAdapter.createParquetFileReader(vectorized = false,
spark.sessionState.conf, Map.empty,
storageConf.unwrapAs(classOf[Configuration]))
- new SparkFileFormatInternalRowReaderContext(reader, Seq.empty, Seq.empty)
+ new SparkFileFormatInternalRowReaderContext(reader, Seq.empty, Seq.empty,
getStorageConf)
}
override def commitToTable(recordList: util.List[HoodieRecord[_]],
operation: String, options: util.Map[String, String]): Unit = {
@@ -149,7 +149,7 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
@Test
def testGetOrderingValue(): Unit = {
val reader = Mockito.mock(classOf[SparkParquetReader])
- val sparkReaderContext = new
SparkFileFormatInternalRowReaderContext(reader, Seq.empty, Seq.empty)
+ val sparkReaderContext = new
SparkFileFormatInternalRowReaderContext(reader, Seq.empty, Seq.empty,
getStorageConf)
val orderingFieldName = "col2"
val avroSchema = new Schema.Parser().parse(
"{\"type\": \"record\",\"name\": \"test\",\"namespace\":
\"org.apache.hudi\",\"fields\": ["
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
index 9e9dd995156..e650f8fd9b3 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
@@ -92,7 +92,7 @@ class TestSpark35RecordPositionMetadataColumn extends
SparkClientFunctionalTestH
assertFalse(allBaseFiles.isEmpty)
val requiredSchema =
SparkFileFormatInternalRowReaderContext.getAppliedRequiredSchema(dataSchema,
- new SparkFileFormatInternalRowReaderContext(reader, Seq.empty,
Seq.empty).supportsParquetRowIndex)
+ new SparkFileFormatInternalRowReaderContext(reader, Seq.empty,
Seq.empty, storageConf()).supportsParquetRowIndex)
// Confirm if the schema is as expected.
if (HoodieSparkUtils.gteqSpark3_5) {