wuchong commented on a change in pull request #12536:
URL: https://github.com/apache/flink/pull/12536#discussion_r437170172



##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java
##########
@@ -81,16 +80,22 @@ public String getDocumentType() {
                return config.get(ElasticsearchOptions.DOCUMENT_TYPE_OPTION);
        }
 
-       public Optional<Integer> getBulkFlushMaxActions() {
-               return 
config.getOptional(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION);
+       public int getBulkFlushMaxActions() {
+               int maxActions = 
config.get(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION);
+               // convert 0 to -1, because Elasticsearch client use -1 to 
disable this configuration.
+               return maxActions == 0 ? -1 : maxActions;
        }
 
-       public Optional<Integer> getBulkFlushMaxSize() {
-               return 
config.getOptional(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION).map(MemorySize::getMebiBytes);
+       public long getBulkFlushMaxByteSize() {
+               long maxSize = 
config.get(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION).getBytes();
+               // convert 0 to -1, because Elasticsearch client use -1 to 
disable this configuration.
+               return maxSize == 0 ? -1 : maxSize;
        }
 
-       public Optional<Long> getBulkFlushInterval() {
-               return 
config.getOptional(BULK_FLUSH_INTERVAL_OPTION).map(Duration::toMillis);
+       public long getBulkFlushInterval() {
+               long interval = 
config.get(BULK_FLUSH_INTERVAL_OPTION).toMillis();
+               // convert 0 to -1, because Elasticsearch client use -1 to 
disable this configuration.
+               return interval == 0 ? -1 : interval;

Review comment:
       Because Elasticsearch client has a default value for them (1000 rows, 
5mb), we have to use `-1` to disable them, otherwise the client will use the 
default value.  

##########
File path: 
flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
##########
@@ -128,9 +128,9 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context 
context) {
                                upsertFunction);
 
                        builder.setFailureHandler(config.getFailureHandler());
-                       
config.getBulkFlushMaxActions().ifPresent(builder::setBulkFlushMaxActions);
-                       
config.getBulkFlushMaxSize().ifPresent(builder::setBulkFlushMaxSizeMb);
-                       
config.getBulkFlushInterval().ifPresent(builder::setBulkFlushInterval);
+                       
builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
+                       builder.setBulkFlushMaxSizeMb((int) 
(config.getBulkFlushMaxByteSize() >> 20));

Review comment:
       I though about this, however, I think we should validate the value is in 
MB granularity. If we remove the remainder, it will be harder to check it. 

##########
File path: 
flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
##########
@@ -128,9 +128,9 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context 
context) {
                                upsertFunction);
 
                        builder.setFailureHandler(config.getFailureHandler());
-                       
config.getBulkFlushMaxActions().ifPresent(builder::setBulkFlushMaxActions);
-                       
config.getBulkFlushMaxSize().ifPresent(builder::setBulkFlushMaxSizeMb);
-                       
config.getBulkFlushInterval().ifPresent(builder::setBulkFlushInterval);
+                       
builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
+                       builder.setBulkFlushMaxSizeMb((int) 
(config.getBulkFlushMaxByteSize() >> 20));

Review comment:
       But we have to move the `-1` logic out of it then. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to