[ 
https://issues.apache.org/jira/browse/FLINK-35922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qiu updated FLINK-35922:
------------------------
    Summary: Enhanced support for reading hadoop configuration from flink 
configuration when constructing hiveCatalog  (was: Add configuration options 
related to hive)

> Enhanced support for reading hadoop configuration from flink configuration 
> when constructing hiveCatalog
> --------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-35922
>                 URL: https://issues.apache.org/jira/browse/FLINK-35922
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Hive
>    Affects Versions: 1.19.1
>            Reporter: Qiu
>            Priority: Major
>
> Current, we submit flink job to yarn with run-application target and need to 
> specify some configuration related to hive, because we use distributed 
> filesystem similar to Ali oss to storage resources, in this case, we will 
> pass special configuration option and set them to hiveConfiguration.
> In order to solve such problems, we can provide a configuration option 
> prefixed with "flink.hadoop."(such as -Dflink.hadoop.xxx=yyy), and then take 
> it into HiveConfiguration.
> A simple implementation code is as follows:
>  
> {code:java}
> //代码占位符
> module: flink-connectors/flink-connector-hive
> class: org.apache.flink.table.catalog.hive.HiveCatalog  
> //代码占位符 
> public static HiveConf createHiveConf(@Nullable String hiveConfDir, @Nullable 
> String hadoopConfDir) {
>         ...
>         String flinkConfDir = 
> System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
>         if (flinkConfDir != null) {
>             org.apache.flink.configuration.Configuration flinkConfiguration = 
> GlobalConfiguration.loadConfiguration(flinkConfDir);
>             // add all configuration keys with prefix 'flink.hadoop.' in 
> Flink conf to Hadoop conf
>             for (String key : flinkConfiguration.keySet()) {
>                 for (String prefix : FLINK_CONFIG_PREFIXES) {
>                     if (key.startsWith(prefix)) {
>                         String newKey = key.substring(prefix.length());
>                         String value = flinkConfiguration.getString(key, 
> null);
>                         hadoopConf.set(newKey, value);
>                         LOG.debug("Adding Flink config entry for {} as {}={} 
> to Hadoop config", key, newKey, value);
>                     }
>                 }
>             }
>         }
>     }{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to