hlteoh37 commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1486041042


##########
example-datastream-job/src/main/java/org/apache/flink/connector/prometheus/examples/SimpleInstanceMetricsTimeSeriesGenerator.java:
##########
@@ -0,0 +1,93 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.examples;
+
+import org.apache.flink.connector.prometheus.sink.PrometheusTimeSeries;
+
+import org.apache.commons.lang3.RandomUtils;
+
+import java.io.Serializable;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
+
+/**
+ * Very simple dummy TimeSeries generator, generating random `CPU` and 
`Memory` samples for a given
+ * number of instances.
+ *
+ * <p>Sample timestamp is always the current system time. The value is random, 
between 0 and 1
+ */
+public class SimpleInstanceMetricsTimeSeriesGenerator implements Serializable {
+
+    private final int numberOfDummyInstances;
+    private final int minNrOfSamples;
+    private final int maxNrOfSamples;
+
+    public SimpleInstanceMetricsTimeSeriesGenerator(
+            int minNrOfSamples, int maxNrOfSamples, int 
numberOfDummyInstances) {
+        this.numberOfDummyInstances = numberOfDummyInstances;
+        this.minNrOfSamples = minNrOfSamples;
+        this.maxNrOfSamples = maxNrOfSamples;
+    }
+
+    private String dummyInstanceId(int number) {
+        return "I" + String.format("%010d", number);

Review Comment:
   nit: instance ID is lower case `i`



##########
prometheus-connector/README.md:
##########
@@ -0,0 +1,189 @@
+## Flink Prometheus connector (sink)
+
+Implementation of the Prometheus sink connector for DataStream API.
+
+The sink writes to Prometheus using the Remote-Write interface, based on 
[Remote-Write specifications version 
1.0](https://prometheus.io/docs/concepts/remote_write_spec/)
+
+### Guarantees and input restrictions
+
+Due to the strict 
[ordering](https://prometheus.io/docs/concepts/remote_write_spec/#ordering) and 
[format](https://prometheus.io/docs/concepts/remote_write_spec/#labels) 
requirements
+of Prometheus Remote-Write, the sink guarantees that input data are written to 
Prometheus only if input data are in order and well-formed.
+
+For efficiency, the connector does not do any validation.
+If input is out of order or malformed, the write request is rejected by 
Prometheus and data is discarded by the sink.
+The connector will log a warning and count rejected data in custom metrics, 
but the data is discarded.
+
+The sink receives as input time-series, each containing one or more samples. 
+To optimise the write throughput, input time-series are batched, in the order 
they are received, and written with a single write-request.
+
+If a write-request contains any out-of-order or malformed data, **the entire 
request is rejected** and all time series are discarded.
+The reason is Remote-Write specifications [explicitly forbids 
retrying](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff)
 of rejected write requests (4xx responses).
+and the Prometheus response does not contain enough information to efficiently 
partially retry the write, discarding the offending data.
+
+### Responsibilities of the application
+
+It is responsibility of the application sending the data to the sink in the 
correct order and format.
+
+1. Input time-series must be well-formed, e.g. only valid and non-duplicated 
labels, 
+samples in timestamp order (see [Labels and 
Ordering](https://prometheus.io/docs/concepts/remote_write_spec/#labels) in 
Prometheus Remote-Write specs).
+2. Input time-series with identical labels are sent to the sink in timestamp 
order.
+3. If sink parallelism > 1 is used, the input stream must be partitioned so 
that all time-series with identical labels go to the same sink subtask. A 
`KeySelector` is provided to partition input correctly (see 
[Partitioning](#partitioning), below). 
+
+
+#### Sink input objects
+
+To help sending well-formed data to the sink, the connector expect 
[`PrometheusTimeSeries`](./src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java)
 POJOs as input.
+
+Each `PrometheusTimeSeries` instance maps 1-to-1 to a [remote-write 
`TimeSeries`](https://prometheus.io/docs/concepts/remote_write_spec/#protocol). 
Each object contains:
+* exactly one `metericName`, mapped to the special  `__name__` label
+* optionally, any number of additional labels { k: String, v:String }
+* one or more `Samples` { value: double, timestamp: long } - must be in 
timestamp order
+
+`PrometheusTimeSeries` provides a builder interface.
+
+```java
+
+// List<Tuple2<Double, Long>> samples = ...
+
+PrometheusTimeSeries.Builder tsBuilder = PrometheusTimeSeries.builder()
+    .withMetricName("CPU") // mapped to  `__name__` label
+    .addLabel("InstanceID", instanceId)
+    .addLabel("AcccountID", accountId);
+    
+for(Tuple2<Double, Long> sample : samples) {
+    tsBuilder.addSample(sample.f0, sample.f1);
+}
+
+PrometheusTimeSeries ts = tsBuilder.build();
+```
+
+
+**Important**: for efficiency, the builder does reorder the samples. It is 
responsibility of the application to **add samples in timestamp order**.
+
+### Batching, blocking writes and retry
+
+The sink batches multiple time-series into a single write-request, retaining 
the order..
+
+Batching is based on the number of samples. Each write-request contains up to 
500 samples, with a max buffering time of 5 seconds 
+(both configurable). The number of time-series doesn't matter.
+
+As by [Prometheus Remote-Write 
specifications](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff),
 
+the sink retries 5xx and 429 responses. Retrying is blocking, to retain sample 
ordering, and uses and exponential backoff.
+
+The exponential backoff starts with an initial delay (default 30 ms) and 
increases it exponentially up to a max retry 
+delay (default 5 sec). It continues retrying until the max number of retries 
is reached (default reties forever).
+
+On non-retriable error response (4xx, except 429, non retryable exceptions), 
or on reaching the retry limit, 
+**the entire write-request**, containing the batch of time-series, **is 
dropped**.
+
+Every dropped request is logged at WARN level, including the reason provided 
by the remote-write endpoint.

Review Comment:
   We can mention the configuration here too



##########
prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/RequestEntrySizeUtils.java:
##########
@@ -0,0 +1,61 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+
+import java.util.Collection;
+
+/** Collection of methods to calculate the sink RequestEntry "size". */
+public class RequestEntrySizeUtils {
+
+    /**
+     * Size of a request entry (a {@link Types.TimeSeries time-series}) for 
the purpose of batching.
+     * Count the number of {@link Types.Sample samples}
+     *
+     * @param requestEntry a time-series
+     * @return number of Samples in the TimeSeries
+     */
+    public static long requestSizeForBatching(Types.TimeSeries requestEntry) {
+        return requestEntry.getSamplesCount();
+    }
+
+    /**
+     * Serialized size of a request entry {@link Types.TimeSeries TimeSeries}: 
the number of bytes
+     * of the protobuf- serialized representation of the TimeSeries.
+     *
+     * @param requestEntry a time-series
+     * @return number of bytes
+     */
+    public static long requestSerializedSize(Types.TimeSeries requestEntry) {
+        return requestEntry.getSerializedSize();
+    }
+
+    /**
+     * Count the number of {@link Types.Sample samples} in a collection of 
{@link Types.TimeSeries
+     * time-series} (a batch).
+     *
+     * @param requestEntries collection of time-series
+     * @return number of samples
+     */
+    public static long countSamples(Collection<Types.TimeSeries> 
requestEntries) {
+        return requestEntries.stream()
+                .mapToLong(RequestEntrySizeUtils::requestSizeForBatching)
+                .sum();
+    }

Review Comment:
   Can we add unit test coverage for this?



##########
prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java:
##########
@@ -0,0 +1,246 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration;
+import 
org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder;
+import org.apache.flink.connector.prometheus.sink.http.RetryConfiguration;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Optional;
+
+/** Builder for Sink implementation. */
+public class PrometheusSinkBuilder
+        extends AsyncSinkBaseBuilder<
+                PrometheusTimeSeries, Types.TimeSeries, PrometheusSinkBuilder> 
{
+    private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusSinkBuilder.class);
+
+    private static final int DEFAULT_MAX_BATCH_SIZE_IN_SAMPLES = 500;
+    private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000;
+    private static final int DEFAULT_MAX_BUFFERED_REQUESTS =
+            1000; // Max nr of requestEntry that will be buffered
+
+    private String prometheusRemoteWriteUrl;
+    private RetryConfiguration retryConfiguration;
+    private Integer socketTimeoutMs;
+    private PrometheusRequestSigner requestSigner = null;
+    private Integer maxBatchSizeInSamples;
+    private Integer maxRecordSizeInSamples;
+    private String httpUserAgent = null;
+    private SinkWriterErrorHandlingBehaviorConfiguration 
errorHandlingBehaviorConfig = null;
+
+    @Override
+    public AsyncSinkBase<PrometheusTimeSeries, Types.TimeSeries> build() {
+
+        int maxInFlightRequest = 1;

Review Comment:
   nit: this could be a static constant. (I understand this is a copy of the 
real value in the sink itself)



##########
amp-request-signer/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AWS4SignerForAuthorizationHeader.java:
##########
@@ -0,0 +1,124 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink.aws;
+
+import com.amazonaws.util.BinaryUtils;
+
+import java.net.URL;
+import java.util.Date;
+import java.util.Map;
+
+/**
+ * Sample AWS4 signer demonstrating how to sign requests to Amazon S3 using an 
'Authorization'
+ * header.
+ */
+public class AWS4SignerForAuthorizationHeader extends AWS4SignerBase {

Review Comment:
   Can we add unit tests?



##########
prometheus-connector/README.md:
##########
@@ -0,0 +1,189 @@
+## Flink Prometheus connector (sink)
+
+Implementation of the Prometheus sink connector for DataStream API.
+
+The sink writes to Prometheus using the Remote-Write interface, based on 
[Remote-Write specifications version 
1.0](https://prometheus.io/docs/concepts/remote_write_spec/)
+
+### Guarantees and input restrictions
+
+Due to the strict 
[ordering](https://prometheus.io/docs/concepts/remote_write_spec/#ordering) and 
[format](https://prometheus.io/docs/concepts/remote_write_spec/#labels) 
requirements
+of Prometheus Remote-Write, the sink guarantees that input data are written to 
Prometheus only if input data are in order and well-formed.
+
+For efficiency, the connector does not do any validation.
+If input is out of order or malformed, the write request is rejected by 
Prometheus and data is discarded by the sink.

Review Comment:
   We can mention our sink configurations



##########
example-datastream-job/src/main/java/org/apache/flink/connector/prometheus/examples/DataStreamJob.java:
##########
@@ -0,0 +1,140 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.examples;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner;
+import org.apache.flink.connector.prometheus.sink.PrometheusSink;
+import org.apache.flink.connector.prometheus.sink.PrometheusTimeSeries;
+import 
org.apache.flink.connector.prometheus.sink.PrometheusTimeSeriesLabelsAndMetricNameKeySelector;
+import 
org.apache.flink.connector.prometheus.sink.aws.AmazonManagedPrometheusWriteRequestSigner;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.OnErrorBehavior;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration;
+import org.apache.flink.connector.prometheus.sink.http.RetryConfiguration;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.function.Supplier;
+
+/** Test application testing the Prometheus sink connector. */
+public class DataStreamJob {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DataStreamJob.class);
+
+    public static void main(String[] args) throws Exception {
+        StreamExecutionEnvironment env;
+
+        // Conditionally return a local execution environment with
+        if (args.length > 0 && 
Arrays.stream(args).anyMatch("--webUI"::equalsIgnoreCase)) {
+            Configuration conf = new Configuration();
+            conf.set(
+                    
ConfigOptions.key("rest.flamegraph.enabled").booleanType().noDefaultValue(),
+                    true);
+            env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
+        } else {
+            env = StreamExecutionEnvironment.getExecutionEnvironment();
+        }
+
+        env.setParallelism(2);
+
+        ParameterTool applicationParameters = ParameterTool.fromArgs(args);
+
+        // Prometheus remote-write URL
+        String prometheusRemoteWriteUrl = 
applicationParameters.get("prometheusRemoteWriteUrl");
+        LOGGER.info("Prometheus URL:{}", prometheusRemoteWriteUrl);
+
+        // Optionally configure Amazon Managed Prometheus request signer
+        PrometheusRequestSigner requestSigner = null;
+        String ampAWSRegion = applicationParameters.get("awsRegion");
+        if (ampAWSRegion != null) {
+            requestSigner =
+                    new AmazonManagedPrometheusWriteRequestSigner(
+                            prometheusRemoteWriteUrl, ampAWSRegion);
+            LOGGER.info(
+                    "Enable Amazon Managed Prometheus request-signer, region: 
{}", ampAWSRegion);
+        }
+
+        // Configure data generator
+        int generatorMinSamplesPerTimeSeries = 1;
+        int generatorMaxSamplesPerTimeSeries = 10;
+        int generatorNumberOfDummyInstances = 5;
+        long generatorPauseBetweenTimeSeriesMs = 100;
+        LOGGER.info(
+                "Data Generator configuration:"
+                        + "\n\t\tMin samples per time series:{}\n\t\tMax 
samples per time series:{}\n\t\tPause between time series:{} ms"
+                        + "\n\t\tNumber of dummy instances:{}",
+                generatorMinSamplesPerTimeSeries,
+                generatorMaxSamplesPerTimeSeries,
+                generatorPauseBetweenTimeSeriesMs,
+                generatorNumberOfDummyInstances);
+
+        Supplier<PrometheusTimeSeries> eventGenerator =
+                new SimpleInstanceMetricsTimeSeriesGenerator(
+                                generatorMinSamplesPerTimeSeries,
+                                generatorMaxSamplesPerTimeSeries,
+                                generatorNumberOfDummyInstances)
+                        .generator();
+
+        SourceFunction<PrometheusTimeSeries> source =
+                new FixedDelayDataGenertorSource<>(

Review Comment:
   Typo - `Generator`



##########
example-datastream-job/README.md:
##########
@@ -0,0 +1,22 @@
+## Example  job using Prometheus Sink connector with DataStream API
+
+Sample application demonstrating the usage of Prometheus Sink Connector with 
DataStream API.
+
+The example demonstrates how to write to a generic, unauthenticated Prometheus 
remote-write URL, and optionally how to use the Amazon Managed Prometheus 
request signer.
+
+It generates random dummy Memory and CPU metrics from a number of instances, 
and writes them to Prometheus.
+
+### Configuration
+
+The application expects these parameters, via command line:
+
+* `--prometheusRemoteWriteUrl <URL>`: the Prometheus remote-write URL to target
+* `--awsRegion <region>`: (optional) if specified, it configures the Amazon 
Managed Prometheus request signer for a workspace in this Region
+* `--webUI`: (optional, for local development only) enables Flink Web UI, with 
flame graphs, for local development
+
+### Data generation
+
+The application generates random time series, containing `CPU` and `Memory` 
samples from 5 dummy "instances". A new time series is generated about every 
100ms. Each time series contains 1 to 10 samples.
+
+These parameters are configurable from the code

Review Comment:
   ```suggestion
   These parameters are configurable from the code.
   ```



##########
example-datastream-job/src/main/java/org/apache/flink/connector/prometheus/examples/DataStreamJob.java:
##########
@@ -0,0 +1,140 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.examples;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner;
+import org.apache.flink.connector.prometheus.sink.PrometheusSink;
+import org.apache.flink.connector.prometheus.sink.PrometheusTimeSeries;
+import 
org.apache.flink.connector.prometheus.sink.PrometheusTimeSeriesLabelsAndMetricNameKeySelector;
+import 
org.apache.flink.connector.prometheus.sink.aws.AmazonManagedPrometheusWriteRequestSigner;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.OnErrorBehavior;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration;
+import org.apache.flink.connector.prometheus.sink.http.RetryConfiguration;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.function.Supplier;
+
+/** Test application testing the Prometheus sink connector. */
+public class DataStreamJob {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DataStreamJob.class);
+
+    public static void main(String[] args) throws Exception {
+        StreamExecutionEnvironment env;
+
+        // Conditionally return a local execution environment with
+        if (args.length > 0 && 
Arrays.stream(args).anyMatch("--webUI"::equalsIgnoreCase)) {
+            Configuration conf = new Configuration();
+            conf.set(
+                    
ConfigOptions.key("rest.flamegraph.enabled").booleanType().noDefaultValue(),
+                    true);
+            env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
+        } else {
+            env = StreamExecutionEnvironment.getExecutionEnvironment();
+        }
+
+        env.setParallelism(2);

Review Comment:
   Can we explain why this is set to 2?



##########
prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallback.java:
##########
@@ -0,0 +1,192 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.OnErrorBehavior;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.PrometheusSinkWriteException;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration;
+import 
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseClassifier;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_DROPPED;
+import static 
org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_NON_RETRIABLE_DROPPED;
+import static 
org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_OUT;
+import static 
org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_RETRY_LIMIT_DROPPED;
+import static 
org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_WRITE_REQUESTS_OUT;
+import static 
org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_WRITE_REQUESTS_PERMANENTLY_FAILED;
+
+/**
+ * Callback handling the outcome of the http async request.
+ *
+ * <p>This class implements the error handling behaviour, based on the 
configuration in {@link
+ * SinkWriterErrorHandlingBehaviorConfiguration}. Depending on the condition, 
the sink may throw an
+ * exception and cause the job to fail, or log the condition to WARN, 
increment the counters and
+ * continue with the next request.
+ *
+ * <p>In any case, every write-request either entirely succeed or fail. 
Partial failures are not
+ * handled.
+ *
+ * <p>In no condition a write-request is re-queued for the AsyncSink to 
reprocess: this would cause
+ * out of order writes that would be rejected by Prometheus.
+ *
+ * <p>Note that the http async client retries, based on the configured retry 
policy. The callback is
+ * called with an outcome of *completed* either when the request has succeeded 
or the max retry
+ * limit has been exceeded. It is responsibility of the callback 
distinguishing between these
+ * conditions.
+ */
+class HttpResponseCallback implements FutureCallback<SimpleHttpResponse> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HttpResponseCallback.class);
+
+    private final int timeSeriesCount;
+    private final long sampleCount;
+    private final Consumer<List<Types.TimeSeries>> reQueuedResult;
+    private final SinkMetrics metrics;
+    private final SinkWriterErrorHandlingBehaviorConfiguration 
errorHandlingBehaviorConfig;
+
+    public HttpResponseCallback(
+            int timeSeriesCount,
+            long sampleCount,
+            SinkMetrics metrics,

Review Comment:
   nit: This can be abstracted away to another `metricsCallback`



##########
example-datastream-job/README.md:
##########
@@ -0,0 +1,22 @@
+## Example  job using Prometheus Sink connector with DataStream API
+
+Sample application demonstrating the usage of Prometheus Sink Connector with 
DataStream API.
+
+The example demonstrates how to write to a generic, unauthenticated Prometheus 
remote-write URL, and optionally how to use the Amazon Managed Prometheus 
request signer.
+
+It generates random dummy Memory and CPU metrics from a number of instances, 
and writes them to Prometheus.
+
+### Configuration
+
+The application expects these parameters, via command line:
+
+* `--prometheusRemoteWriteUrl <URL>`: the Prometheus remote-write URL to target
+* `--awsRegion <region>`: (optional) if specified, it configures the Amazon 
Managed Prometheus request signer for a workspace in this Region
+* `--webUI`: (optional, for local development only) enables Flink Web UI, with 
flame graphs, for local development

Review Comment:
   How do we run this? Should we include an example run command?



##########
example-datastream-job/src/main/java/org/apache/flink/connector/prometheus/examples/DataStreamJob.java:
##########
@@ -0,0 +1,140 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.examples;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner;
+import org.apache.flink.connector.prometheus.sink.PrometheusSink;
+import org.apache.flink.connector.prometheus.sink.PrometheusTimeSeries;
+import 
org.apache.flink.connector.prometheus.sink.PrometheusTimeSeriesLabelsAndMetricNameKeySelector;
+import 
org.apache.flink.connector.prometheus.sink.aws.AmazonManagedPrometheusWriteRequestSigner;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.OnErrorBehavior;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration;
+import org.apache.flink.connector.prometheus.sink.http.RetryConfiguration;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.function.Supplier;
+
+/** Test application testing the Prometheus sink connector. */
+public class DataStreamJob {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DataStreamJob.class);
+
+    public static void main(String[] args) throws Exception {
+        StreamExecutionEnvironment env;
+
+        // Conditionally return a local execution environment with
+        if (args.length > 0 && 
Arrays.stream(args).anyMatch("--webUI"::equalsIgnoreCase)) {
+            Configuration conf = new Configuration();
+            conf.set(
+                    
ConfigOptions.key("rest.flamegraph.enabled").booleanType().noDefaultValue(),
+                    true);
+            env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
+        } else {
+            env = StreamExecutionEnvironment.getExecutionEnvironment();
+        }
+
+        env.setParallelism(2);
+
+        ParameterTool applicationParameters = ParameterTool.fromArgs(args);
+
+        // Prometheus remote-write URL
+        String prometheusRemoteWriteUrl = 
applicationParameters.get("prometheusRemoteWriteUrl");
+        LOGGER.info("Prometheus URL:{}", prometheusRemoteWriteUrl);
+
+        // Optionally configure Amazon Managed Prometheus request signer
+        PrometheusRequestSigner requestSigner = null;
+        String ampAWSRegion = applicationParameters.get("awsRegion");
+        if (ampAWSRegion != null) {
+            requestSigner =
+                    new AmazonManagedPrometheusWriteRequestSigner(
+                            prometheusRemoteWriteUrl, ampAWSRegion);
+            LOGGER.info(
+                    "Enable Amazon Managed Prometheus request-signer, region: 
{}", ampAWSRegion);
+        }
+
+        // Configure data generator
+        int generatorMinSamplesPerTimeSeries = 1;
+        int generatorMaxSamplesPerTimeSeries = 10;
+        int generatorNumberOfDummyInstances = 5;

Review Comment:
   Hmm, this is hard to understand out of context. Can we add an overall 
description at the top of this class, or rename this to 
"NumberOfKeys/NumberOfUniqueMetrics" or something?



##########
example-datastream-job/src/main/java/org/apache/flink/connector/prometheus/examples/DataStreamJob.java:
##########
@@ -0,0 +1,140 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.examples;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner;
+import org.apache.flink.connector.prometheus.sink.PrometheusSink;
+import org.apache.flink.connector.prometheus.sink.PrometheusTimeSeries;
+import 
org.apache.flink.connector.prometheus.sink.PrometheusTimeSeriesLabelsAndMetricNameKeySelector;
+import 
org.apache.flink.connector.prometheus.sink.aws.AmazonManagedPrometheusWriteRequestSigner;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.OnErrorBehavior;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration;
+import org.apache.flink.connector.prometheus.sink.http.RetryConfiguration;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.function.Supplier;
+
+/** Test application testing the Prometheus sink connector. */
+public class DataStreamJob {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DataStreamJob.class);
+
+    public static void main(String[] args) throws Exception {
+        StreamExecutionEnvironment env;
+
+        // Conditionally return a local execution environment with
+        if (args.length > 0 && 
Arrays.stream(args).anyMatch("--webUI"::equalsIgnoreCase)) {
+            Configuration conf = new Configuration();
+            conf.set(
+                    
ConfigOptions.key("rest.flamegraph.enabled").booleanType().noDefaultValue(),
+                    true);
+            env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
+        } else {
+            env = StreamExecutionEnvironment.getExecutionEnvironment();
+        }
+
+        env.setParallelism(2);
+
+        ParameterTool applicationParameters = ParameterTool.fromArgs(args);
+
+        // Prometheus remote-write URL
+        String prometheusRemoteWriteUrl = 
applicationParameters.get("prometheusRemoteWriteUrl");
+        LOGGER.info("Prometheus URL:{}", prometheusRemoteWriteUrl);
+
+        // Optionally configure Amazon Managed Prometheus request signer
+        PrometheusRequestSigner requestSigner = null;
+        String ampAWSRegion = applicationParameters.get("awsRegion");
+        if (ampAWSRegion != null) {
+            requestSigner =
+                    new AmazonManagedPrometheusWriteRequestSigner(
+                            prometheusRemoteWriteUrl, ampAWSRegion);
+            LOGGER.info(
+                    "Enable Amazon Managed Prometheus request-signer, region: 
{}", ampAWSRegion);
+        }
+
+        // Configure data generator
+        int generatorMinSamplesPerTimeSeries = 1;
+        int generatorMaxSamplesPerTimeSeries = 10;
+        int generatorNumberOfDummyInstances = 5;
+        long generatorPauseBetweenTimeSeriesMs = 100;
+        LOGGER.info(
+                "Data Generator configuration:"
+                        + "\n\t\tMin samples per time series:{}\n\t\tMax 
samples per time series:{}\n\t\tPause between time series:{} ms"
+                        + "\n\t\tNumber of dummy instances:{}",
+                generatorMinSamplesPerTimeSeries,
+                generatorMaxSamplesPerTimeSeries,
+                generatorPauseBetweenTimeSeriesMs,
+                generatorNumberOfDummyInstances);
+
+        Supplier<PrometheusTimeSeries> eventGenerator =
+                new SimpleInstanceMetricsTimeSeriesGenerator(
+                                generatorMinSamplesPerTimeSeries,
+                                generatorMaxSamplesPerTimeSeries,
+                                generatorNumberOfDummyInstances)
+                        .generator();
+
+        SourceFunction<PrometheusTimeSeries> source =
+                new FixedDelayDataGenertorSource<>(
+                        PrometheusTimeSeries.class,
+                        eventGenerator,
+                        generatorPauseBetweenTimeSeriesMs);
+
+        DataStream<PrometheusTimeSeries> prometheusTimeSeries = 
env.addSource(source);
+
+        // Build the sink showing all supported configuration parameters.
+        // It is not mandatory to specify all configurations, as they will 
fall back to the default
+        // value
+        AsyncSinkBase<PrometheusTimeSeries, Types.TimeSeries> sink =
+                PrometheusSink.builder()
+                        .setMaxBatchSizeInSamples(500)
+                        .setMaxRecordSizeInSamples(500)
+                        .setMaxTimeInBufferMS(5000)
+                        .setRetryConfiguration(
+                                RetryConfiguration.builder()
+                                        .setInitialRetryDelayMS(30L)
+                                        .setMaxRetryDelayMS(5000L)
+                                        .setMaxRetryCount(100)
+                                        .build())
+                        .setSocketTimeoutMs(5000)

Review Comment:
   nit: Should we leave comment after to show which are optional?
   e.g.
   ```
      .setMaxBatchSizeInSamples(500) // Optional, defaults to 500
   ```



##########
example-datastream-job/src/main/resources/log4j2.properties:
##########
@@ -0,0 +1,29 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+#logger.tracing.name = org.apache.flink.connector.prometheus.sink
+#logger.tracing.level = trace
+#logger.tracing.additivity = false
+#logger.tracing.appenderRef.console.ref = ConsoleAppender

Review Comment:
   Can we remove these, or explain when to uncomment for testing?



##########
prometheus-connector/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusStateSerializerTest.java:
##########
@@ -0,0 +1,108 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class PrometheusStateSerializerTest {
+
+    private static final ElementConverter<PrometheusTimeSeries, 
Types.TimeSeries>
+            ELEMENT_CONVERTER = new PrometheusTimeSeriesConverter();
+
+    private static PrometheusTimeSeries getTestTimeSeries(int i) {
+        return PrometheusTimeSeries.builder()
+                .withMetricName("metric-name")
+                .addLabel("dimension-a", "value-" + i)
+                .addSample(i + 42.0, i + 1L)
+                .addSample(i + 3.14, i + 2L)
+                .build();
+    }
+
+    // This method uses the same implementation as 
PrometheusSinkWriter.getSizeInBytes() to extract
+    // the requestEntry "size"
+    // (i.e. the number of Samples). This is the "size" used in 
RequestEntryWrapper
+    // see
+    // 
https://github.com/apache/flink/blob/69e812688b43be9a0c4f79e6af81bc2d1d8a873e/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterStateSerializer.java#L60

Review Comment:
   Can we consider making this a javadoc + link to code?
   For example:
   ```
       /**
        * This method uses the same implementation as 
PrometheusSinkWriter.getSizeInBytes() to extract
        * the requestEntry "size"
        * (i.e. the number of Samples). This is the "size" used in 
RequestEntryWrapper
        * see {@link 
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer#serialize(BufferedRequestState)}
        */
   ```



##########
prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallback.java:
##########
@@ -0,0 +1,192 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.OnErrorBehavior;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.PrometheusSinkWriteException;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration;
+import 
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseClassifier;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_DROPPED;
+import static 
org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_NON_RETRIABLE_DROPPED;
+import static 
org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_OUT;
+import static 
org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_RETRY_LIMIT_DROPPED;
+import static 
org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_WRITE_REQUESTS_OUT;
+import static 
org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_WRITE_REQUESTS_PERMANENTLY_FAILED;
+
+/**
+ * Callback handling the outcome of the http async request.
+ *
+ * <p>This class implements the error handling behaviour, based on the 
configuration in {@link
+ * SinkWriterErrorHandlingBehaviorConfiguration}. Depending on the condition, 
the sink may throw an
+ * exception and cause the job to fail, or log the condition to WARN, 
increment the counters and
+ * continue with the next request.
+ *
+ * <p>In any case, every write-request either entirely succeed or fail. 
Partial failures are not
+ * handled.
+ *
+ * <p>In no condition a write-request is re-queued for the AsyncSink to 
reprocess: this would cause
+ * out of order writes that would be rejected by Prometheus.
+ *
+ * <p>Note that the http async client retries, based on the configured retry 
policy. The callback is
+ * called with an outcome of *completed* either when the request has succeeded 
or the max retry
+ * limit has been exceeded. It is responsibility of the callback 
distinguishing between these
+ * conditions.
+ */
+class HttpResponseCallback implements FutureCallback<SimpleHttpResponse> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HttpResponseCallback.class);
+
+    private final int timeSeriesCount;
+    private final long sampleCount;
+    private final Consumer<List<Types.TimeSeries>> reQueuedResult;
+    private final SinkMetrics metrics;
+    private final SinkWriterErrorHandlingBehaviorConfiguration 
errorHandlingBehaviorConfig;
+
+    public HttpResponseCallback(
+            int timeSeriesCount,
+            long sampleCount,
+            SinkMetrics metrics,
+            SinkWriterErrorHandlingBehaviorConfiguration 
errorHandlingBehaviorConfig,
+            Consumer<List<Types.TimeSeries>> reQueuedResult) {
+        this.timeSeriesCount = timeSeriesCount;
+        this.sampleCount = sampleCount;
+        this.reQueuedResult = reQueuedResult;
+        this.metrics = metrics;
+        this.errorHandlingBehaviorConfig = errorHandlingBehaviorConfig;
+    }
+
+    /**
+     * The completed outcome is invoked every time the http client 
successfully receives a valid
+     * http response, regardless of the status code.
+     *
+     * <p>This method classifies the responses and implements the behaviour 
expected by the
+     * Remote-Write specifications. In case of error, the behaviour is 
determined by the error
+     * handling configuration.
+     */
+    @Override
+    public void completed(SimpleHttpResponse response) {

Review Comment:
   Do we want to consider expanding out nested `if-else` into linear with 
`return` instead?
   
   See example here: 
https://stackoverflow.com/questions/56356190/nested-if-conditions-vs-multiple-separated-if-conditions-with-return-statement-i



##########
prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusStateSerializer.java:
##########
@@ -0,0 +1,154 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import 
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Serializes/deserializes the sink request-entry, the protobuf-generated 
{@link Types.TimeSeries},
+ * using protobuf.
+ */
+public class PrometheusStateSerializer extends 
AsyncSinkWriterStateSerializer<Types.TimeSeries> {
+    private static final int VERSION = 1;
+
+    // Copied from AsyncSinkWriterStateSerializer.DATA_IDENTIFIER
+    private static final long DATA_IDENTIFIER = -1;
+
+    @Override
+    protected void serializeRequestToStream(Types.TimeSeries request, 
DataOutputStream out)
+            throws IOException {
+        byte[] serializedRequest = request.toByteArray();
+        out.write(serializedRequest);
+    }
+
+    @Override
+    protected Types.TimeSeries deserializeRequestFromStream(long requestSize, 
DataInputStream in)
+            throws IOException {
+        // The size written into the serialized stat is the size of the 
protobuf-serialized
+        // time-series
+        byte[] requestData = new byte[(int) requestSize];
+        in.read(requestData);
+        return Types.TimeSeries.parseFrom(requestData);
+    }
+
+    @Override
+    public int getVersion() {
+        return VERSION;
+    }
+
+    /**
+     * Overrides the original implementation that assumes the serialized size 
is the value returned
+     * by {@link PrometheusSinkWriter#getSizeInBytes(Types.TimeSeries)}
+     *
+     * <p>Most of the code is copied from the original implementation of
+     * AsyncSinkWriterStateSerializer.serialize().
+     *
+     * <p>The state is serialized in form of
+     * [DATA_IDENTIFIER,NUM_OF_ELEMENTS,SIZE1,REQUEST1,SIZE2,REQUEST2....], 
where REQUESTn is the
+     * Protobuf-serialized representation of a {@link Types.TimeSeries 
TimeSeries}. In this
+     * implementation SIZEn is the size of the Protobuf serialization, in 
bytes, that does not match
+     * the "size" of a {@link RequestEntryWrapper}.
+     *
+     * @param bufferedRequestState The buffered request state to be serialized
+     * @return serialized buffered request state
+     * @throws IOException
+     */
+    @Override
+    public byte[] serialize(BufferedRequestState<Types.TimeSeries> 
bufferedRequestState)
+            throws IOException {
+        Collection<RequestEntryWrapper<Types.TimeSeries>> bufferState =
+                bufferedRequestState.getBufferedRequestEntries();
+
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                final DataOutputStream out = new DataOutputStream(baos)) {
+
+            out.writeLong(DATA_IDENTIFIER); // DATA_IDENTIFIER
+            out.writeInt(bufferState.size()); // NUM_OF_ELEMENTS
+
+            for (RequestEntryWrapper<Types.TimeSeries> wrapper : bufferState) {
+                // In the serialized state we write the size of the serialized 
representation,
+                // rather than the size
+                // held in RequestEntryWrapper, that is the output of
+                // AsyncSinkWriter.getSizeInBytes()
+                long requestEntrySize =
+                        
RequestEntrySizeUtils.requestSerializedSize(wrapper.getRequestEntry());
+                out.writeLong(requestEntrySize); // SIZEn
+                serializeRequestToStream(wrapper.getRequestEntry(), out); // 
REQUESTn

Review Comment:
   remove the `n`?



##########
pom.xml:
##########
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-connector-parent</artifactId>
+        <version>1.0.0</version>
+    </parent>
+
+    <groupId>org.apache.flink</groupId>
+    <artifactId>flink-connector-prometheus-parent</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+    <packaging>pom</packaging>
+
+    <name>Flink Prometheus</name>
+    <modules>
+        <module>prometheus-connector</module>
+        <module>amp-request-signer</module>
+        <module>example-datastream-job</module>
+    </modules>
+
+    <licenses>
+        <license>
+            <name>The Apache Software License, Version 2.0</name>
+            <url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
+            <distribution>repo</distribution>
+        </license>
+    </licenses>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <target.java.version>11</target.java.version>
+        <maven.compiler.source>${target.java.version}</maven.compiler.source>
+        <maven.compiler.target>${target.java.version}</maven.compiler.target>
+        <flink.version>1.17.0</flink.version>
+        <protobuf.version>3.22.2</protobuf.version>
+        <apache.httpclient.version>5.2.1</apache.httpclient.version>
+        <aws.sdkv1.version>1.12.570</aws.sdkv1.version>
+        <log4j.version>2.17.1</log4j.version>
+    </properties>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>com.amazonaws</groupId>
+                <artifactId>aws-java-sdk-bom</artifactId>
+                <version>${aws.sdkv1.version}</version>

Review Comment:
   Why are we using AWS SDK v1? Can we use v2?



##########
prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/prometheus/Remote.java:
##########
@@ -0,0 +1,6661 @@
+/*

Review Comment:
   Should we consider adding API snapshot tests for this class?



##########
prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java:
##########
@@ -0,0 +1,129 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration;
+import 
org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+
+import java.util.Collection;
+
+/** Sink implementation accepting {@link PrometheusTimeSeries} as inputs. */
+@Public
+public class PrometheusSink extends AsyncSinkBase<PrometheusTimeSeries, 
Types.TimeSeries> {
+    private final String prometheusRemoteWriteUrl;
+    private final PrometheusAsyncHttpClientBuilder clientBuilder;
+    private final PrometheusRequestSigner requestSigner;
+    private final int maxBatchSizeInSamples;
+    private final String httpUserAgent;
+    private final SinkWriterErrorHandlingBehaviorConfiguration 
errorHandlingBehaviorConfig;
+
+    protected PrometheusSink(

Review Comment:
   Can we add a configuration to specify a metrics group prefix? 



##########
prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusRequestSigner.java:
##########
@@ -0,0 +1,43 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.annotation.Public;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Interface for a request signer, specific of the Prometheus implementation.
+ *
+ * <p>A request signer implementation can generate additional Http request 
headers, based on the
+ * existing headers and the request body.
+ */
+@Public
+public interface PrometheusRequestSigner extends Serializable {
+
+    /**
+     * Add to the existing http request headers any additional header required 
by the specific
+     * Prometheus implementation for signing.
+     *
+     * @param requestHeaders original Http request headers. For efficiency, 
the implementation is
+     *     expected to modify the Map in place. The Map is expected to be 
mutable.
+     * @param requestBody request body, already compressed.
+     */
+    void addSignatureHeaders(Map<String, String> requestHeaders, byte[] 
requestBody);

Review Comment:
   Let's add an explanation here as to why we mutate request headers instead of 
creating a copy - for performance reasons



##########
prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java:
##########
@@ -0,0 +1,186 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.annotation.Public;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Pojo used as sink input, containing a single TimeSeries: a list of Labels 
and a list of Samples.
+ *
+ * <p>metricName is mapped in Prometheus to the value of the mandatory label 
named '__name__'
+ * labels. The other labels, as key/value, are appended after the '__name__' 
label.
+ */
+@Public
+public class PrometheusTimeSeries implements Serializable {
+    /** A single Label. */
+    public static class Label implements Serializable {
+        private final String name;
+        private final String value;
+
+        public Label(String name, String value) {

Review Comment:
   Can we add some `null`/`empty` validations for these data classes? (+ unit  
tests too!)



##########
prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java:
##########
@@ -0,0 +1,186 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.annotation.Public;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Pojo used as sink input, containing a single TimeSeries: a list of Labels 
and a list of Samples.
+ *
+ * <p>metricName is mapped in Prometheus to the value of the mandatory label 
named '__name__'
+ * labels. The other labels, as key/value, are appended after the '__name__' 
label.
+ */
+@Public
+public class PrometheusTimeSeries implements Serializable {
+    /** A single Label. */
+    public static class Label implements Serializable {
+        private final String name;
+        private final String value;
+
+        public Label(String name, String value) {
+            this.name = name;
+            this.value = value;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public String getValue() {
+            return value;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            Label label = (Label) o;
+            return new EqualsBuilder()
+                    .append(name, label.name)
+                    .append(value, label.value)
+                    .isEquals();
+        }

Review Comment:
   Can we add unit test coverage for this?



##########
prometheus-connector/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusStateSerializerTest.java:
##########
@@ -0,0 +1,108 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class PrometheusStateSerializerTest {
+
+    private static final ElementConverter<PrometheusTimeSeries, 
Types.TimeSeries>
+            ELEMENT_CONVERTER = new PrometheusTimeSeriesConverter();
+
+    private static PrometheusTimeSeries getTestTimeSeries(int i) {
+        return PrometheusTimeSeries.builder()
+                .withMetricName("metric-name")
+                .addLabel("dimension-a", "value-" + i)
+                .addSample(i + 42.0, i + 1L)
+                .addSample(i + 3.14, i + 2L)
+                .build();
+    }
+
+    // This method uses the same implementation as 
PrometheusSinkWriter.getSizeInBytes() to extract
+    // the requestEntry "size"
+    // (i.e. the number of Samples). This is the "size" used in 
RequestEntryWrapper
+    // see
+    // 
https://github.com/apache/flink/blob/69e812688b43be9a0c4f79e6af81bc2d1d8a873e/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterStateSerializer.java#L60

Review Comment:
   (we can replace the qualified name with an import for the class)



##########
prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java:
##########
@@ -0,0 +1,186 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.annotation.Public;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Pojo used as sink input, containing a single TimeSeries: a list of Labels 
and a list of Samples.
+ *
+ * <p>metricName is mapped in Prometheus to the value of the mandatory label 
named '__name__'
+ * labels. The other labels, as key/value, are appended after the '__name__' 
label.
+ */
+@Public
+public class PrometheusTimeSeries implements Serializable {
+    /** A single Label. */
+    public static class Label implements Serializable {
+        private final String name;
+        private final String value;
+
+        public Label(String name, String value) {
+            this.name = name;
+            this.value = value;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public String getValue() {
+            return value;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            Label label = (Label) o;
+            return new EqualsBuilder()
+                    .append(name, label.name)
+                    .append(value, label.value)
+                    .isEquals();
+        }
+
+        @Override
+        public int hashCode() {
+            return new HashCodeBuilder(17, 
37).append(name).append(value).toHashCode();
+        }
+    }
+
+    /** A single Sample. */
+    public static class Sample implements Serializable {
+        private final double value;
+        private final long timestamp;
+
+        public Sample(double value, long timestamp) {
+            this.value = value;
+            this.timestamp = timestamp;
+        }
+
+        public double getValue() {
+            return value;
+        }
+
+        public long getTimestamp() {
+            return timestamp;
+        }
+    }
+
+    private final Label[] labels;
+    private final Sample[] samples;
+    private final String metricName;
+
+    public PrometheusTimeSeries(String metricName, Label[] labels, Sample[] 
samples) {
+        this.metricName = metricName;
+        this.labels = labels;
+        this.samples = samples;
+    }
+
+    public Label[] getLabels() {
+        return labels;
+    }
+
+    public Sample[] getSamples() {
+        return samples;
+    }
+
+    public String getMetricName() {
+        return metricName;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static Builder builderFrom(PrometheusTimeSeries other) {
+        return new Builder(
+                Arrays.asList(other.labels), Arrays.asList(other.samples), 
other.metricName);
+    }
+
+    /** Builder for sink input pojo instance. */
+    public static final class Builder {
+        private List<Label> labels = new ArrayList<>();
+        private List<Sample> samples = new ArrayList<>();
+        private String metricName;
+
+        private Builder(List<Label> labels, List<Sample> samples, String 
metricName) {
+            this.labels = labels;
+            this.samples = samples;
+            this.metricName = metricName;
+        }

Review Comment:
   Can we add unit test coverage for this? 
   <img width="717" alt="image" 
src="https://github.com/apache/flink-connector-prometheus/assets/35062175/e03c8c41-bf9b-4339-ab0f-34ac4b62b241";>
   



##########
prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/prometheus/Remote.java:
##########
@@ -0,0 +1,6661 @@
+/*

Review Comment:
   Same for `Types.java` and `GoGoProtos.java`



##########
prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java:
##########
@@ -0,0 +1,117 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+
+import java.util.Collection;
+
+/** Sink implementation accepting {@link PrometheusTimeSeries} as inputs. */
+public class PrometheusSink extends AsyncSinkBase<PrometheusTimeSeries, 
Types.TimeSeries> {

Review Comment:
   Sorry!! I think better to use `@PublicEvolving` or `@Experimental`, either 
ok with me. Prefer `@Experimental` because it allows easier breaking changes on 
our side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to