This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0d8ca8d [HUDI-3104] Kafka-connect support of hadoop config
environments and properties (#4451)
0d8ca8d is described below
commit 0d8ca8da4e0f6651bc1f06dba5e7e37881225fdc
Author: Thinking Chen <[email protected]>
AuthorDate: Sun Jan 9 15:10:17 2022 +0800
[HUDI-3104] Kafka-connect support of hadoop config environments and
properties (#4451)
---
.../hudi/connect/utils/KafkaConnectUtils.java | 68 +++++++++++++++++++++
.../hudi/connect/writers/KafkaConnectConfigs.java | 29 +++++++++
.../apache/hudi/connect/TestHdfsConfiguration.java | 69 ++++++++++++++++++++++
.../src/test/resources/hadoop_conf/core-site.xml | 33 +++++++++++
.../src/test/resources/hadoop_conf/hdfs-site.xml | 30 ++++++++++
.../resources/hadoop_home/etc/hadoop/core-site.xml | 33 +++++++++++
.../resources/hadoop_home/etc/hadoop/hdfs-site.xml | 30 ++++++++++
7 files changed, 292 insertions(+)
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
index cf60b9e..cc37de2 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
@@ -49,9 +49,14 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.FileVisitOption;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -65,6 +70,52 @@ public class KafkaConnectUtils {
private static final Logger LOG =
LogManager.getLogger(KafkaConnectUtils.class);
private static final String HOODIE_CONF_PREFIX = "hoodie.";
+ public static final String HADOOP_CONF_DIR = "HADOOP_CONF_DIR";
+ public static final String HADOOP_HOME = "HADOOP_HOME";
+ private static final List<Path> DEFAULT_HADOOP_CONF_FILES;
+
+ static {
+ DEFAULT_HADOOP_CONF_FILES = new ArrayList<>();
+ try {
+ String hadoopConfigPath = System.getenv(HADOOP_CONF_DIR);
+ String hadoopHomePath = System.getenv(HADOOP_HOME);
+ DEFAULT_HADOOP_CONF_FILES.addAll(getHadoopConfigFiles(hadoopConfigPath,
hadoopHomePath));
+ if (!DEFAULT_HADOOP_CONF_FILES.isEmpty()) {
+ LOG.info(String.format("Found Hadoop default config files %s",
DEFAULT_HADOOP_CONF_FILES));
+ }
+ } catch (IOException e) {
+ LOG.error("An error occurred while getting the default Hadoop
configuration. "
+ + "Please use hadoop.conf.dir or hadoop.home to configure Hadoop
environment variables", e);
+ }
+ }
+
+ /**
+ * Get hadoop config files by HADOOP_CONF_DIR or HADOOP_HOME
+ */
+ public static List<Path> getHadoopConfigFiles(String hadoopConfigPath,
String hadoopHomePath)
+ throws IOException {
+ List<Path> hadoopConfigFiles = new ArrayList<>();
+ if (!StringUtils.isNullOrEmpty(hadoopConfigPath)) {
+ hadoopConfigFiles.addAll(walkTreeForXml(Paths.get(hadoopConfigPath)));
+ }
+ if (hadoopConfigFiles.isEmpty() &&
!StringUtils.isNullOrEmpty(hadoopHomePath)) {
+ hadoopConfigFiles.addAll(walkTreeForXml(Paths.get(hadoopHomePath, "etc",
"hadoop")));
+ }
+ return hadoopConfigFiles;
+ }
+
+ /**
+ * Files walk to find xml
+ */
+ private static List<Path> walkTreeForXml(Path basePath) throws IOException {
+ if (Files.notExists(basePath)) {
+ return new ArrayList<>();
+ }
+ return Files.walk(basePath, FileVisitOption.FOLLOW_LINKS)
+ .filter(path -> path.toFile().isFile())
+ .filter(path -> path.toString().endsWith(".xml"))
+ .collect(Collectors.toList());
+ }
public static int getLatestNumPartitions(String bootstrapServers, String
topicName) {
Properties props = new Properties();
@@ -89,6 +140,23 @@ public class KafkaConnectUtils {
*/
public static Configuration getDefaultHadoopConf(KafkaConnectConfigs
connectConfigs) {
Configuration hadoopConf = new Configuration();
+
+ // add hadoop config files
+ if (!StringUtils.isNullOrEmpty(connectConfigs.getHadoopConfDir())
+ || !StringUtils.isNullOrEmpty(connectConfigs.getHadoopConfHome()))
{
+ try {
+ List<Path> configFiles =
getHadoopConfigFiles(connectConfigs.getHadoopConfDir(),
+ connectConfigs.getHadoopConfHome());
+ configFiles.forEach(f ->
+ hadoopConf.addResource(new
org.apache.hadoop.fs.Path(f.toAbsolutePath().toUri())));
+ } catch (Exception e) {
+ throw new HoodieException("Failed to read hadoop configuration!", e);
+ }
+ } else {
+ DEFAULT_HADOOP_CONF_FILES.forEach(f ->
+ hadoopConf.addResource(new
org.apache.hadoop.fs.Path(f.toAbsolutePath().toUri())));
+ }
+
connectConfigs.getProps().keySet().stream().filter(prop -> {
// In order to prevent printing unnecessary warn logs, here filter out
the hoodie
// configuration items before passing to hadoop/hive configs
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
index ec03451..1200779 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
@@ -93,6 +93,17 @@ public class KafkaConnectConfigs extends HoodieConfig {
.defaultValue(true)
.withDocumentation("Commit even when some records failed to be written");
+ // Reference
https://docs.confluent.io/kafka-connect-hdfs/current/configuration_options.html#hdfs
+ public static final ConfigProperty<String> HADOOP_CONF_DIR = ConfigProperty
+ .key("hadoop.conf.dir")
+ .noDefaultValue()
+ .withDocumentation("The Hadoop configuration directory.");
+
+ public static final ConfigProperty<String> HADOOP_HOME = ConfigProperty
+ .key("hadoop.home")
+ .noDefaultValue()
+ .withDocumentation("The Hadoop home directory.");
+
protected KafkaConnectConfigs() {
super();
}
@@ -145,6 +156,14 @@ public class KafkaConnectConfigs extends HoodieConfig {
return getBoolean(ALLOW_COMMIT_ON_ERRORS);
}
+ public String getHadoopConfDir() {
+ return getString(HADOOP_CONF_DIR);
+ }
+
+ public String getHadoopConfHome() {
+ return getString(HADOOP_HOME);
+ }
+
public static class Builder {
protected final KafkaConnectConfigs connectConfigs = new
KafkaConnectConfigs();
@@ -185,6 +204,16 @@ public class KafkaConnectConfigs extends HoodieConfig {
return this;
}
+ public Builder withHadoopConfDir(String hadoopConfDir) {
+ connectConfigs.setValue(HADOOP_CONF_DIR, String.valueOf(hadoopConfDir));
+ return this;
+ }
+
+ public Builder withHadoopHome(String hadoopHome) {
+ connectConfigs.setValue(HADOOP_HOME, String.valueOf(hadoopHome));
+ return this;
+ }
+
protected void setDefaults() {
// Check for mandatory properties
connectConfigs.setDefaults(KafkaConnectConfigs.class.getName());
diff --git
a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestHdfsConfiguration.java
b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestHdfsConfiguration.java
new file mode 100644
index 0000000..dca8f57
--- /dev/null
+++
b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestHdfsConfiguration.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.connect;
+
+import org.apache.hudi.connect.utils.KafkaConnectUtils;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.hudi.connect.writers.KafkaConnectConfigs;
+import org.junit.jupiter.api.Test;
+
+import java.nio.file.Path;
+import java.util.List;
+
+public class TestHdfsConfiguration {
+
+ private boolean checkFiles(List<Path> paths) {
+ paths.removeIf(p -> {
+ String fileName = p.toFile().getName();
+ return fileName.equals("core-site.xml") ||
fileName.equals("hdfs-site.xml");
+ });
+ return paths.isEmpty();
+ }
+
+ @Test
+ public void testHadoopConfigEnvs() throws Exception {
+ List<Path> paths = KafkaConnectUtils.getHadoopConfigFiles(
+ "src/test/resources/hadoop_conf", "");
+ assertEquals(paths.size(), 2);
+ assertTrue(checkFiles(paths));
+ }
+
+ @Test
+ public void testHadoopHomeEnvs() throws Exception {
+ List<Path> paths = KafkaConnectUtils.getHadoopConfigFiles(
+ "","src/test/resources/hadoop_home");
+ assertEquals(paths.size(), 2);
+ assertTrue(checkFiles(paths));
+ }
+
+ @Test
+ public void testKafkaConfig() throws Exception {
+ KafkaConnectConfigs connectConfigs = KafkaConnectConfigs.newBuilder()
+ .withHadoopHome("src/test/resources/hadoop_home")
+ .build();
+ List<Path> paths = KafkaConnectUtils.getHadoopConfigFiles(
+ connectConfigs.getHadoopConfDir(),
+ connectConfigs.getHadoopConfHome()
+ );
+ assertEquals(paths.size(), 2);
+ assertTrue(checkFiles(paths));
+ }
+}
diff --git a/hudi-kafka-connect/src/test/resources/hadoop_conf/core-site.xml
b/hudi-kafka-connect/src/test/resources/hadoop_conf/core-site.xml
new file mode 100644
index 0000000..26efcea
--- /dev/null
+++ b/hudi-kafka-connect/src/test/resources/hadoop_conf/core-site.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<configuration>
+
+ <property>
+ <name>fs.defaultFS</name>
+ <value>hdfs://test-hudi-path:9000</value>
+ <description>The name of the default file system. A URI whose
+ scheme and authority determine the FileSystem implementation. The
+ uri's scheme determines the config property (fs.SCHEME.impl) naming
+ the FileSystem implementation class. The uri's authority is used
to
+ determine the host, port, etc. for a filesystem.</description>
+ </property>
+
+</configuration>
diff --git a/hudi-kafka-connect/src/test/resources/hadoop_conf/hdfs-site.xml
b/hudi-kafka-connect/src/test/resources/hadoop_conf/hdfs-site.xml
new file mode 100644
index 0000000..0e5daec
--- /dev/null
+++ b/hudi-kafka-connect/src/test/resources/hadoop_conf/hdfs-site.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+
+ <property>
+ <name>dfs.namenode.http-address</name>
+ <value>http://test-hudi-path:50070</value>
+ <description>
+ The address and the base port where the dfs namenode web ui will
listen on.
+ </description>
+ </property>
+
+</configuration>
diff --git
a/hudi-kafka-connect/src/test/resources/hadoop_home/etc/hadoop/core-site.xml
b/hudi-kafka-connect/src/test/resources/hadoop_home/etc/hadoop/core-site.xml
new file mode 100644
index 0000000..26efcea
--- /dev/null
+++ b/hudi-kafka-connect/src/test/resources/hadoop_home/etc/hadoop/core-site.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<configuration>
+
+ <property>
+ <name>fs.defaultFS</name>
+ <value>hdfs://test-hudi-path:9000</value>
+ <description>The name of the default file system. A URI whose
+ scheme and authority determine the FileSystem implementation. The
+ uri's scheme determines the config property (fs.SCHEME.impl) naming
+ the FileSystem implementation class. The uri's authority is used
to
+ determine the host, port, etc. for a filesystem.</description>
+ </property>
+
+</configuration>
diff --git
a/hudi-kafka-connect/src/test/resources/hadoop_home/etc/hadoop/hdfs-site.xml
b/hudi-kafka-connect/src/test/resources/hadoop_home/etc/hadoop/hdfs-site.xml
new file mode 100644
index 0000000..0e5daec
--- /dev/null
+++ b/hudi-kafka-connect/src/test/resources/hadoop_home/etc/hadoop/hdfs-site.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+
+ <property>
+ <name>dfs.namenode.http-address</name>
+ <value>http://test-hudi-path:50070</value>
+ <description>
+ The address and the base port where the dfs namenode web ui will
listen on.
+ </description>
+ </property>
+
+</configuration>