Jiabao-Sun commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1017723597


##########
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/config/MongoWriteOptions.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.sink.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.BULK_FLUSH_INTERVAL;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.BULK_FLUSH_MAX_ACTIONS;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_MAX_RETRIES;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_RETRY_INTERVAL;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Configurations for MongoSink to control write operations. All the options 
list here could be
+ * configured by {@link MongoWriteOptionsBuilder}.
+ */
+@PublicEvolving
+public final class MongoWriteOptions implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int bulkFlushMaxActions;
+    private final long bulkFlushIntervalMs;
+    private final int maxRetryTimes;
+    private final long retryIntervalMs;
+    private final DeliveryGuarantee deliveryGuarantee;
+    private final Integer parallelism;
+
+    private MongoWriteOptions(
+            int bulkFlushMaxActions,
+            long bulkFlushIntervalMs,
+            int maxRetryTimes,
+            long retryIntervalMs,
+            DeliveryGuarantee deliveryGuarantee,
+            @Nullable Integer parallelism) {
+        this.bulkFlushMaxActions = bulkFlushMaxActions;
+        this.bulkFlushIntervalMs = bulkFlushIntervalMs;
+        this.maxRetryTimes = maxRetryTimes;
+        this.retryIntervalMs = retryIntervalMs;
+        this.deliveryGuarantee = deliveryGuarantee;
+        this.parallelism = parallelism;
+    }
+
+    public int getBulkFlushMaxActions() {
+        return bulkFlushMaxActions;
+    }
+
+    public long getBulkFlushIntervalMs() {
+        return bulkFlushIntervalMs;
+    }
+
+    public int getMaxRetryTimes() {
+        return maxRetryTimes;
+    }
+
+    public long getRetryIntervalMs() {
+        return retryIntervalMs;
+    }
+
+    public DeliveryGuarantee getDeliveryGuarantee() {
+        return deliveryGuarantee;
+    }
+
+    @Nullable
+    public Integer getParallelism() {
+        return parallelism;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        MongoWriteOptions that = (MongoWriteOptions) o;
+        return bulkFlushMaxActions == that.bulkFlushMaxActions
+                && bulkFlushIntervalMs == that.bulkFlushIntervalMs
+                && maxRetryTimes == that.maxRetryTimes
+                && retryIntervalMs == that.retryIntervalMs
+                && deliveryGuarantee == that.deliveryGuarantee
+                && Objects.equals(parallelism, that.parallelism);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                bulkFlushMaxActions,
+                bulkFlushIntervalMs,
+                maxRetryTimes,
+                retryIntervalMs,
+                deliveryGuarantee,
+                parallelism);
+    }
+
+    public static MongoWriteOptionsBuilder builder() {
+        return new MongoWriteOptionsBuilder();
+    }
+
+    /** Builder for {@link MongoWriteOptions}. */
+    @PublicEvolving
+    public static class MongoWriteOptionsBuilder {
+        private int bulkFlushMaxActions = 
BULK_FLUSH_MAX_ACTIONS.defaultValue();
+        private long bulkFlushIntervalMs = 
BULK_FLUSH_INTERVAL.defaultValue().toMillis();
+        private int maxRetryTimes = SINK_MAX_RETRIES.defaultValue();
+        private long retryIntervalMs = 
SINK_RETRY_INTERVAL.defaultValue().toMillis();
+        private DeliveryGuarantee deliveryGuarantee = 
DeliveryGuarantee.AT_LEAST_ONCE;
+        private Integer parallelism;
+
+        private MongoWriteOptionsBuilder() {}
+
+        /**
+         * Sets the maximum number of actions to buffer for each bulk request. 
You can pass -1 to
+         * disable it. The default flush size is 1000.
+         *
+         * @param numMaxActions the maximum number of actions to buffer per 
bulk request.
+         * @return this builder
+         */
+        public MongoWriteOptionsBuilder setBulkFlushMaxActions(int 
numMaxActions) {
+            checkArgument(
+                    numMaxActions == -1 || numMaxActions > 0,
+                    "Max number of buffered actions must be larger than 0.");
+            this.bulkFlushMaxActions = numMaxActions;
+            return this;
+        }
+
+        /**
+         * Sets the bulk flush interval, in milliseconds. You can pass -1 to 
disable it.
+         *
+         * @param intervalMillis the bulk flush interval, in milliseconds.
+         * @return this builder
+         */
+        public MongoWriteOptionsBuilder setBulkFlushIntervalMs(long 
intervalMillis) {
+            checkArgument(
+                    intervalMillis == -1 || intervalMillis >= 0,
+                    "Interval (in milliseconds) between each flush must be 
larger than "
+                            + "or equal to 0.");
+            this.bulkFlushIntervalMs = intervalMillis;
+            return this;
+        }
+
+        /**
+         * Sets the max retry times if writing records failed.
+         *
+         * @param maxRetryTimes the max retry times.
+         * @return this builder
+         */
+        public MongoWriteOptionsBuilder setMaxRetryTimes(int maxRetryTimes) {
+            checkArgument(
+                    maxRetryTimes >= 0,
+                    "The sink max retry times must be larger than or equal to 
0.");
+            this.maxRetryTimes = maxRetryTimes;
+            return this;
+        }
+
+        /**
+         * Sets the retry interval if writing records to database failed.
+         *
+         * @param retryIntervalMs the retry time interval, in milliseconds.
+         * @return this builder
+         */
+        public MongoWriteOptionsBuilder setRetryInterval(long retryIntervalMs) 
{
+            checkArgument(
+                    retryIntervalMs > 0,
+                    "The retry interval (in milliseconds) must be larger than 
0.");
+            this.retryIntervalMs = retryIntervalMs;
+            return this;
+        }
+
+        /**
+         * Sets the wanted {@link DeliveryGuarantee}. The default delivery 
guarantee is {@link
+         * DeliveryGuarantee#AT_LEAST_ONCE}
+         *
+         * @param deliveryGuarantee which describes the record emission 
behaviour
+         * @return this builder
+         */
+        public MongoWriteOptionsBuilder setDeliveryGuarantee(DeliveryGuarantee 
deliveryGuarantee) {
+            checkArgument(
+                    deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE,
+                    "Mongo sink does not support the EXACTLY_ONCE guarantee.");
+            this.deliveryGuarantee = checkNotNull(deliveryGuarantee);
+            return this;
+        }
+
+        /**
+         * Sets the parallelism of the Mongo sink operator. By default, the 
parallelism is
+         * determined by the framework using the same parallelism of the 
upstream chained operator.
+         */
+        public MongoWriteOptionsBuilder setParallelism(int parallelism) {

Review Comment:
   Got it.  
   I'll pass the parallelism explicitly to the TableSink constructor instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to