This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.10.1-rc1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 2b98d909886c34a8a64c520f459cdf9a1a3a9c6c Author: Thinking Chen <[email protected]> AuthorDate: Mon Jan 10 07:31:57 2022 +0800 [HUDI-3112] Fix KafkaConnect cannot sync to Hive Problem (#4458) --- .../hudi/connect/utils/KafkaConnectUtils.java | 31 ++++++++++++++++++++++ .../hudi/connect/writers/KafkaConnectConfigs.java | 16 +++++++++++ .../writers/KafkaConnectTransactionServices.java | 25 ++++++++++------- 3 files changed, 62 insertions(+), 10 deletions(-) diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java index cc37de2..6a38430 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java @@ -32,6 +32,8 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.connect.ControlMessage; import org.apache.hudi.connect.writers.KafkaConnectConfigs; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hive.HiveSyncConfig; +import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.keygen.CustomAvroKeyGenerator; import org.apache.hudi.keygen.CustomKeyGenerator; @@ -57,6 +59,7 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -266,4 +269,32 @@ public class KafkaConnectUtils { ControlMessage.ConnectWriteStatus connectWriteStatus = participantInfo.getWriteStatus(); return SerializationUtils.deserialize(connectWriteStatus.getSerializedWriteStatus().toByteArray()); } + + /** + * Build Hive Sync Config + * Note: This method is a temporary solution. + * Future solutions can be referred to: https://issues.apache.org/jira/browse/HUDI-3199 + */ + public static HiveSyncConfig buildSyncConfig(TypedProperties props, String tableBasePath) { + HiveSyncConfig hiveSyncConfig = new HiveSyncConfig(); + hiveSyncConfig.basePath = tableBasePath; + hiveSyncConfig.usePreApacheInputFormat = props.getBoolean(KafkaConnectConfigs.HIVE_USE_PRE_APACHE_INPUT_FORMAT, false); + hiveSyncConfig.databaseName = props.getString(KafkaConnectConfigs.HIVE_DATABASE, "default"); + hiveSyncConfig.tableName = props.getString(KafkaConnectConfigs.HIVE_TABLE, ""); + hiveSyncConfig.hiveUser = props.getString(KafkaConnectConfigs.HIVE_USER, ""); + hiveSyncConfig.hivePass = props.getString(KafkaConnectConfigs.HIVE_PASS, ""); + hiveSyncConfig.jdbcUrl = props.getString(KafkaConnectConfigs.HIVE_URL, ""); + hiveSyncConfig.partitionFields = props.getStringList(KafkaConnectConfigs.HIVE_PARTITION_FIELDS, ",", Collections.emptyList()); + hiveSyncConfig.partitionValueExtractorClass = + props.getString(KafkaConnectConfigs.HIVE_PARTITION_EXTRACTOR_CLASS, SlashEncodedDayPartitionValueExtractor.class.getName()); + hiveSyncConfig.useJdbc = props.getBoolean(KafkaConnectConfigs.HIVE_USE_JDBC, true); + if (props.containsKey(KafkaConnectConfigs.HIVE_SYNC_MODE)) { + hiveSyncConfig.syncMode = props.getString(KafkaConnectConfigs.HIVE_SYNC_MODE); + } + hiveSyncConfig.autoCreateDatabase = props.getBoolean(KafkaConnectConfigs.HIVE_AUTO_CREATE_DATABASE, true); + hiveSyncConfig.ignoreExceptions = props.getBoolean(KafkaConnectConfigs.HIVE_IGNORE_EXCEPTIONS, false); + hiveSyncConfig.skipROSuffix = props.getBoolean(KafkaConnectConfigs.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE, false); + hiveSyncConfig.supportTimestamp = props.getBoolean(KafkaConnectConfigs.HIVE_SUPPORT_TIMESTAMP_TYPE, false); + return hiveSyncConfig; + } } diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java index 1200779..e4543c6 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java @@ -164,6 +164,22 @@ public class KafkaConnectConfigs extends HoodieConfig { return getString(HADOOP_HOME); } + public static final String HIVE_USE_PRE_APACHE_INPUT_FORMAT = "hoodie.datasource.hive_sync.use_pre_apache_input_format"; + public static final String HIVE_DATABASE = "hoodie.datasource.hive_sync.database"; + public static final String HIVE_TABLE = "hoodie.datasource.hive_sync.table"; + public static final String HIVE_USER = "hoodie.datasource.hive_sync.username"; + public static final String HIVE_PASS = "hoodie.datasource.hive_sync.password"; + public static final String HIVE_URL = "hoodie.datasource.hive_sync.jdbcurl"; + public static final String HIVE_PARTITION_FIELDS = "hoodie.datasource.hive_sync.partition_fields"; + public static final String HIVE_PARTITION_EXTRACTOR_CLASS = "hoodie.datasource.hive_sync.partition_extractor_class"; + public static final String HIVE_USE_JDBC = "hoodie.datasource.hive_sync.use_jdbc"; + public static final String HIVE_SYNC_MODE = "hoodie.datasource.hive_sync.mode"; + public static final String HIVE_AUTO_CREATE_DATABASE = "hoodie.datasource.hive_sync.auto_create_database"; + public static final String HIVE_IGNORE_EXCEPTIONS = "hoodie.datasource.hive_sync.ignore_exceptions"; + public static final String HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE = "hoodie.datasource.hive_sync.skip_ro_suffix"; + public static final String HIVE_SUPPORT_TIMESTAMP_TYPE = "hoodie.datasource.hive_sync.support_timestamp"; + public static final String HIVE_METASTORE_URIS = "hive.metastore.uris"; + public static class Builder { protected final KafkaConnectConfigs connectConfigs = new KafkaConnectConfigs(); diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java index cca738a..dae19cc 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java @@ -18,7 +18,6 @@ package org.apache.hudi.connect.writers; -import org.apache.hudi.DataSourceUtils; import org.apache.hudi.client.HoodieJavaWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieJavaEngineContext; @@ -32,12 +31,14 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.connect.transaction.TransactionCoordinator; import org.apache.hudi.connect.utils.KafkaConnectUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HiveSyncTool; +import org.apache.hudi.hive.ddl.HiveSyncMode; import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory; import org.apache.hudi.sync.common.AbstractSyncTool; @@ -163,9 +164,9 @@ public class KafkaConnectTransactionServices implements ConnectTransactionServic } private void syncMeta() { - Set<String> syncClientToolClasses = new HashSet<>( - Arrays.asList(connectConfigs.getMetaSyncClasses().split(","))); if (connectConfigs.isMetaSyncEnabled()) { + Set<String> syncClientToolClasses = new HashSet<>( + Arrays.asList(connectConfigs.getMetaSyncClasses().split(","))); for (String impl : syncClientToolClasses) { impl = impl.trim(); switch (impl) { @@ -185,16 +186,20 @@ public class KafkaConnectTransactionServices implements ConnectTransactionServic } private void syncHive() { - HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig( - new TypedProperties(connectConfigs.getProps()), - tableBasePath, - "PARQUET"); + HiveSyncConfig hiveSyncConfig = KafkaConnectUtils.buildSyncConfig(new TypedProperties(connectConfigs.getProps()), tableBasePath); + String url; + if (!StringUtils.isNullOrEmpty(hiveSyncConfig.syncMode) && HiveSyncMode.of(hiveSyncConfig.syncMode) == HiveSyncMode.HMS) { + url = hadoopConf.get(KafkaConnectConfigs.HIVE_METASTORE_URIS); + } else { + url = hiveSyncConfig.jdbcUrl; + } + LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName - + "). Hive metastore URL :" - + hiveSyncConfig.jdbcUrl + + "). Hive URL :" + + url + ", basePath :" + tableBasePath); - LOG.info("Hive Sync Conf => " + hiveSyncConfig.toString()); + LOG.info("Hive Sync Conf => " + hiveSyncConfig); FileSystem fs = FSUtils.getFs(tableBasePath, hadoopConf); HiveConf hiveConf = new HiveConf(); hiveConf.addResource(fs.getConf());
