[
https://issues.apache.org/jira/browse/FLINK-35922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Qiu updated FLINK-35922:
------------------------
Summary: Enhanced support for reading hadoop configuration from flink
configuration when constructing hiveCatalog (was: Add configuration options
related to hive)
> Enhanced support for reading hadoop configuration from flink configuration
> when constructing hiveCatalog
> --------------------------------------------------------------------------------------------------------
>
> Key: FLINK-35922
> URL: https://issues.apache.org/jira/browse/FLINK-35922
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Hive
> Affects Versions: 1.19.1
> Reporter: Qiu
> Priority: Major
>
> Current, we submit flink job to yarn with run-application target and need to
> specify some configuration related to hive, because we use distributed
> filesystem similar to Ali oss to storage resources, in this case, we will
> pass special configuration option and set them to hiveConfiguration.
> In order to solve such problems, we can provide a configuration option
> prefixed with "flink.hadoop."(such as -Dflink.hadoop.xxx=yyy), and then take
> it into HiveConfiguration.
> A simple implementation code is as follows:
>
> {code:java}
> //代码占位符
> module: flink-connectors/flink-connector-hive
> class: org.apache.flink.table.catalog.hive.HiveCatalog
> //代码占位符
> public static HiveConf createHiveConf(@Nullable String hiveConfDir, @Nullable
> String hadoopConfDir) {
> ...
> String flinkConfDir =
> System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
> if (flinkConfDir != null) {
> org.apache.flink.configuration.Configuration flinkConfiguration =
> GlobalConfiguration.loadConfiguration(flinkConfDir);
> // add all configuration keys with prefix 'flink.hadoop.' in
> Flink conf to Hadoop conf
> for (String key : flinkConfiguration.keySet()) {
> for (String prefix : FLINK_CONFIG_PREFIXES) {
> if (key.startsWith(prefix)) {
> String newKey = key.substring(prefix.length());
> String value = flinkConfiguration.getString(key,
> null);
> hadoopConf.set(newKey, value);
> LOG.debug("Adding Flink config entry for {} as {}={}
> to Hadoop config", key, newKey, value);
> }
> }
> }
> }
> }{code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)