hailin0 commented on code in PR #3997:
URL:
https://github.com/apache/incubator-seatunnel/pull/3997#discussion_r1090612249
##########
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java:
##########
@@ -62,52 +69,115 @@ public class EsRestClient {
private final RestClient restClient;
- private final ObjectMapper mapper = new ObjectMapper();
-
private EsRestClient(RestClient restClient) {
this.restClient = restClient;
}
public static EsRestClient createInstance(Config pluginConfig) {
List<String> hosts =
pluginConfig.getStringList(EsClusterConnectionConfig.HOSTS.key());
- String username = null;
- String password = null;
+ Optional<String> username = Optional.empty();
+ Optional<String> password = Optional.empty();
if (pluginConfig.hasPath(EsClusterConnectionConfig.USERNAME.key())) {
- username =
pluginConfig.getString(EsClusterConnectionConfig.USERNAME.key());
+ username =
Optional.of(pluginConfig.getString(EsClusterConnectionConfig.USERNAME.key()));
if
(pluginConfig.hasPath(EsClusterConnectionConfig.PASSWORD.key())) {
- password =
pluginConfig.getString(EsClusterConnectionConfig.PASSWORD.key());
+ password =
Optional.of(pluginConfig.getString(EsClusterConnectionConfig.PASSWORD.key()));
+ }
+ }
+ Optional<String> keystorePath = Optional.empty();
+ Optional<String> keystorePassword = Optional.empty();
+ Optional<String> truststorePath = Optional.empty();
+ Optional<String> truststorePassword = Optional.empty();
+ boolean tlsVerifyCertificate =
EsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE.defaultValue();
+ if
(pluginConfig.hasPath(EsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE.key())) {
+ tlsVerifyCertificate =
pluginConfig.getBoolean(EsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE.key());
+ }
+ if (tlsVerifyCertificate) {
+ if
(pluginConfig.hasPath(EsClusterConnectionConfig.TLS_KEY_STORE_PATH.key())) {
+ keystorePath =
Optional.of(pluginConfig.getString(EsClusterConnectionConfig.TLS_KEY_STORE_PATH.key()));
}
+ if
(pluginConfig.hasPath(EsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD.key())) {
+ keystorePassword =
Optional.of(pluginConfig.getString(EsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD.key()));
+ }
+ if
(pluginConfig.hasPath(EsClusterConnectionConfig.TLS_TRUST_STORE_PATH.key())) {
+ truststorePath =
Optional.of(pluginConfig.getString(EsClusterConnectionConfig.TLS_TRUST_STORE_PATH.key()));
+ }
+ if
(pluginConfig.hasPath(EsClusterConnectionConfig.TLS_TRUST_STORE_PASSWORD.key()))
{
+ truststorePassword =
Optional.of(pluginConfig.getString(EsClusterConnectionConfig.TLS_TRUST_STORE_PASSWORD.key()));
+ }
+ }
+ boolean tlsVerifyHostnames =
EsClusterConnectionConfig.TLS_VERIFY_HOSTNAME.defaultValue();
+ if
(pluginConfig.hasPath(EsClusterConnectionConfig.TLS_VERIFY_HOSTNAME.key())) {
+ tlsVerifyHostnames =
pluginConfig.getBoolean(EsClusterConnectionConfig.TLS_VERIFY_HOSTNAME.key());
}
- return createInstance(hosts, username, password);
+ return createInstance(hosts, username, password, tlsVerifyCertificate,
tlsVerifyHostnames,
+ keystorePath, keystorePassword, truststorePath,
truststorePassword);
}
- public static EsRestClient createInstance(List<String> hosts, String
username, String password) {
- RestClientBuilder restClientBuilder = getRestClientBuilder(hosts,
username, password);
+ public static EsRestClient createInstance(List<String> hosts,
+ Optional<String> username,
+ Optional<String> password,
+ boolean tlsVerifyCertificate,
+ boolean tlsVerifyHostnames,
+ Optional<String> keystorePath,
+ Optional<String>
keystorePassword,
+ Optional<String> truststorePath,
+ Optional<String>
truststorePassword) {
+ RestClientBuilder restClientBuilder = getRestClientBuilder(
+ hosts, username, password, tlsVerifyCertificate,
tlsVerifyHostnames,
+ keystorePath, keystorePassword, truststorePath,
truststorePassword);
return new EsRestClient(restClientBuilder.build());
}
- private static RestClientBuilder getRestClientBuilder(List<String> hosts,
String username, String password) {
+ private static RestClientBuilder getRestClientBuilder(List<String> hosts,
+ Optional<String>
username,
+ Optional<String>
password,
+ boolean
tlsVerifyCertificate,
+ boolean
tlsVerifyHostnames,
+ Optional<String>
keystorePath,
+ Optional<String>
keystorePassword,
+ Optional<String>
truststorePath,
+ Optional<String>
truststorePassword) {
HttpHost[] httpHosts = new HttpHost[hosts.size()];
for (int i = 0; i < hosts.size(); i++) {
- String[] hostInfo = hosts.get(i).replace("http://", "").split(":");
- httpHosts[i] = new HttpHost(hostInfo[0],
Integer.parseInt(hostInfo[1]));
+ httpHosts[i] = HttpHost.create(hosts.get(i));
}
- RestClientBuilder builder = RestClient.builder(httpHosts)
+ RestClientBuilder restClientBuilder = RestClient.builder(httpHosts)
.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder
.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT)
.setSocketTimeout(SOCKET_TIMEOUT));
- if (StringUtils.isNotEmpty(username)) {
- CredentialsProvider credentialsProvider = new
BasicCredentialsProvider();
- credentialsProvider.setCredentials(AuthScope.ANY, new
UsernamePasswordCredentials(username, password));
- builder.setHttpClientConfigCallback(httpAsyncClientBuilder ->
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
- }
- return builder;
+ restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
+ if (username.isPresent()) {
+ CredentialsProvider credentialsProvider = new
BasicCredentialsProvider();
+ credentialsProvider.setCredentials(AuthScope.ANY,
+ new UsernamePasswordCredentials(username.get(),
password.get()));
+
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+ }
+
+ try {
+ if (tlsVerifyCertificate) {
+ Optional<SSLContext> sslContext =
SSLUtils.buildSSLContext(keystorePath,
+ keystorePassword, truststorePath, truststorePassword);
+ sslContext.ifPresent(e ->
httpClientBuilder.setSSLContext(e));
+ } else {
+ SSLContext sslContext = SSLContexts.custom()
+ .loadTrustMaterial(new TrustAllStrategy()).build();
+ httpClientBuilder.setSSLContext(sslContext);
+ }
+ if (!tlsVerifyHostnames) {
+
httpClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return httpClientBuilder;
+ });
+ return restClientBuilder;
}
public BulkResponse bulk(String requestBody) {
- Request request = new Request("POST", "_bulk");
+ Request request = new Request("POST", "/_bulk");
Review Comment:
fix https://bugs.chromium.org/p/gerrit/issues/detail?id=9761
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]