This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 878d0d92c05b131e6f71e7a9c34ab1bffea1bf66
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed May 15 01:44:54 2024 -0700

    [HUDI-7668] Add and rename APIs in StorageConfiguration (#11102)
---
 .../hudi/table/marker/DirectWriteMarkers.java      |  2 +-
 .../org/apache/hudi/hadoop/fs/HadoopFSUtils.java   |  2 +-
 .../storage/hadoop/HadoopStorageConfiguration.java | 11 ++++++---
 .../testsuite/HoodieDeltaStreamerWrapper.java      |  2 +-
 .../java/org/apache/hudi/common/util/Option.java   |  4 ++++
 .../apache/hudi/storage/StorageConfiguration.java  | 27 ++++++++++++++++++----
 .../io/storage/BaseTestStorageConfiguration.java   | 18 +++++++++++----
 .../hudi/utilities/streamer/HoodieStreamer.java    |  4 ++--
 8 files changed, 54 insertions(+), 16 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
index 241c3050555..d98a90c2053 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
@@ -147,7 +147,7 @@ public class DirectWriteMarkers extends WriteMarkers {
 
     if (subDirectories.size() > 0) {
       parallelism = Math.min(subDirectories.size(), parallelism);
-      SerializableConfiguration serializedConf = new 
SerializableConfiguration((Configuration) storage.getConf().get());
+      SerializableConfiguration serializedConf = new 
SerializableConfiguration((Configuration) storage.getConf().unwrap());
       context.setJobStatus(this.getClass().getSimpleName(), "Obtaining marker 
files for all created, merged paths");
       logFiles.addAll(context.flatMap(subDirectories, directory -> {
         Queue<Path> candidatesDirs = new LinkedList<>();
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
index 78b293ee75f..80d881a45fa 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
@@ -80,7 +80,7 @@ public class HadoopFSUtils {
   }
 
   public static <T> FileSystem getFs(Path path, StorageConfiguration<T> 
storageConf, boolean newCopy) {
-    T conf = newCopy ? storageConf.newCopy() : storageConf.get();
+    T conf = newCopy ? storageConf.unwrapCopy() : storageConf.unwrap();
     ValidationUtils.checkArgument(conf instanceof Configuration);
     return getFs(path, (Configuration) conf);
   }
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HadoopStorageConfiguration.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HadoopStorageConfiguration.java
index a0009aaf75a..f272f8333eb 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HadoopStorageConfiguration.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HadoopStorageConfiguration.java
@@ -54,16 +54,21 @@ public class HadoopStorageConfiguration extends 
StorageConfiguration<Configurati
   }
 
   public HadoopStorageConfiguration(HadoopStorageConfiguration configuration) {
-    this.configuration = configuration.newCopy();
+    this.configuration = configuration.unwrapCopy();
   }
 
   @Override
-  public Configuration get() {
+  public StorageConfiguration<Configuration> newInstance() {
+    return new HadoopStorageConfiguration(this);
+  }
+
+  @Override
+  public Configuration unwrap() {
     return configuration;
   }
 
   @Override
-  public Configuration newCopy() {
+  public Configuration unwrapCopy() {
     return new Configuration(configuration);
   }
 
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
index c653e7f3101..0e055444900 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
@@ -83,7 +83,7 @@ public class HoodieDeltaStreamerWrapper extends 
HoodieDeltaStreamer {
     StreamSync service = getDeltaSync();
     service.refreshTimeline();
     HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
-        .setConf((Configuration) service.getStorage().getConf().newCopy())
+        .setConf((Configuration) service.getStorage().getConf().unwrapCopy())
         .setBasePath(service.getCfg().targetBasePath)
         .build();
     String instantTime = HoodieActiveTimeline.createNewInstantTime();
diff --git a/hudi-io/src/main/java/org/apache/hudi/common/util/Option.java 
b/hudi-io/src/main/java/org/apache/hudi/common/util/Option.java
index 957dab28e2c..42fd98bdd01 100644
--- a/hudi-io/src/main/java/org/apache/hudi/common/util/Option.java
+++ b/hudi-io/src/main/java/org/apache/hudi/common/util/Option.java
@@ -84,6 +84,10 @@ public final class Option<T> implements Serializable {
     return null != val;
   }
 
+  public boolean isEmpty() {
+    return null == val;
+  }
+
   public T get() {
     if (null == val) {
       throw new NoSuchElementException("No value present in Option");
diff --git 
a/hudi-io/src/main/java/org/apache/hudi/storage/StorageConfiguration.java 
b/hudi-io/src/main/java/org/apache/hudi/storage/StorageConfiguration.java
index d92eeab8bed..c0a60490f21 100644
--- a/hudi-io/src/main/java/org/apache/hudi/storage/StorageConfiguration.java
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/StorageConfiguration.java
@@ -31,14 +31,20 @@ import java.io.Serializable;
  */
 public abstract class StorageConfiguration<T> implements Serializable {
   /**
-   * @return the storage configuration.
+   * @return a new {@link StorageConfiguration} instance with a new copy of
+   * the configuration of type {@link T}.
    */
-  public abstract T get();
+  public abstract StorageConfiguration<T> newInstance();
 
   /**
-   * @return a new copy of the storage configuration.
+   * @return the underlying configuration of type {@link T}.
    */
-  public abstract T newCopy();
+  public abstract T unwrap();
+
+  /**
+   * @return a new copy of the underlying configuration of type {@link T}.
+   */
+  public abstract T unwrapCopy();
   
   /**
    * Sets the configuration key-value pair.
@@ -108,4 +114,17 @@ public abstract class StorageConfiguration<T> implements 
Serializable {
         ? Enum.valueOf(defaultValue.getDeclaringClass(), value.get())
         : defaultValue;
   }
+
+  /**
+   * Sets a property key with a value in the configuration, if the property key
+   * does not already exist.
+   *
+   * @param key   property key.
+   * @param value property value.
+   */
+  public final void setIfUnset(String key, String value) {
+    if (getString(key).isEmpty()) {
+      set(key, value);
+    }
+  }
 }
diff --git 
a/hudi-io/src/test/java/org/apache/hudi/io/storage/BaseTestStorageConfiguration.java
 
b/hudi-io/src/test/java/org/apache/hudi/io/storage/BaseTestStorageConfiguration.java
index 19ae29da985..1d6a3d338e4 100644
--- 
a/hudi-io/src/test/java/org/apache/hudi/io/storage/BaseTestStorageConfiguration.java
+++ 
b/hudi-io/src/test/java/org/apache/hudi/io/storage/BaseTestStorageConfiguration.java
@@ -47,11 +47,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public abstract class BaseTestStorageConfiguration<T> {
   private static final Map<String, String> EMPTY_MAP = new HashMap<>();
   private static final String KEY_STRING = "hudi.key.string";
+  private static final String KEY_STRING_OTHER = "hudi.key.string.other";
   private static final String KEY_BOOLEAN = "hudi.key.boolean";
   private static final String KEY_LONG = "hudi.key.long";
   private static final String KEY_ENUM = "hudi.key.enum";
   private static final String KEY_NON_EXISTENT = "hudi.key.non_existent";
   private static final String VALUE_STRING = "string_value";
+  private static final String VALUE_STRING_1 = "string_value_1";
   private static final String VALUE_BOOLEAN = "true";
   private static final String VALUE_LONG = "12309120";
   private static final String VALUE_ENUM = TestEnum.ENUM2.toString();
@@ -68,11 +70,14 @@ public abstract class BaseTestStorageConfiguration<T> {
   protected abstract T getConf(Map<String, String> mapping);
 
   @Test
-  public void testConstructorGetNewCopy() {
+  public void testConstructorNewInstanceUnwrapCopy() {
     T conf = getConf(EMPTY_MAP);
     StorageConfiguration<T> storageConf = getStorageConfiguration(conf);
-    assertSame(storageConf.get(), storageConf.get());
-    assertNotSame(storageConf.get(), storageConf.newCopy());
+    StorageConfiguration<T> newStorageConf = storageConf.newInstance();
+    assertNotSame(storageConf, newStorageConf);
+    assertNotSame(storageConf.unwrap(), newStorageConf.unwrap());
+    assertSame(storageConf.unwrap(), storageConf.unwrap());
+    assertNotSame(storageConf.unwrap(), storageConf.unwrapCopy());
   }
 
   @Test
@@ -85,6 +90,11 @@ public abstract class BaseTestStorageConfiguration<T> {
     storageConf.set(KEY_BOOLEAN, VALUE_BOOLEAN);
     assertEquals(Option.of(VALUE_STRING), storageConf.getString(KEY_STRING));
     assertTrue(storageConf.getBoolean(KEY_BOOLEAN, false));
+
+    storageConf.setIfUnset(KEY_STRING, VALUE_STRING + "_1");
+    storageConf.setIfUnset(KEY_STRING_OTHER, VALUE_STRING_1);
+    assertEquals(Option.of(VALUE_STRING), storageConf.getString(KEY_STRING));
+    assertEquals(Option.of(VALUE_STRING_1), 
storageConf.getString(KEY_STRING_OTHER));
   }
 
   @Test
@@ -102,7 +112,7 @@ public abstract class BaseTestStorageConfiguration<T> {
       try (ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
            ObjectInputStream ois = new ObjectInputStream(bais)) {
         StorageConfiguration<?> deserialized = (StorageConfiguration) 
ois.readObject();
-        assertNotNull(deserialized.get());
+        assertNotNull(deserialized.unwrap());
         validateConfigs(deserialized);
       }
     }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index 5372f15a82b..99b6841d50d 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -56,9 +56,9 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.hive.HiveSyncTool;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.utilities.HiveIncrementalPuller;
 import org.apache.hudi.utilities.IdentitySplitter;
 import org.apache.hudi.utilities.UtilHelpers;
@@ -691,7 +691,7 @@ public class HoodieStreamer implements Serializable {
       if (this.storage.exists(new StoragePath(cfg.targetBasePath))) {
         try {
           HoodieTableMetaClient meta = HoodieTableMetaClient.builder()
-              .setConf((Configuration) this.storage.getConf().newCopy())
+              .setConf((Configuration) this.storage.getConf().unwrapCopy())
               
.setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(false).build();
           tableType = meta.getTableType();
           // This will guarantee there is no surprise with table type

Reply via email to