Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3358#discussion_r102178621
  
    --- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
    @@ -67,10 +73,56 @@
        public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
        public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = 
"bulk.flush.max.size.mb";
        public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = 
"bulk.flush.interval.ms";
    +   public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = 
"bulk.flush.backoff.enable";
    +   public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = 
"bulk.flush.backoff.type";
    +   public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = 
"bulk.flush.backoff.retries";
    +   public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = 
"bulk.flush.backoff.delay";
    +
    +   public enum FlushBackoffType {
    +           CONSTANT,
    +           EXPONENTIAL
    +   }
    +
    +   public class BulkFlushBackoffPolicy implements Serializable {
    +
    +           private static final long serialVersionUID = 
-6022851996101826049L;
    +
    +           // the default values follow the Elasticsearch default settings 
for BulkProcessor
    +           private FlushBackoffType backoffType = 
FlushBackoffType.EXPONENTIAL;
    +           private int maxRetryCount = 8;
    +           private long delayMillis = 50;
    +
    +           public FlushBackoffType getBackoffType() {
    +                   return backoffType;
    +           }
    +
    +           public int getMaxRetryCount() {
    +                   return maxRetryCount;
    +           }
    +
    +           public long getDelayMillis() {
    +                   return delayMillis;
    +           }
    +
    +           public void setBackoffType(FlushBackoffType backoffType) {
    +                   this.backoffType = checkNotNull(backoffType);
    +           }
    +
    +           public void setMaxRetryCount(int maxRetryCount) {
    +                   checkArgument(maxRetryCount > 0);
    +                   this.maxRetryCount = maxRetryCount;
    +           }
    +
    +           public void setDelayMillis(long delayMillis) {
    +                   checkArgument(delayMillis > 0);
    --- End diff --
    
    We should accept 0 here as well, if users want to retry immediately (for 
whatever reason :) )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to