This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 878d0d92c05b131e6f71e7a9c34ab1bffea1bf66 Author: Y Ethan Guo <[email protected]> AuthorDate: Wed May 15 01:44:54 2024 -0700 [HUDI-7668] Add and rename APIs in StorageConfiguration (#11102) --- .../hudi/table/marker/DirectWriteMarkers.java | 2 +- .../org/apache/hudi/hadoop/fs/HadoopFSUtils.java | 2 +- .../storage/hadoop/HadoopStorageConfiguration.java | 11 ++++++--- .../testsuite/HoodieDeltaStreamerWrapper.java | 2 +- .../java/org/apache/hudi/common/util/Option.java | 4 ++++ .../apache/hudi/storage/StorageConfiguration.java | 27 ++++++++++++++++++---- .../io/storage/BaseTestStorageConfiguration.java | 18 +++++++++++---- .../hudi/utilities/streamer/HoodieStreamer.java | 4 ++-- 8 files changed, 54 insertions(+), 16 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java index 241c3050555..d98a90c2053 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java @@ -147,7 +147,7 @@ public class DirectWriteMarkers extends WriteMarkers { if (subDirectories.size() > 0) { parallelism = Math.min(subDirectories.size(), parallelism); - SerializableConfiguration serializedConf = new SerializableConfiguration((Configuration) storage.getConf().get()); + SerializableConfiguration serializedConf = new SerializableConfiguration((Configuration) storage.getConf().unwrap()); context.setJobStatus(this.getClass().getSimpleName(), "Obtaining marker files for all created, merged paths"); logFiles.addAll(context.flatMap(subDirectories, directory -> { Queue<Path> candidatesDirs = new LinkedList<>(); diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java index 78b293ee75f..80d881a45fa 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java @@ -80,7 +80,7 @@ public class HadoopFSUtils { } public static <T> FileSystem getFs(Path path, StorageConfiguration<T> storageConf, boolean newCopy) { - T conf = newCopy ? storageConf.newCopy() : storageConf.get(); + T conf = newCopy ? storageConf.unwrapCopy() : storageConf.unwrap(); ValidationUtils.checkArgument(conf instanceof Configuration); return getFs(path, (Configuration) conf); } diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HadoopStorageConfiguration.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HadoopStorageConfiguration.java index a0009aaf75a..f272f8333eb 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HadoopStorageConfiguration.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HadoopStorageConfiguration.java @@ -54,16 +54,21 @@ public class HadoopStorageConfiguration extends StorageConfiguration<Configurati } public HadoopStorageConfiguration(HadoopStorageConfiguration configuration) { - this.configuration = configuration.newCopy(); + this.configuration = configuration.unwrapCopy(); } @Override - public Configuration get() { + public StorageConfiguration<Configuration> newInstance() { + return new HadoopStorageConfiguration(this); + } + + @Override + public Configuration unwrap() { return configuration; } @Override - public Configuration newCopy() { + public Configuration unwrapCopy() { return new Configuration(configuration); } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java index c653e7f3101..0e055444900 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java @@ -83,7 +83,7 @@ public class HoodieDeltaStreamerWrapper extends HoodieDeltaStreamer { StreamSync service = getDeltaSync(); service.refreshTimeline(); HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() - .setConf((Configuration) service.getStorage().getConf().newCopy()) + .setConf((Configuration) service.getStorage().getConf().unwrapCopy()) .setBasePath(service.getCfg().targetBasePath) .build(); String instantTime = HoodieActiveTimeline.createNewInstantTime(); diff --git a/hudi-io/src/main/java/org/apache/hudi/common/util/Option.java b/hudi-io/src/main/java/org/apache/hudi/common/util/Option.java index 957dab28e2c..42fd98bdd01 100644 --- a/hudi-io/src/main/java/org/apache/hudi/common/util/Option.java +++ b/hudi-io/src/main/java/org/apache/hudi/common/util/Option.java @@ -84,6 +84,10 @@ public final class Option<T> implements Serializable { return null != val; } + public boolean isEmpty() { + return null == val; + } + public T get() { if (null == val) { throw new NoSuchElementException("No value present in Option"); diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/StorageConfiguration.java b/hudi-io/src/main/java/org/apache/hudi/storage/StorageConfiguration.java index d92eeab8bed..c0a60490f21 100644 --- a/hudi-io/src/main/java/org/apache/hudi/storage/StorageConfiguration.java +++ b/hudi-io/src/main/java/org/apache/hudi/storage/StorageConfiguration.java @@ -31,14 +31,20 @@ import java.io.Serializable; */ public abstract class StorageConfiguration<T> implements Serializable { /** - * @return the storage configuration. + * @return a new {@link StorageConfiguration} instance with a new copy of + * the configuration of type {@link T}. */ - public abstract T get(); + public abstract StorageConfiguration<T> newInstance(); /** - * @return a new copy of the storage configuration. + * @return the underlying configuration of type {@link T}. */ - public abstract T newCopy(); + public abstract T unwrap(); + + /** + * @return a new copy of the underlying configuration of type {@link T}. + */ + public abstract T unwrapCopy(); /** * Sets the configuration key-value pair. @@ -108,4 +114,17 @@ public abstract class StorageConfiguration<T> implements Serializable { ? Enum.valueOf(defaultValue.getDeclaringClass(), value.get()) : defaultValue; } + + /** + * Sets a property key with a value in the configuration, if the property key + * does not already exist. + * + * @param key property key. + * @param value property value. + */ + public final void setIfUnset(String key, String value) { + if (getString(key).isEmpty()) { + set(key, value); + } + } } diff --git a/hudi-io/src/test/java/org/apache/hudi/io/storage/BaseTestStorageConfiguration.java b/hudi-io/src/test/java/org/apache/hudi/io/storage/BaseTestStorageConfiguration.java index 19ae29da985..1d6a3d338e4 100644 --- a/hudi-io/src/test/java/org/apache/hudi/io/storage/BaseTestStorageConfiguration.java +++ b/hudi-io/src/test/java/org/apache/hudi/io/storage/BaseTestStorageConfiguration.java @@ -47,11 +47,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public abstract class BaseTestStorageConfiguration<T> { private static final Map<String, String> EMPTY_MAP = new HashMap<>(); private static final String KEY_STRING = "hudi.key.string"; + private static final String KEY_STRING_OTHER = "hudi.key.string.other"; private static final String KEY_BOOLEAN = "hudi.key.boolean"; private static final String KEY_LONG = "hudi.key.long"; private static final String KEY_ENUM = "hudi.key.enum"; private static final String KEY_NON_EXISTENT = "hudi.key.non_existent"; private static final String VALUE_STRING = "string_value"; + private static final String VALUE_STRING_1 = "string_value_1"; private static final String VALUE_BOOLEAN = "true"; private static final String VALUE_LONG = "12309120"; private static final String VALUE_ENUM = TestEnum.ENUM2.toString(); @@ -68,11 +70,14 @@ public abstract class BaseTestStorageConfiguration<T> { protected abstract T getConf(Map<String, String> mapping); @Test - public void testConstructorGetNewCopy() { + public void testConstructorNewInstanceUnwrapCopy() { T conf = getConf(EMPTY_MAP); StorageConfiguration<T> storageConf = getStorageConfiguration(conf); - assertSame(storageConf.get(), storageConf.get()); - assertNotSame(storageConf.get(), storageConf.newCopy()); + StorageConfiguration<T> newStorageConf = storageConf.newInstance(); + assertNotSame(storageConf, newStorageConf); + assertNotSame(storageConf.unwrap(), newStorageConf.unwrap()); + assertSame(storageConf.unwrap(), storageConf.unwrap()); + assertNotSame(storageConf.unwrap(), storageConf.unwrapCopy()); } @Test @@ -85,6 +90,11 @@ public abstract class BaseTestStorageConfiguration<T> { storageConf.set(KEY_BOOLEAN, VALUE_BOOLEAN); assertEquals(Option.of(VALUE_STRING), storageConf.getString(KEY_STRING)); assertTrue(storageConf.getBoolean(KEY_BOOLEAN, false)); + + storageConf.setIfUnset(KEY_STRING, VALUE_STRING + "_1"); + storageConf.setIfUnset(KEY_STRING_OTHER, VALUE_STRING_1); + assertEquals(Option.of(VALUE_STRING), storageConf.getString(KEY_STRING)); + assertEquals(Option.of(VALUE_STRING_1), storageConf.getString(KEY_STRING_OTHER)); } @Test @@ -102,7 +112,7 @@ public abstract class BaseTestStorageConfiguration<T> { try (ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); ObjectInputStream ois = new ObjectInputStream(bais)) { StorageConfiguration<?> deserialized = (StorageConfiguration) ois.readObject(); - assertNotNull(deserialized.get()); + assertNotNull(deserialized.unwrap()); validateConfigs(deserialized); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java index 5372f15a82b..99b6841d50d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java @@ -56,9 +56,9 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.hive.HiveSyncTool; -import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.HoodieStorageUtils; +import org.apache.hudi.storage.StoragePath; import org.apache.hudi.utilities.HiveIncrementalPuller; import org.apache.hudi.utilities.IdentitySplitter; import org.apache.hudi.utilities.UtilHelpers; @@ -691,7 +691,7 @@ public class HoodieStreamer implements Serializable { if (this.storage.exists(new StoragePath(cfg.targetBasePath))) { try { HoodieTableMetaClient meta = HoodieTableMetaClient.builder() - .setConf((Configuration) this.storage.getConf().newCopy()) + .setConf((Configuration) this.storage.getConf().unwrapCopy()) .setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(false).build(); tableType = meta.getTableType(); // This will guarantee there is no surprise with table type
