This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit f4ba12c10e778a74488312d3da6e95c3cfcfa40a
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed May 15 01:27:37 2024 -0700

    [HUDI-7670] Return StorageConfiguration from getConf() in HoodieStorage 
(#11096)
---
 .../org/apache/hudi/table/marker/DirectWriteMarkers.java    |  4 ++--
 .../apache/hudi/testutils/HoodieJavaClientTestHarness.java  |  2 +-
 .../src/main/java/org/apache/hudi/common/fs/FSUtils.java    |  2 +-
 .../apache/hudi/common/model/HoodiePartitionMetadata.java   |  2 +-
 .../common/table/log/AbstractHoodieLogRecordReader.java     |  2 +-
 .../apache/hudi/common/table/log/HoodieLogFileReader.java   |  4 ++--
 .../src/main/java/org/apache/hudi/common/util/OrcUtils.java |  2 +-
 .../test/java/org/apache/hudi/common/fs/TestFSUtils.java    | 12 ++++++------
 .../org/apache/hudi/common/testutils/FileCreateUtils.java   |  2 +-
 .../org/apache/hudi/common/testutils/HoodieTestUtils.java   |  2 +-
 .../hudi/common/util/TestDFSPropertiesConfiguration.java    | 13 +++++++------
 .../hudi/io/storage/TestHoodieHFileReaderWriterBase.java    |  2 +-
 .../org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java |  9 ++++++++-
 .../org/apache/hudi/hadoop/HoodieROTablePathFilter.java     |  2 +-
 .../apache/hudi/hadoop/utils/HoodieInputFormatUtils.java    |  2 +-
 .../hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java    |  2 +-
 .../main/java/org/apache/hudi/storage/HoodieStorage.java    | 10 ++++++++--
 .../src/main/scala/org/apache/hudi/DefaultSource.scala      |  2 +-
 .../main/java/org/apache/hudi/HoodieDataSourceHelpers.java  |  2 +-
 .../scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala    |  6 +++---
 .../apache/hudi/functional/TestStructuredStreaming.scala    |  2 +-
 .../org/apache/hudi/utilities/streamer/HoodieStreamer.java  |  2 +-
 22 files changed, 51 insertions(+), 37 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
index 3d1521a9b0e..241c3050555 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
@@ -120,7 +120,7 @@ public class DirectWriteMarkers extends WriteMarkers {
 
     if (subDirectories.size() > 0) {
       parallelism = Math.min(subDirectories.size(), parallelism);
-      SerializableConfiguration serializedConf = new 
SerializableConfiguration((Configuration) storage.getConf());
+      SerializableConfiguration serializedConf = new 
SerializableConfiguration((Configuration) storage.unwrapConf());
       context.setJobStatus(this.getClass().getSimpleName(), "Obtaining marker 
files for all created, merged paths");
       dataFiles.addAll(context.flatMap(subDirectories, directory -> {
         Path path = new Path(directory);
@@ -147,7 +147,7 @@ public class DirectWriteMarkers extends WriteMarkers {
 
     if (subDirectories.size() > 0) {
       parallelism = Math.min(subDirectories.size(), parallelism);
-      SerializableConfiguration serializedConf = new 
SerializableConfiguration((Configuration) storage.getConf());
+      SerializableConfiguration serializedConf = new 
SerializableConfiguration((Configuration) storage.getConf().get());
       context.setJobStatus(this.getClass().getSimpleName(), "Obtaining marker 
files for all created, merged paths");
       logFiles.addAll(context.flatMap(subDirectories, directory -> {
         Queue<Path> candidatesDirs = new LinkedList<>();
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
index 828b779be9e..9ab606d4d48 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
@@ -952,7 +952,7 @@ public abstract class HoodieJavaClientTestHarness extends 
HoodieWriterClientTest
                                                         String... paths) {
     List<HoodieBaseFile> latestFiles = new ArrayList<>();
     try {
-      HoodieTableMetaClient metaClient = 
HoodieTestUtils.createMetaClient((Configuration) storage.getConf(), basePath);
+      HoodieTableMetaClient metaClient = 
HoodieTestUtils.createMetaClient((Configuration) storage.unwrapConf(), 
basePath);
       for (String path : paths) {
         TableFileSystemView.BaseFileOnlyView fileSystemView =
             new HoodieTableFileSystemView(metaClient,
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 2e584dfb8f9..7bc037ceaca 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -905,7 +905,7 @@ public class FSUtils {
       List<String> subPaths) {
     Map<String, T> result = new HashMap<>();
     if (subPaths.size() > 0) {
-      SerializableConfiguration conf = new 
SerializableConfiguration((Configuration) storage.getConf());
+      SerializableConfiguration conf = new 
SerializableConfiguration((Configuration) storage.unwrapConf());
       int actualParallelism = Math.min(subPaths.size(), parallelism);
 
       hoodieEngineContext.setJobStatus(FSUtils.class.getSimpleName(),
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
index a90d05aefdd..61cf3082cc7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
@@ -189,7 +189,7 @@ public class HoodiePartitionMetadata {
         BaseFileUtils reader = 
BaseFileUtils.getInstance(metafilePath.toString());
         // Data file format
         Map<String, String> metadata = reader.readFooter(
-            (Configuration) storage.getConf(), true, metafilePath, 
PARTITION_DEPTH_KEY, COMMIT_TIME_KEY);
+            (Configuration) storage.unwrapConf(), true, metafilePath, 
PARTITION_DEPTH_KEY, COMMIT_TIME_KEY);
         props.clear();
         props.putAll(metadata);
         format = Option.of(reader.getFormat());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index d1f4e07d4dd..bed4f2e8df9 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -162,7 +162,7 @@ public abstract class AbstractHoodieLogRecordReader {
     this.latestInstantTime = latestInstantTime;
     this.hoodieTableMetaClient = hoodieTableMetaClientOption.orElseGet(
         () -> HoodieTableMetaClient.builder()
-            .setConf((Configuration) 
storage.getConf()).setBasePath(basePath).build());
+            .setConf((Configuration) 
storage.unwrapConf()).setBasePath(basePath).build());
     // load class from the payload fully qualified class name
     HoodieTableConfig tableConfig = 
this.hoodieTableMetaClient.getTableConfig();
     this.payloadClassFQN = tableConfig.getPayloadClass();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index 8ea790a707d..b21068f570e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -104,7 +104,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
   public HoodieLogFileReader(HoodieStorage storage, HoodieLogFile logFile, 
Schema readerSchema, int bufferSize, boolean reverseReader,
                              boolean enableRecordLookups, String keyField, 
InternalSchema internalSchema) throws IOException {
     this.storage = storage;
-    this.hadoopConf = (Configuration) this.storage.getConf();
+    this.hadoopConf = (Configuration) this.storage.unwrapConf();
     // NOTE: We repackage {@code HoodieLogFile} here to make sure that the 
provided path
     //       is prefixed with an appropriate scheme given that we're not 
propagating the FS
     //       further
@@ -202,7 +202,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
         return new HoodieHFileDataBlock(
             () -> getDataInputStream(storage, this.logFile, bufferSize), 
content, true, logBlockContentLoc,
             Option.ofNullable(readerSchema), header, footer, 
enableRecordLookups, logFile.getPath(),
-            ConfigUtils.getBooleanWithAltKeys((Configuration) 
storage.getConf(), HoodieReaderConfig.USE_NATIVE_HFILE_READER));
+            ConfigUtils.getBooleanWithAltKeys((Configuration) 
storage.unwrapConf(), HoodieReaderConfig.USE_NATIVE_HFILE_READER));
 
       case PARQUET_DATA_BLOCK:
         checkState(nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
index e5440760401..4b0cc0d36fc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
@@ -278,7 +278,7 @@ public class OrcUtils extends BaseFileUtils {
     // Since we are only interested in saving metadata to the footer, the 
schema, blocksizes and other
     // parameters are not important.
     Schema schema = HoodieAvroUtils.getRecordKeySchema();
-    OrcFile.WriterOptions writerOptions = 
OrcFile.writerOptions((Configuration) storage.getConf())
+    OrcFile.WriterOptions writerOptions = 
OrcFile.writerOptions((Configuration) storage.unwrapConf())
         .fileSystem((FileSystem) storage.getFileSystem())
         .setSchema(AvroOrcUtils.createOrcSchema(schema));
     try (Writer writer = OrcFile.createWriter(new Path(filePath.toUri()), 
writerOptions)) {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
index ca33c5ae6ae..8ebe16de646 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
@@ -478,7 +478,7 @@ public class TestFSUtils extends HoodieCommonTestHarness {
     prepareTestDirectory(storage, rootDir);
 
     assertTrue(FSUtils.deleteSubPath(
-        subDir.toString(), new SerializableConfiguration((Configuration) 
storage.getConf()), true));
+        subDir.toString(), new SerializableConfiguration((Configuration) 
storage.unwrapConf()), true));
   }
 
   @Test
@@ -491,7 +491,7 @@ public class TestFSUtils extends HoodieCommonTestHarness {
     assertThrows(
         HoodieIOException.class,
         () -> FSUtils.deleteSubPath(
-            subDir.toString(), new SerializableConfiguration((Configuration) 
storage.getConf()), false));
+            subDir.toString(), new SerializableConfiguration((Configuration) 
storage.unwrapConf()), false));
   }
 
   @Test
@@ -502,7 +502,7 @@ public class TestFSUtils extends HoodieCommonTestHarness {
     prepareTestDirectory(storage, rootDir);
 
     assertTrue(FSUtils.deleteSubPath(
-        subDir.toString(), new SerializableConfiguration((Configuration) 
storage.getConf()), false));
+        subDir.toString(), new SerializableConfiguration((Configuration) 
storage.unwrapConf()), false));
   }
 
   @Test
@@ -513,7 +513,7 @@ public class TestFSUtils extends HoodieCommonTestHarness {
     cleanUpTestDirectory(storage, rootDir);
 
     assertFalse(FSUtils.deleteSubPath(
-        subDir.toString(), new SerializableConfiguration((Configuration) 
storage.getConf()), true));
+        subDir.toString(), new SerializableConfiguration((Configuration) 
storage.unwrapConf()), true));
   }
 
   @Test
@@ -522,7 +522,7 @@ public class TestFSUtils extends HoodieCommonTestHarness {
     HoodieStorage storage = metaClient.getStorage();
     prepareTestDirectory(storage, rootDir);
     Map<String, List<String>> result = FSUtils.parallelizeSubPathProcess(
-        new HoodieLocalEngineContext((Configuration) storage.getConf()), 
storage, rootDir, 2,
+        new HoodieLocalEngineContext((Configuration) storage.unwrapConf()), 
storage, rootDir, 2,
         fileStatus -> !fileStatus.getPath().getName().contains("1"),
         pairOfSubPathAndConf -> {
           Path subPath = new Path(pairOfSubPathAndConf.getKey());
@@ -554,7 +554,7 @@ public class TestFSUtils extends HoodieCommonTestHarness {
     HoodieStorage storage = metaClient.getStorage();
     prepareTestDirectory(storage, hoodieTempDir);
     List<FileStatus> fileStatusList = FSUtils.getFileStatusAtLevel(
-        new HoodieLocalEngineContext((Configuration) storage.getConf()), 
(FileSystem) storage.getFileSystem(),
+        new HoodieLocalEngineContext((Configuration) storage.unwrapConf()), 
(FileSystem) storage.getFileSystem(),
         new Path(baseUri), 3, 2);
     assertEquals(CollectionUtils.createImmutableSet(
             new Path(baseUri.toString(), ".hoodie/.temp/subdir1/file1.txt"),
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
index eca9162af77..fef46c2cae6 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
@@ -522,7 +522,7 @@ public class FileCreateUtils {
     Map<String, Long> toReturn = new HashMap<>();
     try {
       HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(
-          (Configuration) storage.getConf(), basePath);
+          (Configuration) storage.unwrapConf(), basePath);
       for (String path : paths) {
         TableFileSystemView.BaseFileOnlyView fileSystemView =
             new HoodieTableFileSystemView(metaClient,
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index 8713b76bb6d..ad046d3832d 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -198,7 +198,7 @@ public class HoodieTestUtils {
    */
   public static HoodieTableMetaClient createMetaClient(HoodieStorage storage,
                                                        String basePath) {
-    return createMetaClient((Configuration) storage.getConf(), basePath);
+    return createMetaClient((Configuration) storage.unwrapConf(), basePath);
   }
 
   /**
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestDFSPropertiesConfiguration.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestDFSPropertiesConfiguration.java
index f7763966c23..2d396fff1f4 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestDFSPropertiesConfiguration.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestDFSPropertiesConfiguration.java
@@ -7,13 +7,14 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.hudi.common.util;
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriterBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriterBase.java
index fbf5f20f126..be9c4b35c38 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriterBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriterBase.java
@@ -226,7 +226,7 @@ public abstract class TestHoodieHFileReaderWriterBase 
extends TestHoodieReaderWr
     byte[] content = FileIOUtils.readAsByteArray(
         storage.open(getFilePath()), (int) 
storage.getPathInfo(getFilePath()).getLength());
     // Reading byte array in HFile format, without actual file path
-    Configuration hadoopConf = (Configuration) storage.getConf();
+    Configuration hadoopConf = (Configuration) storage.unwrapConf();
     try (HoodieAvroHFileReaderImplBase hfileReader = 
createHFileReader(hadoopConf, content)) {
       Schema avroSchema =
           getSchemaFromResource(TestHoodieReaderWriterBase.class, 
"/exampleSchema.avsc");
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
index 975e4267f0c..1e1ba67ae66 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
@@ -23,10 +23,12 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hadoop.fs.HadoopSeekableDataInputStream;
 import org.apache.hudi.io.SeekableDataInputStream;
 import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathFilter;
 import org.apache.hudi.storage.StoragePathInfo;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
@@ -202,7 +204,12 @@ public class HoodieHadoopStorage extends HoodieStorage {
   }
 
   @Override
-  public Object getConf() {
+  public StorageConfiguration<Configuration> getConf() {
+    return new HadoopStorageConfiguration(fs.getConf());
+  }
+
+  @Override
+  public Configuration unwrapConf() {
     return fs.getConf();
   }
 
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
index 6e23c5d226e..4fa271e5d8a 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
@@ -187,7 +187,7 @@ public class HoodieROTablePathFilter implements 
Configurable, PathFilter, Serial
           HoodieTableMetaClient metaClient = 
metaClientCache.get(baseDir.toString());
           if (null == metaClient) {
             metaClient = HoodieTableMetaClient.builder().setConf(
-                (Configuration) 
storage.getConf()).setBasePath(baseDir.toString()).setLoadActiveTimelineOnLoad(true).build();
+                (Configuration) 
storage.unwrapConf()).setBasePath(baseDir.toString()).setLoadActiveTimelineOnLoad(true).build();
             metaClientCache.put(baseDir.toString(), metaClient);
           }
 
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index 67137660cce..393cb9eb267 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -377,7 +377,7 @@ public class HoodieInputFormatUtils {
     }
     LOG.info("Reading hoodie metadata from path " + baseDir.toString());
     return HoodieTableMetaClient.builder().setConf(
-        (Configuration) 
storage.getConf()).setBasePath(baseDir.toString()).build();
+        (Configuration) 
storage.unwrapConf()).setBasePath(baseDir.toString()).build();
   }
 
   public static FileStatus getFileStatus(HoodieBaseFile baseFile) throws 
IOException {
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
index 3541627b3db..c653e7f3101 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
@@ -83,7 +83,7 @@ public class HoodieDeltaStreamerWrapper extends 
HoodieDeltaStreamer {
     StreamSync service = getDeltaSync();
     service.refreshTimeline();
     HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
-        .setConf(new Configuration((Configuration) 
service.getStorage().getConf()))
+        .setConf((Configuration) service.getStorage().getConf().newCopy())
         .setBasePath(service.getCfg().targetBasePath)
         .build();
     String instantTime = HoodieActiveTimeline.createNewInstantTime();
diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java 
b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
index 5abb1ac13c9..35db5ae42da 100644
--- a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
@@ -265,10 +265,16 @@ public abstract class HoodieStorage implements Closeable {
   public abstract Object getFileSystem();
 
   /**
-   * @return the underlying configuration instance if exists.
+   * @return the storage configuration.
    */
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  public abstract Object getConf();
+  public abstract StorageConfiguration<?> getConf();
+
+  /**
+   * @return the underlying configuration instance.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract Object unwrapConf();
 
   /**
    * Creates a new file with overwrite set to false. This ensures files are 
created
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 25b38c899cd..a0f4a25967d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -127,7 +127,7 @@ class DefaultSource extends RelationProvider
     log.info("Obtained hudi table path: " + tablePath)
 
     val metaClient = 
HoodieTableMetaClient.builder().setMetaserverConfig(parameters.asJava)
-      .setConf(storage.getConf.asInstanceOf[Configuration])
+      .setConf(storage.unwrapConf.asInstanceOf[Configuration])
       .setBasePath(tablePath).build()
 
     DefaultSource.createRelation(sqlContext, metaClient, schema, globPaths, 
parameters)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
index c0d5fe653b4..be73976adfc 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
@@ -110,7 +110,7 @@ public class HoodieDataSourceHelpers {
   public static HoodieTimeline allCompletedCommitsCompactions(HoodieStorage 
storage,
                                                               String basePath) 
{
     HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
-        .setConf((Configuration) storage.getConf())
+        .setConf((Configuration) storage.unwrapConf())
         .setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
     if (metaClient.getTableType().equals(HoodieTableType.MERGE_ON_READ)) {
       return metaClient.getActiveTimeline().getTimelineOfActions(
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala
index 0649d03b499..72db130c61b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala
@@ -23,7 +23,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView
 import org.apache.hudi.common.util.FileIOUtils
 import org.apache.hudi.exception.HoodieException
-import org.apache.hudi.storage.{StoragePath, HoodieStorage}
+import org.apache.hudi.storage.{HoodieStorage, StoragePath}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
@@ -77,7 +77,7 @@ class DedupeSparkJob(basePath: String,
     val dedupeTblName = s"${tmpTableName}_dupeKeys"
 
     val metadata = HoodieTableMetaClient.builder()
-      .setConf(storage.getConf.asInstanceOf[Configuration])
+      .setConf(storage.unwrapConf.asInstanceOf[Configuration])
       .setBasePath(basePath).build()
 
     val allFiles = storage.listDirectEntries(new 
StoragePath(s"$basePath/$duplicatedPartitionPath"))
@@ -188,7 +188,7 @@ class DedupeSparkJob(basePath: String,
 
   def fixDuplicates(dryRun: Boolean = true) = {
     val metadata = HoodieTableMetaClient.builder()
-      .setConf(storage.getConf.asInstanceOf[Configuration])
+      .setConf(storage.unwrapConf.asInstanceOf[Configuration])
       .setBasePath(basePath).build()
 
     val allFiles = storage.listDirectEntries(new 
StoragePath(s"$basePath/$duplicatedPartitionPath"))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
index fe3278fb751..51c1718d90d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -504,7 +504,7 @@ class TestStructuredStreaming extends 
HoodieSparkClientTestBase {
       streamingWrite(inputDF.schema, sourcePath, destPath, opts, id)
     }
     val metaClient = HoodieTableMetaClient.builder()
-      .setConf(storage.getConf.asInstanceOf[Configuration])
+      .setConf(storage.unwrapConf.asInstanceOf[Configuration])
       .setBasePath(destPath)
       .setLoadActiveTimelineOnLoad(true).build()
     assertTrue(metaClient.getActiveTimeline.getCommitTimeline.empty())
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index b42b3dbeda2..5372f15a82b 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -691,7 +691,7 @@ public class HoodieStreamer implements Serializable {
       if (this.storage.exists(new StoragePath(cfg.targetBasePath))) {
         try {
           HoodieTableMetaClient meta = HoodieTableMetaClient.builder()
-              .setConf(new Configuration((Configuration) 
this.storage.getConf()))
+              .setConf((Configuration) this.storage.getConf().newCopy())
               
.setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(false).build();
           tableType = meta.getTableType();
           // This will guarantee there is no surprise with table type

Reply via email to