[ 
https://issues.apache.org/jira/browse/FLINK-20224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-20224.
---------------------------
    Fix Version/s:     (was: 1.11.3)
       Resolution: Duplicate

> add username&password to provide a credential for es rest client
> ----------------------------------------------------------------
>
>                 Key: FLINK-20224
>                 URL: https://issues.apache.org/jira/browse/FLINK-20224
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / ElasticSearch
>    Affects Versions: 1.11.0
>            Reporter: jiang7chengzitc
>            Priority: Minor
>
> hello,
> Flink ElasticSearch Connector use Java High Level REST Client to process 
> request for index, delete, get, update, etc.  but some ES clusters (version 6 
> and higher) require security credentials to connect, So it can be considered 
> to add username and password option to build security credentials, then  
> connect to this ES cluster.
> for example:
> {code:java}
> //代码占位符
> org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6UpsertTableSink
> @Override
> protected SinkFunction<Tuple2<Boolean, Row>> createSinkFunction(
>       List<Host> hosts,
>       ActionRequestFailureHandler failureHandler,
>       Map<SinkOption, String> sinkOptions,
>       ElasticsearchUpsertSinkFunction upsertSinkFunction) {
>   ......
>   builder.setRestClientFactory(
>    new DefaultRestClientFactory(
>       Optional.ofNullable(sinkOptions.get(REST_MAX_RETRY_TIMEOUT))
>          .map(Integer::valueOf)
>          .orElse(null),
>       sinkOptions.get(REST_PATH_PREFIX),
>       sinkOptions.get(USERNAME),
>       sinkOptions.get(PASSWORD)));
>   ......
> }
> @VisibleForTesting
> static class DefaultRestClientFactory implements RestClientFactory {
>    private Integer maxRetryTimeout;
>    private String pathPrefix;
>    private String username;
>    private String password;
>    public DefaultRestClientFactory(@Nullable Integer maxRetryTimeout, 
> @Nullable String pathPrefix,@Nullable String username, @Nullable String 
> password) {
>       this.maxRetryTimeout = maxRetryTimeout;
>       this.pathPrefix = pathPrefix;
>       this.username = username;
>       this.password = password;
>    }
>    @Override
>    public void configureRestClientBuilder(RestClientBuilder 
> restClientBuilder) {
>       if (maxRetryTimeout != null) {
>          restClientBuilder.setMaxRetryTimeoutMillis(maxRetryTimeout);
>       }
>       if (pathPrefix != null) {
>          restClientBuilder.setPathPrefix(pathPrefix);
>       }
>       // build credentialsProvider
>       if (username != null && password != null) {
>          final CredentialsProvider credentialsProvider = new 
> BasicCredentialsProvider();
>          credentialsProvider.setCredentials(AuthScope.ANY,
>             new UsernamePasswordCredentials(username, password));
>          restClientBuilder.setHttpClientConfigCallback(httpClientBuilder ->
>             
> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
>          );
>       }
>    }
>    @Override
>    public boolean equals(Object o) {
>       if (this == o) {
>          return true;
>       }
>       if (o == null || getClass() != o.getClass()) {
>          return false;
>       }
>       DefaultRestClientFactory that = (DefaultRestClientFactory) o;
>       return Objects.equals(maxRetryTimeout, that.maxRetryTimeout) &&
>          Objects.equals(pathPrefix, that.pathPrefix) &&
>          Objects.equals(username, that.username) &&
>          Objects.equals(password, that.password);
>    }
>    @Override
>    public int hashCode() {
>       return Objects.hash(
>          maxRetryTimeout,
>          pathPrefix,
>          username,
>          password);
>    }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to