nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1466268497


##########
prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java:
##########
@@ -0,0 +1,203 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+import 
org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder;
+import org.apache.flink.connector.prometheus.sink.http.RetryConfiguration;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Optional;
+
+/** Builder for Sink implementation. */
+public class PrometheusSinkBuilder
+        extends AsyncSinkBaseBuilder<
+                PrometheusTimeSeries, Types.TimeSeries, PrometheusSinkBuilder> 
{
+    private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusSinkBuilder.class);
+
+    private static final int DEFAULT_MAX_BATCH_SIZE_IN_SAMPLES = 500;
+    private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000;
+    private static final int DEFAULT_MAX_BUFFERED_REQUESTS =
+            1000; // Max nr of requestEntry that will be buffered
+
+    private String prometheusRemoteWriteUrl;
+    private RetryConfiguration retryConfiguration;
+    private Integer socketTimeoutMs;
+    private PrometheusRequestSigner requestSigner = null;
+    private Integer maxBatchSizeInSamples;
+    private Integer maxRecordSizeInSamples;
+
+    @Override
+    public AsyncSinkBase<PrometheusTimeSeries, Types.TimeSeries> build() {
+
+        int maxInFlightRequest = 1;
+
+        int actualMaxBatchSizeInSamples =
+                Optional.ofNullable(getMaxBatchSizeInSamples())
+                        .orElse(DEFAULT_MAX_BATCH_SIZE_IN_SAMPLES);
+        int actualMaxBufferedRequests =
+                
Optional.ofNullable(getMaxBufferedRequests()).orElse(DEFAULT_MAX_BUFFERED_REQUESTS);
+        long actualMaxTimeInBufferMS =
+                
Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS);
+        int actualMaxRecordSizeInSamples =
+                Optional.ofNullable(getMaxRecordSizeInSamples())
+                        .orElse(getMaxBatchSizeInSamples()); // By default, 
set max record size to
+        // max batch size (in samples)
+
+        int actualSocketTimeoutMs =
+                Optional.ofNullable(getSocketTimeoutMs())
+                        
.orElse(PrometheusAsyncHttpClientBuilder.DEFAULT_SOCKET_TIMEOUT_MS);
+
+        Preconditions.checkArgument(
+                StringUtils.isNotBlank(prometheusRemoteWriteUrl),
+                "Missing or blank Prometheus Remote-Write URL");
+        checkValidRemoteWriteUrl(prometheusRemoteWriteUrl);
+        Preconditions.checkNotNull(retryConfiguration, "Missing retry 
configuration");
+        Preconditions.checkArgument(
+                actualMaxBatchSizeInSamples > 0, "Max batch size (in samples) 
must be positive");
+        Preconditions.checkArgument(
+                actualMaxRecordSizeInSamples <= actualMaxBatchSizeInSamples,
+                "Max record size (in samples) must be <= Max batch size");
+
+        LOG.info(
+                "Prometheus sink configuration:"
+                        + 
"\n\t\tmaxBatchSizeInSamples={}\n\t\tmaxRecordSizeInSamples={}"
+                        + 
"\n\t\tmaxTimeInBufferMs={}\n\t\tmaxInFlightRequests={}\n\t\tmaxBufferedRequests={}"
+                        + 
"\n\t\tinitialRetryDelayMs={}\n\t\tmaxRetryDelayMs={}\n\t\tmaxRetryCount={}"
+                        + "\n\t\tsocketTimeoutMs={}",
+                actualMaxBatchSizeInSamples,
+                actualMaxRecordSizeInSamples,
+                actualMaxTimeInBufferMS,
+                maxInFlightRequest,
+                actualMaxBufferedRequests,
+                retryConfiguration.getInitialRetryDelayMS(),
+                retryConfiguration.getMaxRetryDelayMS(),
+                retryConfiguration.getMaxRetryCount(),
+                socketTimeoutMs);
+
+        return new PrometheusSink(
+                new PrometheusTimeSeriesConverter(),
+                maxInFlightRequest,
+                actualMaxBufferedRequests,
+                actualMaxBatchSizeInSamples,
+                actualMaxRecordSizeInSamples,
+                actualMaxTimeInBufferMS,
+                prometheusRemoteWriteUrl,
+                new PrometheusAsyncHttpClientBuilder(retryConfiguration)
+                        .setSocketTimeout(actualSocketTimeoutMs),
+                requestSigner);
+    }
+
+    private static void checkValidRemoteWriteUrl(String url) {
+        try {
+            new URL(url);
+        } catch (MalformedURLException mue) {
+            throw new IllegalArgumentException("Invalid Remote-Write URL: " + 
url, mue);
+        }
+    }
+
+    public PrometheusSinkBuilder setPrometheusRemoteWriteUrl(String 
prometheusRemoteWriteUrl) {
+        this.prometheusRemoteWriteUrl = prometheusRemoteWriteUrl;
+        return this;
+    }
+
+    public PrometheusSinkBuilder setRequestSigner(PrometheusRequestSigner 
requestSigner) {
+        this.requestSigner = requestSigner;
+        return this;
+    }
+
+    public PrometheusSinkBuilder setMaxBatchSizeInSamples(int 
maxBatchSizeInSamples) {
+        this.maxBatchSizeInSamples = maxBatchSizeInSamples;
+        return this;
+    }
+
+    public PrometheusSinkBuilder setMaxRecordSizeInSamples(int 
maxRecordSizeInSamples) {
+        this.maxRecordSizeInSamples = maxRecordSizeInSamples;
+        return this;
+    }
+
+    public PrometheusSinkBuilder setRetryConfiguration(RetryConfiguration 
retryConfiguration) {
+        this.retryConfiguration = retryConfiguration;
+        return this;
+    }
+
+    public PrometheusSinkBuilder setSocketTimeoutMs(int socketTimeoutMs) {
+        this.socketTimeoutMs = socketTimeoutMs;
+        return this;
+    }
+
+    private Integer getMaxBatchSizeInSamples() {
+        return maxBatchSizeInSamples;
+    }
+
+    private Integer getMaxRecordSizeInSamples() {
+        return maxRecordSizeInSamples;
+    }
+
+    public RetryConfiguration getRetryConfiguration() {
+        return retryConfiguration;
+    }
+
+    public Integer getSocketTimeoutMs() {
+        return socketTimeoutMs;
+    }
+
+    /// Disable setting maxBatchSize, maxBatchSizeInBytes, and 
maxRecordSizeInBytes directly
+
+    @Override
+    public PrometheusSinkBuilder setMaxBatchSize(int maxBatchSize) {
+        throw new UnsupportedOperationException("maxBatchSize is not supported 
by this sink");
+    }
+
+    @Override
+    public PrometheusSinkBuilder setMaxBatchSizeInBytes(long 
maxBatchSizeInBytes) {
+        throw new UnsupportedOperationException(
+                "maxBatchSizeInBytes is not supported by this sink");

Review Comment:
   Added comments to all disabled parameters, explaining what to use instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to