[
https://issues.apache.org/jira/browse/FLINK-20224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jark Wu closed FLINK-20224.
---------------------------
Fix Version/s: (was: 1.11.3)
Resolution: Duplicate
> add username&password to provide a credential for es rest client
> ----------------------------------------------------------------
>
> Key: FLINK-20224
> URL: https://issues.apache.org/jira/browse/FLINK-20224
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / ElasticSearch
> Affects Versions: 1.11.0
> Reporter: jiang7chengzitc
> Priority: Minor
>
> hello,
> Flink ElasticSearch Connector use Java High Level REST Client to process
> request for index, delete, get, update, etc. but some ES clusters (version 6
> and higher) require security credentials to connect, So it can be considered
> to add username and password option to build security credentials, then
> connect to this ES cluster.
> for example:
> {code:java}
> //代码占位符
> org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6UpsertTableSink
> @Override
> protected SinkFunction<Tuple2<Boolean, Row>> createSinkFunction(
> List<Host> hosts,
> ActionRequestFailureHandler failureHandler,
> Map<SinkOption, String> sinkOptions,
> ElasticsearchUpsertSinkFunction upsertSinkFunction) {
> ......
> builder.setRestClientFactory(
> new DefaultRestClientFactory(
> Optional.ofNullable(sinkOptions.get(REST_MAX_RETRY_TIMEOUT))
> .map(Integer::valueOf)
> .orElse(null),
> sinkOptions.get(REST_PATH_PREFIX),
> sinkOptions.get(USERNAME),
> sinkOptions.get(PASSWORD)));
> ......
> }
> @VisibleForTesting
> static class DefaultRestClientFactory implements RestClientFactory {
> private Integer maxRetryTimeout;
> private String pathPrefix;
> private String username;
> private String password;
> public DefaultRestClientFactory(@Nullable Integer maxRetryTimeout,
> @Nullable String pathPrefix,@Nullable String username, @Nullable String
> password) {
> this.maxRetryTimeout = maxRetryTimeout;
> this.pathPrefix = pathPrefix;
> this.username = username;
> this.password = password;
> }
> @Override
> public void configureRestClientBuilder(RestClientBuilder
> restClientBuilder) {
> if (maxRetryTimeout != null) {
> restClientBuilder.setMaxRetryTimeoutMillis(maxRetryTimeout);
> }
> if (pathPrefix != null) {
> restClientBuilder.setPathPrefix(pathPrefix);
> }
> // build credentialsProvider
> if (username != null && password != null) {
> final CredentialsProvider credentialsProvider = new
> BasicCredentialsProvider();
> credentialsProvider.setCredentials(AuthScope.ANY,
> new UsernamePasswordCredentials(username, password));
> restClientBuilder.setHttpClientConfigCallback(httpClientBuilder ->
>
> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
> );
> }
> }
> @Override
> public boolean equals(Object o) {
> if (this == o) {
> return true;
> }
> if (o == null || getClass() != o.getClass()) {
> return false;
> }
> DefaultRestClientFactory that = (DefaultRestClientFactory) o;
> return Objects.equals(maxRetryTimeout, that.maxRetryTimeout) &&
> Objects.equals(pathPrefix, that.pathPrefix) &&
> Objects.equals(username, that.username) &&
> Objects.equals(password, that.password);
> }
> @Override
> public int hashCode() {
> return Objects.hash(
> maxRetryTimeout,
> pathPrefix,
> username,
> password);
> }
> }{code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)