hlteoh37 commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1450463846
########## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkWriter.java: ########## @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseClassifier; +import org.apache.flink.connector.prometheus.sink.prometheus.Remote; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.io.CloseMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xerial.snappy.Snappy; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; + +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_NON_RETRIABLE_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_OUT; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_RETRY_LIMIT_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_WRITE_REQUESTS_OUT; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_WRITE_REQUESTS_PERMANENTLY_FAILED; + +/** Writer, taking care of batching the {@link PrometheusTimeSeries} and handling retries. */ +public class PrometheusSinkWriter extends AsyncSinkWriter<PrometheusTimeSeries, Types.TimeSeries> { + + /** + * * Batching of this sink is in terms of Samples, not bytes. The goal is adaptively increase + * the number of Samples in each batch, a WriteRequest sent to Prometheus, to a configurable + * number. This is the parameter maxBatchSizeInBytes. + * + * <p>getSizeInBytes(requestEntry) returns the number of Samples (not bytes) and + * maxBatchSizeInBytes is actually in terms of Samples (not bytes). + * + * <p>In AsyncSinkWriter, maxBatchSize is in terms of requestEntries (TimeSeries). But because + * each TimeSeries contains 1+ Samples, we set maxBatchSize = maxBatchSizeInBytes. + * + * <p>maxRecordSizeInBytes is also calculated in the same unit assumed by getSizeInBytes(..). In + * our case is the max number of Samples in a single TimeSeries sent to the Sink. We are + * limiting the number of Samples in each TimeSeries to the max batch size, setting + * maxRecordSizeInBytes = maxBatchSizeInBytes. + */ + private static final Logger LOG = LoggerFactory.getLogger(PrometheusSinkWriter.class); + + private final SinkCounters counters; + private final CloseableHttpAsyncClient asyncHttpClient; + private final PrometheusRemoteWriteHttpRequestBuilder requestBuilder; + + public PrometheusSinkWriter( + ElementConverter<PrometheusTimeSeries, Types.TimeSeries> elementConverter, + Sink.InitContext context, + int maxInFlightRequests, + int maxBufferedRequests, + int maxBatchSizeInSamples, + long maxTimeInBufferMS, + String prometheusRemoteWriteUrl, + CloseableHttpAsyncClient asyncHttpClient, + SinkCounters counters, + PrometheusRequestSigner requestSigner) { + this( + elementConverter, + context, + maxInFlightRequests, + maxBufferedRequests, + maxBatchSizeInSamples, + maxTimeInBufferMS, + prometheusRemoteWriteUrl, + asyncHttpClient, + counters, + requestSigner, + Collections.emptyList()); + } + + public PrometheusSinkWriter( + ElementConverter<PrometheusTimeSeries, Types.TimeSeries> elementConverter, + Sink.InitContext context, + int maxInFlightRequests, + int maxBufferedRequests, + int maxBatchSizeInSamples, + long maxTimeInBufferMS, + String prometheusRemoteWriteUrl, + CloseableHttpAsyncClient asyncHttpClient, + SinkCounters counters, + PrometheusRequestSigner requestSigner, + Collection<BufferedRequestState<Types.TimeSeries>> states) { + super( + elementConverter, + context, + maxBatchSizeInSamples, // maxBatchSize + maxInFlightRequests, + maxBufferedRequests, + maxBatchSizeInSamples, // maxBatchSizeInBytes + maxTimeInBufferMS, + maxBatchSizeInSamples, // maxRecordSizeInBytes + states); + this.requestBuilder = + new PrometheusRemoteWriteHttpRequestBuilder( + prometheusRemoteWriteUrl, requestSigner); + this.asyncHttpClient = asyncHttpClient; + this.counters = counters; + } + + /** + * This is the "size" of the request entry (a {@link Types.TimeSeries time-series}) used for + * batching. Regardless the name of the method, it returns the number of {@link Types.Sample + * samples} in the time-series (not bytes), to support batching in terms of samples. + * + * @param timeSeries the request entry for which we want to know the size + * @return number of samples in the time-series + */ + @Override + protected long getSizeInBytes(Types.TimeSeries timeSeries) { + return RequestEntrySizeUtils.requestSizeForBatching(timeSeries); + } + + @Override + protected void submitRequestEntries( + List<Types.TimeSeries> requestEntries, Consumer<List<Types.TimeSeries>> requestResult) { + int timeSeriesCount = requestEntries.size(); + long sampleCount = RequestEntrySizeUtils.countSamples(requestEntries); + if (LOG.isTraceEnabled()) { + LOG.trace( + "Writing {} time-series containing {} samples ", timeSeriesCount, sampleCount); + } + + Remote.WriteRequest writeRequest = buildWriteRequest(requestEntries); + byte[] requestBody; + try { + requestBody = compressWriteRequest(writeRequest); + } catch (IOException e) { + throw new RuntimeException("Exception compressing the request body", e); + } + + SimpleHttpRequest postRequest = requestBuilder.buildHttpRequest(requestBody); + asyncHttpClient.execute( + postRequest, + new ResponseCallback(timeSeriesCount, sampleCount, counters, requestResult)); + } + + @VisibleForTesting + static class ResponseCallback implements FutureCallback<SimpleHttpResponse> { + private final int timeSeriesCount; + private final long sampleCount; + private final Consumer<List<Types.TimeSeries>> requestResult; + private final SinkCounters counters; + + public ResponseCallback( + int timeSeriesCount, + long sampleCount, + SinkCounters counters, + Consumer<List<Types.TimeSeries>> requestResult) { + this.timeSeriesCount = timeSeriesCount; + this.sampleCount = sampleCount; + this.requestResult = requestResult; + this.counters = counters; + } + + @Override + public void completed(SimpleHttpResponse response) { + if (RemoteWriteResponseClassifier.isSuccessResponse(response)) { + LOG.debug( + "{},{} - successfully posted {} time-series, containing {} samples", + response.getCode(), + response.getReasonPhrase(), + timeSeriesCount, + sampleCount); + counters.inc(NUM_SAMPLES_OUT, sampleCount); + counters.inc(NUM_WRITE_REQUESTS_OUT); + } else { + counters.inc(NUM_SAMPLES_DROPPED, sampleCount); + counters.inc(NUM_WRITE_REQUESTS_PERMANENTLY_FAILED); + + String responseBody = response.getBodyText(); + if (RemoteWriteResponseClassifier.isNonRetriableErrorResponse(response)) { + LOG.warn( + "{},{} {} (discarded {} time-series, containing {} samples)", + response.getCode(), + response.getReasonPhrase(), + responseBody, + timeSeriesCount, + sampleCount); + counters.inc(NUM_SAMPLES_NON_RETRIABLE_DROPPED, sampleCount); + } else if (RemoteWriteResponseClassifier.isRetriableErrorResponse(response)) { + LOG.warn( Review Comment: We shouldn't be dropping records here on a retryable error. IMO we should either requeue the records, or propagate up the exception to fail the job. ########## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java: ########## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; +import org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder; +import org.apache.flink.connector.prometheus.sink.http.RetryConfiguration; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Optional; + +/** Builder for Sink implementation. */ +public class PrometheusSinkBuilder + extends AsyncSinkBaseBuilder< + PrometheusTimeSeries, Types.TimeSeries, PrometheusSinkBuilder> { + private static final Logger LOG = LoggerFactory.getLogger(PrometheusSinkBuilder.class); + + private static final int DEFAULT_MAX_BATCH_SIZE_IN_SAMPLES = 500; + private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000; + private static final int DEFAULT_MAX_BUFFERED_REQUESTS = + 1000; // Max nr of requestEntry that will be buffered + + private String prometheusRemoteWriteUrl; + private RetryConfiguration retryConfiguration; + private Integer socketTimeoutMs; + private PrometheusRequestSigner requestSigner = null; + private Integer maxBatchSizeInSamples; + private Integer maxRecordSizeInSamples; + + @Override + public AsyncSinkBase<PrometheusTimeSeries, Types.TimeSeries> build() { + + int maxInFlightRequest = 1; + + int actualMaxBatchSizeInSamples = + Optional.ofNullable(getMaxBatchSizeInSamples()) + .orElse(DEFAULT_MAX_BATCH_SIZE_IN_SAMPLES); + int actualMaxBufferedRequests = + Optional.ofNullable(getMaxBufferedRequests()).orElse(DEFAULT_MAX_BUFFERED_REQUESTS); + long actualMaxTimeInBufferMS = + Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS); + int actualMaxRecordSizeInSamples = + Optional.ofNullable(getMaxRecordSizeInSamples()) + .orElse(getMaxBatchSizeInSamples()); // By default, set max record size to + // max batch size (in samples) + + int actualSocketTimeoutMs = + Optional.ofNullable(getSocketTimeoutMs()) + .orElse(PrometheusAsyncHttpClientBuilder.DEFAULT_SOCKET_TIMEOUT_MS); + + Preconditions.checkArgument( + StringUtils.isNotBlank(prometheusRemoteWriteUrl), + "Missing or blank Prometheus Remote-Write URL"); + checkValidRemoteWriteUrl(prometheusRemoteWriteUrl); + Preconditions.checkNotNull(retryConfiguration, "Missing retry configuration"); + Preconditions.checkArgument( + actualMaxBatchSizeInSamples > 0, "Max batch size (in samples) must be positive"); + Preconditions.checkArgument( + actualMaxRecordSizeInSamples <= actualMaxBatchSizeInSamples, + "Max record size (in samples) must be <= Max batch size"); + + LOG.info( + "Prometheus sink configuration:" + + "\n\t\tmaxBatchSizeInSamples={}\n\t\tmaxRecordSizeInSamples={}" + + "\n\t\tmaxTimeInBufferMs={}\n\t\tmaxInFlightRequests={}\n\t\tmaxBufferedRequests={}" + + "\n\t\tinitialRetryDelayMs={}\n\t\tmaxRetryDelayMs={}\n\t\tmaxRetryCount={}" + + "\n\t\tsocketTimeoutMs={}", + actualMaxBatchSizeInSamples, + actualMaxRecordSizeInSamples, + actualMaxTimeInBufferMS, + maxInFlightRequest, + actualMaxBufferedRequests, + retryConfiguration.getInitialRetryDelayMS(), + retryConfiguration.getMaxRetryDelayMS(), + retryConfiguration.getMaxRetryCount(), + socketTimeoutMs); + + return new PrometheusSink( + new PrometheusTimeSeriesConverter(), + maxInFlightRequest, + actualMaxBufferedRequests, + actualMaxBatchSizeInSamples, + actualMaxRecordSizeInSamples, + actualMaxTimeInBufferMS, + prometheusRemoteWriteUrl, + new PrometheusAsyncHttpClientBuilder(retryConfiguration) + .setSocketTimeout(actualSocketTimeoutMs), + requestSigner); + } + + private static void checkValidRemoteWriteUrl(String url) { + try { + new URL(url); + } catch (MalformedURLException mue) { + throw new IllegalArgumentException("Invalid Remote-Write URL: " + url, mue); + } + } + + public PrometheusSinkBuilder setPrometheusRemoteWriteUrl(String prometheusRemoteWriteUrl) { + this.prometheusRemoteWriteUrl = prometheusRemoteWriteUrl; + return this; + } + + public PrometheusSinkBuilder setRequestSigner(PrometheusRequestSigner requestSigner) { + this.requestSigner = requestSigner; + return this; + } + + public PrometheusSinkBuilder setMaxBatchSizeInSamples(int maxBatchSizeInSamples) { + this.maxBatchSizeInSamples = maxBatchSizeInSamples; + return this; + } + + public PrometheusSinkBuilder setMaxRecordSizeInSamples(int maxRecordSizeInSamples) { + this.maxRecordSizeInSamples = maxRecordSizeInSamples; + return this; + } + + public PrometheusSinkBuilder setRetryConfiguration(RetryConfiguration retryConfiguration) { + this.retryConfiguration = retryConfiguration; + return this; + } + + public PrometheusSinkBuilder setSocketTimeoutMs(int socketTimeoutMs) { + this.socketTimeoutMs = socketTimeoutMs; + return this; + } + + private Integer getMaxBatchSizeInSamples() { + return maxBatchSizeInSamples; + } + + private Integer getMaxRecordSizeInSamples() { + return maxRecordSizeInSamples; + } + + public RetryConfiguration getRetryConfiguration() { + return retryConfiguration; + } + + public Integer getSocketTimeoutMs() { + return socketTimeoutMs; + } + + /// Disable setting maxBatchSize, maxBatchSizeInBytes, and maxRecordSizeInBytes directly + + @Override + public PrometheusSinkBuilder setMaxBatchSize(int maxBatchSize) { + throw new UnsupportedOperationException("maxBatchSize is not supported by this sink"); + } + + @Override + public PrometheusSinkBuilder setMaxBatchSizeInBytes(long maxBatchSizeInBytes) { + throw new UnsupportedOperationException( + "maxBatchSizeInBytes is not supported by this sink"); Review Comment: Should we add a comment on the reason why we disable it here? ########## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java: ########## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; +import org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder; +import org.apache.flink.connector.prometheus.sink.http.RetryConfiguration; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Optional; + +/** Builder for Sink implementation. */ +public class PrometheusSinkBuilder + extends AsyncSinkBaseBuilder< + PrometheusTimeSeries, Types.TimeSeries, PrometheusSinkBuilder> { + private static final Logger LOG = LoggerFactory.getLogger(PrometheusSinkBuilder.class); + + private static final int DEFAULT_MAX_BATCH_SIZE_IN_SAMPLES = 500; + private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000; + private static final int DEFAULT_MAX_BUFFERED_REQUESTS = + 1000; // Max nr of requestEntry that will be buffered + + private String prometheusRemoteWriteUrl; + private RetryConfiguration retryConfiguration; + private Integer socketTimeoutMs; + private PrometheusRequestSigner requestSigner = null; + private Integer maxBatchSizeInSamples; + private Integer maxRecordSizeInSamples; + + @Override + public AsyncSinkBase<PrometheusTimeSeries, Types.TimeSeries> build() { + + int maxInFlightRequest = 1; + + int actualMaxBatchSizeInSamples = + Optional.ofNullable(getMaxBatchSizeInSamples()) + .orElse(DEFAULT_MAX_BATCH_SIZE_IN_SAMPLES); + int actualMaxBufferedRequests = + Optional.ofNullable(getMaxBufferedRequests()).orElse(DEFAULT_MAX_BUFFERED_REQUESTS); + long actualMaxTimeInBufferMS = + Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS); + int actualMaxRecordSizeInSamples = + Optional.ofNullable(getMaxRecordSizeInSamples()) + .orElse(getMaxBatchSizeInSamples()); // By default, set max record size to + // max batch size (in samples) + + int actualSocketTimeoutMs = + Optional.ofNullable(getSocketTimeoutMs()) + .orElse(PrometheusAsyncHttpClientBuilder.DEFAULT_SOCKET_TIMEOUT_MS); + + Preconditions.checkArgument( + StringUtils.isNotBlank(prometheusRemoteWriteUrl), + "Missing or blank Prometheus Remote-Write URL"); + checkValidRemoteWriteUrl(prometheusRemoteWriteUrl); + Preconditions.checkNotNull(retryConfiguration, "Missing retry configuration"); + Preconditions.checkArgument( + actualMaxBatchSizeInSamples > 0, "Max batch size (in samples) must be positive"); + Preconditions.checkArgument( + actualMaxRecordSizeInSamples <= actualMaxBatchSizeInSamples, + "Max record size (in samples) must be <= Max batch size"); + + LOG.info( + "Prometheus sink configuration:" + + "\n\t\tmaxBatchSizeInSamples={}\n\t\tmaxRecordSizeInSamples={}" + + "\n\t\tmaxTimeInBufferMs={}\n\t\tmaxInFlightRequests={}\n\t\tmaxBufferedRequests={}" + + "\n\t\tinitialRetryDelayMs={}\n\t\tmaxRetryDelayMs={}\n\t\tmaxRetryCount={}" + + "\n\t\tsocketTimeoutMs={}", + actualMaxBatchSizeInSamples, + actualMaxRecordSizeInSamples, + actualMaxTimeInBufferMS, + maxInFlightRequest, + actualMaxBufferedRequests, + retryConfiguration.getInitialRetryDelayMS(), + retryConfiguration.getMaxRetryDelayMS(), + retryConfiguration.getMaxRetryCount(), + socketTimeoutMs); + + return new PrometheusSink( + new PrometheusTimeSeriesConverter(), + maxInFlightRequest, + actualMaxBufferedRequests, + actualMaxBatchSizeInSamples, + actualMaxRecordSizeInSamples, + actualMaxTimeInBufferMS, + prometheusRemoteWriteUrl, + new PrometheusAsyncHttpClientBuilder(retryConfiguration) + .setSocketTimeout(actualSocketTimeoutMs), + requestSigner); + } + + private static void checkValidRemoteWriteUrl(String url) { + try { + new URL(url); + } catch (MalformedURLException mue) { + throw new IllegalArgumentException("Invalid Remote-Write URL: " + url, mue); + } + } + + public PrometheusSinkBuilder setPrometheusRemoteWriteUrl(String prometheusRemoteWriteUrl) { + this.prometheusRemoteWriteUrl = prometheusRemoteWriteUrl; + return this; + } + + public PrometheusSinkBuilder setRequestSigner(PrometheusRequestSigner requestSigner) { + this.requestSigner = requestSigner; + return this; + } + + public PrometheusSinkBuilder setMaxBatchSizeInSamples(int maxBatchSizeInSamples) { + this.maxBatchSizeInSamples = maxBatchSizeInSamples; + return this; + } + + public PrometheusSinkBuilder setMaxRecordSizeInSamples(int maxRecordSizeInSamples) { + this.maxRecordSizeInSamples = maxRecordSizeInSamples; + return this; + } + + public PrometheusSinkBuilder setRetryConfiguration(RetryConfiguration retryConfiguration) { + this.retryConfiguration = retryConfiguration; + return this; + } + + public PrometheusSinkBuilder setSocketTimeoutMs(int socketTimeoutMs) { + this.socketTimeoutMs = socketTimeoutMs; + return this; + } + + private Integer getMaxBatchSizeInSamples() { + return maxBatchSizeInSamples; + } + + private Integer getMaxRecordSizeInSamples() { + return maxRecordSizeInSamples; + } + + public RetryConfiguration getRetryConfiguration() { + return retryConfiguration; + } + + public Integer getSocketTimeoutMs() { + return socketTimeoutMs; + } + + /// Disable setting maxBatchSize, maxBatchSizeInBytes, and maxRecordSizeInBytes directly + + @Override + public PrometheusSinkBuilder setMaxBatchSize(int maxBatchSize) { + throw new UnsupportedOperationException("maxBatchSize is not supported by this sink"); + } + + @Override + public PrometheusSinkBuilder setMaxBatchSizeInBytes(long maxBatchSizeInBytes) { + throw new UnsupportedOperationException( + "maxBatchSizeInBytes is not supported by this sink"); Review Comment: After offline discussion, maybe we can do something like ``` Use setMaxBatchSizeInSamples() instead ``` ########## prometheus-connector/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusRemoteWriteHttpRequestBuilderTest.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.hc.core5.http.HttpHeaders; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; Review Comment: We don't recommend using Mockito in Flink. Usually we prefer creating test constructs to test actual behavior, https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#avoid-mockito---use-reusable-test-implementations ########## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkWriter.java: ########## @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseClassifier; +import org.apache.flink.connector.prometheus.sink.prometheus.Remote; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.io.CloseMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xerial.snappy.Snappy; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; + +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_NON_RETRIABLE_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_OUT; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_RETRY_LIMIT_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_WRITE_REQUESTS_OUT; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_WRITE_REQUESTS_PERMANENTLY_FAILED; + +/** Writer, taking care of batching the {@link PrometheusTimeSeries} and handling retries. */ +public class PrometheusSinkWriter extends AsyncSinkWriter<PrometheusTimeSeries, Types.TimeSeries> { + + /** + * * Batching of this sink is in terms of Samples, not bytes. The goal is adaptively increase + * the number of Samples in each batch, a WriteRequest sent to Prometheus, to a configurable + * number. This is the parameter maxBatchSizeInBytes. + * + * <p>getSizeInBytes(requestEntry) returns the number of Samples (not bytes) and + * maxBatchSizeInBytes is actually in terms of Samples (not bytes). + * + * <p>In AsyncSinkWriter, maxBatchSize is in terms of requestEntries (TimeSeries). But because + * each TimeSeries contains 1+ Samples, we set maxBatchSize = maxBatchSizeInBytes. + * + * <p>maxRecordSizeInBytes is also calculated in the same unit assumed by getSizeInBytes(..). In + * our case is the max number of Samples in a single TimeSeries sent to the Sink. We are + * limiting the number of Samples in each TimeSeries to the max batch size, setting + * maxRecordSizeInBytes = maxBatchSizeInBytes. + */ + private static final Logger LOG = LoggerFactory.getLogger(PrometheusSinkWriter.class); + + private final SinkCounters counters; + private final CloseableHttpAsyncClient asyncHttpClient; + private final PrometheusRemoteWriteHttpRequestBuilder requestBuilder; + + public PrometheusSinkWriter( + ElementConverter<PrometheusTimeSeries, Types.TimeSeries> elementConverter, + Sink.InitContext context, + int maxInFlightRequests, + int maxBufferedRequests, + int maxBatchSizeInSamples, + long maxTimeInBufferMS, + String prometheusRemoteWriteUrl, + CloseableHttpAsyncClient asyncHttpClient, + SinkCounters counters, + PrometheusRequestSigner requestSigner) { + this( + elementConverter, + context, + maxInFlightRequests, + maxBufferedRequests, + maxBatchSizeInSamples, + maxTimeInBufferMS, + prometheusRemoteWriteUrl, + asyncHttpClient, + counters, + requestSigner, + Collections.emptyList()); + } + + public PrometheusSinkWriter( + ElementConverter<PrometheusTimeSeries, Types.TimeSeries> elementConverter, + Sink.InitContext context, + int maxInFlightRequests, + int maxBufferedRequests, + int maxBatchSizeInSamples, + long maxTimeInBufferMS, + String prometheusRemoteWriteUrl, + CloseableHttpAsyncClient asyncHttpClient, + SinkCounters counters, + PrometheusRequestSigner requestSigner, + Collection<BufferedRequestState<Types.TimeSeries>> states) { + super( + elementConverter, + context, + maxBatchSizeInSamples, // maxBatchSize + maxInFlightRequests, + maxBufferedRequests, + maxBatchSizeInSamples, // maxBatchSizeInBytes + maxTimeInBufferMS, + maxBatchSizeInSamples, // maxRecordSizeInBytes + states); + this.requestBuilder = + new PrometheusRemoteWriteHttpRequestBuilder( + prometheusRemoteWriteUrl, requestSigner); + this.asyncHttpClient = asyncHttpClient; + this.counters = counters; + } + + /** + * This is the "size" of the request entry (a {@link Types.TimeSeries time-series}) used for + * batching. Regardless the name of the method, it returns the number of {@link Types.Sample + * samples} in the time-series (not bytes), to support batching in terms of samples. + * + * @param timeSeries the request entry for which we want to know the size + * @return number of samples in the time-series + */ + @Override + protected long getSizeInBytes(Types.TimeSeries timeSeries) { + return RequestEntrySizeUtils.requestSizeForBatching(timeSeries); + } + + @Override + protected void submitRequestEntries( + List<Types.TimeSeries> requestEntries, Consumer<List<Types.TimeSeries>> requestResult) { + int timeSeriesCount = requestEntries.size(); + long sampleCount = RequestEntrySizeUtils.countSamples(requestEntries); + if (LOG.isTraceEnabled()) { + LOG.trace( + "Writing {} time-series containing {} samples ", timeSeriesCount, sampleCount); + } + + Remote.WriteRequest writeRequest = buildWriteRequest(requestEntries); + byte[] requestBody; + try { + requestBody = compressWriteRequest(writeRequest); + } catch (IOException e) { + throw new RuntimeException("Exception compressing the request body", e); + } + + SimpleHttpRequest postRequest = requestBuilder.buildHttpRequest(requestBody); + asyncHttpClient.execute( + postRequest, + new ResponseCallback(timeSeriesCount, sampleCount, counters, requestResult)); + } + + @VisibleForTesting + static class ResponseCallback implements FutureCallback<SimpleHttpResponse> { + private final int timeSeriesCount; + private final long sampleCount; + private final Consumer<List<Types.TimeSeries>> requestResult; + private final SinkCounters counters; + + public ResponseCallback( + int timeSeriesCount, + long sampleCount, + SinkCounters counters, + Consumer<List<Types.TimeSeries>> requestResult) { + this.timeSeriesCount = timeSeriesCount; + this.sampleCount = sampleCount; + this.requestResult = requestResult; + this.counters = counters; + } + + @Override + public void completed(SimpleHttpResponse response) { + if (RemoteWriteResponseClassifier.isSuccessResponse(response)) { + LOG.debug( + "{},{} - successfully posted {} time-series, containing {} samples", + response.getCode(), + response.getReasonPhrase(), + timeSeriesCount, + sampleCount); + counters.inc(NUM_SAMPLES_OUT, sampleCount); + counters.inc(NUM_WRITE_REQUESTS_OUT); + } else { + counters.inc(NUM_SAMPLES_DROPPED, sampleCount); + counters.inc(NUM_WRITE_REQUESTS_PERMANENTLY_FAILED); + + String responseBody = response.getBodyText(); + if (RemoteWriteResponseClassifier.isNonRetriableErrorResponse(response)) { + LOG.warn( + "{},{} {} (discarded {} time-series, containing {} samples)", + response.getCode(), + response.getReasonPhrase(), + responseBody, + timeSeriesCount, + sampleCount); + counters.inc(NUM_SAMPLES_NON_RETRIABLE_DROPPED, sampleCount); + } else if (RemoteWriteResponseClassifier.isRetriableErrorResponse(response)) { + LOG.warn( + "{},{} {} (after retry limit reached, discarded {} time-series, containing {} samples)", + response.getCode(), + response.getReasonPhrase(), + responseBody, + timeSeriesCount, + sampleCount); + counters.inc(NUM_SAMPLES_RETRY_LIMIT_DROPPED, sampleCount); + } Review Comment: Should we add a default log here? In the event that `RemoteWriteResponseClassifier.isRetriableErrorResponse` and `RemoteWriteResponseClassifier.isNonRetriableErrorResponse` don't cover the entire set of responses? ########## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; + +import java.util.Collection; + +/** Sink implementation accepting {@link PrometheusTimeSeries} as inputs. */ +public class PrometheusSink extends AsyncSinkBase<PrometheusTimeSeries, Types.TimeSeries> { Review Comment: Let's use Flink's `@Public` annotation here ########## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkWriter.java: ########## @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseClassifier; +import org.apache.flink.connector.prometheus.sink.prometheus.Remote; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.io.CloseMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xerial.snappy.Snappy; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; + +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_NON_RETRIABLE_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_OUT; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_RETRY_LIMIT_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_WRITE_REQUESTS_OUT; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_WRITE_REQUESTS_PERMANENTLY_FAILED; + +/** Writer, taking care of batching the {@link PrometheusTimeSeries} and handling retries. */ +public class PrometheusSinkWriter extends AsyncSinkWriter<PrometheusTimeSeries, Types.TimeSeries> { + + /** + * * Batching of this sink is in terms of Samples, not bytes. The goal is adaptively increase + * the number of Samples in each batch, a WriteRequest sent to Prometheus, to a configurable + * number. This is the parameter maxBatchSizeInBytes. + * + * <p>getSizeInBytes(requestEntry) returns the number of Samples (not bytes) and + * maxBatchSizeInBytes is actually in terms of Samples (not bytes). + * + * <p>In AsyncSinkWriter, maxBatchSize is in terms of requestEntries (TimeSeries). But because + * each TimeSeries contains 1+ Samples, we set maxBatchSize = maxBatchSizeInBytes. + * + * <p>maxRecordSizeInBytes is also calculated in the same unit assumed by getSizeInBytes(..). In + * our case is the max number of Samples in a single TimeSeries sent to the Sink. We are + * limiting the number of Samples in each TimeSeries to the max batch size, setting + * maxRecordSizeInBytes = maxBatchSizeInBytes. + */ + private static final Logger LOG = LoggerFactory.getLogger(PrometheusSinkWriter.class); + + private final SinkCounters counters; + private final CloseableHttpAsyncClient asyncHttpClient; + private final PrometheusRemoteWriteHttpRequestBuilder requestBuilder; + + public PrometheusSinkWriter( + ElementConverter<PrometheusTimeSeries, Types.TimeSeries> elementConverter, + Sink.InitContext context, + int maxInFlightRequests, + int maxBufferedRequests, + int maxBatchSizeInSamples, + long maxTimeInBufferMS, + String prometheusRemoteWriteUrl, + CloseableHttpAsyncClient asyncHttpClient, + SinkCounters counters, + PrometheusRequestSigner requestSigner) { + this( + elementConverter, + context, + maxInFlightRequests, + maxBufferedRequests, + maxBatchSizeInSamples, + maxTimeInBufferMS, + prometheusRemoteWriteUrl, + asyncHttpClient, + counters, + requestSigner, + Collections.emptyList()); + } + + public PrometheusSinkWriter( + ElementConverter<PrometheusTimeSeries, Types.TimeSeries> elementConverter, + Sink.InitContext context, + int maxInFlightRequests, + int maxBufferedRequests, + int maxBatchSizeInSamples, + long maxTimeInBufferMS, + String prometheusRemoteWriteUrl, + CloseableHttpAsyncClient asyncHttpClient, + SinkCounters counters, + PrometheusRequestSigner requestSigner, + Collection<BufferedRequestState<Types.TimeSeries>> states) { + super( + elementConverter, + context, + maxBatchSizeInSamples, // maxBatchSize + maxInFlightRequests, + maxBufferedRequests, + maxBatchSizeInSamples, // maxBatchSizeInBytes + maxTimeInBufferMS, + maxBatchSizeInSamples, // maxRecordSizeInBytes + states); + this.requestBuilder = + new PrometheusRemoteWriteHttpRequestBuilder( + prometheusRemoteWriteUrl, requestSigner); + this.asyncHttpClient = asyncHttpClient; + this.counters = counters; + } + + /** + * This is the "size" of the request entry (a {@link Types.TimeSeries time-series}) used for + * batching. Regardless the name of the method, it returns the number of {@link Types.Sample + * samples} in the time-series (not bytes), to support batching in terms of samples. + * + * @param timeSeries the request entry for which we want to know the size + * @return number of samples in the time-series + */ + @Override + protected long getSizeInBytes(Types.TimeSeries timeSeries) { + return RequestEntrySizeUtils.requestSizeForBatching(timeSeries); + } + + @Override + protected void submitRequestEntries( + List<Types.TimeSeries> requestEntries, Consumer<List<Types.TimeSeries>> requestResult) { + int timeSeriesCount = requestEntries.size(); + long sampleCount = RequestEntrySizeUtils.countSamples(requestEntries); + if (LOG.isTraceEnabled()) { + LOG.trace( + "Writing {} time-series containing {} samples ", timeSeriesCount, sampleCount); + } + + Remote.WriteRequest writeRequest = buildWriteRequest(requestEntries); + byte[] requestBody; + try { + requestBody = compressWriteRequest(writeRequest); + } catch (IOException e) { + throw new RuntimeException("Exception compressing the request body", e); + } + + SimpleHttpRequest postRequest = requestBuilder.buildHttpRequest(requestBody); + asyncHttpClient.execute( + postRequest, + new ResponseCallback(timeSeriesCount, sampleCount, counters, requestResult)); + } + + @VisibleForTesting + static class ResponseCallback implements FutureCallback<SimpleHttpResponse> { + private final int timeSeriesCount; + private final long sampleCount; + private final Consumer<List<Types.TimeSeries>> requestResult; + private final SinkCounters counters; + + public ResponseCallback( + int timeSeriesCount, + long sampleCount, + SinkCounters counters, + Consumer<List<Types.TimeSeries>> requestResult) { + this.timeSeriesCount = timeSeriesCount; + this.sampleCount = sampleCount; + this.requestResult = requestResult; + this.counters = counters; + } + + @Override + public void completed(SimpleHttpResponse response) { + if (RemoteWriteResponseClassifier.isSuccessResponse(response)) { + LOG.debug( + "{},{} - successfully posted {} time-series, containing {} samples", + response.getCode(), + response.getReasonPhrase(), + timeSeriesCount, + sampleCount); + counters.inc(NUM_SAMPLES_OUT, sampleCount); + counters.inc(NUM_WRITE_REQUESTS_OUT); + } else { + counters.inc(NUM_SAMPLES_DROPPED, sampleCount); + counters.inc(NUM_WRITE_REQUESTS_PERMANENTLY_FAILED); + + String responseBody = response.getBodyText(); + if (RemoteWriteResponseClassifier.isNonRetriableErrorResponse(response)) { + LOG.warn( + "{},{} {} (discarded {} time-series, containing {} samples)", + response.getCode(), + response.getReasonPhrase(), + responseBody, + timeSeriesCount, + sampleCount); + counters.inc(NUM_SAMPLES_NON_RETRIABLE_DROPPED, sampleCount); + } else if (RemoteWriteResponseClassifier.isRetriableErrorResponse(response)) { + LOG.warn( Review Comment: After offline discussion, we agreed that maybe we can set the HttpCllient to a lower maxRetryCount (by default), and then if the maxRetry is exceeded, we can throw the Exception to restart the Flink job ########## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkWriter.java: ########## @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseClassifier; +import org.apache.flink.connector.prometheus.sink.prometheus.Remote; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.io.CloseMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xerial.snappy.Snappy; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; + +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_NON_RETRIABLE_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_OUT; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_RETRY_LIMIT_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_WRITE_REQUESTS_OUT; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_WRITE_REQUESTS_PERMANENTLY_FAILED; + +/** Writer, taking care of batching the {@link PrometheusTimeSeries} and handling retries. */ +public class PrometheusSinkWriter extends AsyncSinkWriter<PrometheusTimeSeries, Types.TimeSeries> { + + /** + * * Batching of this sink is in terms of Samples, not bytes. The goal is adaptively increase + * the number of Samples in each batch, a WriteRequest sent to Prometheus, to a configurable + * number. This is the parameter maxBatchSizeInBytes. + * + * <p>getSizeInBytes(requestEntry) returns the number of Samples (not bytes) and + * maxBatchSizeInBytes is actually in terms of Samples (not bytes). + * + * <p>In AsyncSinkWriter, maxBatchSize is in terms of requestEntries (TimeSeries). But because + * each TimeSeries contains 1+ Samples, we set maxBatchSize = maxBatchSizeInBytes. + * + * <p>maxRecordSizeInBytes is also calculated in the same unit assumed by getSizeInBytes(..). In + * our case is the max number of Samples in a single TimeSeries sent to the Sink. We are + * limiting the number of Samples in each TimeSeries to the max batch size, setting + * maxRecordSizeInBytes = maxBatchSizeInBytes. + */ + private static final Logger LOG = LoggerFactory.getLogger(PrometheusSinkWriter.class); + + private final SinkCounters counters; + private final CloseableHttpAsyncClient asyncHttpClient; + private final PrometheusRemoteWriteHttpRequestBuilder requestBuilder; + + public PrometheusSinkWriter( + ElementConverter<PrometheusTimeSeries, Types.TimeSeries> elementConverter, + Sink.InitContext context, + int maxInFlightRequests, + int maxBufferedRequests, + int maxBatchSizeInSamples, + long maxTimeInBufferMS, + String prometheusRemoteWriteUrl, + CloseableHttpAsyncClient asyncHttpClient, + SinkCounters counters, + PrometheusRequestSigner requestSigner) { + this( + elementConverter, + context, + maxInFlightRequests, + maxBufferedRequests, + maxBatchSizeInSamples, + maxTimeInBufferMS, + prometheusRemoteWriteUrl, + asyncHttpClient, + counters, + requestSigner, + Collections.emptyList()); + } + + public PrometheusSinkWriter( + ElementConverter<PrometheusTimeSeries, Types.TimeSeries> elementConverter, + Sink.InitContext context, + int maxInFlightRequests, + int maxBufferedRequests, + int maxBatchSizeInSamples, + long maxTimeInBufferMS, + String prometheusRemoteWriteUrl, + CloseableHttpAsyncClient asyncHttpClient, + SinkCounters counters, + PrometheusRequestSigner requestSigner, + Collection<BufferedRequestState<Types.TimeSeries>> states) { + super( + elementConverter, + context, + maxBatchSizeInSamples, // maxBatchSize + maxInFlightRequests, + maxBufferedRequests, + maxBatchSizeInSamples, // maxBatchSizeInBytes + maxTimeInBufferMS, + maxBatchSizeInSamples, // maxRecordSizeInBytes + states); + this.requestBuilder = + new PrometheusRemoteWriteHttpRequestBuilder( + prometheusRemoteWriteUrl, requestSigner); + this.asyncHttpClient = asyncHttpClient; + this.counters = counters; + } + + /** + * This is the "size" of the request entry (a {@link Types.TimeSeries time-series}) used for + * batching. Regardless the name of the method, it returns the number of {@link Types.Sample + * samples} in the time-series (not bytes), to support batching in terms of samples. + * + * @param timeSeries the request entry for which we want to know the size + * @return number of samples in the time-series + */ + @Override + protected long getSizeInBytes(Types.TimeSeries timeSeries) { + return RequestEntrySizeUtils.requestSizeForBatching(timeSeries); + } + + @Override + protected void submitRequestEntries( + List<Types.TimeSeries> requestEntries, Consumer<List<Types.TimeSeries>> requestResult) { + int timeSeriesCount = requestEntries.size(); + long sampleCount = RequestEntrySizeUtils.countSamples(requestEntries); + if (LOG.isTraceEnabled()) { + LOG.trace( + "Writing {} time-series containing {} samples ", timeSeriesCount, sampleCount); + } + + Remote.WriteRequest writeRequest = buildWriteRequest(requestEntries); + byte[] requestBody; + try { + requestBody = compressWriteRequest(writeRequest); + } catch (IOException e) { + throw new RuntimeException("Exception compressing the request body", e); + } + + SimpleHttpRequest postRequest = requestBuilder.buildHttpRequest(requestBody); + asyncHttpClient.execute( + postRequest, + new ResponseCallback(timeSeriesCount, sampleCount, counters, requestResult)); + } + + @VisibleForTesting + static class ResponseCallback implements FutureCallback<SimpleHttpResponse> { + private final int timeSeriesCount; + private final long sampleCount; + private final Consumer<List<Types.TimeSeries>> requestResult; + private final SinkCounters counters; + + public ResponseCallback( + int timeSeriesCount, + long sampleCount, + SinkCounters counters, + Consumer<List<Types.TimeSeries>> requestResult) { + this.timeSeriesCount = timeSeriesCount; + this.sampleCount = sampleCount; + this.requestResult = requestResult; + this.counters = counters; + } + + @Override + public void completed(SimpleHttpResponse response) { + if (RemoteWriteResponseClassifier.isSuccessResponse(response)) { + LOG.debug( + "{},{} - successfully posted {} time-series, containing {} samples", + response.getCode(), + response.getReasonPhrase(), + timeSeriesCount, + sampleCount); + counters.inc(NUM_SAMPLES_OUT, sampleCount); + counters.inc(NUM_WRITE_REQUESTS_OUT); + } else { + counters.inc(NUM_SAMPLES_DROPPED, sampleCount); + counters.inc(NUM_WRITE_REQUESTS_PERMANENTLY_FAILED); + + String responseBody = response.getBodyText(); + if (RemoteWriteResponseClassifier.isNonRetriableErrorResponse(response)) { + LOG.warn( + "{},{} {} (discarded {} time-series, containing {} samples)", + response.getCode(), + response.getReasonPhrase(), + responseBody, + timeSeriesCount, + sampleCount); + counters.inc(NUM_SAMPLES_NON_RETRIABLE_DROPPED, sampleCount); + } else if (RemoteWriteResponseClassifier.isRetriableErrorResponse(response)) { + LOG.warn( + "{},{} {} (after retry limit reached, discarded {} time-series, containing {} samples)", + response.getCode(), + response.getReasonPhrase(), + responseBody, + timeSeriesCount, + sampleCount); + counters.inc(NUM_SAMPLES_RETRY_LIMIT_DROPPED, sampleCount); + } + } + + // Never re-queue requests + requestResult.accept(Collections.emptyList()); + } + + @Override + public void failed(Exception ex) { + LOG.warn( + "Exception executing the remote-write (discarded {} time-series containing {} samples)", + timeSeriesCount, + sampleCount, + ex); + counters.inc(NUM_SAMPLES_DROPPED, sampleCount); + counters.inc(NUM_WRITE_REQUESTS_PERMANENTLY_FAILED); + } + + @Override + public void cancelled() { + LOG.warn( + "Write request execution cancelled (discarded {} time-series containing {} samples)", + timeSeriesCount, + sampleCount); + counters.inc(NUM_SAMPLES_DROPPED, sampleCount); + counters.inc(NUM_WRITE_REQUESTS_PERMANENTLY_FAILED); + } Review Comment: I wonder if we should also propagate the exception upwards here rather than swallowing it. For example, this might be triggered on interrupt / non-graceful shutdown of the async Http client ########## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Pojo used as sink input, containing a single TimeSeries: a list of Labels and a list of Samples. + * + * <p>metricName is mapped in Prometheus to the value of the mandatory label named '__name__' + * labels. The other labels, as key/value, are appended after the '__name__' label. + */ +public class PrometheusTimeSeries implements Serializable { + /** A single Label. */ + public static class Label implements Serializable { + private final String name; + private final String value; + + public Label(String name, String value) { + this.name = name; + this.value = value; + } + + public String getName() { + return name; + } + + public String getValue() { + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Label label = (Label) o; + return new EqualsBuilder() + .append(name, label.name) + .append(value, label.value) + .isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37).append(name).append(value).toHashCode(); + } + } + + /** A single Sample. */ + public static class Sample implements Serializable { Review Comment: we should override equals and hashcode here too ########## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkWriter.java: ########## @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseClassifier; +import org.apache.flink.connector.prometheus.sink.prometheus.Remote; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.io.CloseMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xerial.snappy.Snappy; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; + +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_NON_RETRIABLE_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_OUT; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_SAMPLES_RETRY_LIMIT_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_WRITE_REQUESTS_OUT; +import static org.apache.flink.connector.prometheus.sink.SinkCounters.SinkCounter.NUM_WRITE_REQUESTS_PERMANENTLY_FAILED; + +/** Writer, taking care of batching the {@link PrometheusTimeSeries} and handling retries. */ +public class PrometheusSinkWriter extends AsyncSinkWriter<PrometheusTimeSeries, Types.TimeSeries> { + + /** + * * Batching of this sink is in terms of Samples, not bytes. The goal is adaptively increase + * the number of Samples in each batch, a WriteRequest sent to Prometheus, to a configurable + * number. This is the parameter maxBatchSizeInBytes. + * + * <p>getSizeInBytes(requestEntry) returns the number of Samples (not bytes) and + * maxBatchSizeInBytes is actually in terms of Samples (not bytes). + * + * <p>In AsyncSinkWriter, maxBatchSize is in terms of requestEntries (TimeSeries). But because + * each TimeSeries contains 1+ Samples, we set maxBatchSize = maxBatchSizeInBytes. + * + * <p>maxRecordSizeInBytes is also calculated in the same unit assumed by getSizeInBytes(..). In + * our case is the max number of Samples in a single TimeSeries sent to the Sink. We are + * limiting the number of Samples in each TimeSeries to the max batch size, setting + * maxRecordSizeInBytes = maxBatchSizeInBytes. + */ + private static final Logger LOG = LoggerFactory.getLogger(PrometheusSinkWriter.class); + + private final SinkCounters counters; + private final CloseableHttpAsyncClient asyncHttpClient; + private final PrometheusRemoteWriteHttpRequestBuilder requestBuilder; + + public PrometheusSinkWriter( + ElementConverter<PrometheusTimeSeries, Types.TimeSeries> elementConverter, + Sink.InitContext context, + int maxInFlightRequests, + int maxBufferedRequests, + int maxBatchSizeInSamples, + long maxTimeInBufferMS, + String prometheusRemoteWriteUrl, + CloseableHttpAsyncClient asyncHttpClient, + SinkCounters counters, + PrometheusRequestSigner requestSigner) { + this( + elementConverter, + context, + maxInFlightRequests, + maxBufferedRequests, + maxBatchSizeInSamples, + maxTimeInBufferMS, + prometheusRemoteWriteUrl, + asyncHttpClient, + counters, + requestSigner, + Collections.emptyList()); + } + + public PrometheusSinkWriter( + ElementConverter<PrometheusTimeSeries, Types.TimeSeries> elementConverter, + Sink.InitContext context, + int maxInFlightRequests, + int maxBufferedRequests, + int maxBatchSizeInSamples, + long maxTimeInBufferMS, + String prometheusRemoteWriteUrl, + CloseableHttpAsyncClient asyncHttpClient, + SinkCounters counters, + PrometheusRequestSigner requestSigner, + Collection<BufferedRequestState<Types.TimeSeries>> states) { + super( + elementConverter, + context, + maxBatchSizeInSamples, // maxBatchSize + maxInFlightRequests, + maxBufferedRequests, + maxBatchSizeInSamples, // maxBatchSizeInBytes + maxTimeInBufferMS, + maxBatchSizeInSamples, // maxRecordSizeInBytes + states); + this.requestBuilder = + new PrometheusRemoteWriteHttpRequestBuilder( + prometheusRemoteWriteUrl, requestSigner); + this.asyncHttpClient = asyncHttpClient; + this.counters = counters; + } + + /** + * This is the "size" of the request entry (a {@link Types.TimeSeries time-series}) used for + * batching. Regardless the name of the method, it returns the number of {@link Types.Sample + * samples} in the time-series (not bytes), to support batching in terms of samples. + * + * @param timeSeries the request entry for which we want to know the size + * @return number of samples in the time-series + */ + @Override + protected long getSizeInBytes(Types.TimeSeries timeSeries) { + return RequestEntrySizeUtils.requestSizeForBatching(timeSeries); + } + + @Override + protected void submitRequestEntries( + List<Types.TimeSeries> requestEntries, Consumer<List<Types.TimeSeries>> requestResult) { + int timeSeriesCount = requestEntries.size(); + long sampleCount = RequestEntrySizeUtils.countSamples(requestEntries); + if (LOG.isTraceEnabled()) { + LOG.trace( + "Writing {} time-series containing {} samples ", timeSeriesCount, sampleCount); + } + + Remote.WriteRequest writeRequest = buildWriteRequest(requestEntries); + byte[] requestBody; + try { + requestBody = compressWriteRequest(writeRequest); + } catch (IOException e) { + throw new RuntimeException("Exception compressing the request body", e); + } + + SimpleHttpRequest postRequest = requestBuilder.buildHttpRequest(requestBody); + asyncHttpClient.execute( + postRequest, + new ResponseCallback(timeSeriesCount, sampleCount, counters, requestResult)); + } + + @VisibleForTesting + static class ResponseCallback implements FutureCallback<SimpleHttpResponse> { + private final int timeSeriesCount; + private final long sampleCount; + private final Consumer<List<Types.TimeSeries>> requestResult; + private final SinkCounters counters; + + public ResponseCallback( + int timeSeriesCount, + long sampleCount, + SinkCounters counters, + Consumer<List<Types.TimeSeries>> requestResult) { + this.timeSeriesCount = timeSeriesCount; + this.sampleCount = sampleCount; + this.requestResult = requestResult; + this.counters = counters; + } + + @Override + public void completed(SimpleHttpResponse response) { + if (RemoteWriteResponseClassifier.isSuccessResponse(response)) { + LOG.debug( + "{},{} - successfully posted {} time-series, containing {} samples", + response.getCode(), + response.getReasonPhrase(), + timeSeriesCount, + sampleCount); + counters.inc(NUM_SAMPLES_OUT, sampleCount); + counters.inc(NUM_WRITE_REQUESTS_OUT); + } else { + counters.inc(NUM_SAMPLES_DROPPED, sampleCount); + counters.inc(NUM_WRITE_REQUESTS_PERMANENTLY_FAILED); + + String responseBody = response.getBodyText(); + if (RemoteWriteResponseClassifier.isNonRetriableErrorResponse(response)) { + LOG.warn( + "{},{} {} (discarded {} time-series, containing {} samples)", + response.getCode(), + response.getReasonPhrase(), + responseBody, + timeSeriesCount, + sampleCount); + counters.inc(NUM_SAMPLES_NON_RETRIABLE_DROPPED, sampleCount); + } else if (RemoteWriteResponseClassifier.isRetriableErrorResponse(response)) { + LOG.warn( + "{},{} {} (after retry limit reached, discarded {} time-series, containing {} samples)", + response.getCode(), + response.getReasonPhrase(), + responseBody, + timeSeriesCount, + sampleCount); + counters.inc(NUM_SAMPLES_RETRY_LIMIT_DROPPED, sampleCount); + } + } + + // Never re-queue requests + requestResult.accept(Collections.emptyList()); + } + + @Override + public void failed(Exception ex) { + LOG.warn( + "Exception executing the remote-write (discarded {} time-series containing {} samples)", + timeSeriesCount, + sampleCount, + ex); + counters.inc(NUM_SAMPLES_DROPPED, sampleCount); + counters.inc(NUM_WRITE_REQUESTS_PERMANENTLY_FAILED); + } Review Comment: Should we retry these failures / Propagate the exception up to transition the job to failed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
