This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1f7e0f6fbab [HUDI-7357] Introduce generic StorageConfiguration (#10586)
1f7e0f6fbab is described below
commit 1f7e0f6fbabf549cab429fe55191e0500a615d5b
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue Feb 13 19:42:25 2024 -0800
[HUDI-7357] Introduce generic StorageConfiguration (#10586)
This commit introduces the generic `StorageConfiguration` to store
configuration for I/O with `HoodieStorage`. Given there's overhead of
reinitializing Hadoop's `Configuration` instance, the approach is to wrap the
instance in the `HadoopStorageConfiguration` implementation. This change will
enable us to remove our dependency on Hadoop's `Configuration` class. When
integrated, places using `Configuration` will be replaced by
`StorageConfiguration` and the `StorageConfiguration` will [...]
---
.../org/apache/hudi/hadoop/fs/HadoopFSUtils.java | 25 ++++
.../storage/hadoop/HadoopStorageConfiguration.java | 98 +++++++++++++++
.../hadoop}/TestHoodieHadoopStorage.java | 3 +-
...geConfigurationHadoopStorageConfiguration.java} | 31 ++---
.../apache/hudi/storage/StorageConfiguration.java | 132 +++++++++++++++++++++
.../io/storage/BaseTestStorageConfiguration.java | 115 ++++++++++++++++++
6 files changed, 382 insertions(+), 22 deletions(-)
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
index d9abbd5c164..be38dfe8d6d 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
@@ -19,7 +19,10 @@
package org.apache.hudi.hadoop.fs;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -49,6 +52,28 @@ public class HadoopFSUtils {
return conf;
}
+ public static StorageConfiguration<Configuration>
getStorageConf(Configuration conf) {
+ return getStorageConf(conf, false);
+ }
+
+ public static StorageConfiguration<Configuration>
getStorageConf(Configuration conf, boolean copy) {
+ return new HadoopStorageConfiguration(conf, copy);
+ }
+
+ public static <T> FileSystem getFs(String pathStr, StorageConfiguration<T>
storageConf) {
+ return getFs(new Path(pathStr), storageConf);
+ }
+
+ public static <T> FileSystem getFs(Path path, StorageConfiguration<T>
storageConf) {
+ return getFs(path, storageConf, false);
+ }
+
+ public static <T> FileSystem getFs(Path path, StorageConfiguration<T>
storageConf, boolean newCopy) {
+ T conf = newCopy ? storageConf.newCopy() : storageConf.get();
+ ValidationUtils.checkArgument(conf instanceof Configuration);
+ return getFs(path, (Configuration) conf);
+ }
+
public static FileSystem getFs(String pathStr, Configuration conf) {
return getFs(new Path(pathStr), conf);
}
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HadoopStorageConfiguration.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HadoopStorageConfiguration.java
new file mode 100644
index 00000000000..9c5696c01ab
--- /dev/null
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HadoopStorageConfiguration.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.storage.hadoop;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/**
+ * Implementation of {@link StorageConfiguration} providing Hadoop's {@link
Configuration}.
+ */
+public class HadoopStorageConfiguration extends
StorageConfiguration<Configuration> {
+ private static final long serialVersionUID = 1L;
+
+ private transient Configuration configuration;
+
+ public HadoopStorageConfiguration() {
+ this(new Configuration());
+ }
+
+ public HadoopStorageConfiguration(Configuration configuration) {
+ this(configuration, false);
+ }
+
+ public HadoopStorageConfiguration(Configuration configuration, boolean copy)
{
+ if (copy) {
+ this.configuration = new Configuration(configuration);
+ } else {
+ this.configuration = configuration;
+ }
+ }
+
+ public HadoopStorageConfiguration(HadoopStorageConfiguration configuration) {
+ this.configuration = configuration.newCopy();
+ }
+
+ @Override
+ public Configuration get() {
+ return configuration;
+ }
+
+ @Override
+ public Configuration newCopy() {
+ return new Configuration(configuration);
+ }
+
+ @Override
+ public void writeObject(ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+ configuration.write(out);
+ }
+
+ @Override
+ public void readObject(ObjectInputStream in) throws IOException {
+ configuration = new Configuration(false);
+ configuration.readFields(in);
+ }
+
+ @Override
+ public void set(String key, String value) {
+ configuration.set(key, value);
+ }
+
+ @Override
+ public Option<String> getString(String key) {
+ return Option.ofNullable(configuration.get(key));
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder stringBuilder = new StringBuilder();
+ configuration.iterator().forEachRemaining(
+ e -> stringBuilder.append(String.format("%s => %s \n", e.getKey(),
e.getValue())));
+ return stringBuilder.toString();
+ }
+}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/hadoop/storage/TestHoodieHadoopStorage.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/storage/hadoop/TestHoodieHadoopStorage.java
similarity index 94%
copy from
hudi-hadoop-common/src/test/java/org/apache/hudi/hadoop/storage/TestHoodieHadoopStorage.java
copy to
hudi-hadoop-common/src/test/java/org/apache/hudi/storage/hadoop/TestHoodieHadoopStorage.java
index 3eaf4135032..eebce382d7a 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/hadoop/storage/TestHoodieHadoopStorage.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/storage/hadoop/TestHoodieHadoopStorage.java
@@ -17,12 +17,11 @@
* under the License.
*/
-package org.apache.hudi.hadoop.storage;
+package org.apache.hudi.storage.hadoop;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.io.storage.TestHoodieStorageBase;
import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/hadoop/storage/TestHoodieHadoopStorage.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/storage/hadoop/TestStorageConfigurationHadoopStorageConfiguration.java
similarity index 53%
rename from
hudi-hadoop-common/src/test/java/org/apache/hudi/hadoop/storage/TestHoodieHadoopStorage.java
rename to
hudi-hadoop-common/src/test/java/org/apache/hudi/storage/hadoop/TestStorageConfigurationHadoopStorageConfiguration.java
index 3eaf4135032..5225c599fb4 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/hadoop/storage/TestHoodieHadoopStorage.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/storage/hadoop/TestStorageConfigurationHadoopStorageConfiguration.java
@@ -17,37 +17,28 @@
* under the License.
*/
-package org.apache.hudi.hadoop.storage;
+package org.apache.hudi.storage.hadoop;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.io.storage.TestHoodieStorageBase;
-import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
+import org.apache.hudi.io.storage.BaseTestStorageConfiguration;
+import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
+
+import java.util.Map;
/**
- * Tests {@link HoodieHadoopStorage}.
+ * Tests {@link HadoopStorageConfiguration}.
*/
-public class TestHoodieHadoopStorage extends TestHoodieStorageBase {
- private static final String CONF_KEY = "hudi.testing.key";
- private static final String CONF_VALUE = "value";
-
- @Override
- protected HoodieStorage getHoodieStorage(Object fs, Object conf) {
- return new HoodieHadoopStorage((FileSystem) fs);
- }
-
+public class TestStorageConfigurationHadoopStorageConfiguration extends
BaseTestStorageConfiguration<Configuration> {
@Override
- protected Object getFileSystem(Object conf) {
- return HadoopFSUtils.getFs(getTempDir(), (Configuration) conf, true);
+ protected StorageConfiguration<Configuration>
getStorageConfiguration(Configuration conf) {
+ return new HadoopStorageConfiguration(conf);
}
@Override
- protected Object getConf() {
+ protected Configuration getConf(Map<String, String> mapping) {
Configuration conf = new Configuration();
- conf.set(CONF_KEY, CONF_VALUE);
+ mapping.forEach(conf::set);
return conf;
}
}
diff --git
a/hudi-io/src/main/java/org/apache/hudi/storage/StorageConfiguration.java
b/hudi-io/src/main/java/org/apache/hudi/storage/StorageConfiguration.java
new file mode 100644
index 00000000000..4b81347bf3e
--- /dev/null
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/StorageConfiguration.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.storage;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * Interface providing the storage configuration in type {@link T}.
+ *
+ * @param <T> type of storage configuration to provide.
+ */
+public abstract class StorageConfiguration<T> implements Serializable {
+ /**
+ * @return the storage configuration.
+ */
+ public abstract T get();
+
+ /**
+ * @return a new copy of the storage configuration.
+ */
+ public abstract T newCopy();
+
+ /**
+ * Serializes the storage configuration.
+ * DO NOT change the signature, as required by {@link Serializable}.
+ *
+ * @param out stream to write.
+ * @throws IOException on I/O error.
+ */
+ public abstract void writeObject(ObjectOutputStream out) throws IOException;
+
+ /**
+ * Deserializes the storage configuration.
+ * DO NOT change the signature, as required by {@link Serializable}.
+ *
+ * @param in stream to read.
+ * @throws IOException on I/O error.
+ */
+ public abstract void readObject(ObjectInputStream in) throws IOException;
+
+ /**
+ * Sets the configuration key-value pair.
+ *
+ * @param key in String.
+ * @param value in String.
+ */
+ public abstract void set(String key, String value);
+
+ /**
+ * Gets the String value of a property key.
+ *
+ * @param key property key in String.
+ * @return the property value if present, or {@code Option.empty()}.
+ */
+ public abstract Option<String> getString(String key);
+
+ /**
+ * Gets the String value of a property key if present, or the default value
if not.
+ *
+ * @param key property key in String.
+ * @param defaultValue default value is the property does not exist.
+ * @return the property value if present, or the default value.
+ */
+ public final String getString(String key, String defaultValue) {
+ Option<String> value = getString(key);
+ return value.isPresent() ? value.get() : defaultValue;
+ }
+
+ /**
+ * Gets the boolean value of a property key if present, or the default value
if not.
+ *
+ * @param key property key in String.
+ * @param defaultValue default value is the property does not exist.
+ * @return the property value if present, or the default value.
+ */
+ public final boolean getBoolean(String key, boolean defaultValue) {
+ Option<String> value = getString(key);
+ return value.isPresent()
+ ? (!StringUtils.isNullOrEmpty(value.get()) ?
Boolean.parseBoolean(value.get()) : defaultValue)
+ : defaultValue;
+ }
+
+ /**
+ * Gets the long value of a property key if present, or the default value if
not.
+ *
+ * @param key property key in String.
+ * @param defaultValue default value is the property does not exist.
+ * @return the property value if present, or the default value.
+ */
+ public final long getLong(String key, long defaultValue) {
+ Option<String> value = getString(key);
+ return value.isPresent() ? Long.parseLong(value.get()) : defaultValue;
+ }
+
+ /**
+ * Gets the Enum value of a property key if present, or the default value if
not.
+ *
+ * @param key property key in String.
+ * @param defaultValue default value is the property does not exist.
+ * @param <T> Enum.
+ * @return the property value if present, or the default value.
+ */
+ public <T extends Enum<T>> T getEnum(String key, T defaultValue) {
+ Option<String> value = getString(key);
+ return value.isPresent()
+ ? Enum.valueOf(defaultValue.getDeclaringClass(), value.get())
+ : defaultValue;
+ }
+}
diff --git
a/hudi-io/src/test/java/org/apache/hudi/io/storage/BaseTestStorageConfiguration.java
b/hudi-io/src/test/java/org/apache/hudi/io/storage/BaseTestStorageConfiguration.java
new file mode 100644
index 00000000000..6828e3c766e
--- /dev/null
+++
b/hudi-io/src/test/java/org/apache/hudi/io/storage/BaseTestStorageConfiguration.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Base class for testing different implementation of {@link
StorageConfiguration}.
+ *
+ * @param <T> configuration type.
+ */
+public abstract class BaseTestStorageConfiguration<T> {
+ private static final Map<String, String> EMPTY_MAP = new HashMap<>();
+ private static final String KEY_STRING = "hudi.key.string";
+ private static final String KEY_BOOLEAN = "hudi.key.boolean";
+ private static final String KEY_LONG = "hudi.key.long";
+ private static final String KEY_ENUM = "hudi.key.enum";
+ private static final String KEY_NON_EXISTENT = "hudi.key.non_existent";
+ private static final String VALUE_STRING = "string_value";
+ private static final String VALUE_BOOLEAN = "true";
+ private static final String VALUE_LONG = "12309120";
+ private static final String VALUE_ENUM = TestEnum.ENUM2.toString();
+
+ /**
+ * @return instance of {@link StorageConfiguration} implementation class.
+ */
+ protected abstract StorageConfiguration<T> getStorageConfiguration(T conf);
+
+ /**
+ * @param mapping configuration in key-value pairs.
+ * @return underlying configuration instance.
+ */
+ protected abstract T getConf(Map<String, String> mapping);
+
+ @Test
+ public void testConstructorGetNewCopy() {
+ T conf = getConf(EMPTY_MAP);
+ StorageConfiguration<T> storageConf = getStorageConfiguration(conf);
+ assertSame(storageConf.get(), storageConf.get());
+ assertNotSame(storageConf.get(), storageConf.newCopy());
+ }
+
+ @Test
+ public void testSet() {
+ StorageConfiguration<T> storageConf =
getStorageConfiguration(getConf(EMPTY_MAP));
+ assertFalse(storageConf.getString(KEY_STRING).isPresent());
+ assertFalse(storageConf.getString(KEY_BOOLEAN).isPresent());
+
+ storageConf.set(KEY_STRING, VALUE_STRING);
+ storageConf.set(KEY_BOOLEAN, VALUE_BOOLEAN);
+ assertEquals(Option.of(VALUE_STRING), storageConf.getString(KEY_STRING));
+ assertTrue(storageConf.getBoolean(KEY_BOOLEAN, false));
+ }
+
+ @Test
+ public void testGet() {
+ StorageConfiguration<?> storageConf =
getStorageConfiguration(getConf(prepareConfigs()));
+ validateConfigs(storageConf);
+ }
+
+ private Map<String, String> prepareConfigs() {
+ Map<String, String> conf = new HashMap<>();
+ conf.put(KEY_STRING, VALUE_STRING);
+ conf.put(KEY_BOOLEAN, VALUE_BOOLEAN);
+ conf.put(KEY_LONG, VALUE_LONG);
+ conf.put(KEY_ENUM, VALUE_ENUM);
+ return conf;
+ }
+
+ private void validateConfigs(StorageConfiguration<?> storageConf) {
+ assertEquals(Option.of(VALUE_STRING), storageConf.getString(KEY_STRING));
+ assertEquals(VALUE_STRING, storageConf.getString(KEY_STRING, ""));
+ assertTrue(storageConf.getBoolean(KEY_BOOLEAN, false));
+ assertFalse(storageConf.getBoolean(KEY_NON_EXISTENT, false));
+ assertEquals(Long.parseLong(VALUE_LONG), storageConf.getLong(KEY_LONG, 0));
+ assertEquals(30L, storageConf.getLong(KEY_NON_EXISTENT, 30L));
+ assertEquals(TestEnum.valueOf(VALUE_ENUM), storageConf.getEnum(KEY_ENUM,
TestEnum.ENUM1));
+ assertEquals(TestEnum.ENUM1, storageConf.getEnum(KEY_NON_EXISTENT,
TestEnum.ENUM1));
+ assertFalse(storageConf.getString(KEY_NON_EXISTENT).isPresent());
+ assertEquals(VALUE_STRING, storageConf.getString(KEY_NON_EXISTENT,
VALUE_STRING));
+ }
+
+ enum TestEnum {
+ ENUM1, ENUM2, ENUM3
+ }
+}