This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4bba6af0fa1 [HUDI-6462] Add Hudi client init callback interface (#9108)
4bba6af0fa1 is described below
commit 4bba6af0fa104ad8eef0ecd62e0aedf67bbe33a4
Author: Y Ethan Guo <[email protected]>
AuthorDate: Mon Jul 3 22:18:37 2023 -0700
[HUDI-6462] Add Hudi client init callback interface (#9108)
This PR adds the interface for Hudi client init callback to run custom
logic at the time of initialization of a Hudi client:
@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
public interface HoodieClientInitCallback {
/**
* A callback method in which the user can implement custom logic.
* This method is called when a {@link BaseHoodieClient} is initialized.
*
* @param hoodieClient {@link BaseHoodieClient} instance.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
void call(BaseHoodieClient hoodieClient);
}
At the time of instantiation of the write or table service client, a user
may want to do additional processing, such as sending metrics, logsm
notification, or adding more properties to the write config. The implementation
of client init callback interface allows such logic to be plugged into Hudi.
A new config, hoodie.client.init.callback.classes, is added for plugging in
the callback implementation. The class list is comma-separated.
---
.../hudi/callback/HoodieClientInitCallback.java | 40 ++++
.../org/apache/hudi/client/BaseHoodieClient.java | 21 ++
.../org/apache/hudi/config/HoodieWriteConfig.java | 31 ++-
.../callback/TestHoodieClientInitCallback.java | 234 +++++++++++++++++++++
4 files changed, 320 insertions(+), 6 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/HoodieClientInitCallback.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/HoodieClientInitCallback.java
new file mode 100644
index 00000000000..a86eded75e5
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/HoodieClientInitCallback.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.callback;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.client.BaseHoodieClient;
+
+/**
+ * A callback interface to run custom logic at the time of initialization of
the Hudi client.
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public interface HoodieClientInitCallback {
+ /**
+ * A callback method in which the user can implement custom logic.
+ * This method is called when a {@link BaseHoodieClient} is initialized.
+ *
+ * @param hoodieClient {@link BaseHoodieClient} instance.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ void call(BaseHoodieClient hoodieClient);
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
index e01ffb20719..26b10c1c1bf 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
@@ -18,6 +18,7 @@
package org.apache.hudi.client;
+import org.apache.hudi.callback.HoodieClientInitCallback;
import org.apache.hudi.client.embedded.EmbeddedTimelineServerHelper;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
@@ -30,8 +31,11 @@ import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.metrics.HoodieMetrics;
@@ -45,6 +49,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
+import java.util.Arrays;
import java.util.List;
import java.util.Set;
@@ -92,6 +97,7 @@ public abstract class BaseHoodieClient implements
Serializable, AutoCloseable {
this.txnManager = new TransactionManager(config, fs);
startEmbeddedServerView();
initWrapperFSMetrics();
+ runClientInitCallbacks();
}
/**
@@ -137,6 +143,21 @@ public abstract class BaseHoodieClient implements
Serializable, AutoCloseable {
}
}
+ private void runClientInitCallbacks() {
+ String callbackClassNames = config.getClientInitCallbackClassNames();
+ if (StringUtils.isNullOrEmpty(callbackClassNames)) {
+ return;
+ }
+ Arrays.stream(callbackClassNames.split(",")).forEach(callbackClass -> {
+ Object callback = ReflectionUtils.loadClass(callbackClass);
+ if (!(callback instanceof HoodieClientInitCallback)) {
+ throw new HoodieException(callbackClass + " is not a subclass of "
+ + HoodieClientInitCallback.class.getName());
+ }
+ ((HoodieClientInitCallback) callback).call(this);
+ });
+ }
+
public HoodieWriteConfig getConfig() {
return config;
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index bc964b3cfe8..93105491180 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -698,14 +698,24 @@ public class HoodieWriteConfig extends HoodieConfig {
+ "a configured filter `ssl`, value for config
`ssl.trustore.location` would be masked.");
public static final ConfigProperty<Boolean> ROLLBACK_INSTANT_BACKUP_ENABLED
= ConfigProperty
- .key("hoodie.rollback.instant.backup.enabled")
- .defaultValue(false)
- .withDocumentation("Backup instants removed during rollback and
restore (useful for debugging)");
+ .key("hoodie.rollback.instant.backup.enabled")
+ .defaultValue(false)
+ .withDocumentation("Backup instants removed during rollback and restore
(useful for debugging)");
public static final ConfigProperty<String> ROLLBACK_INSTANT_BACKUP_DIRECTORY
= ConfigProperty
- .key("hoodie.rollback.instant.backup.dir")
- .defaultValue(".rollback_backup")
- .withDocumentation("Path where instants being rolled back are
copied. If not absolute path then a directory relative to .hoodie folder is
created.");
+ .key("hoodie.rollback.instant.backup.dir")
+ .defaultValue(".rollback_backup")
+ .withDocumentation("Path where instants being rolled back are copied. If
not absolute path then a directory relative to .hoodie folder is created.");
+
+ public static final ConfigProperty<String> CLIENT_INIT_CALLBACK_CLASS_NAMES
= ConfigProperty
+ .key("hoodie.client.init.callback.classes")
+ .defaultValue("")
+ .markAdvanced()
+ .sinceVersion("0.14.0")
+ .withDocumentation("Fully-qualified class names of the Hudi client init
callbacks to run "
+ + "at the initialization of the Hudi client. The class names are
separated by `,`. "
+ + "The class must be a subclass of
`org.apache.hudi.callback.HoodieClientInitCallback`."
+ + "By default, no Hudi client init callback is executed.");
private ConsistencyGuardConfig consistencyGuardConfig;
private FileSystemRetryConfig fileSystemRetryConfig;
@@ -2535,6 +2545,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getString(ROLLBACK_INSTANT_BACKUP_DIRECTORY);
}
+ public String getClientInitCallbackClassNames() {
+ return getString(CLIENT_INIT_CALLBACK_CLASS_NAMES);
+ }
+
public static class Builder {
protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
@@ -3017,6 +3031,11 @@ public class HoodieWriteConfig extends HoodieConfig {
return this;
}
+ public Builder withClientInitCallbackClassNames(String classNames) {
+ writeConfig.setValue(CLIENT_INIT_CALLBACK_CLASS_NAMES, classNames);
+ return this;
+ }
+
protected void setDefaults() {
writeConfig.setDefaultValue(MARKERS_TYPE,
getDefaultMarkersType(engineType));
// Check for mandatory properties
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/callback/TestHoodieClientInitCallback.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/callback/TestHoodieClientInitCallback.java
new file mode 100644
index 00000000000..1ede02413fb
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/callback/TestHoodieClientInitCallback.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.callback;
+
+import org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback;
+import org.apache.hudi.client.BaseHoodieClient;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.execution.bulkinsert.NonSortPartitionerWithRows;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.Stream;
+
+import static
org.apache.hudi.callback.TestHoodieClientInitCallback.AddConfigInitCallbackTestClass.CUSTOM_CONFIG_KEY1;
+import static
org.apache.hudi.callback.TestHoodieClientInitCallback.AddConfigInitCallbackTestClass.CUSTOM_CONFIG_VALUE1;
+import static
org.apache.hudi.callback.TestHoodieClientInitCallback.ChangeConfigInitCallbackTestClass.CUSTOM_CONFIG_KEY2;
+import static
org.apache.hudi.callback.TestHoodieClientInitCallback.ChangeConfigInitCallbackTestClass.CUSTOM_CONFIG_VALUE2;
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_NESTED_EXAMPLE_SCHEMA;
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_SCHEMA_OVERRIDE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests {@link HoodieClientInitCallback}.
+ */
+public class TestHoodieClientInitCallback {
+ @TempDir
+ java.nio.file.Path tmpDir;
+
+ @Mock
+ static HoodieSparkEngineContext engineContext =
+ Mockito.mock(HoodieSparkEngineContext.class);
+
+ @BeforeAll
+ public static void setup() {
+ when(engineContext.getHadoopConf())
+ .thenReturn(new SerializableConfiguration(new Configuration()));
+ }
+
+ @Test
+ public void testNoClientInitCallback() {
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .withPath(tmpDir.toString())
+ .withEmbeddedTimelineServerEnabled(false)
+ .build(false);
+ assertFalse(config.contains(CUSTOM_CONFIG_KEY1));
+
+ SparkRDDWriteClient<Object> writeClient = new
SparkRDDWriteClient<>(engineContext, config);
+
+ assertFalse(writeClient.getConfig().contains(CUSTOM_CONFIG_KEY1));
+
assertFalse(writeClient.getTableServiceClient().getConfig().contains(CUSTOM_CONFIG_KEY1));
+ }
+
+ @Test
+ public void testSingleClientInitCallback() {
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .withPath(tmpDir.toString())
+ .withEmbeddedTimelineServerEnabled(false)
+
.withClientInitCallbackClassNames(ChangeConfigInitCallbackTestClass.class.getName())
+ .withProps(Collections.singletonMap(
+ WRITE_SCHEMA_OVERRIDE.key(), TRIP_NESTED_EXAMPLE_SCHEMA))
+ .build(false);
+ assertFalse(config.contains(CUSTOM_CONFIG_KEY1));
+ assertFalse(new Schema.Parser().parse(config.getWriteSchema())
+ .getObjectProps().containsKey(CUSTOM_CONFIG_KEY2));
+
+ SparkRDDWriteClient<Object> writeClient = new
SparkRDDWriteClient<>(engineContext, config);
+
+ HoodieWriteConfig updatedConfig = writeClient.getConfig();
+ assertFalse(updatedConfig.contains(CUSTOM_CONFIG_KEY1));
+ Schema actualSchema = new
Schema.Parser().parse(updatedConfig.getWriteSchema());
+ assertTrue(actualSchema.getObjectProps().containsKey(CUSTOM_CONFIG_KEY2));
+ assertEquals(CUSTOM_CONFIG_VALUE2,
actualSchema.getObjectProps().get(CUSTOM_CONFIG_KEY2));
+
+ updatedConfig = writeClient.getTableServiceClient().getConfig();
+ assertFalse(updatedConfig.contains(CUSTOM_CONFIG_KEY1));
+ actualSchema = new Schema.Parser().parse(updatedConfig.getWriteSchema());
+ assertTrue(actualSchema.getObjectProps().containsKey(CUSTOM_CONFIG_KEY2));
+ assertEquals(CUSTOM_CONFIG_VALUE2,
actualSchema.getObjectProps().get(CUSTOM_CONFIG_KEY2));
+ }
+
+ @Test
+ public void testTwoClientInitCallbacks() {
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .withPath(tmpDir.toString())
+ .withEmbeddedTimelineServerEnabled(false)
+ .withClientInitCallbackClassNames(
+ ChangeConfigInitCallbackTestClass.class.getName() + ","
+ + AddConfigInitCallbackTestClass.class.getName())
+ .withProps(Collections.singletonMap(
+ WRITE_SCHEMA_OVERRIDE.key(), TRIP_NESTED_EXAMPLE_SCHEMA))
+ .build(false);
+ assertFalse(config.contains(CUSTOM_CONFIG_KEY1));
+ assertFalse(new Schema.Parser().parse(config.getWriteSchema())
+ .getObjectProps().containsKey(CUSTOM_CONFIG_KEY2));
+
+ SparkRDDWriteClient<Object> writeClient = new
SparkRDDWriteClient<>(engineContext, config);
+
+ HoodieWriteConfig updatedConfig = writeClient.getConfig();
+ assertTrue(updatedConfig.contains(CUSTOM_CONFIG_KEY1));
+ assertEquals(CUSTOM_CONFIG_VALUE1,
updatedConfig.getString(CUSTOM_CONFIG_KEY1));
+ Schema actualSchema = new
Schema.Parser().parse(updatedConfig.getWriteSchema());
+ assertTrue(actualSchema.getObjectProps().containsKey(CUSTOM_CONFIG_KEY2));
+ assertEquals(CUSTOM_CONFIG_VALUE2,
actualSchema.getObjectProps().get(CUSTOM_CONFIG_KEY2));
+
+ updatedConfig = writeClient.getTableServiceClient().getConfig();
+ assertTrue(updatedConfig.contains(CUSTOM_CONFIG_KEY1));
+ assertEquals(CUSTOM_CONFIG_VALUE1,
updatedConfig.getString(CUSTOM_CONFIG_KEY1));
+ actualSchema = new Schema.Parser().parse(updatedConfig.getWriteSchema());
+ assertTrue(actualSchema.getObjectProps().containsKey(CUSTOM_CONFIG_KEY2));
+ assertEquals(CUSTOM_CONFIG_VALUE2,
actualSchema.getObjectProps().get(CUSTOM_CONFIG_KEY2));
+ }
+
+ @Test
+ public void testClientInitCallbackThrowingException() {
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .withPath(tmpDir.toString())
+ .withEmbeddedTimelineServerEnabled(false)
+ .withClientInitCallbackClassNames(
+ AddConfigInitCallbackTestClass.class.getName() + ","
+ + ThrowExceptionCallbackTestClass.class.getName())
+ .build(false);
+ HoodieIOException exception = assertThrows(
+ HoodieIOException.class,
+ () -> new SparkRDDWriteClient<>(engineContext, config),
+ "Expects the initialization to throw a HoodieIOException");
+ assertEquals(
+ "Throwing exception during client initialization.",
+ exception.getMessage());
+ }
+
+ @ParameterizedTest
+ @MethodSource("testArgsForNonCallbackClass")
+ public void testNonClientInitCallbackClassInConfig(String className, String
errorMsg) {
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .withPath(tmpDir.toString())
+ .withEmbeddedTimelineServerEnabled(false)
+ .withClientInitCallbackClassNames(className)
+ .build(false);
+ HoodieException exception = assertThrows(
+ HoodieException.class,
+ () -> new SparkRDDWriteClient<>(engineContext, config),
+ "Expects the initialization to throw a HoodieException");
+ assertEquals(errorMsg, exception.getMessage());
+ }
+
+ private static Stream<Arguments> testArgsForNonCallbackClass() {
+ return Arrays.stream(new String[][] {
+ {HoodieWriteCommitHttpCallback.class.getName(),
+ "Could not load class " +
HoodieWriteCommitHttpCallback.class.getName()},
+ {NonSortPartitionerWithRows.class.getName(),
+ NonSortPartitionerWithRows.class.getName() + " is not a subclass
of " + HoodieClientInitCallback.class.getName()}
+ }).map(Arguments::of);
+ }
+
+ /**
+ * A test {@link HoodieClientInitCallback} implementation to add
`user.defined.key1` config.
+ */
+ public static class AddConfigInitCallbackTestClass implements
HoodieClientInitCallback {
+ public static final String CUSTOM_CONFIG_KEY1 = "user.defined.key1";
+ public static final String CUSTOM_CONFIG_VALUE1 = "value1";
+
+ @Override
+ public void call(BaseHoodieClient hoodieClient) {
+ HoodieWriteConfig config = hoodieClient.getConfig();
+ config.setValue(CUSTOM_CONFIG_KEY1, CUSTOM_CONFIG_VALUE1);
+ }
+ }
+
+ /**
+ * A test {@link HoodieClientInitCallback} implementation to add the property
+ * `user.defined.key2=value2` to the write schema.
+ */
+ public static class ChangeConfigInitCallbackTestClass implements
HoodieClientInitCallback {
+ public static final String CUSTOM_CONFIG_KEY2 = "user.defined.key2";
+ public static final String CUSTOM_CONFIG_VALUE2 = "value2";
+
+ @Override
+ public void call(BaseHoodieClient hoodieClient) {
+ HoodieWriteConfig config = hoodieClient.getConfig();
+ Schema schema = new Schema.Parser().parse(config.getWriteSchema());
+ if (!schema.getObjectProps().containsKey(CUSTOM_CONFIG_KEY2)) {
+ schema.addProp(CUSTOM_CONFIG_KEY2, CUSTOM_CONFIG_VALUE2);
+ }
+ config.getProps().setProperty(WRITE_SCHEMA_OVERRIDE.key(),
schema.toString());
+ }
+ }
+
+ /**
+ * A test {@link HoodieClientInitCallback} implementation to throw an
exception.
+ */
+ public static class ThrowExceptionCallbackTestClass implements
HoodieClientInitCallback {
+ @Override
+ public void call(BaseHoodieClient hoodieClient) {
+ throw new HoodieIOException("Throwing exception during client
initialization.");
+ }
+ }
+}