wuchong commented on a change in pull request #12536:
URL: https://github.com/apache/flink/pull/12536#discussion_r438029982



##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java
##########
@@ -81,16 +80,22 @@ public String getDocumentType() {
                return config.get(ElasticsearchOptions.DOCUMENT_TYPE_OPTION);
        }
 
-       public Optional<Integer> getBulkFlushMaxActions() {
-               return 
config.getOptional(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION);
+       public int getBulkFlushMaxActions() {
+               int maxActions = 
config.get(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION);
+               // convert 0 to -1, because Elasticsearch client use -1 to 
disable this configuration.
+               return maxActions == 0 ? -1 : maxActions;
        }
 
-       public Optional<Integer> getBulkFlushMaxSize() {
-               return 
config.getOptional(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION).map(MemorySize::getMebiBytes);
+       public long getBulkFlushMaxByteSize() {
+               long maxSize = 
config.get(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION).getBytes();
+               // convert 0 to -1, because Elasticsearch client use -1 to 
disable this configuration.
+               return maxSize == 0 ? -1 : maxSize;
        }
 
-       public Optional<Long> getBulkFlushInterval() {
-               return 
config.getOptional(BULK_FLUSH_INTERVAL_OPTION).map(Duration::toMillis);
+       public long getBulkFlushInterval() {
+               long interval = 
config.get(BULK_FLUSH_INTERVAL_OPTION).toMillis();
+               // convert 0 to -1, because Elasticsearch client use -1 to 
disable this configuration.
+               return interval == 0 ? -1 : interval;

Review comment:
       Unfortunately, Duration and MemorySize ConfigOptions doesn't allow 
negative values, e.g. `-1`. That's why I picked zero. I think zero is used as a 
disabled value in the many places, e.g. Kafka 
[batch.size](https://kafka.apache.org/documentation/#batch.size), and 
[watermark 
interval](https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#pipeline-auto-watermark-interval).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to