danny0405 commented on a change in pull request #12536:
URL: https://github.com/apache/flink/pull/12536#discussion_r438019632



##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java
##########
@@ -81,16 +80,22 @@ public String getDocumentType() {
                return config.get(ElasticsearchOptions.DOCUMENT_TYPE_OPTION);
        }
 
-       public Optional<Integer> getBulkFlushMaxActions() {
-               return 
config.getOptional(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION);
+       public int getBulkFlushMaxActions() {
+               int maxActions = 
config.get(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION);
+               // convert 0 to -1, because Elasticsearch client use -1 to 
disable this configuration.
+               return maxActions == 0 ? -1 : maxActions;
        }
 
-       public Optional<Integer> getBulkFlushMaxSize() {
-               return 
config.getOptional(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION).map(MemorySize::getMebiBytes);
+       public long getBulkFlushMaxByteSize() {
+               long maxSize = 
config.get(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION).getBytes();
+               // convert 0 to -1, because Elasticsearch client use -1 to 
disable this configuration.
+               return maxSize == 0 ? -1 : maxSize;
        }
 
-       public Optional<Long> getBulkFlushInterval() {
-               return 
config.getOptional(BULK_FLUSH_INTERVAL_OPTION).map(Duration::toMillis);
+       public long getBulkFlushInterval() {
+               long interval = 
config.get(BULK_FLUSH_INTERVAL_OPTION).toMillis();
+               // convert 0 to -1, because Elasticsearch client use -1 to 
disable this configuration.
+               return interval == 0 ? -1 : interval;

Review comment:
       The bridge logic seems weird, why not just let user set `-1` which is 
synced with ES way.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to