This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new be8d5256a82 [HUDI-7735] Remove usage of SerializableConfiguration
(#11177)
be8d5256a82 is described below
commit be8d5256a829fd6363f677ad6a265c6ea7a73f86
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed May 8 22:51:42 2024 -0700
[HUDI-7735] Remove usage of SerializableConfiguration (#11177)
---
.../hudi/client/transaction/lock/LockManager.java | 7 +--
.../apache/spark/HoodieSparkKryoRegistrar.scala | 18 +++---
.../common/config/SerializableConfiguration.java | 69 ----------------------
.../java/org/apache/hudi/common/fs/FSUtils.java | 66 +++++----------------
.../hudi/sink/StreamWriteOperatorCoordinator.java | 9 +--
.../apache/hudi/sink/utils/HiveSyncContext.java | 6 +-
.../testsuite/configuration/DFSDeltaConfig.java | 8 ++-
.../integ/testsuite/configuration/DeltaConfig.java | 10 ++--
.../hudi/integ/testsuite/dag/WriterContext.java | 4 +-
.../TestDFSHoodieTestSuiteWriterAdapter.java | 3 +-
.../procedures/ShowInvalidParquetProcedure.scala | 7 +--
.../multitable/MultiTableServiceUtils.java | 17 +++---
.../sources/S3EventsHoodieIncrSource.java | 8 ++-
.../helpers/CloudObjectsSelectorCommon.java | 12 ++--
.../sources/helpers/DatePartitionPathSelector.java | 6 +-
.../helpers/gcs/GcsObjectMetadataFetcher.java | 8 ++-
16 files changed, 81 insertions(+), 177 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
index 3b5b9c449a9..c8a07d09684 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
@@ -20,7 +20,6 @@ package org.apache.hudi.client.transaction.lock;
import org.apache.hudi.client.transaction.lock.metrics.HoodieLockMetrics;
import org.apache.hudi.common.config.LockConfiguration;
-import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.lock.LockProvider;
import org.apache.hudi.common.util.ReflectionUtils;
@@ -51,7 +50,7 @@ public class LockManager implements Serializable,
AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(LockManager.class);
private final HoodieWriteConfig writeConfig;
private final LockConfiguration lockConfiguration;
- private final SerializableConfiguration hadoopConf;
+ private final StorageConfiguration<?> storageConf;
private final int maxRetries;
private final long maxWaitTimeInMs;
private final RetryHelper<Boolean, HoodieLockException> lockRetryHelper;
@@ -64,7 +63,7 @@ public class LockManager implements Serializable,
AutoCloseable {
public LockManager(HoodieWriteConfig writeConfig, FileSystem fs,
TypedProperties lockProps) {
this.writeConfig = writeConfig;
- this.hadoopConf = new SerializableConfiguration(fs.getConf());
+ this.storageConf = HadoopFSUtils.getStorageConfWithCopy(fs.getConf());
this.lockConfiguration = new LockConfiguration(lockProps);
maxRetries =
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue()));
@@ -112,7 +111,7 @@ public class LockManager implements Serializable,
AutoCloseable {
LOG.info("LockProvider " + writeConfig.getLockProviderClass());
lockProvider = (LockProvider)
ReflectionUtils.loadClass(writeConfig.getLockProviderClass(),
new Class<?>[] {LockConfiguration.class, StorageConfiguration.class},
- lockConfiguration, HadoopFSUtils.getStorageConf(hadoopConf.get()));
+ lockConfiguration, storageConf);
}
return lockProvider;
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
index dd98227d440..a8650e5668a 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
@@ -18,14 +18,15 @@
package org.apache.spark
-import com.esotericsoftware.kryo.io.{Input, Output}
-import com.esotericsoftware.kryo.{Kryo, Serializer}
-import com.esotericsoftware.kryo.serializers.JavaSerializer
import org.apache.hudi.client.model.HoodieInternalRow
-import org.apache.hudi.common.config.SerializableConfiguration
import org.apache.hudi.common.model.{HoodieKey, HoodieSparkRecord}
import org.apache.hudi.common.util.HoodieCommonKryoRegistrar
import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.storage.StorageConfiguration
+
+import com.esotericsoftware.kryo.io.{Input, Output}
+import com.esotericsoftware.kryo.serializers.JavaSerializer
+import com.esotericsoftware.kryo.{Kryo, Serializer}
import org.apache.spark.serializer.KryoRegistrator
/**
@@ -59,9 +60,12 @@ class HoodieSparkKryoRegistrar extends
HoodieCommonKryoRegistrar with KryoRegist
kryo.register(classOf[HoodieSparkRecord])
kryo.register(classOf[HoodieInternalRow])
- // NOTE: Hadoop's configuration is not a serializable object by itself,
and hence
- // we're relying on [[SerializableConfiguration]] wrapper to work it
around
- kryo.register(classOf[SerializableConfiguration], new JavaSerializer())
+ // NOTE: This entry is used for [[SerializableConfiguration]] before since
+ // Hadoop's configuration is not a serializable object by itself,
and hence
+ // we're relying on [[SerializableConfiguration]] wrapper to work it
around.
+ // We cannot remove this entry; otherwise the ordering is changed.
+ // So we replace it with [[StorageConfiguration]].
+ kryo.register(classOf[StorageConfiguration[_]], new JavaSerializer())
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableConfiguration.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableConfiguration.java
deleted file mode 100644
index 23a22e01822..00000000000
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableConfiguration.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.common.config;
-
-import org.apache.hadoop.conf.Configuration;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
-/**
- * A wrapped configuration which can be serialized.
- */
-public class SerializableConfiguration implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private transient Configuration configuration;
-
- public SerializableConfiguration(Configuration configuration) {
- this.configuration = new Configuration(configuration);
- }
-
- public SerializableConfiguration(SerializableConfiguration configuration) {
- this.configuration = configuration.newCopy();
- }
-
- public Configuration newCopy() {
- return new Configuration(configuration);
- }
-
- public Configuration get() {
- return configuration;
- }
-
- private void writeObject(ObjectOutputStream out) throws IOException {
- out.defaultWriteObject();
- configuration.write(out);
- }
-
- private void readObject(ObjectInputStream in) throws IOException {
- configuration = new Configuration(false);
- configuration.readFields(in);
- }
-
- @Override
- public String toString() {
- StringBuilder str = new StringBuilder();
- configuration.iterator().forEachRemaining(e ->
str.append(String.format("%s => %s \n", e.getKey(), e.getValue())));
- return configuration.toString();
- }
-}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 431ad474a02..4160e099d44 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -20,7 +20,6 @@
package org.apache.hudi.common.fs;
import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
@@ -38,12 +37,10 @@ import org.apache.hudi.exception.InvalidHoodiePathException;
import org.apache.hudi.hadoop.fs.CachingPath;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem;
-import org.apache.hudi.hadoop.fs.NoOpConsistencyGuard;
import org.apache.hudi.hadoop.fs.inline.InLineFSUtils;
import org.apache.hudi.hadoop.fs.inline.InLineFileSystem;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathFilter;
@@ -706,22 +703,6 @@ public class FSUtils {
return returnConf;
}
- /**
- * Get the FS implementation for this table.
- * @param path Path String
- * @param hadoopConf Serializable Hadoop Configuration
- * @param consistencyGuardConfig Consistency Guard Config
- * @return HoodieWrapperFileSystem
- */
- public static HoodieWrapperFileSystem getFs(String path,
SerializableConfiguration hadoopConf,
- ConsistencyGuardConfig consistencyGuardConfig) {
- FileSystem fileSystem = HadoopFSUtils.getFs(path, hadoopConf.newCopy());
- return new HoodieWrapperFileSystem(fileSystem,
- consistencyGuardConfig.isConsistencyCheckEnabled()
- ? new
FailSafeConsistencyGuard(HoodieStorageUtils.getStorage(fileSystem),
consistencyGuardConfig)
- : new NoOpConsistencyGuard());
- }
-
/**
* Helper to filter out paths under metadata folder when running
fs.globStatus.
*
@@ -767,44 +748,15 @@ public class FSUtils {
return false;
}
- /**
- * Processes sub-path in parallel.
- *
- * @param hoodieEngineContext {@code HoodieEngineContext} instance
- * @param fs file system
- * @param dirPath directory path
- * @param parallelism parallelism to use for sub-paths
- * @param subPathPredicate predicate to use to filter sub-paths for
processing
- * @param pairFunction actual processing logic for each sub-path
- * @param <T> type of result to return for each sub-path
- * @return a map of sub-path to result of the processing
- */
- public static <T> Map<String, T> parallelizeSubPathProcess(
- HoodieEngineContext hoodieEngineContext, FileSystem fs, Path dirPath,
int parallelism,
- Predicate<FileStatus> subPathPredicate,
SerializableFunction<Pair<String, SerializableConfiguration>, T> pairFunction) {
- Map<String, T> result = new HashMap<>();
- try {
- FileStatus[] fileStatuses = fs.listStatus(dirPath);
- List<String> subPaths = Arrays.stream(fileStatuses)
- .filter(subPathPredicate)
- .map(fileStatus -> fileStatus.getPath().toString())
- .collect(Collectors.toList());
- result = parallelizeFilesProcess(hoodieEngineContext, fs, parallelism,
pairFunction, subPaths);
- } catch (IOException ioe) {
- throw new HoodieIOException(ioe.getMessage(), ioe);
- }
- return result;
- }
-
public static <T> Map<String, T> parallelizeFilesProcess(
HoodieEngineContext hoodieEngineContext,
FileSystem fs,
int parallelism,
- SerializableFunction<Pair<String, SerializableConfiguration>, T>
pairFunction,
+ SerializableFunction<Pair<String, StorageConfiguration<Configuration>>,
T> pairFunction,
List<String> subPaths) {
Map<String, T> result = new HashMap<>();
if (subPaths.size() > 0) {
- SerializableConfiguration conf = new
SerializableConfiguration(fs.getConf());
+ StorageConfiguration<Configuration> conf =
HadoopFSUtils.getStorageConfWithCopy(fs.getConf());
int actualParallelism = Math.min(subPaths.size(), parallelism);
hoodieEngineContext.setJobStatus(FSUtils.class.getSimpleName(),
@@ -817,6 +769,18 @@ public class FSUtils {
return result;
}
+ /**
+ * Processes sub-path in parallel.
+ *
+ * @param hoodieEngineContext {@link HoodieEngineContext} instance
+ * @param storage {@link HoodieStorage} instance
+ * @param dirPath directory path
+ * @param parallelism parallelism to use for sub-paths
+ * @param subPathPredicate predicate to use to filter sub-paths for
processing
+ * @param pairFunction actual processing logic for each sub-path
+ * @param <T> type of result to return for each sub-path
+ * @return a map of sub-path to result of the processing
+ */
public static <T> Map<String, T> parallelizeSubPathProcess(
HoodieEngineContext hoodieEngineContext, HoodieStorage storage,
StoragePath dirPath, int parallelism,
Predicate<StoragePathInfo> subPathPredicate,
SerializableFunction<Pair<String, StorageConfiguration<?>>, T> pairFunction) {
@@ -900,7 +864,7 @@ public class FSUtils {
pairOfSubPathAndConf -> {
Path path = new Path(pairOfSubPathAndConf.getKey());
try {
- FileSystem fileSystem =
path.getFileSystem(pairOfSubPathAndConf.getValue().get());
+ FileSystem fileSystem =
path.getFileSystem(pairOfSubPathAndConf.getValue().unwrap());
return Arrays.stream(fileSystem.listStatus(path))
.collect(Collectors.toList());
} catch (IOException e) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 61feb294b49..207c301e425 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -21,7 +21,6 @@ package org.apache.hudi.sink;
import org.apache.hudi.adapter.OperatorCoordinatorAdapter;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -34,6 +33,7 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
@@ -41,6 +41,7 @@ import org.apache.hudi.sink.meta.CkpMetadata;
import org.apache.hudi.sink.meta.CkpMetadataFactory;
import org.apache.hudi.sink.utils.HiveSyncContext;
import org.apache.hudi.sink.utils.NonThrownExecutor;
+import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.util.ClientIds;
import org.apache.hudi.util.ClusteringUtil;
import org.apache.hudi.util.CompactionUtil;
@@ -95,7 +96,7 @@ public class StreamWriteOperatorCoordinator
/**
* Hive config options.
*/
- private final SerializableConfiguration hiveConf;
+ private final StorageConfiguration<org.apache.hadoop.conf.Configuration>
storageConf;
/**
* Coordinator context.
@@ -175,7 +176,7 @@ public class StreamWriteOperatorCoordinator
this.conf = conf;
this.context = context;
this.parallelism = context.currentParallelism();
- this.hiveConf = new
SerializableConfiguration(HadoopConfigurations.getHiveConf(conf));
+ this.storageConf =
HadoopFSUtils.getStorageConfWithCopy(HadoopConfigurations.getHiveConf(conf));
}
@Override
@@ -320,7 +321,7 @@ public class StreamWriteOperatorCoordinator
private void initHiveSync() {
this.hiveSyncExecutor =
NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
- this.hiveSyncContext = HiveSyncContext.create(conf, this.hiveConf);
+ this.hiveSyncContext = HiveSyncContext.create(conf, this.storageConf);
}
private void syncHiveAsync() {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
index 0d4cd10b8c7..e0c88eccc39 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
@@ -18,13 +18,13 @@
package org.apache.hudi.sink.utils;
-import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.hive.ddl.HiveSyncMode;
+import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.util.StreamerUtil;
@@ -85,11 +85,11 @@ public class HiveSyncContext {
return new HiveSyncTool(props, hiveConf);
}
- public static HiveSyncContext create(Configuration conf,
SerializableConfiguration serConf) {
+ public static HiveSyncContext create(Configuration conf,
StorageConfiguration<org.apache.hadoop.conf.Configuration> storageConf) {
Properties props = buildSyncConfig(conf);
org.apache.hadoop.conf.Configuration hadoopConf =
HadoopConfigurations.getHadoopConf(conf);
HiveConf hiveConf = new HiveConf();
- hiveConf.addResource(serConf.get());
+ hiveConf.addResource(storageConf.unwrap());
if (!FlinkOptions.isDefaultValueDefined(conf,
FlinkOptions.HIVE_SYNC_METASTORE_URIS)) {
hadoopConf.set(HiveConf.ConfVars.METASTOREURIS.varname,
conf.getString(FlinkOptions.HIVE_SYNC_METASTORE_URIS));
}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java
index 231f6c4830e..fff0c71583d 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java
@@ -18,9 +18,11 @@
package org.apache.hudi.integ.testsuite.configuration;
-import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import org.apache.hadoop.conf.Configuration;
/**
* Configuration to hold details about a DFS based output type, implements
{@link DeltaConfig}.
@@ -43,10 +45,10 @@ public class DFSDeltaConfig extends DeltaConfig {
private boolean useHudiToGenerateUpdates;
public DFSDeltaConfig(DeltaOutputMode deltaOutputMode, DeltaInputType
deltaInputType,
- SerializableConfiguration configuration,
+ StorageConfiguration<Configuration> storageConf,
String deltaBasePath, String targetBasePath, String
schemaStr, Long maxFileSize,
int inputParallelism, boolean deleteOldInputData,
boolean useHudiToGenerateUpdates) {
- super(deltaOutputMode, deltaInputType, configuration);
+ super(deltaOutputMode, deltaInputType, storageConf);
this.deltaBasePath = deltaBasePath;
this.schemaStr = schemaStr;
this.maxFileSize = maxFileSize;
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
index bbcd375e5f7..244877e799b 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
@@ -18,11 +18,11 @@
package org.apache.hudi.integ.testsuite.configuration;
-import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
+import org.apache.hudi.storage.StorageConfiguration;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.conf.Configuration;
@@ -40,13 +40,13 @@ public class DeltaConfig implements Serializable {
private final DeltaOutputMode deltaOutputMode;
private final DeltaInputType deltaInputType;
- private final SerializableConfiguration configuration;
+ private final StorageConfiguration<Configuration> storageConf;
public DeltaConfig(DeltaOutputMode deltaOutputMode, DeltaInputType
deltaInputType,
- SerializableConfiguration configuration) {
+ StorageConfiguration<Configuration> storageConf) {
this.deltaOutputMode = deltaOutputMode;
this.deltaInputType = deltaInputType;
- this.configuration = configuration;
+ this.storageConf = storageConf;
}
public DeltaOutputMode getDeltaOutputMode() {
@@ -58,7 +58,7 @@ public class DeltaConfig implements Serializable {
}
public Configuration getConfiguration() {
- return configuration.get();
+ return storageConf.unwrap();
}
/**
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
index 6966bda01b6..6df2c718812 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
@@ -18,9 +18,9 @@
package org.apache.hudi.integ.testsuite.dag;
-import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.integ.testsuite.HoodieContinuousTestSuiteWriter;
import org.apache.hudi.integ.testsuite.HoodieInlineTestSuiteWriter;
import
org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
@@ -77,7 +77,7 @@ public class WriterContext {
int inputParallelism = cfg.inputParallelism > 0 ? cfg.inputParallelism :
jsc.defaultParallelism();
this.deltaGenerator = new DeltaGenerator(
new DFSDeltaConfig(DeltaOutputMode.valueOf(cfg.outputTypeName),
DeltaInputType.valueOf(cfg.inputFormatName),
- new SerializableConfiguration(jsc.hadoopConfiguration()),
cfg.inputBasePath, cfg.targetBasePath,
+ HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration()),
cfg.inputBasePath, cfg.targetBasePath,
schemaStr, cfg.limitFileSize, inputParallelism,
cfg.deleteOldInput, cfg.useHudiToGenerateUpdates),
jsc, sparkSession, schemaStr, keyGenerator);
log.info(String.format("Initialized writerContext with: %s", schemaStr));
diff --git
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
index f2ec458bf2d..521495cacb8 100644
---
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
+++
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
@@ -18,7 +18,6 @@
package org.apache.hudi.integ.testsuite;
-import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
@@ -131,7 +130,7 @@ public class TestDFSHoodieTestSuiteWriterAdapter extends
UtilitiesTestBase {
// TODO(HUDI-3668): Fix this test
public void testDFSWorkloadSinkWithMultipleFilesFunctional() throws
IOException {
DeltaConfig dfsSinkConfig = new DFSDeltaConfig(DeltaOutputMode.DFS,
DeltaInputType.AVRO,
- new SerializableConfiguration(jsc.hadoopConfiguration()), basePath,
basePath,
+ HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration()),
basePath, basePath,
schemaProvider.getSourceSchema().toString(), 10240L,
jsc.defaultParallelism(), false, false);
DeltaWriterAdapter<GenericRecord> dfsDeltaWriterAdapter =
DeltaWriterFactory
.getDeltaWriterAdapter(dfsSinkConfig, 1);
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
index 7c7a62c8ab1..193e23ccb7c 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.client.common.HoodieSparkEngineContext
-import org.apache.hudi.common.config.SerializableConfiguration
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.hadoop.fs.HadoopFSUtils
@@ -52,16 +51,16 @@ class ShowInvalidParquetProcedure extends BaseProcedure
with ProcedureBuilder {
val limit = getArgValueOrDefault(args, PARAMETERS(1))
val partitionPaths: java.util.List[String] =
FSUtils.getAllPartitionPaths(new HoodieSparkEngineContext(jsc), srcPath, false)
val javaRdd: JavaRDD[String] = jsc.parallelize(partitionPaths,
partitionPaths.size())
- val serHadoopConf = new
SerializableConfiguration(jsc.hadoopConfiguration())
+ val storageConf =
HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration())
val parquetRdd = javaRdd.rdd.map(part => {
- val fs = HadoopFSUtils.getFs(new Path(srcPath), serHadoopConf.get())
+ val fs = HadoopFSUtils.getFs(new Path(srcPath), storageConf.unwrap())
FSUtils.getAllDataFilesInPartition(fs,
FSUtils.constructAbsolutePathInHadoopPath(srcPath, part))
}).flatMap(_.toList)
.filter(status => {
val filePath = status.getPath
var isInvalid = false
if (filePath.toString.endsWith(".parquet")) {
- try ParquetFileReader.readFooter(serHadoopConf.get(), filePath,
SKIP_ROW_GROUPS).getFileMetaData catch {
+ try ParquetFileReader.readFooter(storageConf.unwrap(), filePath,
SKIP_ROW_GROUPS).getFileMetaData catch {
case e: Exception =>
isInvalid = e.getMessage.contains("is not a Parquet file")
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java
index f600db65733..4aabc85898d 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java
@@ -20,12 +20,13 @@
package org.apache.hudi.utilities.multitable;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hadoop.conf.Configuration;
@@ -62,7 +63,7 @@ public class MultiTableServiceUtils {
}
public static List<String> getTablesToBeServedFromProps(JavaSparkContext
jsc, TypedProperties properties) {
- SerializableConfiguration conf = new
SerializableConfiguration(jsc.hadoopConfiguration());
+ StorageConfiguration<Configuration> conf =
HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration());
String combinedTablesString =
properties.getString(Constants.TABLES_TO_BE_SERVED_PROP);
boolean skipWrongPath =
properties.getBoolean(Constants.TABLES_SKIP_WRONG_PATH, false);
if (combinedTablesString == null) {
@@ -74,7 +75,7 @@ public class MultiTableServiceUtils {
if (skipWrongPath) {
tablePaths = Arrays.stream(tablesArray)
.filter(tablePath -> {
- if (isHoodieTable(new Path(tablePath), conf.get())) {
+ if (isHoodieTable(new Path(tablePath), conf.unwrap())) {
return true;
} else {
// Log the wrong path in console.
@@ -85,7 +86,7 @@ public class MultiTableServiceUtils {
} else {
tablePaths = Arrays.asList(tablesArray);
tablePaths.stream()
- .filter(tablePath -> !isHoodieTable(new Path(tablePath), conf.get()))
+ .filter(tablePath -> !isHoodieTable(new Path(tablePath),
conf.unwrap()))
.findFirst()
.ifPresent(tablePath -> {
throw new TableNotFoundException("Table not found: " + tablePath);
@@ -105,8 +106,8 @@ public class MultiTableServiceUtils {
public static List<String> findHoodieTablesUnderPath(JavaSparkContext jsc,
String pathStr) {
Path rootPath = new Path(pathStr);
- SerializableConfiguration conf = new
SerializableConfiguration(jsc.hadoopConfiguration());
- if (isHoodieTable(rootPath, conf.get())) {
+ StorageConfiguration<Configuration> conf =
HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration());
+ if (isHoodieTable(rootPath, conf.unwrap())) {
return Collections.singletonList(pathStr);
}
@@ -119,7 +120,7 @@ public class MultiTableServiceUtils {
while (!pathsToList.isEmpty()) {
// List all directories in parallel
List<FileStatus[]> dirToFileListing = engineContext.map(pathsToList,
path -> {
- FileSystem fileSystem = path.getFileSystem(conf.get());
+ FileSystem fileSystem = path.getFileSystem(conf.unwrap());
return fileSystem.listStatus(path);
}, listingParallelism);
pathsToList.clear();
@@ -131,7 +132,7 @@ public class MultiTableServiceUtils {
if (!dirs.isEmpty()) {
List<Pair<FileStatus, DirType>> dirResults = engineContext.map(dirs,
fileStatus -> {
- if (isHoodieTable(fileStatus.getPath(), conf.get())) {
+ if (isHoodieTable(fileStatus.getPath(), conf.unwrap())) {
return Pair.of(fileStatus, DirType.HOODIE_TABLE);
} else if (fileStatus.getPath().getName().equals(METAFOLDER_NAME)) {
return Pair.of(fileStatus, DirType.META_FOLDER);
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
index 51bc2907cc9..be9914190e7 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
@@ -18,12 +18,13 @@
package org.apache.hudi.utilities.sources;
-import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
@@ -34,6 +35,7 @@ import
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.sources.helpers.QueryRunner;
+import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
@@ -161,13 +163,13 @@ public class S3EventsHoodieIncrSource extends
HoodieIncrSource {
String s3Prefix = s3FS + "://";
// Create S3 paths
- SerializableConfiguration serializableHadoopConf = new
SerializableConfiguration(sparkContext.hadoopConfiguration());
+ StorageConfiguration<Configuration> storageConf =
HadoopFSUtils.getStorageConfWithCopy(sparkContext.hadoopConfiguration());
List<CloudObjectMetadata> cloudObjectMetadata =
checkPointAndDataset.getRight().get()
.select(CloudObjectsSelectorCommon.S3_BUCKET_NAME,
CloudObjectsSelectorCommon.S3_OBJECT_KEY,
CloudObjectsSelectorCommon.S3_OBJECT_SIZE)
.distinct()
- .mapPartitions(getCloudObjectMetadataPerPartition(s3Prefix,
serializableHadoopConf, checkIfFileExists),
Encoders.kryo(CloudObjectMetadata.class))
+ .mapPartitions(getCloudObjectMetadataPerPartition(s3Prefix,
storageConf, checkIfFileExists), Encoders.kryo(CloudObjectMetadata.class))
.collectAsList();
LOG.info("Total number of files to process :" +
cloudObjectMetadata.size());
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
index 8676bf41cb5..8a442455291 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
@@ -20,13 +20,13 @@ package org.apache.hudi.utilities.sources.helpers;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.ConfigProperty;
-import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.utilities.config.CloudSourceConfig;
import org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -90,16 +90,16 @@ public class CloudObjectsSelectorCommon {
* Return a function that extracts filepaths from a list of Rows.
* Here Row is assumed to have the schema [bucket_name,
filepath_relative_to_bucket, object_size]
* @param storageUrlSchemePrefix Eg: s3:// or gs://. The
storage-provider-specific prefix to use within the URL.
- * @param serializableHadoopConf
+ * @param storageConf storage configuration.
* @param checkIfExists check if each file exists, before adding
it to the returned list
* @return
*/
public static MapPartitionsFunction<Row, CloudObjectMetadata>
getCloudObjectMetadataPerPartition(
- String storageUrlSchemePrefix, SerializableConfiguration
serializableHadoopConf, boolean checkIfExists) {
+ String storageUrlSchemePrefix, StorageConfiguration<Configuration>
storageConf, boolean checkIfExists) {
return rows -> {
List<CloudObjectMetadata> cloudObjectMetadataPerPartition = new
ArrayList<>();
rows.forEachRemaining(row -> {
- Option<String> filePathUrl = getUrlForFile(row,
storageUrlSchemePrefix, serializableHadoopConf, checkIfExists);
+ Option<String> filePathUrl = getUrlForFile(row,
storageUrlSchemePrefix, storageConf, checkIfExists);
filePathUrl.ifPresent(url -> {
LOG.info("Adding file: " + url);
long size;
@@ -130,9 +130,9 @@ public class CloudObjectsSelectorCommon {
* @param storageUrlSchemePrefix Eg: s3:// or gs://. The
storage-provider-specific prefix to use within the URL.
*/
private static Option<String> getUrlForFile(Row row, String
storageUrlSchemePrefix,
- SerializableConfiguration
serializableConfiguration,
+
StorageConfiguration<Configuration> storageConf,
boolean checkIfExists) {
- final Configuration configuration = serializableConfiguration.newCopy();
+ final Configuration configuration = storageConf.unwrapCopy();
String bucket = row.getString(0);
String filePath = storageUrlSchemePrefix + bucket + "/" + row.getString(1);
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
index 0b7197e3a5b..ab9ccbb8ca7 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
@@ -19,12 +19,12 @@
package org.apache.hudi.utilities.sources.helpers;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
@@ -195,12 +195,12 @@ public class DatePartitionPathSelector extends
DFSPathSelector {
if (datePartitionDepth <= 0) {
return partitionPaths;
}
- SerializableConfiguration serializedConf = new SerializableConfiguration(
+ StorageConfiguration<Configuration> storageConf =
HadoopFSUtils.getStorageConfWithCopy(
((FileSystem) storage.getFileSystem()).getConf());
for (int i = 0; i < datePartitionDepth; i++) {
partitionPaths = context.flatMap(partitionPaths, path -> {
Path subDir = new Path(path);
- FileSystem fileSystem = subDir.getFileSystem(serializedConf.get());
+ FileSystem fileSystem = subDir.getFileSystem(storageConf.unwrap());
// skip files/dirs whose names start with (_, ., etc)
FileStatus[] statuses = fileSystem.listStatus(subDir,
file -> IGNORE_FILEPREFIX_LIST.stream()
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
index 29a50e81fb0..21ca334d05f 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
@@ -18,11 +18,13 @@
package org.apache.hudi.utilities.sources.helpers.gcs;
-import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
+import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
@@ -62,11 +64,11 @@ public class GcsObjectMetadataFetcher implements
Serializable {
* @return A {@link List} of {@link CloudObjectMetadata} containing GCS info.
*/
public List<CloudObjectMetadata> getGcsObjectMetadata(JavaSparkContext jsc,
Dataset<Row> cloudObjectMetadataDF, boolean checkIfExists) {
- SerializableConfiguration serializableHadoopConf = new
SerializableConfiguration(jsc.hadoopConfiguration());
+ StorageConfiguration<Configuration> storageConf =
HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration());
return cloudObjectMetadataDF
.select("bucket", "name", "size")
.distinct()
- .mapPartitions(getCloudObjectMetadataPerPartition(GCS_PREFIX,
serializableHadoopConf, checkIfExists),
Encoders.kryo(CloudObjectMetadata.class))
+ .mapPartitions(getCloudObjectMetadataPerPartition(GCS_PREFIX,
storageConf, checkIfExists), Encoders.kryo(CloudObjectMetadata.class))
.collectAsList();
}