wuchong commented on a change in pull request #11822:
URL: https://github.com/apache/flink/pull/11822#discussion_r411878291



##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/Elasticsearch.java
##########
@@ -93,6 +95,26 @@ public Elasticsearch host(String hostname, int port, String 
protocol) {
                return this;
        }
 
+       /**
+        * The Elasticsearch Cluster userName.
+        *
+        * @param userName Elasticsearch userName
+        */
+       public Elasticsearch userName(String userName) {

Review comment:
       lower case for N
   ```suggestion
        public Elasticsearch username(String username) {
   ```

##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/Elasticsearch.java
##########
@@ -93,6 +95,26 @@ public Elasticsearch host(String hostname, int port, String 
protocol) {
                return this;
        }
 
+       /**
+        * The Elasticsearch Cluster userName.

Review comment:
       ditto

##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/table/descriptors/ElasticsearchValidator.java
##########
@@ -76,12 +78,18 @@ public void validate(DescriptorProperties properties) {
                properties.validateValue(CONNECTOR_TYPE, 
CONNECTOR_TYPE_VALUE_ELASTICSEARCH, false);
                validateVersion(properties);
                validateHosts(properties);
+               validateAuth(properties);
                validateGeneralProperties(properties);
                validateFailureHandler(properties);
                validateBulkFlush(properties);
                validateConnectionProperties(properties);
        }
 
+       private void validateAuth(DescriptorProperties properties) {
+               properties.validateString(CONNECTOR_USERNAME, true);
+               properties.validateString(CONNECTOR_PASSWORD, true);

Review comment:
       We should validate that both username and password should exist if one 
of them is configured. 

##########
File path: 
flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSink.java
##########
@@ -189,6 +207,67 @@ protected ElasticsearchUpsertTableSinkBase copy(
        // Helper classes
        // 
--------------------------------------------------------------------------------------------
 
+       /**
+        * This class implements {@link RestClientFactory}, used for es with 
authentication.
+        */
+       static class AuthRestClientFactory implements RestClientFactory {
+
+               private String userName;
+
+               private String password;
+
+               private Integer maxRetryTimeout;
+
+               private String pathPrefix;
+
+               private transient CredentialsProvider credentialsProvider;
+
+               public AuthRestClientFactory(@Nullable String userName, 
@Nullable String password,

Review comment:
       `username` and `password` are not nullable, because you will use 
`AuthRestClientFactory` iff username and password are set. 

##########
File path: 
flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSink.java
##########
@@ -159,12 +165,24 @@ protected ElasticsearchUpsertTableSinkBase copy(
                Optional.ofNullable(sinkOptions.get(BULK_FLUSH_BACKOFF_DELAY))
                        .ifPresent(v -> 
builder.setBulkFlushBackoffDelay(Long.valueOf(v)));
 
-               builder.setRestClientFactory(
-                       new DefaultRestClientFactory(
+               if 
(Optional.ofNullable(sinkOptions.get(CONNECTOR_USERNAME)).isPresent() &&
+                       
Optional.ofNullable(sinkOptions.get(CONNECTOR_PASSWORD)).isPresent()) {
+                       builder.setRestClientFactory(new AuthRestClientFactory(
+                               sinkOptions.get(CONNECTOR_USERNAME),
+                               sinkOptions.get(CONNECTOR_PASSWORD),
                                
Optional.ofNullable(sinkOptions.get(REST_MAX_RETRY_TIMEOUT))
                                        .map(Integer::valueOf)
                                        .orElse(null),
-                               sinkOptions.get(REST_PATH_PREFIX)));
+                               sinkOptions.get(REST_PATH_PREFIX)
+                       ));
+               } else {
+                       builder.setRestClientFactory(
+                               new DefaultRestClientFactory(
+                                       
Optional.ofNullable(sinkOptions.get(REST_MAX_RETRY_TIMEOUT))
+                                               .map(Integer::valueOf)
+                                               .orElse(null),
+                                       sinkOptions.get(REST_PATH_PREFIX)));
+               }

Review comment:
       Could you extract the common vairable before the if else condition? e.g. 
maxRetry, pathPrefix, username, password.

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch7/Elasticsearch7UpsertTableSinkFactoryTest.java
##########
@@ -102,7 +102,9 @@ public void testBuilder() {
                expectedBuilder.setBulkFlushInterval(100);
                expectedBuilder.setBulkFlushMaxActions(1000);
                expectedBuilder.setBulkFlushMaxSizeMb(1);
-               expectedBuilder.setRestClientFactory(new 
Elasticsearch7UpsertTableSink.DefaultRestClientFactory("/myapp"));
+               expectedBuilder.setRestClientFactory(new 
Elasticsearch7UpsertTableSink.AuthRestClientFactory("elastic",
+                       "123456", "/myapp"));
+//             expectedBuilder.setRestClientFactory(new 
Elasticsearch7UpsertTableSink.DefaultRestClientFactory("/myapp"));

Review comment:
       remove useless code.

##########
File path: docs/dev/table/connect.md
##########
@@ -960,6 +960,10 @@ CREATE TABLE MyUserTable (
                                          -- or "custom" for failure handling 
with a
                                          -- ActionRequestFailureHandler 
subclass
 
+  -- optional: configure Elasticsearch cluster username and password 
+  'connector.username' = 'elastic',   -- optional: Elasticsearch username
+  'connector.password' = '123456',   -- optional: Elasticsearch password

Review comment:
       We can just use `myusername` or `mypassword` as an example. 
   ```suggestion
     'connector.username' = 'myusername',   -- optional: Elasticsearch username
     'connector.password' = 'mypassword',   -- optional: Elasticsearch password
   ```

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch7/Elasticsearch7UpsertTableSink.java
##########
@@ -185,8 +191,18 @@ protected ElasticsearchUpsertTableSinkBase copy(
                Optional.ofNullable(sinkOptions.get(BULK_FLUSH_BACKOFF_DELAY))
                        .ifPresent(v -> 
builder.setBulkFlushBackoffDelay(Long.valueOf(v)));
 
-               builder.setRestClientFactory(
-                       new 
DefaultRestClientFactory(sinkOptions.get(REST_PATH_PREFIX)));
+               if 
(Optional.ofNullable(sinkOptions.get(CONNECTOR_USERNAME)).isPresent() &&
+                       
Optional.ofNullable(sinkOptions.get(CONNECTOR_PASSWORD)).isPresent()) {
+                       builder.setRestClientFactory(new AuthRestClientFactory(
+                               sinkOptions.get(CONNECTOR_USERNAME),
+                               sinkOptions.get(CONNECTOR_PASSWORD),
+                               sinkOptions.get(REST_PATH_PREFIX)
+                       ));
+               } else {
+                       builder.setRestClientFactory(
+                               new DefaultRestClientFactory(

Review comment:
       I just wondering why es7 doesn't support `REST_MAX_RETRY_TIMEOUT`, I 
guess it is a mistake of FLINK-13025. Could you add the retry support for es7? 
cc @twalthr @yanghua 

##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java
##########
@@ -319,6 +319,8 @@ private void validateKeyTypes(int[] keyFieldIndices) {
         * Keys for optional parameterization of the sink.
         */
        public enum SinkOption {
+               CONNECTOR_USERNAME,

Review comment:
       We do not need `CONNECTOR_` prefix for `SinkOption`, I think just 
`USERNAME` and `PASSWORD` is fine.

##########
File path: docs/dev/table/connect.md
##########
@@ -1040,6 +1048,10 @@ CREATE TABLE MyUserTable (
     .index("MyUsers")                  # required: Elasticsearch index
     .document_type("user")             # required: Elasticsearch document type
 
+    //optional: configure ElasticSearch cluster username and password 
+    .userName("elastic")    //optional: Elasticsearch cluster username

Review comment:
       Python should use `#` as comments. 

##########
File path: docs/dev/table/connect.md
##########
@@ -998,6 +1002,10 @@ CREATE TABLE MyUserTable (
     .index("MyUsers")                  // required: Elasticsearch index
     .documentType("user")              // required: Elasticsearch document type
 
+    //optional: configure Elasticsearch cluster username and password 
+    .userName("elastic")    //optional: Elasticsearch cluster username
+    .password("123456")    //optional: Elasticsearch cluster password

Review comment:
       Please add a space after `//` 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to