This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new be8d5256a82 [HUDI-7735] Remove usage of SerializableConfiguration 
(#11177)
be8d5256a82 is described below

commit be8d5256a829fd6363f677ad6a265c6ea7a73f86
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed May 8 22:51:42 2024 -0700

    [HUDI-7735] Remove usage of SerializableConfiguration (#11177)
---
 .../hudi/client/transaction/lock/LockManager.java  |  7 +--
 .../apache/spark/HoodieSparkKryoRegistrar.scala    | 18 +++---
 .../common/config/SerializableConfiguration.java   | 69 ----------------------
 .../java/org/apache/hudi/common/fs/FSUtils.java    | 66 +++++----------------
 .../hudi/sink/StreamWriteOperatorCoordinator.java  |  9 +--
 .../apache/hudi/sink/utils/HiveSyncContext.java    |  6 +-
 .../testsuite/configuration/DFSDeltaConfig.java    |  8 ++-
 .../integ/testsuite/configuration/DeltaConfig.java | 10 ++--
 .../hudi/integ/testsuite/dag/WriterContext.java    |  4 +-
 .../TestDFSHoodieTestSuiteWriterAdapter.java       |  3 +-
 .../procedures/ShowInvalidParquetProcedure.scala   |  7 +--
 .../multitable/MultiTableServiceUtils.java         | 17 +++---
 .../sources/S3EventsHoodieIncrSource.java          |  8 ++-
 .../helpers/CloudObjectsSelectorCommon.java        | 12 ++--
 .../sources/helpers/DatePartitionPathSelector.java |  6 +-
 .../helpers/gcs/GcsObjectMetadataFetcher.java      |  8 ++-
 16 files changed, 81 insertions(+), 177 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
index 3b5b9c449a9..c8a07d09684 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java
@@ -20,7 +20,6 @@ package org.apache.hudi.client.transaction.lock;
 
 import org.apache.hudi.client.transaction.lock.metrics.HoodieLockMetrics;
 import org.apache.hudi.common.config.LockConfiguration;
-import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.lock.LockProvider;
 import org.apache.hudi.common.util.ReflectionUtils;
@@ -51,7 +50,7 @@ public class LockManager implements Serializable, 
AutoCloseable {
   private static final Logger LOG = LoggerFactory.getLogger(LockManager.class);
   private final HoodieWriteConfig writeConfig;
   private final LockConfiguration lockConfiguration;
-  private final SerializableConfiguration hadoopConf;
+  private final StorageConfiguration<?> storageConf;
   private final int maxRetries;
   private final long maxWaitTimeInMs;
   private final RetryHelper<Boolean, HoodieLockException> lockRetryHelper;
@@ -64,7 +63,7 @@ public class LockManager implements Serializable, 
AutoCloseable {
 
   public LockManager(HoodieWriteConfig writeConfig, FileSystem fs, 
TypedProperties lockProps) {
     this.writeConfig = writeConfig;
-    this.hadoopConf = new SerializableConfiguration(fs.getConf());
+    this.storageConf = HadoopFSUtils.getStorageConfWithCopy(fs.getConf());
     this.lockConfiguration = new LockConfiguration(lockProps);
     maxRetries = 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
         
Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue()));
@@ -112,7 +111,7 @@ public class LockManager implements Serializable, 
AutoCloseable {
       LOG.info("LockProvider " + writeConfig.getLockProviderClass());
       lockProvider = (LockProvider) 
ReflectionUtils.loadClass(writeConfig.getLockProviderClass(),
           new Class<?>[] {LockConfiguration.class, StorageConfiguration.class},
-          lockConfiguration, HadoopFSUtils.getStorageConf(hadoopConf.get()));
+          lockConfiguration, storageConf);
     }
     return lockProvider;
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
index dd98227d440..a8650e5668a 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
@@ -18,14 +18,15 @@
 
 package org.apache.spark
 
-import com.esotericsoftware.kryo.io.{Input, Output}
-import com.esotericsoftware.kryo.{Kryo, Serializer}
-import com.esotericsoftware.kryo.serializers.JavaSerializer
 import org.apache.hudi.client.model.HoodieInternalRow
-import org.apache.hudi.common.config.SerializableConfiguration
 import org.apache.hudi.common.model.{HoodieKey, HoodieSparkRecord}
 import org.apache.hudi.common.util.HoodieCommonKryoRegistrar
 import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.storage.StorageConfiguration
+
+import com.esotericsoftware.kryo.io.{Input, Output}
+import com.esotericsoftware.kryo.serializers.JavaSerializer
+import com.esotericsoftware.kryo.{Kryo, Serializer}
 import org.apache.spark.serializer.KryoRegistrator
 
 /**
@@ -59,9 +60,12 @@ class HoodieSparkKryoRegistrar extends 
HoodieCommonKryoRegistrar with KryoRegist
     kryo.register(classOf[HoodieSparkRecord])
     kryo.register(classOf[HoodieInternalRow])
 
-    // NOTE: Hadoop's configuration is not a serializable object by itself, 
and hence
-    //       we're relying on [[SerializableConfiguration]] wrapper to work it 
around
-    kryo.register(classOf[SerializableConfiguration], new JavaSerializer())
+    // NOTE: This entry is used for [[SerializableConfiguration]] before since
+    //       Hadoop's configuration is not a serializable object by itself, 
and hence
+    //       we're relying on [[SerializableConfiguration]] wrapper to work it 
around.
+    //       We cannot remove this entry; otherwise the ordering is changed.
+    //       So we replace it with [[StorageConfiguration]].
+    kryo.register(classOf[StorageConfiguration[_]], new JavaSerializer())
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableConfiguration.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableConfiguration.java
deleted file mode 100644
index 23a22e01822..00000000000
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableConfiguration.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.common.config;
-
-import org.apache.hadoop.conf.Configuration;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
-/**
- * A wrapped configuration which can be serialized.
- */
-public class SerializableConfiguration implements Serializable {
-
-  private static final long serialVersionUID = 1L;
-
-  private transient Configuration configuration;
-
-  public SerializableConfiguration(Configuration configuration) {
-    this.configuration = new Configuration(configuration);
-  }
-
-  public SerializableConfiguration(SerializableConfiguration configuration) {
-    this.configuration = configuration.newCopy();
-  }
-
-  public Configuration newCopy() {
-    return new Configuration(configuration);
-  }
-
-  public Configuration get() {
-    return configuration;
-  }
-
-  private void writeObject(ObjectOutputStream out) throws IOException {
-    out.defaultWriteObject();
-    configuration.write(out);
-  }
-
-  private void readObject(ObjectInputStream in) throws IOException {
-    configuration = new Configuration(false);
-    configuration.readFields(in);
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder str = new StringBuilder();
-    configuration.iterator().forEachRemaining(e -> 
str.append(String.format("%s => %s \n", e.getKey(), e.getValue())));
-    return configuration.toString();
-  }
-}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 431ad474a02..4160e099d44 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -20,7 +20,6 @@
 package org.apache.hudi.common.fs;
 
 import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieLogFile;
@@ -38,12 +37,10 @@ import org.apache.hudi.exception.InvalidHoodiePathException;
 import org.apache.hudi.hadoop.fs.CachingPath;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem;
-import org.apache.hudi.hadoop.fs.NoOpConsistencyGuard;
 import org.apache.hudi.hadoop.fs.inline.InLineFSUtils;
 import org.apache.hudi.hadoop.fs.inline.InLineFileSystem;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathFilter;
@@ -706,22 +703,6 @@ public class FSUtils {
     return returnConf;
   }
 
-  /**
-   * Get the FS implementation for this table.
-   * @param path  Path String
-   * @param hadoopConf  Serializable Hadoop Configuration
-   * @param consistencyGuardConfig Consistency Guard Config
-   * @return HoodieWrapperFileSystem
-   */
-  public static HoodieWrapperFileSystem getFs(String path, 
SerializableConfiguration hadoopConf,
-      ConsistencyGuardConfig consistencyGuardConfig) {
-    FileSystem fileSystem = HadoopFSUtils.getFs(path, hadoopConf.newCopy());
-    return new HoodieWrapperFileSystem(fileSystem,
-        consistencyGuardConfig.isConsistencyCheckEnabled()
-            ? new 
FailSafeConsistencyGuard(HoodieStorageUtils.getStorage(fileSystem), 
consistencyGuardConfig)
-            : new NoOpConsistencyGuard());
-  }
-
   /**
    * Helper to filter out paths under metadata folder when running 
fs.globStatus.
    *
@@ -767,44 +748,15 @@ public class FSUtils {
     return false;
   }
 
-  /**
-   * Processes sub-path in parallel.
-   *
-   * @param hoodieEngineContext {@code HoodieEngineContext} instance
-   * @param fs file system
-   * @param dirPath directory path
-   * @param parallelism parallelism to use for sub-paths
-   * @param subPathPredicate predicate to use to filter sub-paths for 
processing
-   * @param pairFunction actual processing logic for each sub-path
-   * @param <T> type of result to return for each sub-path
-   * @return a map of sub-path to result of the processing
-   */
-  public static <T> Map<String, T> parallelizeSubPathProcess(
-      HoodieEngineContext hoodieEngineContext, FileSystem fs, Path dirPath, 
int parallelism,
-      Predicate<FileStatus> subPathPredicate, 
SerializableFunction<Pair<String, SerializableConfiguration>, T> pairFunction) {
-    Map<String, T> result = new HashMap<>();
-    try {
-      FileStatus[] fileStatuses = fs.listStatus(dirPath);
-      List<String> subPaths = Arrays.stream(fileStatuses)
-          .filter(subPathPredicate)
-          .map(fileStatus -> fileStatus.getPath().toString())
-          .collect(Collectors.toList());
-      result = parallelizeFilesProcess(hoodieEngineContext, fs, parallelism, 
pairFunction, subPaths);
-    } catch (IOException ioe) {
-      throw new HoodieIOException(ioe.getMessage(), ioe);
-    }
-    return result;
-  }
-
   public static <T> Map<String, T> parallelizeFilesProcess(
       HoodieEngineContext hoodieEngineContext,
       FileSystem fs,
       int parallelism,
-      SerializableFunction<Pair<String, SerializableConfiguration>, T> 
pairFunction,
+      SerializableFunction<Pair<String, StorageConfiguration<Configuration>>, 
T> pairFunction,
       List<String> subPaths) {
     Map<String, T> result = new HashMap<>();
     if (subPaths.size() > 0) {
-      SerializableConfiguration conf = new 
SerializableConfiguration(fs.getConf());
+      StorageConfiguration<Configuration> conf = 
HadoopFSUtils.getStorageConfWithCopy(fs.getConf());
       int actualParallelism = Math.min(subPaths.size(), parallelism);
 
       hoodieEngineContext.setJobStatus(FSUtils.class.getSimpleName(),
@@ -817,6 +769,18 @@ public class FSUtils {
     return result;
   }
 
+  /**
+   * Processes sub-path in parallel.
+   *
+   * @param hoodieEngineContext {@link HoodieEngineContext} instance
+   * @param storage             {@link HoodieStorage} instance
+   * @param dirPath             directory path
+   * @param parallelism         parallelism to use for sub-paths
+   * @param subPathPredicate    predicate to use to filter sub-paths for 
processing
+   * @param pairFunction        actual processing logic for each sub-path
+   * @param <T>                 type of result to return for each sub-path
+   * @return a map of sub-path to result of the processing
+   */
   public static <T> Map<String, T> parallelizeSubPathProcess(
       HoodieEngineContext hoodieEngineContext, HoodieStorage storage, 
StoragePath dirPath, int parallelism,
       Predicate<StoragePathInfo> subPathPredicate, 
SerializableFunction<Pair<String, StorageConfiguration<?>>, T> pairFunction) {
@@ -900,7 +864,7 @@ public class FSUtils {
           pairOfSubPathAndConf -> {
             Path path = new Path(pairOfSubPathAndConf.getKey());
             try {
-              FileSystem fileSystem = 
path.getFileSystem(pairOfSubPathAndConf.getValue().get());
+              FileSystem fileSystem = 
path.getFileSystem(pairOfSubPathAndConf.getValue().unwrap());
               return Arrays.stream(fileSystem.listStatus(path))
                 .collect(Collectors.toList());
             } catch (IOException e) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 61feb294b49..207c301e425 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -21,7 +21,6 @@ package org.apache.hudi.sink;
 import org.apache.hudi.adapter.OperatorCoordinatorAdapter;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -34,6 +33,7 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hive.HiveSyncTool;
 import org.apache.hudi.sink.event.CommitAckEvent;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
@@ -41,6 +41,7 @@ import org.apache.hudi.sink.meta.CkpMetadata;
 import org.apache.hudi.sink.meta.CkpMetadataFactory;
 import org.apache.hudi.sink.utils.HiveSyncContext;
 import org.apache.hudi.sink.utils.NonThrownExecutor;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.util.ClientIds;
 import org.apache.hudi.util.ClusteringUtil;
 import org.apache.hudi.util.CompactionUtil;
@@ -95,7 +96,7 @@ public class StreamWriteOperatorCoordinator
   /**
    * Hive config options.
    */
-  private final SerializableConfiguration hiveConf;
+  private final StorageConfiguration<org.apache.hadoop.conf.Configuration> 
storageConf;
 
   /**
    * Coordinator context.
@@ -175,7 +176,7 @@ public class StreamWriteOperatorCoordinator
     this.conf = conf;
     this.context = context;
     this.parallelism = context.currentParallelism();
-    this.hiveConf = new 
SerializableConfiguration(HadoopConfigurations.getHiveConf(conf));
+    this.storageConf = 
HadoopFSUtils.getStorageConfWithCopy(HadoopConfigurations.getHiveConf(conf));
   }
 
   @Override
@@ -320,7 +321,7 @@ public class StreamWriteOperatorCoordinator
 
   private void initHiveSync() {
     this.hiveSyncExecutor = 
NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
-    this.hiveSyncContext = HiveSyncContext.create(conf, this.hiveConf);
+    this.hiveSyncContext = HiveSyncContext.create(conf, this.storageConf);
   }
 
   private void syncHiveAsync() {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
index 0d4cd10b8c7..e0c88eccc39 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
@@ -18,13 +18,13 @@
 
 package org.apache.hudi.sink.utils;
 
-import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.hive.HiveSyncTool;
 import org.apache.hudi.hive.ddl.HiveSyncMode;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.table.format.FilePathUtils;
 import org.apache.hudi.util.StreamerUtil;
 
@@ -85,11 +85,11 @@ public class HiveSyncContext {
     return new HiveSyncTool(props, hiveConf);
   }
 
-  public static HiveSyncContext create(Configuration conf, 
SerializableConfiguration serConf) {
+  public static HiveSyncContext create(Configuration conf, 
StorageConfiguration<org.apache.hadoop.conf.Configuration> storageConf) {
     Properties props = buildSyncConfig(conf);
     org.apache.hadoop.conf.Configuration hadoopConf = 
HadoopConfigurations.getHadoopConf(conf);
     HiveConf hiveConf = new HiveConf();
-    hiveConf.addResource(serConf.get());
+    hiveConf.addResource(storageConf.unwrap());
     if (!FlinkOptions.isDefaultValueDefined(conf, 
FlinkOptions.HIVE_SYNC_METASTORE_URIS)) {
       hadoopConf.set(HiveConf.ConfVars.METASTOREURIS.varname, 
conf.getString(FlinkOptions.HIVE_SYNC_METASTORE_URIS));
     }
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java
index 231f6c4830e..fff0c71583d 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java
@@ -18,9 +18,11 @@
 
 package org.apache.hudi.integ.testsuite.configuration;
 
-import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
 import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import org.apache.hadoop.conf.Configuration;
 
 /**
  * Configuration to hold details about a DFS based output type, implements 
{@link DeltaConfig}.
@@ -43,10 +45,10 @@ public class DFSDeltaConfig extends DeltaConfig {
   private boolean useHudiToGenerateUpdates;
 
   public DFSDeltaConfig(DeltaOutputMode deltaOutputMode, DeltaInputType 
deltaInputType,
-                        SerializableConfiguration configuration,
+                        StorageConfiguration<Configuration> storageConf,
                         String deltaBasePath, String targetBasePath, String 
schemaStr, Long maxFileSize,
                         int inputParallelism, boolean deleteOldInputData, 
boolean useHudiToGenerateUpdates) {
-    super(deltaOutputMode, deltaInputType, configuration);
+    super(deltaOutputMode, deltaInputType, storageConf);
     this.deltaBasePath = deltaBasePath;
     this.schemaStr = schemaStr;
     this.maxFileSize = maxFileSize;
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
index bbcd375e5f7..244877e799b 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
@@ -18,11 +18,11 @@
 
 package org.apache.hudi.integ.testsuite.configuration;
 
-import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
 import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
+import org.apache.hudi.storage.StorageConfiguration;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.hadoop.conf.Configuration;
@@ -40,13 +40,13 @@ public class DeltaConfig implements Serializable {
 
   private final DeltaOutputMode deltaOutputMode;
   private final DeltaInputType deltaInputType;
-  private final SerializableConfiguration configuration;
+  private final StorageConfiguration<Configuration> storageConf;
 
   public DeltaConfig(DeltaOutputMode deltaOutputMode, DeltaInputType 
deltaInputType,
-      SerializableConfiguration configuration) {
+                     StorageConfiguration<Configuration> storageConf) {
     this.deltaOutputMode = deltaOutputMode;
     this.deltaInputType = deltaInputType;
-    this.configuration = configuration;
+    this.storageConf = storageConf;
   }
 
   public DeltaOutputMode getDeltaOutputMode() {
@@ -58,7 +58,7 @@ public class DeltaConfig implements Serializable {
   }
 
   public Configuration getConfiguration() {
-    return configuration.get();
+    return storageConf.unwrap();
   }
 
   /**
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
index 6966bda01b6..6df2c718812 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
@@ -18,9 +18,9 @@
 
 package org.apache.hudi.integ.testsuite.dag;
 
-import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.integ.testsuite.HoodieContinuousTestSuiteWriter;
 import org.apache.hudi.integ.testsuite.HoodieInlineTestSuiteWriter;
 import 
org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
@@ -77,7 +77,7 @@ public class WriterContext {
       int inputParallelism = cfg.inputParallelism > 0 ? cfg.inputParallelism : 
jsc.defaultParallelism();
       this.deltaGenerator = new DeltaGenerator(
           new DFSDeltaConfig(DeltaOutputMode.valueOf(cfg.outputTypeName), 
DeltaInputType.valueOf(cfg.inputFormatName),
-              new SerializableConfiguration(jsc.hadoopConfiguration()), 
cfg.inputBasePath, cfg.targetBasePath,
+              HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration()), 
cfg.inputBasePath, cfg.targetBasePath,
               schemaStr, cfg.limitFileSize, inputParallelism, 
cfg.deleteOldInput, cfg.useHudiToGenerateUpdates),
           jsc, sparkSession, schemaStr, keyGenerator);
       log.info(String.format("Initialized writerContext with: %s", schemaStr));
diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
index f2ec458bf2d..521495cacb8 100644
--- 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
+++ 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.integ.testsuite;
 
-import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig;
 import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
@@ -131,7 +130,7 @@ public class TestDFSHoodieTestSuiteWriterAdapter extends 
UtilitiesTestBase {
   // TODO(HUDI-3668): Fix this test
   public void testDFSWorkloadSinkWithMultipleFilesFunctional() throws 
IOException {
     DeltaConfig dfsSinkConfig = new DFSDeltaConfig(DeltaOutputMode.DFS, 
DeltaInputType.AVRO,
-        new SerializableConfiguration(jsc.hadoopConfiguration()), basePath, 
basePath,
+        HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration()), 
basePath, basePath,
         schemaProvider.getSourceSchema().toString(), 10240L, 
jsc.defaultParallelism(), false, false);
     DeltaWriterAdapter<GenericRecord> dfsDeltaWriterAdapter = 
DeltaWriterFactory
         .getDeltaWriterAdapter(dfsSinkConfig, 1);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
index 7c7a62c8ab1..193e23ccb7c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.hudi.command.procedures
 
 import org.apache.hudi.client.common.HoodieSparkEngineContext
-import org.apache.hudi.common.config.SerializableConfiguration
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
 
@@ -52,16 +51,16 @@ class ShowInvalidParquetProcedure extends BaseProcedure 
with ProcedureBuilder {
     val limit = getArgValueOrDefault(args, PARAMETERS(1))
     val partitionPaths: java.util.List[String] = 
FSUtils.getAllPartitionPaths(new HoodieSparkEngineContext(jsc), srcPath, false)
     val javaRdd: JavaRDD[String] = jsc.parallelize(partitionPaths, 
partitionPaths.size())
-    val serHadoopConf = new 
SerializableConfiguration(jsc.hadoopConfiguration())
+    val storageConf = 
HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration())
     val parquetRdd = javaRdd.rdd.map(part => {
-      val fs = HadoopFSUtils.getFs(new Path(srcPath), serHadoopConf.get())
+        val fs = HadoopFSUtils.getFs(new Path(srcPath), storageConf.unwrap())
       FSUtils.getAllDataFilesInPartition(fs, 
FSUtils.constructAbsolutePathInHadoopPath(srcPath, part))
     }).flatMap(_.toList)
       .filter(status => {
         val filePath = status.getPath
         var isInvalid = false
         if (filePath.toString.endsWith(".parquet")) {
-          try ParquetFileReader.readFooter(serHadoopConf.get(), filePath, 
SKIP_ROW_GROUPS).getFileMetaData catch {
+          try ParquetFileReader.readFooter(storageConf.unwrap(), filePath, 
SKIP_ROW_GROUPS).getFileMetaData catch {
             case e: Exception =>
               isInvalid = e.getMessage.contains("is not a Parquet file")
           }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java
index f600db65733..4aabc85898d 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java
@@ -20,12 +20,13 @@
 package org.apache.hudi.utilities.multitable;
 
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.utilities.UtilHelpers;
 
 import org.apache.hadoop.conf.Configuration;
@@ -62,7 +63,7 @@ public class MultiTableServiceUtils {
   }
 
   public static List<String> getTablesToBeServedFromProps(JavaSparkContext 
jsc, TypedProperties properties) {
-    SerializableConfiguration conf = new 
SerializableConfiguration(jsc.hadoopConfiguration());
+    StorageConfiguration<Configuration> conf = 
HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration());
     String combinedTablesString = 
properties.getString(Constants.TABLES_TO_BE_SERVED_PROP);
     boolean skipWrongPath = 
properties.getBoolean(Constants.TABLES_SKIP_WRONG_PATH, false);
     if (combinedTablesString == null) {
@@ -74,7 +75,7 @@ public class MultiTableServiceUtils {
     if (skipWrongPath) {
       tablePaths = Arrays.stream(tablesArray)
           .filter(tablePath -> {
-            if (isHoodieTable(new Path(tablePath), conf.get())) {
+            if (isHoodieTable(new Path(tablePath), conf.unwrap())) {
               return true;
             } else {
               // Log the wrong path in console.
@@ -85,7 +86,7 @@ public class MultiTableServiceUtils {
     } else {
       tablePaths = Arrays.asList(tablesArray);
       tablePaths.stream()
-          .filter(tablePath -> !isHoodieTable(new Path(tablePath), conf.get()))
+          .filter(tablePath -> !isHoodieTable(new Path(tablePath), 
conf.unwrap()))
           .findFirst()
           .ifPresent(tablePath -> {
             throw new TableNotFoundException("Table not found: " + tablePath);
@@ -105,8 +106,8 @@ public class MultiTableServiceUtils {
 
   public static List<String> findHoodieTablesUnderPath(JavaSparkContext jsc, 
String pathStr) {
     Path rootPath = new Path(pathStr);
-    SerializableConfiguration conf = new 
SerializableConfiguration(jsc.hadoopConfiguration());
-    if (isHoodieTable(rootPath, conf.get())) {
+    StorageConfiguration<Configuration> conf = 
HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration());
+    if (isHoodieTable(rootPath, conf.unwrap())) {
       return Collections.singletonList(pathStr);
     }
 
@@ -119,7 +120,7 @@ public class MultiTableServiceUtils {
     while (!pathsToList.isEmpty()) {
       // List all directories in parallel
       List<FileStatus[]> dirToFileListing = engineContext.map(pathsToList, 
path -> {
-        FileSystem fileSystem = path.getFileSystem(conf.get());
+        FileSystem fileSystem = path.getFileSystem(conf.unwrap());
         return fileSystem.listStatus(path);
       }, listingParallelism);
       pathsToList.clear();
@@ -131,7 +132,7 @@ public class MultiTableServiceUtils {
 
       if (!dirs.isEmpty()) {
         List<Pair<FileStatus, DirType>> dirResults = engineContext.map(dirs, 
fileStatus -> {
-          if (isHoodieTable(fileStatus.getPath(), conf.get())) {
+          if (isHoodieTable(fileStatus.getPath(), conf.unwrap())) {
             return Pair.of(fileStatus, DirType.HOODIE_TABLE);
           } else if (fileStatus.getPath().getName().equals(METAFOLDER_NAME)) {
             return Pair.of(fileStatus, DirType.META_FOLDER);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
index 51bc2907cc9..be9914190e7 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
@@ -18,12 +18,13 @@
 
 package org.apache.hudi.utilities.sources;
 
-import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
 import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
@@ -34,6 +35,7 @@ import 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
 import org.apache.hudi.utilities.sources.helpers.QueryInfo;
 import org.apache.hudi.utilities.sources.helpers.QueryRunner;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
@@ -161,13 +163,13 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
     String s3Prefix = s3FS + "://";
 
     // Create S3 paths
-    SerializableConfiguration serializableHadoopConf = new 
SerializableConfiguration(sparkContext.hadoopConfiguration());
+    StorageConfiguration<Configuration> storageConf = 
HadoopFSUtils.getStorageConfWithCopy(sparkContext.hadoopConfiguration());
     List<CloudObjectMetadata> cloudObjectMetadata = 
checkPointAndDataset.getRight().get()
         .select(CloudObjectsSelectorCommon.S3_BUCKET_NAME,
                 CloudObjectsSelectorCommon.S3_OBJECT_KEY,
                 CloudObjectsSelectorCommon.S3_OBJECT_SIZE)
         .distinct()
-        .mapPartitions(getCloudObjectMetadataPerPartition(s3Prefix, 
serializableHadoopConf, checkIfFileExists), 
Encoders.kryo(CloudObjectMetadata.class))
+        .mapPartitions(getCloudObjectMetadataPerPartition(s3Prefix, 
storageConf, checkIfFileExists), Encoders.kryo(CloudObjectMetadata.class))
         .collectAsList();
     LOG.info("Total number of files to process :" + 
cloudObjectMetadata.size());
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
index 8676bf41cb5..8a442455291 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
@@ -20,13 +20,13 @@ package org.apache.hudi.utilities.sources.helpers;
 
 import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.common.config.ConfigProperty;
-import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.utilities.config.CloudSourceConfig;
 import org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig;
 import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -90,16 +90,16 @@ public class CloudObjectsSelectorCommon {
    * Return a function that extracts filepaths from a list of Rows.
    * Here Row is assumed to have the schema [bucket_name, 
filepath_relative_to_bucket, object_size]
    * @param storageUrlSchemePrefix    Eg: s3:// or gs://. The 
storage-provider-specific prefix to use within the URL.
-   * @param serializableHadoopConf
+   * @param storageConf               storage configuration.
    * @param checkIfExists             check if each file exists, before adding 
it to the returned list
    * @return
    */
   public static MapPartitionsFunction<Row, CloudObjectMetadata> 
getCloudObjectMetadataPerPartition(
-      String storageUrlSchemePrefix, SerializableConfiguration 
serializableHadoopConf, boolean checkIfExists) {
+      String storageUrlSchemePrefix, StorageConfiguration<Configuration> 
storageConf, boolean checkIfExists) {
     return rows -> {
       List<CloudObjectMetadata> cloudObjectMetadataPerPartition = new 
ArrayList<>();
       rows.forEachRemaining(row -> {
-        Option<String> filePathUrl = getUrlForFile(row, 
storageUrlSchemePrefix, serializableHadoopConf, checkIfExists);
+        Option<String> filePathUrl = getUrlForFile(row, 
storageUrlSchemePrefix, storageConf, checkIfExists);
         filePathUrl.ifPresent(url -> {
           LOG.info("Adding file: " + url);
           long size;
@@ -130,9 +130,9 @@ public class CloudObjectsSelectorCommon {
    * @param storageUrlSchemePrefix Eg: s3:// or gs://. The 
storage-provider-specific prefix to use within the URL.
    */
   private static Option<String> getUrlForFile(Row row, String 
storageUrlSchemePrefix,
-                                              SerializableConfiguration 
serializableConfiguration,
+                                              
StorageConfiguration<Configuration> storageConf,
                                               boolean checkIfExists) {
-    final Configuration configuration = serializableConfiguration.newCopy();
+    final Configuration configuration = storageConf.unwrapCopy();
 
     String bucket = row.getString(0);
     String filePath = storageUrlSchemePrefix + bucket + "/" + row.getString(1);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
index 0b7197e3a5b..ab9ccbb8ca7 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
@@ -19,12 +19,12 @@
 package org.apache.hudi.utilities.sources.helpers;
 
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StorageConfiguration;
@@ -195,12 +195,12 @@ public class DatePartitionPathSelector extends 
DFSPathSelector {
     if (datePartitionDepth <= 0) {
       return partitionPaths;
     }
-    SerializableConfiguration serializedConf = new SerializableConfiguration(
+    StorageConfiguration<Configuration> storageConf = 
HadoopFSUtils.getStorageConfWithCopy(
         ((FileSystem) storage.getFileSystem()).getConf());
     for (int i = 0; i < datePartitionDepth; i++) {
       partitionPaths = context.flatMap(partitionPaths, path -> {
         Path subDir = new Path(path);
-        FileSystem fileSystem = subDir.getFileSystem(serializedConf.get());
+        FileSystem fileSystem = subDir.getFileSystem(storageConf.unwrap());
         // skip files/dirs whose names start with (_, ., etc)
         FileStatus[] statuses = fileSystem.listStatus(subDir,
             file -> IGNORE_FILEPREFIX_LIST.stream()
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
index 29a50e81fb0..21ca334d05f 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectMetadataFetcher.java
@@ -18,11 +18,13 @@
 
 package org.apache.hudi.utilities.sources.helpers.gcs;
 
-import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;
 import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
@@ -62,11 +64,11 @@ public class GcsObjectMetadataFetcher implements 
Serializable {
    * @return A {@link List} of {@link CloudObjectMetadata} containing GCS info.
    */
   public List<CloudObjectMetadata> getGcsObjectMetadata(JavaSparkContext jsc, 
Dataset<Row> cloudObjectMetadataDF, boolean checkIfExists) {
-    SerializableConfiguration serializableHadoopConf = new 
SerializableConfiguration(jsc.hadoopConfiguration());
+    StorageConfiguration<Configuration> storageConf = 
HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration());
     return cloudObjectMetadataDF
         .select("bucket", "name", "size")
         .distinct()
-        .mapPartitions(getCloudObjectMetadataPerPartition(GCS_PREFIX, 
serializableHadoopConf, checkIfExists), 
Encoders.kryo(CloudObjectMetadata.class))
+        .mapPartitions(getCloudObjectMetadataPerPartition(GCS_PREFIX, 
storageConf, checkIfExists), Encoders.kryo(CloudObjectMetadata.class))
         .collectAsList();
   }
 


Reply via email to