hlteoh37 commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1486041042
########## example-datastream-job/src/main/java/org/apache/flink/connector/prometheus/examples/SimpleInstanceMetricsTimeSeriesGenerator.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.examples; + +import org.apache.flink.connector.prometheus.sink.PrometheusTimeSeries; + +import org.apache.commons.lang3.RandomUtils; + +import java.io.Serializable; +import java.util.function.BiFunction; +import java.util.function.Supplier; + +/** + * Very simple dummy TimeSeries generator, generating random `CPU` and `Memory` samples for a given + * number of instances. + * + * <p>Sample timestamp is always the current system time. The value is random, between 0 and 1 + */ +public class SimpleInstanceMetricsTimeSeriesGenerator implements Serializable { + + private final int numberOfDummyInstances; + private final int minNrOfSamples; + private final int maxNrOfSamples; + + public SimpleInstanceMetricsTimeSeriesGenerator( + int minNrOfSamples, int maxNrOfSamples, int numberOfDummyInstances) { + this.numberOfDummyInstances = numberOfDummyInstances; + this.minNrOfSamples = minNrOfSamples; + this.maxNrOfSamples = maxNrOfSamples; + } + + private String dummyInstanceId(int number) { + return "I" + String.format("%010d", number); Review Comment: nit: instance ID is lower case `i` ########## prometheus-connector/README.md: ########## @@ -0,0 +1,189 @@ +## Flink Prometheus connector (sink) + +Implementation of the Prometheus sink connector for DataStream API. + +The sink writes to Prometheus using the Remote-Write interface, based on [Remote-Write specifications version 1.0](https://prometheus.io/docs/concepts/remote_write_spec/) + +### Guarantees and input restrictions + +Due to the strict [ordering](https://prometheus.io/docs/concepts/remote_write_spec/#ordering) and [format](https://prometheus.io/docs/concepts/remote_write_spec/#labels) requirements +of Prometheus Remote-Write, the sink guarantees that input data are written to Prometheus only if input data are in order and well-formed. + +For efficiency, the connector does not do any validation. +If input is out of order or malformed, the write request is rejected by Prometheus and data is discarded by the sink. +The connector will log a warning and count rejected data in custom metrics, but the data is discarded. + +The sink receives as input time-series, each containing one or more samples. +To optimise the write throughput, input time-series are batched, in the order they are received, and written with a single write-request. + +If a write-request contains any out-of-order or malformed data, **the entire request is rejected** and all time series are discarded. +The reason is Remote-Write specifications [explicitly forbids retrying](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff) of rejected write requests (4xx responses). +and the Prometheus response does not contain enough information to efficiently partially retry the write, discarding the offending data. + +### Responsibilities of the application + +It is responsibility of the application sending the data to the sink in the correct order and format. + +1. Input time-series must be well-formed, e.g. only valid and non-duplicated labels, +samples in timestamp order (see [Labels and Ordering](https://prometheus.io/docs/concepts/remote_write_spec/#labels) in Prometheus Remote-Write specs). +2. Input time-series with identical labels are sent to the sink in timestamp order. +3. If sink parallelism > 1 is used, the input stream must be partitioned so that all time-series with identical labels go to the same sink subtask. A `KeySelector` is provided to partition input correctly (see [Partitioning](#partitioning), below). + + +#### Sink input objects + +To help sending well-formed data to the sink, the connector expect [`PrometheusTimeSeries`](./src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java) POJOs as input. + +Each `PrometheusTimeSeries` instance maps 1-to-1 to a [remote-write `TimeSeries`](https://prometheus.io/docs/concepts/remote_write_spec/#protocol). Each object contains: +* exactly one `metericName`, mapped to the special `__name__` label +* optionally, any number of additional labels { k: String, v:String } +* one or more `Samples` { value: double, timestamp: long } - must be in timestamp order + +`PrometheusTimeSeries` provides a builder interface. + +```java + +// List<Tuple2<Double, Long>> samples = ... + +PrometheusTimeSeries.Builder tsBuilder = PrometheusTimeSeries.builder() + .withMetricName("CPU") // mapped to `__name__` label + .addLabel("InstanceID", instanceId) + .addLabel("AcccountID", accountId); + +for(Tuple2<Double, Long> sample : samples) { + tsBuilder.addSample(sample.f0, sample.f1); +} + +PrometheusTimeSeries ts = tsBuilder.build(); +``` + + +**Important**: for efficiency, the builder does reorder the samples. It is responsibility of the application to **add samples in timestamp order**. + +### Batching, blocking writes and retry + +The sink batches multiple time-series into a single write-request, retaining the order.. + +Batching is based on the number of samples. Each write-request contains up to 500 samples, with a max buffering time of 5 seconds +(both configurable). The number of time-series doesn't matter. + +As by [Prometheus Remote-Write specifications](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff), +the sink retries 5xx and 429 responses. Retrying is blocking, to retain sample ordering, and uses and exponential backoff. + +The exponential backoff starts with an initial delay (default 30 ms) and increases it exponentially up to a max retry +delay (default 5 sec). It continues retrying until the max number of retries is reached (default reties forever). + +On non-retriable error response (4xx, except 429, non retryable exceptions), or on reaching the retry limit, +**the entire write-request**, containing the batch of time-series, **is dropped**. + +Every dropped request is logged at WARN level, including the reason provided by the remote-write endpoint. Review Comment: We can mention the configuration here too ########## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/RequestEntrySizeUtils.java: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.connector.prometheus.sink.prometheus.Types; + +import java.util.Collection; + +/** Collection of methods to calculate the sink RequestEntry "size". */ +public class RequestEntrySizeUtils { + + /** + * Size of a request entry (a {@link Types.TimeSeries time-series}) for the purpose of batching. + * Count the number of {@link Types.Sample samples} + * + * @param requestEntry a time-series + * @return number of Samples in the TimeSeries + */ + public static long requestSizeForBatching(Types.TimeSeries requestEntry) { + return requestEntry.getSamplesCount(); + } + + /** + * Serialized size of a request entry {@link Types.TimeSeries TimeSeries}: the number of bytes + * of the protobuf- serialized representation of the TimeSeries. + * + * @param requestEntry a time-series + * @return number of bytes + */ + public static long requestSerializedSize(Types.TimeSeries requestEntry) { + return requestEntry.getSerializedSize(); + } + + /** + * Count the number of {@link Types.Sample samples} in a collection of {@link Types.TimeSeries + * time-series} (a batch). + * + * @param requestEntries collection of time-series + * @return number of samples + */ + public static long countSamples(Collection<Types.TimeSeries> requestEntries) { + return requestEntries.stream() + .mapToLong(RequestEntrySizeUtils::requestSizeForBatching) + .sum(); + } Review Comment: Can we add unit test coverage for this? ########## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java: ########## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; +import org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration; +import org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder; +import org.apache.flink.connector.prometheus.sink.http.RetryConfiguration; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Optional; + +/** Builder for Sink implementation. */ +public class PrometheusSinkBuilder + extends AsyncSinkBaseBuilder< + PrometheusTimeSeries, Types.TimeSeries, PrometheusSinkBuilder> { + private static final Logger LOG = LoggerFactory.getLogger(PrometheusSinkBuilder.class); + + private static final int DEFAULT_MAX_BATCH_SIZE_IN_SAMPLES = 500; + private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000; + private static final int DEFAULT_MAX_BUFFERED_REQUESTS = + 1000; // Max nr of requestEntry that will be buffered + + private String prometheusRemoteWriteUrl; + private RetryConfiguration retryConfiguration; + private Integer socketTimeoutMs; + private PrometheusRequestSigner requestSigner = null; + private Integer maxBatchSizeInSamples; + private Integer maxRecordSizeInSamples; + private String httpUserAgent = null; + private SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehaviorConfig = null; + + @Override + public AsyncSinkBase<PrometheusTimeSeries, Types.TimeSeries> build() { + + int maxInFlightRequest = 1; Review Comment: nit: this could be a static constant. (I understand this is a copy of the real value in the sink itself) ########## amp-request-signer/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AWS4SignerForAuthorizationHeader.java: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink.aws; + +import com.amazonaws.util.BinaryUtils; + +import java.net.URL; +import java.util.Date; +import java.util.Map; + +/** + * Sample AWS4 signer demonstrating how to sign requests to Amazon S3 using an 'Authorization' + * header. + */ +public class AWS4SignerForAuthorizationHeader extends AWS4SignerBase { Review Comment: Can we add unit tests? ########## prometheus-connector/README.md: ########## @@ -0,0 +1,189 @@ +## Flink Prometheus connector (sink) + +Implementation of the Prometheus sink connector for DataStream API. + +The sink writes to Prometheus using the Remote-Write interface, based on [Remote-Write specifications version 1.0](https://prometheus.io/docs/concepts/remote_write_spec/) + +### Guarantees and input restrictions + +Due to the strict [ordering](https://prometheus.io/docs/concepts/remote_write_spec/#ordering) and [format](https://prometheus.io/docs/concepts/remote_write_spec/#labels) requirements +of Prometheus Remote-Write, the sink guarantees that input data are written to Prometheus only if input data are in order and well-formed. + +For efficiency, the connector does not do any validation. +If input is out of order or malformed, the write request is rejected by Prometheus and data is discarded by the sink. Review Comment: We can mention our sink configurations ########## example-datastream-job/src/main/java/org/apache/flink/connector/prometheus/examples/DataStreamJob.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.examples; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner; +import org.apache.flink.connector.prometheus.sink.PrometheusSink; +import org.apache.flink.connector.prometheus.sink.PrometheusTimeSeries; +import org.apache.flink.connector.prometheus.sink.PrometheusTimeSeriesLabelsAndMetricNameKeySelector; +import org.apache.flink.connector.prometheus.sink.aws.AmazonManagedPrometheusWriteRequestSigner; +import org.apache.flink.connector.prometheus.sink.errorhandling.OnErrorBehavior; +import org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration; +import org.apache.flink.connector.prometheus.sink.http.RetryConfiguration; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.function.Supplier; + +/** Test application testing the Prometheus sink connector. */ +public class DataStreamJob { + private static final Logger LOGGER = LoggerFactory.getLogger(DataStreamJob.class); + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env; + + // Conditionally return a local execution environment with + if (args.length > 0 && Arrays.stream(args).anyMatch("--webUI"::equalsIgnoreCase)) { + Configuration conf = new Configuration(); + conf.set( + ConfigOptions.key("rest.flamegraph.enabled").booleanType().noDefaultValue(), + true); + env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); + } else { + env = StreamExecutionEnvironment.getExecutionEnvironment(); + } + + env.setParallelism(2); + + ParameterTool applicationParameters = ParameterTool.fromArgs(args); + + // Prometheus remote-write URL + String prometheusRemoteWriteUrl = applicationParameters.get("prometheusRemoteWriteUrl"); + LOGGER.info("Prometheus URL:{}", prometheusRemoteWriteUrl); + + // Optionally configure Amazon Managed Prometheus request signer + PrometheusRequestSigner requestSigner = null; + String ampAWSRegion = applicationParameters.get("awsRegion"); + if (ampAWSRegion != null) { + requestSigner = + new AmazonManagedPrometheusWriteRequestSigner( + prometheusRemoteWriteUrl, ampAWSRegion); + LOGGER.info( + "Enable Amazon Managed Prometheus request-signer, region: {}", ampAWSRegion); + } + + // Configure data generator + int generatorMinSamplesPerTimeSeries = 1; + int generatorMaxSamplesPerTimeSeries = 10; + int generatorNumberOfDummyInstances = 5; + long generatorPauseBetweenTimeSeriesMs = 100; + LOGGER.info( + "Data Generator configuration:" + + "\n\t\tMin samples per time series:{}\n\t\tMax samples per time series:{}\n\t\tPause between time series:{} ms" + + "\n\t\tNumber of dummy instances:{}", + generatorMinSamplesPerTimeSeries, + generatorMaxSamplesPerTimeSeries, + generatorPauseBetweenTimeSeriesMs, + generatorNumberOfDummyInstances); + + Supplier<PrometheusTimeSeries> eventGenerator = + new SimpleInstanceMetricsTimeSeriesGenerator( + generatorMinSamplesPerTimeSeries, + generatorMaxSamplesPerTimeSeries, + generatorNumberOfDummyInstances) + .generator(); + + SourceFunction<PrometheusTimeSeries> source = + new FixedDelayDataGenertorSource<>( Review Comment: Typo - `Generator` ########## example-datastream-job/README.md: ########## @@ -0,0 +1,22 @@ +## Example job using Prometheus Sink connector with DataStream API + +Sample application demonstrating the usage of Prometheus Sink Connector with DataStream API. + +The example demonstrates how to write to a generic, unauthenticated Prometheus remote-write URL, and optionally how to use the Amazon Managed Prometheus request signer. + +It generates random dummy Memory and CPU metrics from a number of instances, and writes them to Prometheus. + +### Configuration + +The application expects these parameters, via command line: + +* `--prometheusRemoteWriteUrl <URL>`: the Prometheus remote-write URL to target +* `--awsRegion <region>`: (optional) if specified, it configures the Amazon Managed Prometheus request signer for a workspace in this Region +* `--webUI`: (optional, for local development only) enables Flink Web UI, with flame graphs, for local development + +### Data generation + +The application generates random time series, containing `CPU` and `Memory` samples from 5 dummy "instances". A new time series is generated about every 100ms. Each time series contains 1 to 10 samples. + +These parameters are configurable from the code Review Comment: ```suggestion These parameters are configurable from the code. ``` ########## example-datastream-job/src/main/java/org/apache/flink/connector/prometheus/examples/DataStreamJob.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.examples; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner; +import org.apache.flink.connector.prometheus.sink.PrometheusSink; +import org.apache.flink.connector.prometheus.sink.PrometheusTimeSeries; +import org.apache.flink.connector.prometheus.sink.PrometheusTimeSeriesLabelsAndMetricNameKeySelector; +import org.apache.flink.connector.prometheus.sink.aws.AmazonManagedPrometheusWriteRequestSigner; +import org.apache.flink.connector.prometheus.sink.errorhandling.OnErrorBehavior; +import org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration; +import org.apache.flink.connector.prometheus.sink.http.RetryConfiguration; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.function.Supplier; + +/** Test application testing the Prometheus sink connector. */ +public class DataStreamJob { + private static final Logger LOGGER = LoggerFactory.getLogger(DataStreamJob.class); + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env; + + // Conditionally return a local execution environment with + if (args.length > 0 && Arrays.stream(args).anyMatch("--webUI"::equalsIgnoreCase)) { + Configuration conf = new Configuration(); + conf.set( + ConfigOptions.key("rest.flamegraph.enabled").booleanType().noDefaultValue(), + true); + env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); + } else { + env = StreamExecutionEnvironment.getExecutionEnvironment(); + } + + env.setParallelism(2); Review Comment: Can we explain why this is set to 2? ########## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallback.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.connector.prometheus.sink.errorhandling.OnErrorBehavior; +import org.apache.flink.connector.prometheus.sink.errorhandling.PrometheusSinkWriteException; +import org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration; +import org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseClassifier; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; + +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; + +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_NON_RETRIABLE_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_OUT; +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_RETRY_LIMIT_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_WRITE_REQUESTS_OUT; +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_WRITE_REQUESTS_PERMANENTLY_FAILED; + +/** + * Callback handling the outcome of the http async request. + * + * <p>This class implements the error handling behaviour, based on the configuration in {@link + * SinkWriterErrorHandlingBehaviorConfiguration}. Depending on the condition, the sink may throw an + * exception and cause the job to fail, or log the condition to WARN, increment the counters and + * continue with the next request. + * + * <p>In any case, every write-request either entirely succeed or fail. Partial failures are not + * handled. + * + * <p>In no condition a write-request is re-queued for the AsyncSink to reprocess: this would cause + * out of order writes that would be rejected by Prometheus. + * + * <p>Note that the http async client retries, based on the configured retry policy. The callback is + * called with an outcome of *completed* either when the request has succeeded or the max retry + * limit has been exceeded. It is responsibility of the callback distinguishing between these + * conditions. + */ +class HttpResponseCallback implements FutureCallback<SimpleHttpResponse> { + private static final Logger LOG = LoggerFactory.getLogger(HttpResponseCallback.class); + + private final int timeSeriesCount; + private final long sampleCount; + private final Consumer<List<Types.TimeSeries>> reQueuedResult; + private final SinkMetrics metrics; + private final SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehaviorConfig; + + public HttpResponseCallback( + int timeSeriesCount, + long sampleCount, + SinkMetrics metrics, Review Comment: nit: This can be abstracted away to another `metricsCallback` ########## example-datastream-job/README.md: ########## @@ -0,0 +1,22 @@ +## Example job using Prometheus Sink connector with DataStream API + +Sample application demonstrating the usage of Prometheus Sink Connector with DataStream API. + +The example demonstrates how to write to a generic, unauthenticated Prometheus remote-write URL, and optionally how to use the Amazon Managed Prometheus request signer. + +It generates random dummy Memory and CPU metrics from a number of instances, and writes them to Prometheus. + +### Configuration + +The application expects these parameters, via command line: + +* `--prometheusRemoteWriteUrl <URL>`: the Prometheus remote-write URL to target +* `--awsRegion <region>`: (optional) if specified, it configures the Amazon Managed Prometheus request signer for a workspace in this Region +* `--webUI`: (optional, for local development only) enables Flink Web UI, with flame graphs, for local development Review Comment: How do we run this? Should we include an example run command? ########## example-datastream-job/src/main/java/org/apache/flink/connector/prometheus/examples/DataStreamJob.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.examples; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner; +import org.apache.flink.connector.prometheus.sink.PrometheusSink; +import org.apache.flink.connector.prometheus.sink.PrometheusTimeSeries; +import org.apache.flink.connector.prometheus.sink.PrometheusTimeSeriesLabelsAndMetricNameKeySelector; +import org.apache.flink.connector.prometheus.sink.aws.AmazonManagedPrometheusWriteRequestSigner; +import org.apache.flink.connector.prometheus.sink.errorhandling.OnErrorBehavior; +import org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration; +import org.apache.flink.connector.prometheus.sink.http.RetryConfiguration; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.function.Supplier; + +/** Test application testing the Prometheus sink connector. */ +public class DataStreamJob { + private static final Logger LOGGER = LoggerFactory.getLogger(DataStreamJob.class); + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env; + + // Conditionally return a local execution environment with + if (args.length > 0 && Arrays.stream(args).anyMatch("--webUI"::equalsIgnoreCase)) { + Configuration conf = new Configuration(); + conf.set( + ConfigOptions.key("rest.flamegraph.enabled").booleanType().noDefaultValue(), + true); + env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); + } else { + env = StreamExecutionEnvironment.getExecutionEnvironment(); + } + + env.setParallelism(2); + + ParameterTool applicationParameters = ParameterTool.fromArgs(args); + + // Prometheus remote-write URL + String prometheusRemoteWriteUrl = applicationParameters.get("prometheusRemoteWriteUrl"); + LOGGER.info("Prometheus URL:{}", prometheusRemoteWriteUrl); + + // Optionally configure Amazon Managed Prometheus request signer + PrometheusRequestSigner requestSigner = null; + String ampAWSRegion = applicationParameters.get("awsRegion"); + if (ampAWSRegion != null) { + requestSigner = + new AmazonManagedPrometheusWriteRequestSigner( + prometheusRemoteWriteUrl, ampAWSRegion); + LOGGER.info( + "Enable Amazon Managed Prometheus request-signer, region: {}", ampAWSRegion); + } + + // Configure data generator + int generatorMinSamplesPerTimeSeries = 1; + int generatorMaxSamplesPerTimeSeries = 10; + int generatorNumberOfDummyInstances = 5; Review Comment: Hmm, this is hard to understand out of context. Can we add an overall description at the top of this class, or rename this to "NumberOfKeys/NumberOfUniqueMetrics" or something? ########## example-datastream-job/src/main/java/org/apache/flink/connector/prometheus/examples/DataStreamJob.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.examples; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner; +import org.apache.flink.connector.prometheus.sink.PrometheusSink; +import org.apache.flink.connector.prometheus.sink.PrometheusTimeSeries; +import org.apache.flink.connector.prometheus.sink.PrometheusTimeSeriesLabelsAndMetricNameKeySelector; +import org.apache.flink.connector.prometheus.sink.aws.AmazonManagedPrometheusWriteRequestSigner; +import org.apache.flink.connector.prometheus.sink.errorhandling.OnErrorBehavior; +import org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration; +import org.apache.flink.connector.prometheus.sink.http.RetryConfiguration; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.function.Supplier; + +/** Test application testing the Prometheus sink connector. */ +public class DataStreamJob { + private static final Logger LOGGER = LoggerFactory.getLogger(DataStreamJob.class); + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env; + + // Conditionally return a local execution environment with + if (args.length > 0 && Arrays.stream(args).anyMatch("--webUI"::equalsIgnoreCase)) { + Configuration conf = new Configuration(); + conf.set( + ConfigOptions.key("rest.flamegraph.enabled").booleanType().noDefaultValue(), + true); + env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); + } else { + env = StreamExecutionEnvironment.getExecutionEnvironment(); + } + + env.setParallelism(2); + + ParameterTool applicationParameters = ParameterTool.fromArgs(args); + + // Prometheus remote-write URL + String prometheusRemoteWriteUrl = applicationParameters.get("prometheusRemoteWriteUrl"); + LOGGER.info("Prometheus URL:{}", prometheusRemoteWriteUrl); + + // Optionally configure Amazon Managed Prometheus request signer + PrometheusRequestSigner requestSigner = null; + String ampAWSRegion = applicationParameters.get("awsRegion"); + if (ampAWSRegion != null) { + requestSigner = + new AmazonManagedPrometheusWriteRequestSigner( + prometheusRemoteWriteUrl, ampAWSRegion); + LOGGER.info( + "Enable Amazon Managed Prometheus request-signer, region: {}", ampAWSRegion); + } + + // Configure data generator + int generatorMinSamplesPerTimeSeries = 1; + int generatorMaxSamplesPerTimeSeries = 10; + int generatorNumberOfDummyInstances = 5; + long generatorPauseBetweenTimeSeriesMs = 100; + LOGGER.info( + "Data Generator configuration:" + + "\n\t\tMin samples per time series:{}\n\t\tMax samples per time series:{}\n\t\tPause between time series:{} ms" + + "\n\t\tNumber of dummy instances:{}", + generatorMinSamplesPerTimeSeries, + generatorMaxSamplesPerTimeSeries, + generatorPauseBetweenTimeSeriesMs, + generatorNumberOfDummyInstances); + + Supplier<PrometheusTimeSeries> eventGenerator = + new SimpleInstanceMetricsTimeSeriesGenerator( + generatorMinSamplesPerTimeSeries, + generatorMaxSamplesPerTimeSeries, + generatorNumberOfDummyInstances) + .generator(); + + SourceFunction<PrometheusTimeSeries> source = + new FixedDelayDataGenertorSource<>( + PrometheusTimeSeries.class, + eventGenerator, + generatorPauseBetweenTimeSeriesMs); + + DataStream<PrometheusTimeSeries> prometheusTimeSeries = env.addSource(source); + + // Build the sink showing all supported configuration parameters. + // It is not mandatory to specify all configurations, as they will fall back to the default + // value + AsyncSinkBase<PrometheusTimeSeries, Types.TimeSeries> sink = + PrometheusSink.builder() + .setMaxBatchSizeInSamples(500) + .setMaxRecordSizeInSamples(500) + .setMaxTimeInBufferMS(5000) + .setRetryConfiguration( + RetryConfiguration.builder() + .setInitialRetryDelayMS(30L) + .setMaxRetryDelayMS(5000L) + .setMaxRetryCount(100) + .build()) + .setSocketTimeoutMs(5000) Review Comment: nit: Should we leave comment after to show which are optional? e.g. ``` .setMaxBatchSizeInSamples(500) // Optional, defaults to 500 ``` ########## example-datastream-job/src/main/resources/log4j2.properties: ########## @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n + +#logger.tracing.name = org.apache.flink.connector.prometheus.sink +#logger.tracing.level = trace +#logger.tracing.additivity = false +#logger.tracing.appenderRef.console.ref = ConsoleAppender Review Comment: Can we remove these, or explain when to uncomment for testing? ########## prometheus-connector/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusStateSerializerTest.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class PrometheusStateSerializerTest { + + private static final ElementConverter<PrometheusTimeSeries, Types.TimeSeries> + ELEMENT_CONVERTER = new PrometheusTimeSeriesConverter(); + + private static PrometheusTimeSeries getTestTimeSeries(int i) { + return PrometheusTimeSeries.builder() + .withMetricName("metric-name") + .addLabel("dimension-a", "value-" + i) + .addSample(i + 42.0, i + 1L) + .addSample(i + 3.14, i + 2L) + .build(); + } + + // This method uses the same implementation as PrometheusSinkWriter.getSizeInBytes() to extract + // the requestEntry "size" + // (i.e. the number of Samples). This is the "size" used in RequestEntryWrapper + // see + // https://github.com/apache/flink/blob/69e812688b43be9a0c4f79e6af81bc2d1d8a873e/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterStateSerializer.java#L60 Review Comment: Can we consider making this a javadoc + link to code? For example: ``` /** * This method uses the same implementation as PrometheusSinkWriter.getSizeInBytes() to extract * the requestEntry "size" * (i.e. the number of Samples). This is the "size" used in RequestEntryWrapper * see {@link org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer#serialize(BufferedRequestState)} */ ``` ########## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallback.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.connector.prometheus.sink.errorhandling.OnErrorBehavior; +import org.apache.flink.connector.prometheus.sink.errorhandling.PrometheusSinkWriteException; +import org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration; +import org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseClassifier; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; + +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; + +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_NON_RETRIABLE_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_OUT; +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_RETRY_LIMIT_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_WRITE_REQUESTS_OUT; +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_WRITE_REQUESTS_PERMANENTLY_FAILED; + +/** + * Callback handling the outcome of the http async request. + * + * <p>This class implements the error handling behaviour, based on the configuration in {@link + * SinkWriterErrorHandlingBehaviorConfiguration}. Depending on the condition, the sink may throw an + * exception and cause the job to fail, or log the condition to WARN, increment the counters and + * continue with the next request. + * + * <p>In any case, every write-request either entirely succeed or fail. Partial failures are not + * handled. + * + * <p>In no condition a write-request is re-queued for the AsyncSink to reprocess: this would cause + * out of order writes that would be rejected by Prometheus. + * + * <p>Note that the http async client retries, based on the configured retry policy. The callback is + * called with an outcome of *completed* either when the request has succeeded or the max retry + * limit has been exceeded. It is responsibility of the callback distinguishing between these + * conditions. + */ +class HttpResponseCallback implements FutureCallback<SimpleHttpResponse> { + private static final Logger LOG = LoggerFactory.getLogger(HttpResponseCallback.class); + + private final int timeSeriesCount; + private final long sampleCount; + private final Consumer<List<Types.TimeSeries>> reQueuedResult; + private final SinkMetrics metrics; + private final SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehaviorConfig; + + public HttpResponseCallback( + int timeSeriesCount, + long sampleCount, + SinkMetrics metrics, + SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehaviorConfig, + Consumer<List<Types.TimeSeries>> reQueuedResult) { + this.timeSeriesCount = timeSeriesCount; + this.sampleCount = sampleCount; + this.reQueuedResult = reQueuedResult; + this.metrics = metrics; + this.errorHandlingBehaviorConfig = errorHandlingBehaviorConfig; + } + + /** + * The completed outcome is invoked every time the http client successfully receives a valid + * http response, regardless of the status code. + * + * <p>This method classifies the responses and implements the behaviour expected by the + * Remote-Write specifications. In case of error, the behaviour is determined by the error + * handling configuration. + */ + @Override + public void completed(SimpleHttpResponse response) { Review Comment: Do we want to consider expanding out nested `if-else` into linear with `return` instead? See example here: https://stackoverflow.com/questions/56356190/nested-if-conditions-vs-multiple-separated-if-conditions-with-return-statement-i ########## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusStateSerializer.java: ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * Serializes/deserializes the sink request-entry, the protobuf-generated {@link Types.TimeSeries}, + * using protobuf. + */ +public class PrometheusStateSerializer extends AsyncSinkWriterStateSerializer<Types.TimeSeries> { + private static final int VERSION = 1; + + // Copied from AsyncSinkWriterStateSerializer.DATA_IDENTIFIER + private static final long DATA_IDENTIFIER = -1; + + @Override + protected void serializeRequestToStream(Types.TimeSeries request, DataOutputStream out) + throws IOException { + byte[] serializedRequest = request.toByteArray(); + out.write(serializedRequest); + } + + @Override + protected Types.TimeSeries deserializeRequestFromStream(long requestSize, DataInputStream in) + throws IOException { + // The size written into the serialized stat is the size of the protobuf-serialized + // time-series + byte[] requestData = new byte[(int) requestSize]; + in.read(requestData); + return Types.TimeSeries.parseFrom(requestData); + } + + @Override + public int getVersion() { + return VERSION; + } + + /** + * Overrides the original implementation that assumes the serialized size is the value returned + * by {@link PrometheusSinkWriter#getSizeInBytes(Types.TimeSeries)} + * + * <p>Most of the code is copied from the original implementation of + * AsyncSinkWriterStateSerializer.serialize(). + * + * <p>The state is serialized in form of + * [DATA_IDENTIFIER,NUM_OF_ELEMENTS,SIZE1,REQUEST1,SIZE2,REQUEST2....], where REQUESTn is the + * Protobuf-serialized representation of a {@link Types.TimeSeries TimeSeries}. In this + * implementation SIZEn is the size of the Protobuf serialization, in bytes, that does not match + * the "size" of a {@link RequestEntryWrapper}. + * + * @param bufferedRequestState The buffered request state to be serialized + * @return serialized buffered request state + * @throws IOException + */ + @Override + public byte[] serialize(BufferedRequestState<Types.TimeSeries> bufferedRequestState) + throws IOException { + Collection<RequestEntryWrapper<Types.TimeSeries>> bufferState = + bufferedRequestState.getBufferedRequestEntries(); + + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final DataOutputStream out = new DataOutputStream(baos)) { + + out.writeLong(DATA_IDENTIFIER); // DATA_IDENTIFIER + out.writeInt(bufferState.size()); // NUM_OF_ELEMENTS + + for (RequestEntryWrapper<Types.TimeSeries> wrapper : bufferState) { + // In the serialized state we write the size of the serialized representation, + // rather than the size + // held in RequestEntryWrapper, that is the output of + // AsyncSinkWriter.getSizeInBytes() + long requestEntrySize = + RequestEntrySizeUtils.requestSerializedSize(wrapper.getRequestEntry()); + out.writeLong(requestEntrySize); // SIZEn + serializeRequestToStream(wrapper.getRequestEntry(), out); // REQUESTn Review Comment: remove the `n`? ########## pom.xml: ########## @@ -0,0 +1,120 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-parent</artifactId> + <version>1.0.0</version> + </parent> + + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-prometheus-parent</artifactId> + <version>1.0.0-SNAPSHOT</version> + <packaging>pom</packaging> + + <name>Flink Prometheus</name> + <modules> + <module>prometheus-connector</module> + <module>amp-request-signer</module> + <module>example-datastream-job</module> + </modules> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>https://www.apache.org/licenses/LICENSE-2.0.txt</url> + <distribution>repo</distribution> + </license> + </licenses> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <target.java.version>11</target.java.version> + <maven.compiler.source>${target.java.version}</maven.compiler.source> + <maven.compiler.target>${target.java.version}</maven.compiler.target> + <flink.version>1.17.0</flink.version> + <protobuf.version>3.22.2</protobuf.version> + <apache.httpclient.version>5.2.1</apache.httpclient.version> + <aws.sdkv1.version>1.12.570</aws.sdkv1.version> + <log4j.version>2.17.1</log4j.version> + </properties> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-bom</artifactId> + <version>${aws.sdkv1.version}</version> Review Comment: Why are we using AWS SDK v1? Can we use v2? ########## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/prometheus/Remote.java: ########## @@ -0,0 +1,6661 @@ +/* Review Comment: Should we consider adding API snapshot tests for this class? ########## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.annotation.Public; +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration; +import org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; + +import java.util.Collection; + +/** Sink implementation accepting {@link PrometheusTimeSeries} as inputs. */ +@Public +public class PrometheusSink extends AsyncSinkBase<PrometheusTimeSeries, Types.TimeSeries> { + private final String prometheusRemoteWriteUrl; + private final PrometheusAsyncHttpClientBuilder clientBuilder; + private final PrometheusRequestSigner requestSigner; + private final int maxBatchSizeInSamples; + private final String httpUserAgent; + private final SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehaviorConfig; + + protected PrometheusSink( Review Comment: Can we add a configuration to specify a metrics group prefix? ########## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusRequestSigner.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.annotation.Public; + +import java.io.Serializable; +import java.util.Map; + +/** + * Interface for a request signer, specific of the Prometheus implementation. + * + * <p>A request signer implementation can generate additional Http request headers, based on the + * existing headers and the request body. + */ +@Public +public interface PrometheusRequestSigner extends Serializable { + + /** + * Add to the existing http request headers any additional header required by the specific + * Prometheus implementation for signing. + * + * @param requestHeaders original Http request headers. For efficiency, the implementation is + * expected to modify the Map in place. The Map is expected to be mutable. + * @param requestBody request body, already compressed. + */ + void addSignatureHeaders(Map<String, String> requestHeaders, byte[] requestBody); Review Comment: Let's add an explanation here as to why we mutate request headers instead of creating a copy - for performance reasons ########## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.annotation.Public; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** + * Pojo used as sink input, containing a single TimeSeries: a list of Labels and a list of Samples. + * + * <p>metricName is mapped in Prometheus to the value of the mandatory label named '__name__' + * labels. The other labels, as key/value, are appended after the '__name__' label. + */ +@Public +public class PrometheusTimeSeries implements Serializable { + /** A single Label. */ + public static class Label implements Serializable { + private final String name; + private final String value; + + public Label(String name, String value) { Review Comment: Can we add some `null`/`empty` validations for these data classes? (+ unit tests too!) ########## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.annotation.Public; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** + * Pojo used as sink input, containing a single TimeSeries: a list of Labels and a list of Samples. + * + * <p>metricName is mapped in Prometheus to the value of the mandatory label named '__name__' + * labels. The other labels, as key/value, are appended after the '__name__' label. + */ +@Public +public class PrometheusTimeSeries implements Serializable { + /** A single Label. */ + public static class Label implements Serializable { + private final String name; + private final String value; + + public Label(String name, String value) { + this.name = name; + this.value = value; + } + + public String getName() { + return name; + } + + public String getValue() { + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Label label = (Label) o; + return new EqualsBuilder() + .append(name, label.name) + .append(value, label.value) + .isEquals(); + } Review Comment: Can we add unit test coverage for this? ########## prometheus-connector/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusStateSerializerTest.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class PrometheusStateSerializerTest { + + private static final ElementConverter<PrometheusTimeSeries, Types.TimeSeries> + ELEMENT_CONVERTER = new PrometheusTimeSeriesConverter(); + + private static PrometheusTimeSeries getTestTimeSeries(int i) { + return PrometheusTimeSeries.builder() + .withMetricName("metric-name") + .addLabel("dimension-a", "value-" + i) + .addSample(i + 42.0, i + 1L) + .addSample(i + 3.14, i + 2L) + .build(); + } + + // This method uses the same implementation as PrometheusSinkWriter.getSizeInBytes() to extract + // the requestEntry "size" + // (i.e. the number of Samples). This is the "size" used in RequestEntryWrapper + // see + // https://github.com/apache/flink/blob/69e812688b43be9a0c4f79e6af81bc2d1d8a873e/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterStateSerializer.java#L60 Review Comment: (we can replace the qualified name with an import for the class) ########## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.annotation.Public; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** + * Pojo used as sink input, containing a single TimeSeries: a list of Labels and a list of Samples. + * + * <p>metricName is mapped in Prometheus to the value of the mandatory label named '__name__' + * labels. The other labels, as key/value, are appended after the '__name__' label. + */ +@Public +public class PrometheusTimeSeries implements Serializable { + /** A single Label. */ + public static class Label implements Serializable { + private final String name; + private final String value; + + public Label(String name, String value) { + this.name = name; + this.value = value; + } + + public String getName() { + return name; + } + + public String getValue() { + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Label label = (Label) o; + return new EqualsBuilder() + .append(name, label.name) + .append(value, label.value) + .isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37).append(name).append(value).toHashCode(); + } + } + + /** A single Sample. */ + public static class Sample implements Serializable { + private final double value; + private final long timestamp; + + public Sample(double value, long timestamp) { + this.value = value; + this.timestamp = timestamp; + } + + public double getValue() { + return value; + } + + public long getTimestamp() { + return timestamp; + } + } + + private final Label[] labels; + private final Sample[] samples; + private final String metricName; + + public PrometheusTimeSeries(String metricName, Label[] labels, Sample[] samples) { + this.metricName = metricName; + this.labels = labels; + this.samples = samples; + } + + public Label[] getLabels() { + return labels; + } + + public Sample[] getSamples() { + return samples; + } + + public String getMetricName() { + return metricName; + } + + public static Builder builder() { + return new Builder(); + } + + public static Builder builderFrom(PrometheusTimeSeries other) { + return new Builder( + Arrays.asList(other.labels), Arrays.asList(other.samples), other.metricName); + } + + /** Builder for sink input pojo instance. */ + public static final class Builder { + private List<Label> labels = new ArrayList<>(); + private List<Sample> samples = new ArrayList<>(); + private String metricName; + + private Builder(List<Label> labels, List<Sample> samples, String metricName) { + this.labels = labels; + this.samples = samples; + this.metricName = metricName; + } Review Comment: Can we add unit test coverage for this? <img width="717" alt="image" src="https://github.com/apache/flink-connector-prometheus/assets/35062175/e03c8c41-bf9b-4339-ab0f-34ac4b62b241"> ########## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/prometheus/Remote.java: ########## @@ -0,0 +1,6661 @@ +/* Review Comment: Same for `Types.java` and `GoGoProtos.java` ########## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; + +import java.util.Collection; + +/** Sink implementation accepting {@link PrometheusTimeSeries} as inputs. */ +public class PrometheusSink extends AsyncSinkBase<PrometheusTimeSeries, Types.TimeSeries> { Review Comment: Sorry!! I think better to use `@PublicEvolving` or `@Experimental`, either ok with me. Prefer `@Experimental` because it allows easier breaking changes on our side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
