This is an automated email from the ASF dual-hosted git repository.
hong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-prometheus.git
The following commit(s) were added to refs/heads/main by this push:
new 67da7df [FLINK-36404][Connectors/Prometheus] Improve exception
classification and handling (#9)
67da7df is described below
commit 67da7dfd715fd6c7cf9fda38d5a8c942e1c9fa38
Author: Lorenzo Nicora <[email protected]>
AuthorDate: Mon Oct 7 09:44:40 2024 +0100
[FLINK-36404][Connectors/Prometheus] Improve exception classification and
handling (#9)
* Fix exceptions not causing job to fail
* Always handle 403 and 404 responses as fatal errors
* Httpclient exceptions always fatal. Force httpclient to fail when a
exception is thrown by the callback Upgrade httpclient version. Fix minor
missing parameter validations.
* Refactored response classification, using an enum, and added comments to
clarify the callback behaviour
---
flink-connector-prometheus/README.md | 13 +-
flink-connector-prometheus/pom.xml | 3 -
.../prometheus/sink/HttpResponseCallback.java | 206 ++++++++++++---------
.../connector/prometheus/sink/PrometheusSink.java | 2 +-
.../prometheus/sink/PrometheusSinkBuilder.java | 3 +-
.../sink/PrometheusSinkConfiguration.java | 27 +--
.../http/PrometheusAsyncHttpClientBuilder.java | 1 +
.../sink/http/RemoteWriteResponseClassifier.java | 38 ++--
.../sink/http/RemoteWriteResponseType.java | 41 ++++
.../sink/http/RemoteWriteRetryStrategy.java | 7 +-
.../sink/http/RethrowingIOSessionListener.java | 69 +++++++
.../prometheus/sink/HttpResponseCallbackTest.java | 120 +++---------
.../sink/HttpResponseHandlingBehaviourIT.java | 97 +++++++---
.../prometheus/sink/PrometheusSinkBuilderTest.java | 3 -
...iterErrorHandlingBehaviorConfigurationTest.java | 7 -
.../sink/examples/DataStreamExample.java | 10 +-
.../http/RemoteWriteResponseClassifierTest.java | 79 ++++++++
.../sink/http/RethrowingIOSessionListenerTest.java | 45 +++++
pom.xml | 7 +-
19 files changed, 505 insertions(+), 273 deletions(-)
diff --git a/flink-connector-prometheus/README.md
b/flink-connector-prometheus/README.md
index 07f2308..b93cbce 100644
--- a/flink-connector-prometheus/README.md
+++ b/flink-connector-prometheus/README.md
@@ -130,7 +130,6 @@ PrometheusSink sink = PrometheusSink.builder()
.onPrometheusNonRetriableError(OnErrorBehavior.DISCARD_AND_CONTINUE)
// Default is FAIL for other error types
.onMaxRetryExceeded(OnErrorBehavior.FAIL)
- .onHttpClientIOFail(OnErrorBehavior.FAIL)
.build())
.setMetricGroupName("Prometheus") // Customizable
metric-group suffix (default: "Prometheus")
.build();
@@ -182,11 +181,10 @@ The possible behaviors are:
* `FAIL`: throw a `PrometheusSinkWriteException`, causing the job to fail.
* `DISCARD_AND_CONTINUE`: log the reason of the error, discard the offending
request, and continue.
-There are 3 error conditions:
+There are two error conditions:
1. Prometheus returns a non-retriable error response (i.e. any `4xx` status
code except `429`). Default: `DISCARD_AND_CONTINUE`.
2. Prometheus returns a retriable error response (i.e. `5xx` or `429`) but the
max retry limit is exceeded. Default: `FAIL`.
-3. The http client fails to complete the request, for an I/O error. Default:
`FAIL`.
The error handling behaviors can be configured when creating the instance of
the sink, as shown in this snipped:
@@ -196,7 +194,6 @@ PrometheusSink sink = PrometheusSink.builder()
.setErrorHandlingBehaviourConfiguration(SinkWriterErrorHandlingBehaviorConfiguration.builder()
.onPrometheusNonRetriableError(OnErrorBehavior.DISCARD_AND_CONTINUE)
.onMaxRetryExceeded(OnErrorBehavior.DISCARD_AND_CONTINUE)
- .onHttpClientIOFail(OnErrorBehavior.DISCARD_AND_CONTINUE)
.build())
.build();
```
@@ -218,6 +215,14 @@ Prometheus does not return sufficient information to
automatically handle partia
> restart from checkpoint. The reason is that restarting from checkpoint cause
> some duplicates, that are rejected by
> Prometheus as out of order, causing in turn another non-retriable error, in
> an endless loop.
+
+### Fatal errors
+
+Remote-write endpoint responses 403 (Forbidden) and 404 (Not found) are always
considered fatal, regardless the error
+handling configuration.
+
+Any I/O error during the communication with the endpoint is also fatal.
+
### Metrics
The sink exposes custom metrics, counting the samples and write-requests
(batches) successfully written or discarded.
diff --git a/flink-connector-prometheus/pom.xml
b/flink-connector-prometheus/pom.xml
index 6ea2403..84fa4ab 100644
--- a/flink-connector-prometheus/pom.xml
+++ b/flink-connector-prometheus/pom.xml
@@ -65,19 +65,16 @@ under the License.
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
- <version>1.1.10.4</version>
</dependency>
<!-- http client -->
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
- <version>${apache.httpclient.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.core5</groupId>
<artifactId>httpcore5</artifactId>
- <version>${apache.httpclient.version}</version>
</dependency>
<!-- test -->
diff --git
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallback.java
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallback.java
index 3d96a25..0bfda7c 100644
---
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallback.java
+++
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallback.java
@@ -18,8 +18,9 @@
package org.apache.flink.connector.prometheus.sink;
import org.apache.flink.annotation.Internal;
+import
org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.OnErrorBehavior;
import
org.apache.flink.connector.prometheus.sink.errorhandling.PrometheusSinkWriteException;
-import
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseClassifier;
+import org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType;
import org.apache.flink.connector.prometheus.sink.metrics.SinkMetricsCallback;
import org.apache.flink.connector.prometheus.sink.prometheus.Types;
@@ -32,6 +33,12 @@ import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
+import static
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseClassifier.classify;
+import static
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.FATAL_ERROR;
+import static
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.NON_RETRIABLE_ERROR;
+import static
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.RETRIABLE_ERROR;
+import static
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.SUCCESS;
+
/**
* Callback handling the outcome of the http async request.
*
@@ -50,6 +57,11 @@ import java.util.function.Consumer;
* called with an outcome of *completed* either when the request has succeeded
or the max retry
* limit has been exceeded. It is responsibility of the callback
distinguishing between these
* conditions.
+ *
+ * <p>Also, when an exception is thrown after the request is *completed*, for
the http client point
+ * of view (i.e. in the {@link #completed(SimpleHttpResponse)} callback
method), it does not
+ * directly cause the writer to fail until it is intercepted further up the
client stack, by the
+ * {@link
org.apache.flink.connector.prometheus.sink.http.RethrowingIOSessionListener}.
*/
@Internal
class HttpResponseCallback implements FutureCallback<SimpleHttpResponse> {
@@ -77,114 +89,126 @@ class HttpResponseCallback implements
FutureCallback<SimpleHttpResponse> {
}
/**
- * The completed outcome is invoked every time the http client
successfully receives a valid
- * http response, regardless of the status code.
+ * The completed outcome is invoked when the http client successfully
receives a response,
+ * regardless of the status code.
*
- * <p>This method classifies the responses and implements the behaviour
expected by the
- * Remote-Write specifications. In case of error, the behaviour is
determined by the error
- * handling configuration.
+ * <p>This method classifies the responses using {@link
+ *
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseClassifier}
and implements
+ * the behaviour expected by the Remote-Write specifications. If the
response is classified as
+ * an error, the behaviour is determined by the error handling
configuration.
*/
@Override
public void completed(SimpleHttpResponse response) {
// Never re-queue requests
reQueuedResult.accept(Collections.emptyList());
- // Success
- if (RemoteWriteResponseClassifier.isSuccessResponse(response)) {
- LOG.debug(
- "{},{} - successfully posted {} time-series, containing {}
samples",
- response.getCode(),
- response.getReasonPhrase(),
- timeSeriesCount,
- sampleCount);
- metricsCallback.onSuccessfulWriteRequest(sampleCount);
- return;
- }
-
- String responseBody = response.getBodyText();
- int statusCode = response.getCode();
- String reasonPhrase = response.getReasonPhrase();
-
- // Prometheus's response is a non-retriable error.
- // Depending on the configured behaviour, log and discard or throw an
exception
- if
(RemoteWriteResponseClassifier.isNonRetriableErrorResponse(response)) {
- if (errorHandlingBehaviorConfig.getOnPrometheusNonRetriableError()
- == PrometheusSinkConfiguration.OnErrorBehavior.FAIL) {
- throw new PrometheusSinkWriteException(
- "Non-retriable error response from Prometheus",
- statusCode,
- reasonPhrase,
+ RemoteWriteResponseType responseType = classify(response);
+ switch (responseType) {
+ case SUCCESS: // Data successfully written
+ // Increment successful writes counts
+ metricsCallback.onSuccessfulWriteRequest(sampleCount);
+ LOG.debug(
+ "{},{} - successfully posted {} time-series,
containing {} samples",
+ response.getCode(),
+ response.getReasonPhrase(),
timeSeriesCount,
- sampleCount,
- responseBody);
- }
-
- LOG.warn(
- "{},{} {} (discarded {} time-series, containing {}
samples)",
- statusCode,
- reasonPhrase,
- responseBody,
- timeSeriesCount,
- sampleCount);
-
metricsCallback.onFailedWriteRequestForNonRetriableError(sampleCount);
- return;
- }
-
- // Retry limit exceeded on retriable error
- // Depending on the configured behaviour, log and discard or throw an
exception
- if (RemoteWriteResponseClassifier.isRetriableErrorResponse(response)) {
- if (errorHandlingBehaviorConfig.getOnMaxRetryExceeded()
- == PrometheusSinkConfiguration.OnErrorBehavior.FAIL) {
- throw new PrometheusSinkWriteException(
- "Max retry limit exceeded on retriable error",
- statusCode,
- reasonPhrase,
+ sampleCount);
+ break;
+
+ case FATAL_ERROR: // Response is a fatal error
+ // Throw an exception regardless of configured behaviour
+ logErrorAndThrow(
+ new PrometheusSinkWriteException(
+ "Fatal error response from Prometheus",
+ response.getCode(),
+ response.getReasonPhrase(),
+ timeSeriesCount,
+ sampleCount,
+ response.getBodyText()));
+ break;
+
+ case NON_RETRIABLE_ERROR: // Response is a non-retriable error.
+ // If behavior is FAIL, throw an exception
+ if
(errorHandlingBehaviorConfig.getOnPrometheusNonRetriableError()
+ == OnErrorBehavior.FAIL) {
+ logErrorAndThrow(
+ new PrometheusSinkWriteException(
+ "Non-retriable error response from
Prometheus",
+ response.getCode(),
+ response.getReasonPhrase(),
+ timeSeriesCount,
+ sampleCount,
+ response.getBodyText()));
+ }
+
+ // Otherwise (DISCARD_AND_CONTINUE), increment discarded data
counts & log WARN
+
metricsCallback.onFailedWriteRequestForNonRetriableError(sampleCount);
+ LOG.warn(
+ "{},{} {} (discarded {} time-series, containing {}
samples)",
+ response.getCode(),
+ response.getReasonPhrase(),
+ response.getBodyText(),
+ timeSeriesCount,
+ sampleCount);
+ break;
+
+ case RETRIABLE_ERROR: // Retry limit exceeded on retriable error
+ // If behavior is FAIL, throw an exception
+ if (errorHandlingBehaviorConfig.getOnMaxRetryExceeded() ==
OnErrorBehavior.FAIL) {
+ logErrorAndThrow(
+ new PrometheusSinkWriteException(
+ "Max retry limit exceeded on retriable
error",
+ response.getCode(),
+ response.getReasonPhrase(),
+ timeSeriesCount,
+ sampleCount,
+ response.getBodyText()));
+ }
+
+ // Otherwise (DISCARD_AND_CONTINUE), increment discarded data
counts & log WARN
+
metricsCallback.onFailedWriteRequestForRetryLimitExceeded(sampleCount);
+ LOG.warn(
+ "{},{} {} (after retry limit reached, discarded {}
time-series, containing {} samples)",
+ response.getCode(),
+ response.getReasonPhrase(),
+ response.getBodyText(),
timeSeriesCount,
- sampleCount,
- responseBody);
- }
-
- LOG.warn(
- "{},{} {} (after retry limit reached, discarded {}
time-series, containing {} samples)",
- statusCode,
- reasonPhrase,
- responseBody,
- timeSeriesCount,
- sampleCount);
-
metricsCallback.onFailedWriteRequestForRetryLimitExceeded(sampleCount);
- return;
+ sampleCount);
+ break;
+
+ default: // Unexpected/unhandled response outcome
+ // Always fail
+ logErrorAndThrow(
+ new PrometheusSinkWriteException(
+ "Unexpected status code returned from the
remote-write endpoint",
+ response.getCode(),
+ response.getReasonPhrase(),
+ timeSeriesCount,
+ sampleCount,
+ response.getBodyText()));
}
-
- // Unexpected/unhandled response outcome
- throw new PrometheusSinkWriteException(
- "Unexpected status code returned from the remote-write
endpoint",
- statusCode,
- reasonPhrase,
- timeSeriesCount,
- sampleCount,
- responseBody);
}
+ /**
+ * Exception reported by the http client (e.g. I/O failure). Always throw
an exception.
+ *
+ * @param ex exception reported by the http client
+ */
@Override
public void failed(Exception ex) {
- // General I/O failure reported by http client
- // Depending on the configured behavior, throw an exception or log and
discard
- if (errorHandlingBehaviorConfig.getOnHttpClientIOFail()
- == PrometheusSinkConfiguration.OnErrorBehavior.FAIL) {
- throw new PrometheusSinkWriteException("Http client failure", ex);
- } else {
- LOG.warn(
- "Exception executing the remote-write (discarded {}
time-series containing {} samples)",
- timeSeriesCount,
- sampleCount,
- ex);
-
metricsCallback.onFailedWriteRequestForHttpClientIoFail(sampleCount);
- }
+ throw new PrometheusSinkWriteException("Http client failure", ex);
}
+ /** The async http client was cancelled. Always throw an exception. */
@Override
public void cancelled() {
- // When the async http client is cancelled, the sink always throws an
exception
+ // When the async http client is cancelled, the sink should always
throw an exception
throw new PrometheusSinkWriteException("Write request execution
cancelled");
}
+
+ /** Log the exception at ERROR and rethrow. */
+ private void logErrorAndThrow(PrometheusSinkWriteException ex) {
+ LOG.error("Error condition detected by the http response callback (on
complete)", ex);
+ throw ex;
+ }
}
diff --git
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java
index 7a7738f..78edd7c 100644
---
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java
+++
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java
@@ -80,7 +80,7 @@ public class PrometheusSink extends
AsyncSinkBase<PrometheusTimeSeries, Types.Ti
maxBatchSizeInSamples > 1, "Max batch size (in samples) must
be positive");
Preconditions.checkArgument(
maxRecordSizeInSamples <= maxBatchSizeInSamples,
- "Maz record size (in samples) must be <= Max batch size");
+ "Max record size (in samples) must be <= Max batch size");
Preconditions.checkArgument(maxInFlightRequests == 1, "Max in-flight
requests must be 1");
Preconditions.checkArgument(
StringUtils.isNotBlank(prometheusRemoteWriteUrl),
diff --git
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java
index 02b1e77..438855d 100644
---
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java
+++
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java
@@ -102,7 +102,7 @@ public class PrometheusSinkBuilder
+
"\n\t\tmaxTimeInBufferMs={}\n\t\tmaxInFlightRequests={}\n\t\tmaxBufferedRequests={}"
+ "\n\t\tRetryConfiguration: initialRetryDelayMs={},
maxRetryDelayMs={}, maxRetryCount={}"
+ "\n\t\tsocketTimeoutMs={}\n\t\thttpUserAgent={}"
- + "\n\t\tErrorHandlingBehaviour:
onMaxRetryExceeded={}, onHttpClientIOFailure={}, onNonRetriableError={}",
+ + "\n\t\tErrorHandlingBehaviour:
onMaxRetryExceeded={}, onNonRetriableError={}",
actualMaxBatchSizeInSamples,
actualMaxRecordSizeInSamples,
actualMaxTimeInBufferMS,
@@ -114,7 +114,6 @@ public class PrometheusSinkBuilder
socketTimeoutMs,
actualHttpUserAgent,
actualErrorHandlingBehaviorConfig.getOnMaxRetryExceeded(),
- actualErrorHandlingBehaviorConfig.getOnHttpClientIOFail(),
actualErrorHandlingBehaviorConfig.getOnPrometheusNonRetriableError());
return new PrometheusSink(
diff --git
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkConfiguration.java
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkConfiguration.java
index 2c7dd34..fb7a17c 100644
---
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkConfiguration.java
+++
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkConfiguration.java
@@ -46,23 +46,17 @@ public class PrometheusSinkConfiguration {
public static class SinkWriterErrorHandlingBehaviorConfiguration
implements Serializable {
public static final OnErrorBehavior
ON_MAX_RETRY_EXCEEDED_DEFAULT_BEHAVIOR = FAIL;
- public static final OnErrorBehavior
ON_HTTP_CLIENT_IO_FAIL_DEFAULT_BEHAVIOR = FAIL;
public static final OnErrorBehavior
ON_PROMETHEUS_NON_RETRIABLE_ERROR_DEFAULT_BEHAVIOR =
DISCARD_AND_CONTINUE;
/** Behaviour when the max retries is exceeded on Prometheus retriable
errors. */
private final OnErrorBehavior onMaxRetryExceeded;
- /** Behaviour when the HTTP client fails, for an I/O problem. */
- private final OnErrorBehavior onHttpClientIOFail;
-
/** Behaviour when Prometheus Remote-Write respond with a
non-retriable error. */
private final OnErrorBehavior onPrometheusNonRetriableError;
public SinkWriterErrorHandlingBehaviorConfiguration(
- OnErrorBehavior onMaxRetryExceeded,
- OnErrorBehavior onHttpClientIOFail,
- OnErrorBehavior onPrometheusNonRetriableError) {
+ OnErrorBehavior onMaxRetryExceeded, OnErrorBehavior
onPrometheusNonRetriableError) {
// onPrometheusNonRetriableError cannot be set to FAIL, because it
makes impossible for
// the job to restart from checkpoint (see FLINK-36319).
// We are retaining the possibility of configuring the behavior on
this type of error to
@@ -71,7 +65,6 @@ public class PrometheusSinkConfiguration {
onPrometheusNonRetriableError == DISCARD_AND_CONTINUE,
"Only DISCARD_AND_CONTINUE is currently supported for
onPrometheusNonRetriableError");
this.onMaxRetryExceeded = onMaxRetryExceeded;
- this.onHttpClientIOFail = onHttpClientIOFail;
this.onPrometheusNonRetriableError = onPrometheusNonRetriableError;
}
@@ -79,10 +72,6 @@ public class PrometheusSinkConfiguration {
return onMaxRetryExceeded;
}
- public OnErrorBehavior getOnHttpClientIOFail() {
- return onHttpClientIOFail;
- }
-
public OnErrorBehavior getOnPrometheusNonRetriableError() {
return onPrometheusNonRetriableError;
}
@@ -90,7 +79,6 @@ public class PrometheusSinkConfiguration {
/** Builder for PrometheusSinkWriterErrorHandlingConfiguration. */
public static class Builder {
private OnErrorBehavior onMaxRetryExceeded = null;
- private OnErrorBehavior onHttpClientIOFail = null;
private OnErrorBehavior onPrometheusNonRetriableError = null;
public Builder() {}
@@ -100,11 +88,6 @@ public class PrometheusSinkConfiguration {
return this;
}
- public Builder onHttpClientIOFail(OnErrorBehavior onErrorBehavior)
{
- this.onHttpClientIOFail = onErrorBehavior;
- return this;
- }
-
public Builder onPrometheusNonRetriableError(OnErrorBehavior
onErrorBehavior) {
this.onPrometheusNonRetriableError = onErrorBehavior;
return this;
@@ -114,8 +97,6 @@ public class PrometheusSinkConfiguration {
return new SinkWriterErrorHandlingBehaviorConfiguration(
Optional.ofNullable(onMaxRetryExceeded)
.orElse(ON_MAX_RETRY_EXCEEDED_DEFAULT_BEHAVIOR),
- Optional.ofNullable(onHttpClientIOFail)
-
.orElse(ON_HTTP_CLIENT_IO_FAIL_DEFAULT_BEHAVIOR),
Optional.ofNullable(onPrometheusNonRetriableError)
.orElse(ON_PROMETHEUS_NON_RETRIABLE_ERROR_DEFAULT_BEHAVIOR));
}
@@ -153,6 +134,12 @@ public class PrometheusSinkConfiguration {
public RetryConfiguration(
long initialRetryDelayMS, long maxRetryDelayMS, int
maxRetryCount) {
+ Preconditions.checkArgument(initialRetryDelayMS > 0, "Initial
retry delay must be > 0");
+ Preconditions.checkArgument(maxRetryDelayMS > 0, "Max retry delay
must be > 0");
+ Preconditions.checkArgument(
+ maxRetryDelayMS >= initialRetryDelayMS,
+ "Max retry delay must be >= Initial retry delay");
+ Preconditions.checkArgument(maxRetryCount > 0, "Max retry count
must be > 0");
this.initialRetryDelayMS = initialRetryDelayMS;
this.maxRetryDelayMS = maxRetryDelayMS;
this.maxRetryCount = maxRetryCount;
diff --git
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/PrometheusAsyncHttpClientBuilder.java
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/PrometheusAsyncHttpClientBuilder.java
index 29c504e..4e7f68b 100644
---
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/PrometheusAsyncHttpClientBuilder.java
+++
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/PrometheusAsyncHttpClientBuilder.java
@@ -88,6 +88,7 @@ public class PrometheusAsyncHttpClientBuilder implements
Serializable {
HttpAsyncClients.custom()
.setConnectionManager(connectionManager)
.setIOReactorConfig(ioReactorConfig)
+ .setIOSessionListener(new
RethrowingIOSessionListener())
.setRetryStrategy(
new
RemoteWriteRetryStrategy(retryConfiguration, metricsCallback))
.build();
diff --git
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteResponseClassifier.java
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteResponseClassifier.java
index 2cc038a..895a888 100644
---
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteResponseClassifier.java
+++
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteResponseClassifier.java
@@ -21,21 +21,37 @@ import org.apache.flink.annotation.Internal;
import org.apache.hc.core5.http.HttpResponse;
+import static
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.FATAL_ERROR;
+import static
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.NON_RETRIABLE_ERROR;
+import static
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.RETRIABLE_ERROR;
+import static
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.SUCCESS;
+import static
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.UNHANDLED;
+
/** Classify http responses based on the status code. */
@Internal
public class RemoteWriteResponseClassifier {
- public static boolean isRetriableErrorResponse(HttpResponse response) {
- int statusCode = response.getCode();
- return statusCode >= 500 | statusCode == 429;
- }
-
- public static boolean isNonRetriableErrorResponse(HttpResponse response) {
- int statusCode = response.getCode();
- return (statusCode >= 400 && statusCode < 500) && statusCode != 429;
- }
- public static boolean isSuccessResponse(HttpResponse response) {
+ public static RemoteWriteResponseType classify(HttpResponse response) {
int statusCode = response.getCode();
- return (statusCode >= 200 && statusCode < 300);
+ if (statusCode >= 200 && statusCode < 300) {
+ // 2xx: success
+ return SUCCESS;
+ } else if (statusCode == 429) {
+ // 429, Too Many Requests: throttling
+ return RETRIABLE_ERROR;
+ } else if (statusCode == 403 || statusCode == 404) {
+ // 403, Forbidden: authentication error
+ // 404, Not Found: wrong endpoint URL path
+ return FATAL_ERROR;
+ } else if (statusCode >= 400 && statusCode < 500) {
+ // 4xx (except 403, 404, 429): wrong request/bad data
+ return NON_RETRIABLE_ERROR;
+ } else if (statusCode >= 500) {
+ // 5xx: internal errors, recoverable
+ return RETRIABLE_ERROR;
+ } else {
+ // Other status code are unhandled
+ return UNHANDLED;
+ }
}
}
diff --git
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteResponseType.java
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteResponseType.java
new file mode 100644
index 0000000..810d649
--- /dev/null
+++
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteResponseType.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink.http;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Type of response, from Remote-Write endpoint, classified by {@link
+ * RemoteWriteResponseClassifier}.
+ */
+@Internal
+public enum RemoteWriteResponseType {
+ /** The Write-Request was successfully accepted. */
+ SUCCESS,
+ /** Write-Request temporarily rejected. The request can be retried. */
+ RETRIABLE_ERROR,
+ /**
+ * Write-Request permanently rejected. It cannot be retried. The error
condition is recoverable,
+ * after discarding the offending request.
+ */
+ NON_RETRIABLE_ERROR,
+ /** Unrecoverable error condition. */
+ FATAL_ERROR,
+ /** Unhandled status code. */
+ UNHANDLED;
+}
diff --git
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteRetryStrategy.java
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteRetryStrategy.java
index 1b8fdc5..2a4b1a0 100644
---
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteRetryStrategy.java
+++
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteRetryStrategy.java
@@ -40,6 +40,9 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import static
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseClassifier.classify;
+import static
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.RETRIABLE_ERROR;
+
/**
* Retry strategy for the http client.
*
@@ -103,9 +106,7 @@ public class RemoteWriteRetryStrategy implements
HttpRequestRetryStrategy {
@Override
public boolean retryRequest(HttpResponse httpResponse, int execCount,
HttpContext httpContext) {
- boolean retry =
- (execCount <= maxRetryCount)
- &&
RemoteWriteResponseClassifier.isRetriableErrorResponse(httpResponse);
+ boolean retry = (execCount <= maxRetryCount) &&
(classify(httpResponse) == RETRIABLE_ERROR);
LOG.debug(
"{} retry on response {} {}, at execution {}",
(retry) ? "DO" : "DO NOT",
diff --git
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RethrowingIOSessionListener.java
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RethrowingIOSessionListener.java
new file mode 100644
index 0000000..e983712
--- /dev/null
+++
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RethrowingIOSessionListener.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink.http;
+
+import org.apache.flink.annotation.Internal;
+import
org.apache.flink.connector.prometheus.sink.errorhandling.PrometheusSinkWriteException;
+
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.reactor.IOSessionListener;
+
+/**
+ * Selectively rethrow PrometheusSinkWriteException, causing the httpclient to
fail. Otherwise, the
+ * exception would be swallowed by the IOReactor.
+ */
+@Internal
+public class RethrowingIOSessionListener implements IOSessionListener {
+ @Override
+ public void exception(IOSession ioSession, Exception e) {
+ if (e instanceof PrometheusSinkWriteException) {
+ // Rethrow the exception
+ throw (PrometheusSinkWriteException) e;
+ }
+ }
+
+ @Override
+ public void connected(IOSession ioSession) {
+ // Nothing to do
+ }
+
+ @Override
+ public void startTls(IOSession ioSession) {
+ // Nothing to do
+ }
+
+ @Override
+ public void inputReady(IOSession ioSession) {
+ // Nothing to do
+ }
+
+ @Override
+ public void outputReady(IOSession ioSession) {
+ // Nothing to do
+ }
+
+ @Override
+ public void timeout(IOSession ioSession) {
+ // Nothing to do
+ }
+
+ @Override
+ public void disconnected(IOSession ioSession) {
+ // Nothing to do
+ }
+}
diff --git
a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallbackTest.java
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallbackTest.java
index 4b98d63..8e7eddd 100644
---
a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallbackTest.java
+++
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallbackTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.connector.prometheus.sink;
+import
org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration;
import
org.apache.flink.connector.prometheus.sink.errorhandling.PrometheusSinkWriteException;
import
org.apache.flink.connector.prometheus.sink.metrics.VerifybleSinkMetricsCallback;
import org.apache.flink.connector.prometheus.sink.prometheus.Types;
@@ -30,9 +31,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
-import static
org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration.ON_HTTP_CLIENT_IO_FAIL_DEFAULT_BEHAVIOR;
-import static
org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration.ON_MAX_RETRY_EXCEEDED_DEFAULT_BEHAVIOR;
-import static
org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration.ON_PROMETHEUS_NON_RETRIABLE_ERROR_DEFAULT_BEHAVIOR;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -41,14 +39,12 @@ class HttpResponseCallbackTest {
private static final int TIME_SERIES_COUNT = 17;
private static final long SAMPLE_COUNT = 42;
- private InspectableMetricGroup metricGroup;
private VerifybleSinkMetricsCallback metricsCallback;
private List<Types.TimeSeries> reQueuedResults;
Consumer<List<Types.TimeSeries>> requestResults;
@BeforeEach
void setUp() {
- metricGroup = new InspectableMetricGroup();
metricsCallback = new VerifybleSinkMetricsCallback();
reQueuedResults = new ArrayList<>();
requestResults =
HttpResponseCallbackTestUtils.getRequestResult(reQueuedResults);
@@ -56,22 +52,12 @@ class HttpResponseCallbackTest {
@Test
void shouldIncSuccessCountersOn200OK() {
-
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
- errorHandlingBehavior =
-
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
- .builder()
-
.onMaxRetryExceeded(ON_MAX_RETRY_EXCEEDED_DEFAULT_BEHAVIOR)
-
.onHttpClientIOFail(ON_HTTP_CLIENT_IO_FAIL_DEFAULT_BEHAVIOR)
- .onPrometheusNonRetriableError(
-
ON_PROMETHEUS_NON_RETRIABLE_ERROR_DEFAULT_BEHAVIOR)
- .build();
-
HttpResponseCallback callback =
new HttpResponseCallback(
TIME_SERIES_COUNT,
SAMPLE_COUNT,
metricsCallback,
- errorHandlingBehavior,
+
SinkWriterErrorHandlingBehaviorConfiguration.DEFAULT_BEHAVIORS,
requestResults);
SimpleHttpResponse httpResponse = new
SimpleHttpResponse(HttpStatus.SC_OK);
@@ -86,15 +72,12 @@ class HttpResponseCallbackTest {
}
@Test
- void
shouldIncFailCountersOnCompletedWith404WhenDiscardAndContinueOnNonRetriableIsSelected()
{
-
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
- errorHandlingBehavior =
-
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
- .builder()
- .onPrometheusNonRetriableError(
-
PrometheusSinkConfiguration.OnErrorBehavior
- .DISCARD_AND_CONTINUE)
- .build();
+ void
shouldIncFailCountersOnCompletedWith400WhenDiscardAndContinueOnNonRetriableIsSelected()
{
+ SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehavior =
+ SinkWriterErrorHandlingBehaviorConfiguration.builder()
+ .onPrometheusNonRetriableError(
+
PrometheusSinkConfiguration.OnErrorBehavior.DISCARD_AND_CONTINUE)
+ .build();
HttpResponseCallback callback =
new HttpResponseCallback(
@@ -104,7 +87,7 @@ class HttpResponseCallbackTest {
errorHandlingBehavior,
requestResults);
- SimpleHttpResponse httpResponse = new
SimpleHttpResponse(HttpStatus.SC_NOT_FOUND);
+ SimpleHttpResponse httpResponse = new
SimpleHttpResponse(HttpStatus.SC_BAD_REQUEST);
callback.completed(httpResponse);
@@ -118,13 +101,10 @@ class HttpResponseCallbackTest {
@Test
void
shouldThrowExceptionsOnCompletedWith500WhenFailOnRetryExceededIsSelected() {
-
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
- errorHandlingBehavior =
-
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
- .builder()
- .onMaxRetryExceeded(
-
PrometheusSinkConfiguration.OnErrorBehavior.FAIL)
- .build();
+ SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehavior =
+ SinkWriterErrorHandlingBehaviorConfiguration.builder()
+
.onMaxRetryExceeded(PrometheusSinkConfiguration.OnErrorBehavior.FAIL)
+ .build();
HttpResponseCallback callback =
new HttpResponseCallback(
@@ -145,14 +125,11 @@ class HttpResponseCallbackTest {
@Test
void
shouldIncFailCountersOnCompletedWith500WhenDiscardAndContinueOnRetryExceededIsSelected()
{
-
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
- errorHandlingBehavior =
-
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
- .builder()
- .onMaxRetryExceeded(
-
PrometheusSinkConfiguration.OnErrorBehavior
- .DISCARD_AND_CONTINUE)
- .build();
+ SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehavior =
+ SinkWriterErrorHandlingBehaviorConfiguration.builder()
+ .onMaxRetryExceeded(
+
PrometheusSinkConfiguration.OnErrorBehavior.DISCARD_AND_CONTINUE)
+ .build();
HttpResponseCallback callback =
new HttpResponseCallback(
@@ -176,18 +153,12 @@ class HttpResponseCallbackTest {
@Test
void shouldThrowExceptionOnCompletedWith100() {
-
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
- errorHandlingBehavior =
-
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
- .builder()
- .build();
-
HttpResponseCallback callback =
new HttpResponseCallback(
TIME_SERIES_COUNT,
SAMPLE_COUNT,
metricsCallback,
- errorHandlingBehavior,
+
SinkWriterErrorHandlingBehaviorConfiguration.DEFAULT_BEHAVIORS,
requestResults);
SimpleHttpResponse httpResponse = new SimpleHttpResponse(100);
@@ -200,69 +171,28 @@ class HttpResponseCallbackTest {
}
@Test
- void shouldThrowExceptionOnFailedWhenFailOnHttpIOFailureIsSelected() {
-
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
- errorHandlingBehavior =
-
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
- .builder()
- .onHttpClientIOFail(
-
PrometheusSinkConfiguration.OnErrorBehavior.FAIL)
- .build();
-
+ void shouldThrowExceptionOnCompletedWith403() {
HttpResponseCallback callback =
new HttpResponseCallback(
TIME_SERIES_COUNT,
SAMPLE_COUNT,
metricsCallback,
- errorHandlingBehavior,
+
SinkWriterErrorHandlingBehaviorConfiguration.DEFAULT_BEHAVIORS,
requestResults);
- Exception ex = new UnsupportedOperationException("Dummy exceptions");
+ SimpleHttpResponse httpResponse = new SimpleHttpResponse(403);
assertThrows(
PrometheusSinkWriteException.class,
() -> {
- callback.failed(ex);
+ callback.completed(httpResponse);
});
}
- @Test
- void
shouldIncFailCountersOnFailedWhenDiscardAndContinueOnHttpIOFailureIsSelected() {
-
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
- errorHandlingBehavior =
-
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
- .builder()
- .onHttpClientIOFail(
-
PrometheusSinkConfiguration.OnErrorBehavior
- .DISCARD_AND_CONTINUE)
- .build();
-
- HttpResponseCallback callback =
- new HttpResponseCallback(
- TIME_SERIES_COUNT,
- SAMPLE_COUNT,
- metricsCallback,
- errorHandlingBehavior,
- requestResults);
-
- Exception ex = new UnsupportedOperationException("Dummy exceptions");
-
- callback.failed(ex);
-
- // Verify only the expected metric callback was called, once
-
assertTrue(metricsCallback.verifyOnlyFailedWriteRequestsForHttpClientIoFailWasCalledOnce());
-
- // No time series is re-queued
- HttpResponseCallbackTestUtils.assertNoReQueuedResult(reQueuedResults);
- }
-
@Test
void shouldThrowExceptionOnCancelled() {
-
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
- errorHandlingBehavior =
-
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
- .builder()
- .build();
+ SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehavior =
+ SinkWriterErrorHandlingBehaviorConfiguration.builder().build();
HttpResponseCallback callback =
new HttpResponseCallback(
diff --git
a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseHandlingBehaviourIT.java
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseHandlingBehaviourIT.java
index 9fc4cfa..e232678 100644
---
a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseHandlingBehaviourIT.java
+++
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseHandlingBehaviourIT.java
@@ -45,9 +45,6 @@ import static
com.github.tomakehurst.wiremock.client.WireMock.verify;
import static
org.apache.flink.connector.prometheus.sink.HttpResponseCallbackTestUtils.assertCallbackCompletedOnceWithException;
import static
org.apache.flink.connector.prometheus.sink.HttpResponseCallbackTestUtils.assertCallbackCompletedOnceWithNoException;
import static
org.apache.flink.connector.prometheus.sink.HttpResponseCallbackTestUtils.getRequestResult;
-import static
org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration.ON_HTTP_CLIENT_IO_FAIL_DEFAULT_BEHAVIOR;
-import static
org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration.ON_MAX_RETRY_EXCEEDED_DEFAULT_BEHAVIOR;
-import static
org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration.ON_PROMETHEUS_NON_RETRIABLE_ERROR_DEFAULT_BEHAVIOR;
import static org.awaitility.Awaitility.await;
/**
@@ -115,18 +112,11 @@ public class HttpResponseHandlingBehaviourIT {
serverWillRespond(status(statusCode));
// Default behaviors for all errors
-
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
- errorHandlingBehavior =
-
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
- .builder()
-
.onMaxRetryExceeded(ON_MAX_RETRY_EXCEEDED_DEFAULT_BEHAVIOR)
-
.onHttpClientIOFail(ON_HTTP_CLIENT_IO_FAIL_DEFAULT_BEHAVIOR)
- .onPrometheusNonRetriableError(
-
ON_PROMETHEUS_NON_RETRIABLE_ERROR_DEFAULT_BEHAVIOR)
- .build();
-
VerifyableResponseCallback callback =
- getResponseCallback(metricsCallback, errorHandlingBehavior);
+ getResponseCallback(
+ metricsCallback,
+
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
+ .DEFAULT_BEHAVIORS);
SimpleHttpRequest request = buildRequest(wmRuntimeInfo);
@@ -229,12 +219,12 @@ public class HttpResponseHandlingBehaviourIT {
}
@Test
- void
shouldNotRetryAndCompleteOn404WhenDiscardAndContinueOnNonRetriableIsSelected(
+ void
shouldNotRetryAndCompleteOn400WhenDiscardAndContinueOnNonRetriableIsSelected(
WireMockRuntimeInfo wmRuntimeInfo) throws URISyntaxException,
IOException {
PrometheusAsyncHttpClientBuilder clientBuilder =
getHttpClientBuilder(1);
- // 404,Not found is non-retriable for Prometheus remote-write
- int statusCode = 404;
+ // 400,Bad Request is non-retriable for Prometheus remote-write
+ int statusCode = 400;
serverWillRespond(status(statusCode));
// Discard and continue on non-retriable
@@ -278,14 +268,77 @@ public class HttpResponseHandlingBehaviourIT {
PrometheusAsyncHttpClientBuilder clientBuilder =
getHttpClientBuilder(1);
-
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
- errorHandlingBehavior =
+ VerifyableResponseCallback callback =
+ getResponseCallback(
+ metricsCallback,
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
- .builder()
- .build();
+ .DEFAULT_BEHAVIORS);
+
+ SimpleHttpRequest request = buildRequest(wmRuntimeInfo);
+
+ try (CloseableHttpAsyncClient client =
clientBuilder.buildAndStartClient(metricsCallback)) {
+ client.execute(request, callback);
+
+ await().untilAsserted(
+ () -> {
+ // Check the client execute only one request
+ verify(exactly(1),
postRequestedFor(urlEqualTo("/remote_write")));
+
+ // Verify the callback was completed once with
an exception
+ assertCallbackCompletedOnceWithException(
+ PrometheusSinkWriteException.class,
callback);
+ });
+ }
+ }
+
+ @Test
+ void shouldNotRetryCompleteAndThrowExceptionOn403(WireMockRuntimeInfo
wmRuntimeInfo)
+ throws URISyntaxException, IOException {
+
+ // 403,Forbidden is always fatal
+ int statusCode = 403;
+ serverWillRespond(status(statusCode));
+
+ PrometheusAsyncHttpClientBuilder clientBuilder =
getHttpClientBuilder(1);
VerifyableResponseCallback callback =
- getResponseCallback(metricsCallback, errorHandlingBehavior);
+ getResponseCallback(
+ metricsCallback,
+
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
+ .DEFAULT_BEHAVIORS);
+
+ SimpleHttpRequest request = buildRequest(wmRuntimeInfo);
+
+ try (CloseableHttpAsyncClient client =
clientBuilder.buildAndStartClient(metricsCallback)) {
+ client.execute(request, callback);
+
+ await().untilAsserted(
+ () -> {
+ // Check the client execute only one request
+ verify(exactly(1),
postRequestedFor(urlEqualTo("/remote_write")));
+
+ // Verify the callback was completed once with
an exception
+ assertCallbackCompletedOnceWithException(
+ PrometheusSinkWriteException.class,
callback);
+ });
+ }
+ }
+
+ @Test
+ void shouldNotRetryCompleteAndThrowExceptionOn404(WireMockRuntimeInfo
wmRuntimeInfo)
+ throws URISyntaxException, IOException {
+
+ // 404,Not Found is always fatal
+ int statusCode = 404;
+ serverWillRespond(status(statusCode));
+
+ PrometheusAsyncHttpClientBuilder clientBuilder =
getHttpClientBuilder(1);
+
+ VerifyableResponseCallback callback =
+ getResponseCallback(
+ metricsCallback,
+
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
+ .DEFAULT_BEHAVIORS);
SimpleHttpRequest request = buildRequest(wmRuntimeInfo);
diff --git
a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilderTest.java
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilderTest.java
index f0639a0..dba0847 100644
---
a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilderTest.java
+++
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilderTest.java
@@ -52,9 +52,6 @@ class PrometheusSinkBuilderTest {
PrometheusSinkConfiguration
.SinkWriterErrorHandlingBehaviorConfiguration
.builder()
- .onHttpClientIOFail(
-
PrometheusSinkConfiguration.OnErrorBehavior
- .FAIL)
.onMaxRetryExceeded(
PrometheusSinkConfiguration.OnErrorBehavior
.FAIL)
diff --git
a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/SinkWriterErrorHandlingBehaviorConfigurationTest.java
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/SinkWriterErrorHandlingBehaviorConfigurationTest.java
index ef644f7..8068294 100644
---
a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/SinkWriterErrorHandlingBehaviorConfigurationTest.java
+++
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/SinkWriterErrorHandlingBehaviorConfigurationTest.java
@@ -37,13 +37,6 @@ class SinkWriterErrorHandlingBehaviorConfigurationTest {
DEFAULT_CONFIG.getOnMaxRetryExceeded());
}
- @Test
- public void shouldDefaultToFailOnHttpClientIOFail() {
- assertEquals(
- PrometheusSinkConfiguration.OnErrorBehavior.FAIL,
- DEFAULT_CONFIG.getOnHttpClientIOFail());
- }
-
@Test
public void
shouldDefaultToDiscardAndContinueOnPrometheusNonRetriableError() {
assertEquals(
diff --git
a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/examples/DataStreamExample.java
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/examples/DataStreamExample.java
index 9318d35..dfec646 100644
---
a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/examples/DataStreamExample.java
+++
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/examples/DataStreamExample.java
@@ -25,6 +25,7 @@ import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner;
import org.apache.flink.connector.prometheus.sink.PrometheusSink;
import org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration;
+import
org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.OnErrorBehavior;
import org.apache.flink.connector.prometheus.sink.PrometheusTimeSeries;
import
org.apache.flink.connector.prometheus.sink.PrometheusTimeSeriesLabelsAndMetricNameKeySelector;
import org.apache.flink.connector.prometheus.sink.prometheus.Types;
@@ -134,15 +135,8 @@ public class DataStreamExample {
.setErrorHandlingBehaviourConfiguration(
PrometheusSinkConfiguration
.SinkWriterErrorHandlingBehaviorConfiguration.builder()
- .onPrometheusNonRetriableError(
-
PrometheusSinkConfiguration.OnErrorBehavior
- .FAIL) // Optional,
default FAIL
.onMaxRetryExceeded(
-
PrometheusSinkConfiguration.OnErrorBehavior
- .FAIL) // Optional,
default FAIL
- .onHttpClientIOFail(
-
PrometheusSinkConfiguration.OnErrorBehavior
- .FAIL) // Optional,
default FAIL
+ OnErrorBehavior.FAIL) //
Optional, default FAIL
.build())
.setMetricGroupName("Prometheus") // Optional, default
"Prometheus"
.build();
diff --git
a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteResponseClassifierTest.java
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteResponseClassifierTest.java
new file mode 100644
index 0000000..e4d3adb
--- /dev/null
+++
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteResponseClassifierTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink.http;
+
+import org.apache.hc.core5.http.HttpResponse;
+import org.junit.jupiter.api.Test;
+
+import static
org.apache.flink.connector.prometheus.sink.http.HttpClientTestUtils.httpResponse;
+import static
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.FATAL_ERROR;
+import static
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.NON_RETRIABLE_ERROR;
+import static
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.RETRIABLE_ERROR;
+import static
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.UNHANDLED;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class RemoteWriteResponseClassifierTest {
+ @Test
+ void shouldClassify100AsUnhandled() {
+ HttpResponse response = httpResponse(100);
+ assertEquals(UNHANDLED,
RemoteWriteResponseClassifier.classify(response));
+ }
+
+ @Test
+ void shouldClassify200AsSuccess() {
+ HttpResponse response = httpResponse(200);
+
+ assertEquals(
+ RemoteWriteResponseType.SUCCESS,
RemoteWriteResponseClassifier.classify(response));
+ }
+
+ @Test
+ void shouldClassify400AsNonRetriableError() {
+ HttpResponse response = httpResponse(400);
+
+ assertEquals(NON_RETRIABLE_ERROR,
RemoteWriteResponseClassifier.classify(response));
+ }
+
+ @Test
+ void shouldClassify403AsFatal() {
+ HttpResponse response = httpResponse(403);
+
+ assertEquals(FATAL_ERROR,
RemoteWriteResponseClassifier.classify(response));
+ }
+
+ @Test
+ void shouldClassify404AsFatal() {
+ HttpResponse response = httpResponse(404);
+
+ assertEquals(FATAL_ERROR,
RemoteWriteResponseClassifier.classify(response));
+ }
+
+ @Test
+ void shouldClassify429AsRetrialeError() {
+ HttpResponse response = httpResponse(429);
+
+ assertEquals(RETRIABLE_ERROR,
RemoteWriteResponseClassifier.classify(response));
+ }
+
+ @Test
+ void shouldClassify500AsRetriableError() {
+ HttpResponse response = httpResponse(500);
+
+ assertEquals(RETRIABLE_ERROR,
RemoteWriteResponseClassifier.classify(response));
+ }
+}
diff --git
a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/http/RethrowingIOSessionListenerTest.java
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/http/RethrowingIOSessionListenerTest.java
new file mode 100644
index 0000000..9712435
--- /dev/null
+++
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/http/RethrowingIOSessionListenerTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink.http;
+
+import
org.apache.flink.connector.prometheus.sink.errorhandling.PrometheusSinkWriteException;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class RethrowingIOSessionListenerTest {
+ @Test
+ void exceptionHandlerShouldRethrowPrometheusSinkWriteException() {
+ RethrowingIOSessionListener listener = new
RethrowingIOSessionListener();
+ PrometheusSinkWriteException exception = new
PrometheusSinkWriteException("Test exception");
+
+ Exception thrown = assertThrows(Exception.class, () ->
listener.exception(null, exception));
+ assertEquals(exception, thrown);
+ }
+
+ @Test
+ void exceptionHandlerShouldNotRethrowOtherExceptions() {
+ RethrowingIOSessionListener listener = new
RethrowingIOSessionListener();
+ Exception otherException = new Exception("Other exception");
+
+ assertDoesNotThrow(() -> listener.exception(null, otherException));
+ }
+}
diff --git a/pom.xml b/pom.xml
index 2bed12d..cafcc44 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,7 +54,8 @@ under the License.
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.19.1</flink.version>
<protobuf.version>3.22.2</protobuf.version>
- <apache.httpclient.version>5.2.1</apache.httpclient.version>
+ <apache.httpcomponents.version>5.3</apache.httpcomponents.version>
+
<apache.httpcomponents.client.version>5.3.1</apache.httpcomponents.client.version>
<jackson-bom.version>2.14.3</jackson-bom.version>
<log4j.version>2.17.1</log4j.version>
<junit5.version>5.8.1</junit5.version>
@@ -95,12 +96,12 @@ under the License.
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
- <version>${apache.httpclient.version}</version>
+ <version>${apache.httpcomponents.client.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.core5</groupId>
<artifactId>httpcore5</artifactId>
- <version>${apache.httpclient.version}</version>
+ <version>${apache.httpcomponents.version}</version>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>