dawidwys commented on a change in pull request #12536:
URL: https://github.com/apache/flink/pull/12536#discussion_r437157675



##########
File path: 
flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
##########
@@ -128,9 +128,9 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context 
context) {
                                upsertFunction);
 
                        builder.setFailureHandler(config.getFailureHandler());
-                       
config.getBulkFlushMaxActions().ifPresent(builder::setBulkFlushMaxActions);
-                       
config.getBulkFlushMaxSize().ifPresent(builder::setBulkFlushMaxSizeMb);
-                       
config.getBulkFlushInterval().ifPresent(builder::setBulkFlushInterval);
+                       
builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
+                       builder.setBulkFlushMaxSizeMb((int) 
(config.getBulkFlushMaxByteSize() >> 20));

Review comment:
       I wanted to have any formatting/transformation in one place in the 
`ElasticsearchOptions`. Can we return Mb from `config.getBulkFlushMaxByteSize`?

##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java
##########
@@ -81,16 +80,22 @@ public String getDocumentType() {
                return config.get(ElasticsearchOptions.DOCUMENT_TYPE_OPTION);
        }
 
-       public Optional<Integer> getBulkFlushMaxActions() {
-               return 
config.getOptional(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION);
+       public int getBulkFlushMaxActions() {
+               int maxActions = 
config.get(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION);
+               // convert 0 to -1, because Elasticsearch client use -1 to 
disable this configuration.
+               return maxActions == 0 ? -1 : maxActions;
        }
 
-       public Optional<Integer> getBulkFlushMaxSize() {
-               return 
config.getOptional(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION).map(MemorySize::getMebiBytes);
+       public long getBulkFlushMaxByteSize() {
+               long maxSize = 
config.get(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION).getBytes();
+               // convert 0 to -1, because Elasticsearch client use -1 to 
disable this configuration.
+               return maxSize == 0 ? -1 : maxSize;
        }
 
-       public Optional<Long> getBulkFlushInterval() {
-               return 
config.getOptional(BULK_FLUSH_INTERVAL_OPTION).map(Duration::toMillis);
+       public long getBulkFlushInterval() {
+               long interval = 
config.get(BULK_FLUSH_INTERVAL_OPTION).toMillis();
+               // convert 0 to -1, because Elasticsearch client use -1 to 
disable this configuration.
+               return interval == 0 ? -1 : interval;

Review comment:
       Why not return empty in case it is set to 0? I prefer explicitness of 
`Optional` instead of magic values. Plus we would not need to change the 
Elasticsearch connector code for that.

##########
File path: 
flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
##########
@@ -128,9 +128,9 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context 
context) {
                                upsertFunction);
 
                        builder.setFailureHandler(config.getFailureHandler());
-                       
config.getBulkFlushMaxActions().ifPresent(builder::setBulkFlushMaxActions);
-                       
config.getBulkFlushMaxSize().ifPresent(builder::setBulkFlushMaxSizeMb);
-                       
config.getBulkFlushInterval().ifPresent(builder::setBulkFlushInterval);
+                       
builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
+                       builder.setBulkFlushMaxSizeMb((int) 
(config.getBulkFlushMaxByteSize() >> 20));

Review comment:
       Then return the `MemorySize` instead? Without copying the shifting logic 
which is implemented inside of the `MemorySize`.

##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java
##########
@@ -81,16 +80,22 @@ public String getDocumentType() {
                return config.get(ElasticsearchOptions.DOCUMENT_TYPE_OPTION);
        }
 
-       public Optional<Integer> getBulkFlushMaxActions() {
-               return 
config.getOptional(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION);
+       public int getBulkFlushMaxActions() {
+               int maxActions = 
config.get(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION);
+               // convert 0 to -1, because Elasticsearch client use -1 to 
disable this configuration.
+               return maxActions == 0 ? -1 : maxActions;
        }
 
-       public Optional<Integer> getBulkFlushMaxSize() {
-               return 
config.getOptional(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION).map(MemorySize::getMebiBytes);
+       public long getBulkFlushMaxByteSize() {
+               long maxSize = 
config.get(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION).getBytes();
+               // convert 0 to -1, because Elasticsearch client use -1 to 
disable this configuration.
+               return maxSize == 0 ? -1 : maxSize;
        }
 
-       public Optional<Long> getBulkFlushInterval() {
-               return 
config.getOptional(BULK_FLUSH_INTERVAL_OPTION).map(Duration::toMillis);
+       public long getBulkFlushInterval() {
+               long interval = 
config.get(BULK_FLUSH_INTERVAL_OPTION).toMillis();
+               // convert 0 to -1, because Elasticsearch client use -1 to 
disable this configuration.
+               return interval == 0 ? -1 : interval;

Review comment:
       got it.

##########
File path: 
flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
##########
@@ -128,9 +128,9 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context 
context) {
                                upsertFunction);
 
                        builder.setFailureHandler(config.getFailureHandler());
-                       
config.getBulkFlushMaxActions().ifPresent(builder::setBulkFlushMaxActions);
-                       
config.getBulkFlushMaxSize().ifPresent(builder::setBulkFlushMaxSizeMb);
-                       
config.getBulkFlushInterval().ifPresent(builder::setBulkFlushInterval);
+                       
builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
+                       builder.setBulkFlushMaxSizeMb((int) 
(config.getBulkFlushMaxByteSize() >> 20));

Review comment:
       right, let's leave it as it is then.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to