This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b613ffec62 [HUDI-7670] Return StorageConfiguration from getConf() in 
HoodieStorage (#11096)
3b613ffec62 is described below

commit 3b613ffec622bbea0701d715c6a6a7a288379ca3
Author: Y Ethan Guo <[email protected]>
AuthorDate: Fri Apr 26 16:22:08 2024 -0700

    [HUDI-7670] Return StorageConfiguration from getConf() in HoodieStorage 
(#11096)
---
 .../org/apache/hudi/table/marker/DirectWriteMarkers.java    |  2 +-
 .../apache/hudi/testutils/HoodieJavaClientTestHarness.java  |  2 +-
 .../java/org/apache/hudi/io/TestHoodieTimelineArchiver.java |  6 +++---
 .../src/main/java/org/apache/hudi/common/fs/FSUtils.java    |  2 +-
 .../apache/hudi/common/model/HoodiePartitionMetadata.java   |  2 +-
 .../common/table/log/AbstractHoodieLogRecordReader.java     |  2 +-
 .../hudi/common/table/log/BaseHoodieLogRecordReader.java    |  2 +-
 .../apache/hudi/common/table/log/HoodieLogFileReader.java   |  4 ++--
 .../src/main/java/org/apache/hudi/common/util/OrcUtils.java |  2 +-
 .../test/java/org/apache/hudi/common/fs/TestFSUtils.java    | 12 ++++++------
 .../org/apache/hudi/common/testutils/FileCreateUtils.java   |  2 +-
 .../org/apache/hudi/common/testutils/HoodieTestUtils.java   |  2 +-
 .../hudi/common/util/TestDFSPropertiesConfiguration.java    | 13 +++++++------
 .../hudi/io/storage/TestHoodieHFileReaderWriterBase.java    |  2 +-
 .../org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java |  9 ++++++++-
 .../org/apache/hudi/hadoop/HoodieROTablePathFilter.java     |  2 +-
 .../apache/hudi/hadoop/utils/HoodieInputFormatUtils.java    |  2 +-
 .../hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java    |  2 +-
 .../main/java/org/apache/hudi/storage/HoodieStorage.java    | 10 ++++++++--
 .../src/main/scala/org/apache/hudi/DefaultSource.scala      |  2 +-
 .../main/java/org/apache/hudi/HoodieDataSourceHelpers.java  |  2 +-
 .../scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala    |  6 +++---
 .../apache/hudi/functional/TestStructuredStreaming.scala    |  2 +-
 .../org/apache/hudi/utilities/streamer/HoodieStreamer.java  |  2 +-
 24 files changed, 54 insertions(+), 40 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
index 726bc5f9069..248379dc402 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
@@ -114,7 +114,7 @@ public class DirectWriteMarkers extends WriteMarkers {
 
     if (subDirectories.size() > 0) {
       parallelism = Math.min(subDirectories.size(), parallelism);
-      SerializableConfiguration serializedConf = new 
SerializableConfiguration((Configuration) storage.getConf());
+      SerializableConfiguration serializedConf = new 
SerializableConfiguration((Configuration) storage.unwrapConf());
       context.setJobStatus(this.getClass().getSimpleName(), "Obtaining marker 
files for all created, merged paths");
       dataFiles.addAll(context.flatMap(subDirectories, directory -> {
         Path path = new Path(directory);
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
index 8255f443e38..055a20e2344 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
@@ -952,7 +952,7 @@ public abstract class HoodieJavaClientTestHarness extends 
HoodieWriterClientTest
                                                         String... paths) {
     List<HoodieBaseFile> latestFiles = new ArrayList<>();
     try {
-      HoodieTableMetaClient metaClient = 
HoodieTestUtils.createMetaClient((Configuration) storage.getConf(), basePath);
+      HoodieTableMetaClient metaClient = 
HoodieTestUtils.createMetaClient((Configuration) storage.unwrapConf(), 
basePath);
       for (String path : paths) {
         TableFileSystemView.BaseFileOnlyView fileSystemView =
             new HoodieTableFileSystemView(metaClient,
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index 51dd01e1b8a..ae25c717363 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -1636,9 +1636,9 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
     assertTrue(archiver.archiveIfRequired(context) > 0);
     // Simulate archival failing to delete by re-adding the .commit instant 
files
     // (101.commit, 102.commit, and 103.commit instant files)
-    HoodieTestDataGenerator.createOnlyCompletedCommitFile(basePath, 
"101_1001", (Configuration) storage.getConf());
-    HoodieTestDataGenerator.createOnlyCompletedCommitFile(basePath, 
"102_1021", (Configuration) storage.getConf());
-    HoodieTestDataGenerator.createOnlyCompletedCommitFile(basePath, 
"103_1031", (Configuration) storage.getConf());
+    HoodieTestDataGenerator.createOnlyCompletedCommitFile(basePath, 
"101_1001", (Configuration) storage.unwrapConf());
+    HoodieTestDataGenerator.createOnlyCompletedCommitFile(basePath, 
"102_1021", (Configuration) storage.unwrapConf());
+    HoodieTestDataGenerator.createOnlyCompletedCommitFile(basePath, 
"103_1031", (Configuration) storage.unwrapConf());
     timeline = metaClient.getActiveTimeline().reload().getWriteTimeline();
     assertEquals(5, timeline.countInstants(), "Due to simulating partial 
archival deletion, there should"
         + "be 5 instants (as instant times 101-103 .commit files should remain 
in timeline)");
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index a0c39ecff1a..2bfaa869338 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -835,7 +835,7 @@ public class FSUtils {
       List<String> subPaths) {
     Map<String, T> result = new HashMap<>();
     if (subPaths.size() > 0) {
-      SerializableConfiguration conf = new 
SerializableConfiguration((Configuration) storage.getConf());
+      SerializableConfiguration conf = new 
SerializableConfiguration((Configuration) storage.unwrapConf());
       int actualParallelism = Math.min(subPaths.size(), parallelism);
 
       hoodieEngineContext.setJobStatus(FSUtils.class.getSimpleName(),
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
index a90d05aefdd..61cf3082cc7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
@@ -189,7 +189,7 @@ public class HoodiePartitionMetadata {
         BaseFileUtils reader = 
BaseFileUtils.getInstance(metafilePath.toString());
         // Data file format
         Map<String, String> metadata = reader.readFooter(
-            (Configuration) storage.getConf(), true, metafilePath, 
PARTITION_DEPTH_KEY, COMMIT_TIME_KEY);
+            (Configuration) storage.unwrapConf(), true, metafilePath, 
PARTITION_DEPTH_KEY, COMMIT_TIME_KEY);
         props.clear();
         props.putAll(metadata);
         format = Option.of(reader.getFormat());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index cb3e6f2e370..e047d5c6017 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -162,7 +162,7 @@ public abstract class AbstractHoodieLogRecordReader {
     this.latestInstantTime = latestInstantTime;
     this.hoodieTableMetaClient = hoodieTableMetaClientOption.orElseGet(
         () -> HoodieTableMetaClient.builder()
-            .setConf((Configuration) 
storage.getConf()).setBasePath(basePath).build());
+            .setConf((Configuration) 
storage.unwrapConf()).setBasePath(basePath).build());
     // load class from the payload fully qualified class name
     HoodieTableConfig tableConfig = 
this.hoodieTableMetaClient.getTableConfig();
     this.payloadClassFQN = tableConfig.getPayloadClass();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
index 55d825758d1..2771faabb44 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
@@ -155,7 +155,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
     this.readerSchema = readerSchema;
     this.latestInstantTime = latestInstantTime;
     this.hoodieTableMetaClient = HoodieTableMetaClient.builder()
-        .setConf((Configuration) 
storage.getConf()).setBasePath(basePath).build();
+        .setConf((Configuration) 
storage.unwrapConf()).setBasePath(basePath).build();
     // load class from the payload fully qualified class name
     HoodieTableConfig tableConfig = 
this.hoodieTableMetaClient.getTableConfig();
     this.payloadClassFQN = tableConfig.getPayloadClass();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index a13e9095afb..53bacd213c0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -104,7 +104,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
   public HoodieLogFileReader(HoodieStorage storage, HoodieLogFile logFile, 
Schema readerSchema, int bufferSize, boolean reverseReader,
                              boolean enableRecordLookups, String keyField, 
InternalSchema internalSchema) throws IOException {
     this.storage = storage;
-    this.hadoopConf = (Configuration) this.storage.getConf();
+    this.hadoopConf = (Configuration) this.storage.unwrapConf();
     // NOTE: We repackage {@code HoodieLogFile} here to make sure that the 
provided path
     //       is prefixed with an appropriate scheme given that we're not 
propagating the FS
     //       further
@@ -202,7 +202,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
         return new HoodieHFileDataBlock(
             () -> getDataInputStream(storage, this.logFile, bufferSize), 
content, true, logBlockContentLoc,
             Option.ofNullable(readerSchema), header, footer, 
enableRecordLookups, logFile.getPath(),
-            ConfigUtils.getBooleanWithAltKeys((Configuration) 
storage.getConf(), HoodieReaderConfig.USE_NATIVE_HFILE_READER));
+            ConfigUtils.getBooleanWithAltKeys((Configuration) 
storage.unwrapConf(), HoodieReaderConfig.USE_NATIVE_HFILE_READER));
 
       case PARQUET_DATA_BLOCK:
         checkState(nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
index 0fe615e795d..04ec8fda3ce 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
@@ -285,7 +285,7 @@ public class OrcUtils extends BaseFileUtils {
     // Since we are only interested in saving metadata to the footer, the 
schema, blocksizes and other
     // parameters are not important.
     Schema schema = HoodieAvroUtils.getRecordKeySchema();
-    OrcFile.WriterOptions writerOptions = 
OrcFile.writerOptions((Configuration) storage.getConf())
+    OrcFile.WriterOptions writerOptions = 
OrcFile.writerOptions((Configuration) storage.unwrapConf())
         .fileSystem((FileSystem) storage.getFileSystem())
         .setSchema(AvroOrcUtils.createOrcSchema(schema));
     try (Writer writer = OrcFile.createWriter(new Path(filePath.toUri()), 
writerOptions)) {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
index 3703a7217c6..f32d027a576 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
@@ -479,7 +479,7 @@ public class TestFSUtils extends HoodieCommonTestHarness {
     prepareTestDirectory(storage, rootDir);
 
     assertTrue(FSUtils.deleteSubPath(
-        subDir.toString(), new SerializableConfiguration((Configuration) 
storage.getConf()), true));
+        subDir.toString(), new SerializableConfiguration((Configuration) 
storage.unwrapConf()), true));
   }
 
   @Test
@@ -492,7 +492,7 @@ public class TestFSUtils extends HoodieCommonTestHarness {
     assertThrows(
         HoodieIOException.class,
         () -> FSUtils.deleteSubPath(
-            subDir.toString(), new SerializableConfiguration((Configuration) 
storage.getConf()), false));
+            subDir.toString(), new SerializableConfiguration((Configuration) 
storage.unwrapConf()), false));
   }
 
   @Test
@@ -503,7 +503,7 @@ public class TestFSUtils extends HoodieCommonTestHarness {
     prepareTestDirectory(storage, rootDir);
 
     assertTrue(FSUtils.deleteSubPath(
-        subDir.toString(), new SerializableConfiguration((Configuration) 
storage.getConf()), false));
+        subDir.toString(), new SerializableConfiguration((Configuration) 
storage.unwrapConf()), false));
   }
 
   @Test
@@ -514,7 +514,7 @@ public class TestFSUtils extends HoodieCommonTestHarness {
     cleanUpTestDirectory(storage, rootDir);
 
     assertFalse(FSUtils.deleteSubPath(
-        subDir.toString(), new SerializableConfiguration((Configuration) 
storage.getConf()), true));
+        subDir.toString(), new SerializableConfiguration((Configuration) 
storage.unwrapConf()), true));
   }
 
   @Test
@@ -523,7 +523,7 @@ public class TestFSUtils extends HoodieCommonTestHarness {
     HoodieStorage storage = metaClient.getStorage();
     prepareTestDirectory(storage, rootDir);
     Map<String, List<String>> result = FSUtils.parallelizeSubPathProcess(
-        new HoodieLocalEngineContext((Configuration) storage.getConf()), 
storage, rootDir, 2,
+        new HoodieLocalEngineContext((Configuration) storage.unwrapConf()), 
storage, rootDir, 2,
         fileStatus -> !fileStatus.getPath().getName().contains("1"),
         pairOfSubPathAndConf -> {
           Path subPath = new Path(pairOfSubPathAndConf.getKey());
@@ -555,7 +555,7 @@ public class TestFSUtils extends HoodieCommonTestHarness {
     HoodieStorage storage = metaClient.getStorage();
     prepareTestDirectory(storage, hoodieTempDir);
     List<FileStatus> fileStatusList = FSUtils.getFileStatusAtLevel(
-        new HoodieLocalEngineContext((Configuration) storage.getConf()), 
(FileSystem) storage.getFileSystem(),
+        new HoodieLocalEngineContext((Configuration) storage.unwrapConf()), 
(FileSystem) storage.getFileSystem(),
         new Path(baseUri), 3, 2);
     assertEquals(CollectionUtils.createImmutableSet(
             new Path(baseUri.toString(), ".hoodie/.temp/subdir1/file1.txt"),
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
index 5b9d84e46a8..7e27d8a9cd1 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
@@ -561,7 +561,7 @@ public class FileCreateUtils {
     Map<String, Long> toReturn = new HashMap<>();
     try {
       HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(
-          (Configuration) storage.getConf(), basePath);
+          (Configuration) storage.unwrapConf(), basePath);
       for (String path : paths) {
         TableFileSystemView.BaseFileOnlyView fileSystemView =
             new HoodieTableFileSystemView(metaClient,
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index 71f51208e55..7e83a2aed45 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -204,7 +204,7 @@ public class HoodieTestUtils {
    */
   public static HoodieTableMetaClient createMetaClient(HoodieStorage storage,
                                                        String basePath) {
-    return createMetaClient((Configuration) storage.getConf(), basePath);
+    return createMetaClient((Configuration) storage.unwrapConf(), basePath);
   }
 
   /**
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestDFSPropertiesConfiguration.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestDFSPropertiesConfiguration.java
index f7763966c23..2d396fff1f4 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestDFSPropertiesConfiguration.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestDFSPropertiesConfiguration.java
@@ -7,13 +7,14 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.hudi.common.util;
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriterBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriterBase.java
index b0324e929dd..c802c7f8df3 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriterBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriterBase.java
@@ -226,7 +226,7 @@ public abstract class TestHoodieHFileReaderWriterBase 
extends TestHoodieReaderWr
     byte[] content = FileIOUtils.readAsByteArray(
         storage.open(getFilePath()), (int) 
storage.getPathInfo(getFilePath()).getLength());
     // Reading byte array in HFile format, without actual file path
-    Configuration hadoopConf = (Configuration) storage.getConf();
+    Configuration hadoopConf = (Configuration) storage.unwrapConf();
     try (HoodieAvroHFileReaderImplBase hfileReader = 
createHFileReader(hadoopConf, content)) {
       Schema avroSchema =
           getSchemaFromResource(TestHoodieReaderWriterBase.class, 
"/exampleSchema.avsc");
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
index 975e4267f0c..1e1ba67ae66 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java
@@ -23,10 +23,12 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hadoop.fs.HadoopSeekableDataInputStream;
 import org.apache.hudi.io.SeekableDataInputStream;
 import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathFilter;
 import org.apache.hudi.storage.StoragePathInfo;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
@@ -202,7 +204,12 @@ public class HoodieHadoopStorage extends HoodieStorage {
   }
 
   @Override
-  public Object getConf() {
+  public StorageConfiguration<Configuration> getConf() {
+    return new HadoopStorageConfiguration(fs.getConf());
+  }
+
+  @Override
+  public Configuration unwrapConf() {
     return fs.getConf();
   }
 
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
index 6e23c5d226e..4fa271e5d8a 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
@@ -187,7 +187,7 @@ public class HoodieROTablePathFilter implements 
Configurable, PathFilter, Serial
           HoodieTableMetaClient metaClient = 
metaClientCache.get(baseDir.toString());
           if (null == metaClient) {
             metaClient = HoodieTableMetaClient.builder().setConf(
-                (Configuration) 
storage.getConf()).setBasePath(baseDir.toString()).setLoadActiveTimelineOnLoad(true).build();
+                (Configuration) 
storage.unwrapConf()).setBasePath(baseDir.toString()).setLoadActiveTimelineOnLoad(true).build();
             metaClientCache.put(baseDir.toString(), metaClient);
           }
 
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index 67137660cce..393cb9eb267 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -377,7 +377,7 @@ public class HoodieInputFormatUtils {
     }
     LOG.info("Reading hoodie metadata from path " + baseDir.toString());
     return HoodieTableMetaClient.builder().setConf(
-        (Configuration) 
storage.getConf()).setBasePath(baseDir.toString()).build();
+        (Configuration) 
storage.unwrapConf()).setBasePath(baseDir.toString()).build();
   }
 
   public static FileStatus getFileStatus(HoodieBaseFile baseFile) throws 
IOException {
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
index 359f29cfd38..2b0fc2baa01 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
@@ -83,7 +83,7 @@ public class HoodieDeltaStreamerWrapper extends 
HoodieDeltaStreamer {
     StreamSync service = getDeltaSync();
     service.refreshTimeline();
     HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
-        .setConf(new Configuration((Configuration) 
service.getStorage().getConf()))
+        .setConf((Configuration) service.getStorage().getConf().newCopy())
         .setBasePath(service.getCfg().targetBasePath)
         .build();
     String instantTime = InProcessTimeGenerator.createNewInstantTime();
diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java 
b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
index 5abb1ac13c9..35db5ae42da 100644
--- a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
@@ -265,10 +265,16 @@ public abstract class HoodieStorage implements Closeable {
   public abstract Object getFileSystem();
 
   /**
-   * @return the underlying configuration instance if exists.
+   * @return the storage configuration.
    */
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  public abstract Object getConf();
+  public abstract StorageConfiguration<?> getConf();
+
+  /**
+   * @return the underlying configuration instance.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract Object unwrapConf();
 
   /**
    * Creates a new file with overwrite set to false. This ensures files are 
created
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 48ccff74a08..91d8d93895b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -127,7 +127,7 @@ class DefaultSource extends RelationProvider
     log.info("Obtained hudi table path: " + tablePath)
 
     val metaClient = 
HoodieTableMetaClient.builder().setMetaserverConfig(parameters.asJava)
-      .setConf(storage.getConf.asInstanceOf[Configuration])
+      .setConf(storage.unwrapConf.asInstanceOf[Configuration])
       .setBasePath(tablePath).build()
 
     DefaultSource.createRelation(sqlContext, metaClient, schema, globPaths, 
parameters)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
index c0d5fe653b4..be73976adfc 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
@@ -110,7 +110,7 @@ public class HoodieDataSourceHelpers {
   public static HoodieTimeline allCompletedCommitsCompactions(HoodieStorage 
storage,
                                                               String basePath) 
{
     HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
-        .setConf((Configuration) storage.getConf())
+        .setConf((Configuration) storage.unwrapConf())
         .setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
     if (metaClient.getTableType().equals(HoodieTableType.MERGE_ON_READ)) {
       return metaClient.getActiveTimeline().getTimelineOfActions(
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala
index 0649d03b499..72db130c61b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala
@@ -23,7 +23,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView
 import org.apache.hudi.common.util.FileIOUtils
 import org.apache.hudi.exception.HoodieException
-import org.apache.hudi.storage.{StoragePath, HoodieStorage}
+import org.apache.hudi.storage.{HoodieStorage, StoragePath}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
@@ -77,7 +77,7 @@ class DedupeSparkJob(basePath: String,
     val dedupeTblName = s"${tmpTableName}_dupeKeys"
 
     val metadata = HoodieTableMetaClient.builder()
-      .setConf(storage.getConf.asInstanceOf[Configuration])
+      .setConf(storage.unwrapConf.asInstanceOf[Configuration])
       .setBasePath(basePath).build()
 
     val allFiles = storage.listDirectEntries(new 
StoragePath(s"$basePath/$duplicatedPartitionPath"))
@@ -188,7 +188,7 @@ class DedupeSparkJob(basePath: String,
 
   def fixDuplicates(dryRun: Boolean = true) = {
     val metadata = HoodieTableMetaClient.builder()
-      .setConf(storage.getConf.asInstanceOf[Configuration])
+      .setConf(storage.unwrapConf.asInstanceOf[Configuration])
       .setBasePath(basePath).build()
 
     val allFiles = storage.listDirectEntries(new 
StoragePath(s"$basePath/$duplicatedPartitionPath"))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
index fe3278fb751..51c1718d90d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -504,7 +504,7 @@ class TestStructuredStreaming extends 
HoodieSparkClientTestBase {
       streamingWrite(inputDF.schema, sourcePath, destPath, opts, id)
     }
     val metaClient = HoodieTableMetaClient.builder()
-      .setConf(storage.getConf.asInstanceOf[Configuration])
+      .setConf(storage.unwrapConf.asInstanceOf[Configuration])
       .setBasePath(destPath)
       .setLoadActiveTimelineOnLoad(true).build()
     assertTrue(metaClient.getActiveTimeline.getCommitTimeline.empty())
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index 0dd488bffcb..db01a1231f9 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -691,7 +691,7 @@ public class HoodieStreamer implements Serializable {
       if (this.storage.exists(new StoragePath(cfg.targetBasePath))) {
         try {
           HoodieTableMetaClient meta = HoodieTableMetaClient.builder()
-              .setConf(new Configuration((Configuration) 
this.storage.getConf()))
+              .setConf((Configuration) this.storage.getConf().newCopy())
               
.setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(false).build();
           tableType = meta.getTableType();
           // This will guarantee there is no surprise with table type

Reply via email to