This is an automated email from the ASF dual-hosted git repository.

hong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-prometheus.git


The following commit(s) were added to refs/heads/main by this push:
     new 67da7df  [FLINK-36404][Connectors/Prometheus] Improve exception 
classification and handling (#9)
67da7df is described below

commit 67da7dfd715fd6c7cf9fda38d5a8c942e1c9fa38
Author: Lorenzo Nicora <[email protected]>
AuthorDate: Mon Oct 7 09:44:40 2024 +0100

    [FLINK-36404][Connectors/Prometheus] Improve exception classification and 
handling (#9)
    
    * Fix exceptions not causing job to fail
    
    * Always handle 403 and 404 responses as fatal errors
    
    * Httpclient exceptions always fatal. Force httpclient to fail when a 
exception is thrown by the callback Upgrade httpclient version. Fix minor 
missing parameter validations.
    
    * Refactored response classification, using an enum, and added comments to 
clarify the callback behaviour
---
 flink-connector-prometheus/README.md               |  13 +-
 flink-connector-prometheus/pom.xml                 |   3 -
 .../prometheus/sink/HttpResponseCallback.java      | 206 ++++++++++++---------
 .../connector/prometheus/sink/PrometheusSink.java  |   2 +-
 .../prometheus/sink/PrometheusSinkBuilder.java     |   3 +-
 .../sink/PrometheusSinkConfiguration.java          |  27 +--
 .../http/PrometheusAsyncHttpClientBuilder.java     |   1 +
 .../sink/http/RemoteWriteResponseClassifier.java   |  38 ++--
 .../sink/http/RemoteWriteResponseType.java         |  41 ++++
 .../sink/http/RemoteWriteRetryStrategy.java        |   7 +-
 .../sink/http/RethrowingIOSessionListener.java     |  69 +++++++
 .../prometheus/sink/HttpResponseCallbackTest.java  | 120 +++---------
 .../sink/HttpResponseHandlingBehaviourIT.java      |  97 +++++++---
 .../prometheus/sink/PrometheusSinkBuilderTest.java |   3 -
 ...iterErrorHandlingBehaviorConfigurationTest.java |   7 -
 .../sink/examples/DataStreamExample.java           |  10 +-
 .../http/RemoteWriteResponseClassifierTest.java    |  79 ++++++++
 .../sink/http/RethrowingIOSessionListenerTest.java |  45 +++++
 pom.xml                                            |   7 +-
 19 files changed, 505 insertions(+), 273 deletions(-)

diff --git a/flink-connector-prometheus/README.md 
b/flink-connector-prometheus/README.md
index 07f2308..b93cbce 100644
--- a/flink-connector-prometheus/README.md
+++ b/flink-connector-prometheus/README.md
@@ -130,7 +130,6 @@ PrometheusSink sink = PrometheusSink.builder()
                 
.onPrometheusNonRetriableError(OnErrorBehavior.DISCARD_AND_CONTINUE) 
                 // Default is FAIL for other error types
                 .onMaxRetryExceeded(OnErrorBehavior.FAIL)
-                .onHttpClientIOFail(OnErrorBehavior.FAIL)
                 .build())
         .setMetricGroupName("Prometheus")           // Customizable 
metric-group suffix (default: "Prometheus")
         .build();
@@ -182,11 +181,10 @@ The possible behaviors are:
 * `FAIL`: throw a `PrometheusSinkWriteException`, causing the job to fail.
 * `DISCARD_AND_CONTINUE`: log the reason of the error, discard the offending 
request, and continue.
 
-There are 3 error conditions:
+There are two error conditions:
 
 1. Prometheus returns a non-retriable error response (i.e. any `4xx` status 
code except `429`). Default: `DISCARD_AND_CONTINUE`.
 2. Prometheus returns a retriable error response (i.e. `5xx` or `429`) but the 
max retry limit is exceeded. Default: `FAIL`.
-3. The http client fails to complete the request, for an I/O error. Default: 
`FAIL`.
 
 The error handling behaviors can be configured when creating the instance of 
the sink, as shown in this snipped:
 
@@ -196,7 +194,6 @@ PrometheusSink sink = PrometheusSink.builder()
         
.setErrorHandlingBehaviourConfiguration(SinkWriterErrorHandlingBehaviorConfiguration.builder()
                 
.onPrometheusNonRetriableError(OnErrorBehavior.DISCARD_AND_CONTINUE)
                 .onMaxRetryExceeded(OnErrorBehavior.DISCARD_AND_CONTINUE)
-                .onHttpClientIOFail(OnErrorBehavior.DISCARD_AND_CONTINUE)
                 .build())
         .build();
 ```
@@ -218,6 +215,14 @@ Prometheus does not return sufficient information to 
automatically handle partia
 > restart from checkpoint. The reason is that restarting from checkpoint cause 
 > some duplicates, that are rejected by
 > Prometheus as out of order, causing in turn another non-retriable error, in 
 > an endless loop.
 
+
+### Fatal errors
+
+Remote-write endpoint responses 403 (Forbidden) and 404 (Not found) are always 
considered fatal, regardless the error 
+handling configuration.
+
+Any I/O error during the communication with the endpoint is also fatal.
+
 ### Metrics
 
 The sink exposes custom metrics, counting the samples and write-requests 
(batches) successfully written or discarded.
diff --git a/flink-connector-prometheus/pom.xml 
b/flink-connector-prometheus/pom.xml
index 6ea2403..84fa4ab 100644
--- a/flink-connector-prometheus/pom.xml
+++ b/flink-connector-prometheus/pom.xml
@@ -65,19 +65,16 @@ under the License.
         <dependency>
             <groupId>org.xerial.snappy</groupId>
             <artifactId>snappy-java</artifactId>
-            <version>1.1.10.4</version>
         </dependency>
 
         <!-- http client -->
         <dependency>
             <groupId>org.apache.httpcomponents.client5</groupId>
             <artifactId>httpclient5</artifactId>
-            <version>${apache.httpclient.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.httpcomponents.core5</groupId>
             <artifactId>httpcore5</artifactId>
-            <version>${apache.httpclient.version}</version>
         </dependency>
 
         <!-- test -->
diff --git 
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallback.java
 
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallback.java
index 3d96a25..0bfda7c 100644
--- 
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallback.java
+++ 
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallback.java
@@ -18,8 +18,9 @@
 package org.apache.flink.connector.prometheus.sink;
 
 import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.OnErrorBehavior;
 import 
org.apache.flink.connector.prometheus.sink.errorhandling.PrometheusSinkWriteException;
-import 
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseClassifier;
+import org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType;
 import org.apache.flink.connector.prometheus.sink.metrics.SinkMetricsCallback;
 import org.apache.flink.connector.prometheus.sink.prometheus.Types;
 
@@ -32,6 +33,12 @@ import java.util.Collections;
 import java.util.List;
 import java.util.function.Consumer;
 
+import static 
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseClassifier.classify;
+import static 
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.FATAL_ERROR;
+import static 
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.NON_RETRIABLE_ERROR;
+import static 
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.RETRIABLE_ERROR;
+import static 
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.SUCCESS;
+
 /**
  * Callback handling the outcome of the http async request.
  *
@@ -50,6 +57,11 @@ import java.util.function.Consumer;
  * called with an outcome of *completed* either when the request has succeeded 
or the max retry
  * limit has been exceeded. It is responsibility of the callback 
distinguishing between these
  * conditions.
+ *
+ * <p>Also, when an exception is thrown after the request is *completed*, for 
the http client point
+ * of view (i.e. in the {@link #completed(SimpleHttpResponse)} callback 
method), it does not
+ * directly cause the writer to fail until it is intercepted further up the 
client stack, by the
+ * {@link 
org.apache.flink.connector.prometheus.sink.http.RethrowingIOSessionListener}.
  */
 @Internal
 class HttpResponseCallback implements FutureCallback<SimpleHttpResponse> {
@@ -77,114 +89,126 @@ class HttpResponseCallback implements 
FutureCallback<SimpleHttpResponse> {
     }
 
     /**
-     * The completed outcome is invoked every time the http client 
successfully receives a valid
-     * http response, regardless of the status code.
+     * The completed outcome is invoked when the http client successfully 
receives a response,
+     * regardless of the status code.
      *
-     * <p>This method classifies the responses and implements the behaviour 
expected by the
-     * Remote-Write specifications. In case of error, the behaviour is 
determined by the error
-     * handling configuration.
+     * <p>This method classifies the responses using {@link
+     * 
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseClassifier} 
and implements
+     * the behaviour expected by the Remote-Write specifications. If the 
response is classified as
+     * an error, the behaviour is determined by the error handling 
configuration.
      */
     @Override
     public void completed(SimpleHttpResponse response) {
         // Never re-queue requests
         reQueuedResult.accept(Collections.emptyList());
 
-        // Success
-        if (RemoteWriteResponseClassifier.isSuccessResponse(response)) {
-            LOG.debug(
-                    "{},{} - successfully posted {} time-series, containing {} 
samples",
-                    response.getCode(),
-                    response.getReasonPhrase(),
-                    timeSeriesCount,
-                    sampleCount);
-            metricsCallback.onSuccessfulWriteRequest(sampleCount);
-            return;
-        }
-
-        String responseBody = response.getBodyText();
-        int statusCode = response.getCode();
-        String reasonPhrase = response.getReasonPhrase();
-
-        // Prometheus's response is a non-retriable error.
-        // Depending on the configured behaviour, log and discard or throw an 
exception
-        if 
(RemoteWriteResponseClassifier.isNonRetriableErrorResponse(response)) {
-            if (errorHandlingBehaviorConfig.getOnPrometheusNonRetriableError()
-                    == PrometheusSinkConfiguration.OnErrorBehavior.FAIL) {
-                throw new PrometheusSinkWriteException(
-                        "Non-retriable error response from Prometheus",
-                        statusCode,
-                        reasonPhrase,
+        RemoteWriteResponseType responseType = classify(response);
+        switch (responseType) {
+            case SUCCESS: // Data successfully written
+                // Increment successful writes counts
+                metricsCallback.onSuccessfulWriteRequest(sampleCount);
+                LOG.debug(
+                        "{},{} - successfully posted {} time-series, 
containing {} samples",
+                        response.getCode(),
+                        response.getReasonPhrase(),
                         timeSeriesCount,
-                        sampleCount,
-                        responseBody);
-            }
-
-            LOG.warn(
-                    "{},{} {} (discarded {} time-series, containing {} 
samples)",
-                    statusCode,
-                    reasonPhrase,
-                    responseBody,
-                    timeSeriesCount,
-                    sampleCount);
-            
metricsCallback.onFailedWriteRequestForNonRetriableError(sampleCount);
-            return;
-        }
-
-        // Retry limit exceeded on retriable error
-        // Depending on the configured behaviour, log and discard or throw an 
exception
-        if (RemoteWriteResponseClassifier.isRetriableErrorResponse(response)) {
-            if (errorHandlingBehaviorConfig.getOnMaxRetryExceeded()
-                    == PrometheusSinkConfiguration.OnErrorBehavior.FAIL) {
-                throw new PrometheusSinkWriteException(
-                        "Max retry limit exceeded on retriable error",
-                        statusCode,
-                        reasonPhrase,
+                        sampleCount);
+                break;
+
+            case FATAL_ERROR: // Response is a fatal error
+                // Throw an exception regardless of configured behaviour
+                logErrorAndThrow(
+                        new PrometheusSinkWriteException(
+                                "Fatal error response from Prometheus",
+                                response.getCode(),
+                                response.getReasonPhrase(),
+                                timeSeriesCount,
+                                sampleCount,
+                                response.getBodyText()));
+                break;
+
+            case NON_RETRIABLE_ERROR: // Response is a non-retriable error.
+                // If behavior is FAIL, throw an exception
+                if 
(errorHandlingBehaviorConfig.getOnPrometheusNonRetriableError()
+                        == OnErrorBehavior.FAIL) {
+                    logErrorAndThrow(
+                            new PrometheusSinkWriteException(
+                                    "Non-retriable error response from 
Prometheus",
+                                    response.getCode(),
+                                    response.getReasonPhrase(),
+                                    timeSeriesCount,
+                                    sampleCount,
+                                    response.getBodyText()));
+                }
+
+                // Otherwise (DISCARD_AND_CONTINUE), increment discarded data 
counts & log WARN
+                
metricsCallback.onFailedWriteRequestForNonRetriableError(sampleCount);
+                LOG.warn(
+                        "{},{} {} (discarded {} time-series, containing {} 
samples)",
+                        response.getCode(),
+                        response.getReasonPhrase(),
+                        response.getBodyText(),
+                        timeSeriesCount,
+                        sampleCount);
+                break;
+
+            case RETRIABLE_ERROR: // Retry limit exceeded on retriable error
+                // If behavior is FAIL, throw an exception
+                if (errorHandlingBehaviorConfig.getOnMaxRetryExceeded() == 
OnErrorBehavior.FAIL) {
+                    logErrorAndThrow(
+                            new PrometheusSinkWriteException(
+                                    "Max retry limit exceeded on retriable 
error",
+                                    response.getCode(),
+                                    response.getReasonPhrase(),
+                                    timeSeriesCount,
+                                    sampleCount,
+                                    response.getBodyText()));
+                }
+
+                // Otherwise (DISCARD_AND_CONTINUE), increment discarded data 
counts & log WARN
+                
metricsCallback.onFailedWriteRequestForRetryLimitExceeded(sampleCount);
+                LOG.warn(
+                        "{},{} {} (after retry limit reached, discarded {} 
time-series, containing {} samples)",
+                        response.getCode(),
+                        response.getReasonPhrase(),
+                        response.getBodyText(),
                         timeSeriesCount,
-                        sampleCount,
-                        responseBody);
-            }
-
-            LOG.warn(
-                    "{},{} {} (after retry limit reached, discarded {} 
time-series, containing {} samples)",
-                    statusCode,
-                    reasonPhrase,
-                    responseBody,
-                    timeSeriesCount,
-                    sampleCount);
-            
metricsCallback.onFailedWriteRequestForRetryLimitExceeded(sampleCount);
-            return;
+                        sampleCount);
+                break;
+
+            default: // Unexpected/unhandled response outcome
+                // Always fail
+                logErrorAndThrow(
+                        new PrometheusSinkWriteException(
+                                "Unexpected status code returned from the 
remote-write endpoint",
+                                response.getCode(),
+                                response.getReasonPhrase(),
+                                timeSeriesCount,
+                                sampleCount,
+                                response.getBodyText()));
         }
-
-        // Unexpected/unhandled response outcome
-        throw new PrometheusSinkWriteException(
-                "Unexpected status code returned from the remote-write 
endpoint",
-                statusCode,
-                reasonPhrase,
-                timeSeriesCount,
-                sampleCount,
-                responseBody);
     }
 
+    /**
+     * Exception reported by the http client (e.g. I/O failure). Always throw 
an exception.
+     *
+     * @param ex exception reported by the http client
+     */
     @Override
     public void failed(Exception ex) {
-        // General I/O failure reported by http client
-        // Depending on the configured behavior, throw an exception or log and 
discard
-        if (errorHandlingBehaviorConfig.getOnHttpClientIOFail()
-                == PrometheusSinkConfiguration.OnErrorBehavior.FAIL) {
-            throw new PrometheusSinkWriteException("Http client failure", ex);
-        } else {
-            LOG.warn(
-                    "Exception executing the remote-write (discarded {} 
time-series containing {} samples)",
-                    timeSeriesCount,
-                    sampleCount,
-                    ex);
-            
metricsCallback.onFailedWriteRequestForHttpClientIoFail(sampleCount);
-        }
+        throw new PrometheusSinkWriteException("Http client failure", ex);
     }
 
+    /** The async http client was cancelled. Always throw an exception. */
     @Override
     public void cancelled() {
-        // When the async http client is cancelled, the sink always throws an 
exception
+        // When the async http client is cancelled, the sink should always 
throw an exception
         throw new PrometheusSinkWriteException("Write request execution 
cancelled");
     }
+
+    /** Log the exception at ERROR and rethrow. */
+    private void logErrorAndThrow(PrometheusSinkWriteException ex) {
+        LOG.error("Error condition detected by the http response callback (on 
complete)", ex);
+        throw ex;
+    }
 }
diff --git 
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java
 
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java
index 7a7738f..78edd7c 100644
--- 
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java
+++ 
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java
@@ -80,7 +80,7 @@ public class PrometheusSink extends 
AsyncSinkBase<PrometheusTimeSeries, Types.Ti
                 maxBatchSizeInSamples > 1, "Max batch size (in samples) must 
be positive");
         Preconditions.checkArgument(
                 maxRecordSizeInSamples <= maxBatchSizeInSamples,
-                "Maz record size (in samples) must be <= Max batch size");
+                "Max record size (in samples) must be <= Max batch size");
         Preconditions.checkArgument(maxInFlightRequests == 1, "Max in-flight 
requests must be 1");
         Preconditions.checkArgument(
                 StringUtils.isNotBlank(prometheusRemoteWriteUrl),
diff --git 
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java
 
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java
index 02b1e77..438855d 100644
--- 
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java
+++ 
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilder.java
@@ -102,7 +102,7 @@ public class PrometheusSinkBuilder
                         + 
"\n\t\tmaxTimeInBufferMs={}\n\t\tmaxInFlightRequests={}\n\t\tmaxBufferedRequests={}"
                         + "\n\t\tRetryConfiguration: initialRetryDelayMs={}, 
maxRetryDelayMs={}, maxRetryCount={}"
                         + "\n\t\tsocketTimeoutMs={}\n\t\thttpUserAgent={}"
-                        + "\n\t\tErrorHandlingBehaviour: 
onMaxRetryExceeded={}, onHttpClientIOFailure={}, onNonRetriableError={}",
+                        + "\n\t\tErrorHandlingBehaviour: 
onMaxRetryExceeded={}, onNonRetriableError={}",
                 actualMaxBatchSizeInSamples,
                 actualMaxRecordSizeInSamples,
                 actualMaxTimeInBufferMS,
@@ -114,7 +114,6 @@ public class PrometheusSinkBuilder
                 socketTimeoutMs,
                 actualHttpUserAgent,
                 actualErrorHandlingBehaviorConfig.getOnMaxRetryExceeded(),
-                actualErrorHandlingBehaviorConfig.getOnHttpClientIOFail(),
                 
actualErrorHandlingBehaviorConfig.getOnPrometheusNonRetriableError());
 
         return new PrometheusSink(
diff --git 
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkConfiguration.java
 
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkConfiguration.java
index 2c7dd34..fb7a17c 100644
--- 
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkConfiguration.java
+++ 
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkConfiguration.java
@@ -46,23 +46,17 @@ public class PrometheusSinkConfiguration {
     public static class SinkWriterErrorHandlingBehaviorConfiguration 
implements Serializable {
 
         public static final OnErrorBehavior 
ON_MAX_RETRY_EXCEEDED_DEFAULT_BEHAVIOR = FAIL;
-        public static final OnErrorBehavior 
ON_HTTP_CLIENT_IO_FAIL_DEFAULT_BEHAVIOR = FAIL;
         public static final OnErrorBehavior 
ON_PROMETHEUS_NON_RETRIABLE_ERROR_DEFAULT_BEHAVIOR =
                 DISCARD_AND_CONTINUE;
 
         /** Behaviour when the max retries is exceeded on Prometheus retriable 
errors. */
         private final OnErrorBehavior onMaxRetryExceeded;
 
-        /** Behaviour when the HTTP client fails, for an I/O problem. */
-        private final OnErrorBehavior onHttpClientIOFail;
-
         /** Behaviour when Prometheus Remote-Write respond with a 
non-retriable error. */
         private final OnErrorBehavior onPrometheusNonRetriableError;
 
         public SinkWriterErrorHandlingBehaviorConfiguration(
-                OnErrorBehavior onMaxRetryExceeded,
-                OnErrorBehavior onHttpClientIOFail,
-                OnErrorBehavior onPrometheusNonRetriableError) {
+                OnErrorBehavior onMaxRetryExceeded, OnErrorBehavior 
onPrometheusNonRetriableError) {
             // onPrometheusNonRetriableError cannot be set to FAIL, because it 
makes impossible for
             // the job to restart from checkpoint (see FLINK-36319).
             // We are retaining the possibility of configuring the behavior on 
this type of error to
@@ -71,7 +65,6 @@ public class PrometheusSinkConfiguration {
                     onPrometheusNonRetriableError == DISCARD_AND_CONTINUE,
                     "Only DISCARD_AND_CONTINUE is currently supported for 
onPrometheusNonRetriableError");
             this.onMaxRetryExceeded = onMaxRetryExceeded;
-            this.onHttpClientIOFail = onHttpClientIOFail;
             this.onPrometheusNonRetriableError = onPrometheusNonRetriableError;
         }
 
@@ -79,10 +72,6 @@ public class PrometheusSinkConfiguration {
             return onMaxRetryExceeded;
         }
 
-        public OnErrorBehavior getOnHttpClientIOFail() {
-            return onHttpClientIOFail;
-        }
-
         public OnErrorBehavior getOnPrometheusNonRetriableError() {
             return onPrometheusNonRetriableError;
         }
@@ -90,7 +79,6 @@ public class PrometheusSinkConfiguration {
         /** Builder for PrometheusSinkWriterErrorHandlingConfiguration. */
         public static class Builder {
             private OnErrorBehavior onMaxRetryExceeded = null;
-            private OnErrorBehavior onHttpClientIOFail = null;
             private OnErrorBehavior onPrometheusNonRetriableError = null;
 
             public Builder() {}
@@ -100,11 +88,6 @@ public class PrometheusSinkConfiguration {
                 return this;
             }
 
-            public Builder onHttpClientIOFail(OnErrorBehavior onErrorBehavior) 
{
-                this.onHttpClientIOFail = onErrorBehavior;
-                return this;
-            }
-
             public Builder onPrometheusNonRetriableError(OnErrorBehavior 
onErrorBehavior) {
                 this.onPrometheusNonRetriableError = onErrorBehavior;
                 return this;
@@ -114,8 +97,6 @@ public class PrometheusSinkConfiguration {
                 return new SinkWriterErrorHandlingBehaviorConfiguration(
                         Optional.ofNullable(onMaxRetryExceeded)
                                 
.orElse(ON_MAX_RETRY_EXCEEDED_DEFAULT_BEHAVIOR),
-                        Optional.ofNullable(onHttpClientIOFail)
-                                
.orElse(ON_HTTP_CLIENT_IO_FAIL_DEFAULT_BEHAVIOR),
                         Optional.ofNullable(onPrometheusNonRetriableError)
                                 
.orElse(ON_PROMETHEUS_NON_RETRIABLE_ERROR_DEFAULT_BEHAVIOR));
             }
@@ -153,6 +134,12 @@ public class PrometheusSinkConfiguration {
 
         public RetryConfiguration(
                 long initialRetryDelayMS, long maxRetryDelayMS, int 
maxRetryCount) {
+            Preconditions.checkArgument(initialRetryDelayMS > 0, "Initial 
retry delay must be > 0");
+            Preconditions.checkArgument(maxRetryDelayMS > 0, "Max retry delay 
must be > 0");
+            Preconditions.checkArgument(
+                    maxRetryDelayMS >= initialRetryDelayMS,
+                    "Max retry delay must be >= Initial retry delay");
+            Preconditions.checkArgument(maxRetryCount > 0, "Max retry count 
must be > 0");
             this.initialRetryDelayMS = initialRetryDelayMS;
             this.maxRetryDelayMS = maxRetryDelayMS;
             this.maxRetryCount = maxRetryCount;
diff --git 
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/PrometheusAsyncHttpClientBuilder.java
 
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/PrometheusAsyncHttpClientBuilder.java
index 29c504e..4e7f68b 100644
--- 
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/PrometheusAsyncHttpClientBuilder.java
+++ 
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/PrometheusAsyncHttpClientBuilder.java
@@ -88,6 +88,7 @@ public class PrometheusAsyncHttpClientBuilder implements 
Serializable {
                 HttpAsyncClients.custom()
                         .setConnectionManager(connectionManager)
                         .setIOReactorConfig(ioReactorConfig)
+                        .setIOSessionListener(new 
RethrowingIOSessionListener())
                         .setRetryStrategy(
                                 new 
RemoteWriteRetryStrategy(retryConfiguration, metricsCallback))
                         .build();
diff --git 
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteResponseClassifier.java
 
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteResponseClassifier.java
index 2cc038a..895a888 100644
--- 
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteResponseClassifier.java
+++ 
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteResponseClassifier.java
@@ -21,21 +21,37 @@ import org.apache.flink.annotation.Internal;
 
 import org.apache.hc.core5.http.HttpResponse;
 
+import static 
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.FATAL_ERROR;
+import static 
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.NON_RETRIABLE_ERROR;
+import static 
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.RETRIABLE_ERROR;
+import static 
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.SUCCESS;
+import static 
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.UNHANDLED;
+
 /** Classify http responses based on the status code. */
 @Internal
 public class RemoteWriteResponseClassifier {
-    public static boolean isRetriableErrorResponse(HttpResponse response) {
-        int statusCode = response.getCode();
-        return statusCode >= 500 | statusCode == 429;
-    }
-
-    public static boolean isNonRetriableErrorResponse(HttpResponse response) {
-        int statusCode = response.getCode();
-        return (statusCode >= 400 && statusCode < 500) && statusCode != 429;
-    }
 
-    public static boolean isSuccessResponse(HttpResponse response) {
+    public static RemoteWriteResponseType classify(HttpResponse response) {
         int statusCode = response.getCode();
-        return (statusCode >= 200 && statusCode < 300);
+        if (statusCode >= 200 && statusCode < 300) {
+            // 2xx: success
+            return SUCCESS;
+        } else if (statusCode == 429) {
+            // 429, Too Many Requests: throttling
+            return RETRIABLE_ERROR;
+        } else if (statusCode == 403 || statusCode == 404) {
+            // 403, Forbidden: authentication error
+            // 404, Not Found: wrong endpoint URL path
+            return FATAL_ERROR;
+        } else if (statusCode >= 400 && statusCode < 500) {
+            // 4xx (except 403, 404, 429): wrong request/bad data
+            return NON_RETRIABLE_ERROR;
+        } else if (statusCode >= 500) {
+            // 5xx: internal errors, recoverable
+            return RETRIABLE_ERROR;
+        } else {
+            // Other status code are unhandled
+            return UNHANDLED;
+        }
     }
 }
diff --git 
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteResponseType.java
 
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteResponseType.java
new file mode 100644
index 0000000..810d649
--- /dev/null
+++ 
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteResponseType.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink.http;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Type of response, from Remote-Write endpoint, classified by {@link
+ * RemoteWriteResponseClassifier}.
+ */
+@Internal
+public enum RemoteWriteResponseType {
+    /** The Write-Request was successfully accepted. */
+    SUCCESS,
+    /** Write-Request temporarily rejected. The request can be retried. */
+    RETRIABLE_ERROR,
+    /**
+     * Write-Request permanently rejected. It cannot be retried. The error 
condition is recoverable,
+     * after discarding the offending request.
+     */
+    NON_RETRIABLE_ERROR,
+    /** Unrecoverable error condition. */
+    FATAL_ERROR,
+    /** Unhandled status code. */
+    UNHANDLED;
+}
diff --git 
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteRetryStrategy.java
 
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteRetryStrategy.java
index 1b8fdc5..2a4b1a0 100644
--- 
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteRetryStrategy.java
+++ 
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteRetryStrategy.java
@@ -40,6 +40,9 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import static 
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseClassifier.classify;
+import static 
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.RETRIABLE_ERROR;
+
 /**
  * Retry strategy for the http client.
  *
@@ -103,9 +106,7 @@ public class RemoteWriteRetryStrategy implements 
HttpRequestRetryStrategy {
 
     @Override
     public boolean retryRequest(HttpResponse httpResponse, int execCount, 
HttpContext httpContext) {
-        boolean retry =
-                (execCount <= maxRetryCount)
-                        && 
RemoteWriteResponseClassifier.isRetriableErrorResponse(httpResponse);
+        boolean retry = (execCount <= maxRetryCount) && 
(classify(httpResponse) == RETRIABLE_ERROR);
         LOG.debug(
                 "{} retry on response {} {}, at execution {}",
                 (retry) ? "DO" : "DO NOT",
diff --git 
a/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RethrowingIOSessionListener.java
 
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RethrowingIOSessionListener.java
new file mode 100644
index 0000000..e983712
--- /dev/null
+++ 
b/flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/http/RethrowingIOSessionListener.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink.http;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.PrometheusSinkWriteException;
+
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.reactor.IOSessionListener;
+
+/**
+ * Selectively rethrow PrometheusSinkWriteException, causing the httpclient to 
fail. Otherwise, the
+ * exception would be swallowed by the IOReactor.
+ */
+@Internal
+public class RethrowingIOSessionListener implements IOSessionListener {
+    @Override
+    public void exception(IOSession ioSession, Exception e) {
+        if (e instanceof PrometheusSinkWriteException) {
+            // Rethrow the exception
+            throw (PrometheusSinkWriteException) e;
+        }
+    }
+
+    @Override
+    public void connected(IOSession ioSession) {
+        // Nothing to do
+    }
+
+    @Override
+    public void startTls(IOSession ioSession) {
+        // Nothing to do
+    }
+
+    @Override
+    public void inputReady(IOSession ioSession) {
+        // Nothing to do
+    }
+
+    @Override
+    public void outputReady(IOSession ioSession) {
+        // Nothing to do
+    }
+
+    @Override
+    public void timeout(IOSession ioSession) {
+        // Nothing to do
+    }
+
+    @Override
+    public void disconnected(IOSession ioSession) {
+        // Nothing to do
+    }
+}
diff --git 
a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallbackTest.java
 
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallbackTest.java
index 4b98d63..8e7eddd 100644
--- 
a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallbackTest.java
+++ 
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallbackTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.connector.prometheus.sink;
 
+import 
org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration;
 import 
org.apache.flink.connector.prometheus.sink.errorhandling.PrometheusSinkWriteException;
 import 
org.apache.flink.connector.prometheus.sink.metrics.VerifybleSinkMetricsCallback;
 import org.apache.flink.connector.prometheus.sink.prometheus.Types;
@@ -30,9 +31,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.function.Consumer;
 
-import static 
org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration.ON_HTTP_CLIENT_IO_FAIL_DEFAULT_BEHAVIOR;
-import static 
org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration.ON_MAX_RETRY_EXCEEDED_DEFAULT_BEHAVIOR;
-import static 
org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration.ON_PROMETHEUS_NON_RETRIABLE_ERROR_DEFAULT_BEHAVIOR;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -41,14 +39,12 @@ class HttpResponseCallbackTest {
     private static final int TIME_SERIES_COUNT = 17;
     private static final long SAMPLE_COUNT = 42;
 
-    private InspectableMetricGroup metricGroup;
     private VerifybleSinkMetricsCallback metricsCallback;
     private List<Types.TimeSeries> reQueuedResults;
     Consumer<List<Types.TimeSeries>> requestResults;
 
     @BeforeEach
     void setUp() {
-        metricGroup = new InspectableMetricGroup();
         metricsCallback = new VerifybleSinkMetricsCallback();
         reQueuedResults = new ArrayList<>();
         requestResults = 
HttpResponseCallbackTestUtils.getRequestResult(reQueuedResults);
@@ -56,22 +52,12 @@ class HttpResponseCallbackTest {
 
     @Test
     void shouldIncSuccessCountersOn200OK() {
-        
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
-                errorHandlingBehavior =
-                        
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
-                                .builder()
-                                
.onMaxRetryExceeded(ON_MAX_RETRY_EXCEEDED_DEFAULT_BEHAVIOR)
-                                
.onHttpClientIOFail(ON_HTTP_CLIENT_IO_FAIL_DEFAULT_BEHAVIOR)
-                                .onPrometheusNonRetriableError(
-                                        
ON_PROMETHEUS_NON_RETRIABLE_ERROR_DEFAULT_BEHAVIOR)
-                                .build();
-
         HttpResponseCallback callback =
                 new HttpResponseCallback(
                         TIME_SERIES_COUNT,
                         SAMPLE_COUNT,
                         metricsCallback,
-                        errorHandlingBehavior,
+                        
SinkWriterErrorHandlingBehaviorConfiguration.DEFAULT_BEHAVIORS,
                         requestResults);
 
         SimpleHttpResponse httpResponse = new 
SimpleHttpResponse(HttpStatus.SC_OK);
@@ -86,15 +72,12 @@ class HttpResponseCallbackTest {
     }
 
     @Test
-    void 
shouldIncFailCountersOnCompletedWith404WhenDiscardAndContinueOnNonRetriableIsSelected()
 {
-        
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
-                errorHandlingBehavior =
-                        
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
-                                .builder()
-                                .onPrometheusNonRetriableError(
-                                        
PrometheusSinkConfiguration.OnErrorBehavior
-                                                .DISCARD_AND_CONTINUE)
-                                .build();
+    void 
shouldIncFailCountersOnCompletedWith400WhenDiscardAndContinueOnNonRetriableIsSelected()
 {
+        SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehavior =
+                SinkWriterErrorHandlingBehaviorConfiguration.builder()
+                        .onPrometheusNonRetriableError(
+                                
PrometheusSinkConfiguration.OnErrorBehavior.DISCARD_AND_CONTINUE)
+                        .build();
 
         HttpResponseCallback callback =
                 new HttpResponseCallback(
@@ -104,7 +87,7 @@ class HttpResponseCallbackTest {
                         errorHandlingBehavior,
                         requestResults);
 
-        SimpleHttpResponse httpResponse = new 
SimpleHttpResponse(HttpStatus.SC_NOT_FOUND);
+        SimpleHttpResponse httpResponse = new 
SimpleHttpResponse(HttpStatus.SC_BAD_REQUEST);
 
         callback.completed(httpResponse);
 
@@ -118,13 +101,10 @@ class HttpResponseCallbackTest {
 
     @Test
     void 
shouldThrowExceptionsOnCompletedWith500WhenFailOnRetryExceededIsSelected() {
-        
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
-                errorHandlingBehavior =
-                        
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
-                                .builder()
-                                .onMaxRetryExceeded(
-                                        
PrometheusSinkConfiguration.OnErrorBehavior.FAIL)
-                                .build();
+        SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehavior =
+                SinkWriterErrorHandlingBehaviorConfiguration.builder()
+                        
.onMaxRetryExceeded(PrometheusSinkConfiguration.OnErrorBehavior.FAIL)
+                        .build();
 
         HttpResponseCallback callback =
                 new HttpResponseCallback(
@@ -145,14 +125,11 @@ class HttpResponseCallbackTest {
 
     @Test
     void 
shouldIncFailCountersOnCompletedWith500WhenDiscardAndContinueOnRetryExceededIsSelected()
 {
-        
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
-                errorHandlingBehavior =
-                        
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
-                                .builder()
-                                .onMaxRetryExceeded(
-                                        
PrometheusSinkConfiguration.OnErrorBehavior
-                                                .DISCARD_AND_CONTINUE)
-                                .build();
+        SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehavior =
+                SinkWriterErrorHandlingBehaviorConfiguration.builder()
+                        .onMaxRetryExceeded(
+                                
PrometheusSinkConfiguration.OnErrorBehavior.DISCARD_AND_CONTINUE)
+                        .build();
 
         HttpResponseCallback callback =
                 new HttpResponseCallback(
@@ -176,18 +153,12 @@ class HttpResponseCallbackTest {
 
     @Test
     void shouldThrowExceptionOnCompletedWith100() {
-        
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
-                errorHandlingBehavior =
-                        
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
-                                .builder()
-                                .build();
-
         HttpResponseCallback callback =
                 new HttpResponseCallback(
                         TIME_SERIES_COUNT,
                         SAMPLE_COUNT,
                         metricsCallback,
-                        errorHandlingBehavior,
+                        
SinkWriterErrorHandlingBehaviorConfiguration.DEFAULT_BEHAVIORS,
                         requestResults);
 
         SimpleHttpResponse httpResponse = new SimpleHttpResponse(100);
@@ -200,69 +171,28 @@ class HttpResponseCallbackTest {
     }
 
     @Test
-    void shouldThrowExceptionOnFailedWhenFailOnHttpIOFailureIsSelected() {
-        
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
-                errorHandlingBehavior =
-                        
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
-                                .builder()
-                                .onHttpClientIOFail(
-                                        
PrometheusSinkConfiguration.OnErrorBehavior.FAIL)
-                                .build();
-
+    void shouldThrowExceptionOnCompletedWith403() {
         HttpResponseCallback callback =
                 new HttpResponseCallback(
                         TIME_SERIES_COUNT,
                         SAMPLE_COUNT,
                         metricsCallback,
-                        errorHandlingBehavior,
+                        
SinkWriterErrorHandlingBehaviorConfiguration.DEFAULT_BEHAVIORS,
                         requestResults);
 
-        Exception ex = new UnsupportedOperationException("Dummy exceptions");
+        SimpleHttpResponse httpResponse = new SimpleHttpResponse(403);
 
         assertThrows(
                 PrometheusSinkWriteException.class,
                 () -> {
-                    callback.failed(ex);
+                    callback.completed(httpResponse);
                 });
     }
 
-    @Test
-    void 
shouldIncFailCountersOnFailedWhenDiscardAndContinueOnHttpIOFailureIsSelected() {
-        
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
-                errorHandlingBehavior =
-                        
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
-                                .builder()
-                                .onHttpClientIOFail(
-                                        
PrometheusSinkConfiguration.OnErrorBehavior
-                                                .DISCARD_AND_CONTINUE)
-                                .build();
-
-        HttpResponseCallback callback =
-                new HttpResponseCallback(
-                        TIME_SERIES_COUNT,
-                        SAMPLE_COUNT,
-                        metricsCallback,
-                        errorHandlingBehavior,
-                        requestResults);
-
-        Exception ex = new UnsupportedOperationException("Dummy exceptions");
-
-        callback.failed(ex);
-
-        // Verify only the expected metric callback was called, once
-        
assertTrue(metricsCallback.verifyOnlyFailedWriteRequestsForHttpClientIoFailWasCalledOnce());
-
-        // No time series is re-queued
-        HttpResponseCallbackTestUtils.assertNoReQueuedResult(reQueuedResults);
-    }
-
     @Test
     void shouldThrowExceptionOnCancelled() {
-        
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
-                errorHandlingBehavior =
-                        
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
-                                .builder()
-                                .build();
+        SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehavior =
+                SinkWriterErrorHandlingBehaviorConfiguration.builder().build();
 
         HttpResponseCallback callback =
                 new HttpResponseCallback(
diff --git 
a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseHandlingBehaviourIT.java
 
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseHandlingBehaviourIT.java
index 9fc4cfa..e232678 100644
--- 
a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseHandlingBehaviourIT.java
+++ 
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/HttpResponseHandlingBehaviourIT.java
@@ -45,9 +45,6 @@ import static 
com.github.tomakehurst.wiremock.client.WireMock.verify;
 import static 
org.apache.flink.connector.prometheus.sink.HttpResponseCallbackTestUtils.assertCallbackCompletedOnceWithException;
 import static 
org.apache.flink.connector.prometheus.sink.HttpResponseCallbackTestUtils.assertCallbackCompletedOnceWithNoException;
 import static 
org.apache.flink.connector.prometheus.sink.HttpResponseCallbackTestUtils.getRequestResult;
-import static 
org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration.ON_HTTP_CLIENT_IO_FAIL_DEFAULT_BEHAVIOR;
-import static 
org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration.ON_MAX_RETRY_EXCEEDED_DEFAULT_BEHAVIOR;
-import static 
org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration.ON_PROMETHEUS_NON_RETRIABLE_ERROR_DEFAULT_BEHAVIOR;
 import static org.awaitility.Awaitility.await;
 
 /**
@@ -115,18 +112,11 @@ public class HttpResponseHandlingBehaviourIT {
         serverWillRespond(status(statusCode));
 
         // Default behaviors for all errors
-        
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
-                errorHandlingBehavior =
-                        
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
-                                .builder()
-                                
.onMaxRetryExceeded(ON_MAX_RETRY_EXCEEDED_DEFAULT_BEHAVIOR)
-                                
.onHttpClientIOFail(ON_HTTP_CLIENT_IO_FAIL_DEFAULT_BEHAVIOR)
-                                .onPrometheusNonRetriableError(
-                                        
ON_PROMETHEUS_NON_RETRIABLE_ERROR_DEFAULT_BEHAVIOR)
-                                .build();
-
         VerifyableResponseCallback callback =
-                getResponseCallback(metricsCallback, errorHandlingBehavior);
+                getResponseCallback(
+                        metricsCallback,
+                        
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
+                                .DEFAULT_BEHAVIORS);
 
         SimpleHttpRequest request = buildRequest(wmRuntimeInfo);
 
@@ -229,12 +219,12 @@ public class HttpResponseHandlingBehaviourIT {
     }
 
     @Test
-    void 
shouldNotRetryAndCompleteOn404WhenDiscardAndContinueOnNonRetriableIsSelected(
+    void 
shouldNotRetryAndCompleteOn400WhenDiscardAndContinueOnNonRetriableIsSelected(
             WireMockRuntimeInfo wmRuntimeInfo) throws URISyntaxException, 
IOException {
         PrometheusAsyncHttpClientBuilder clientBuilder = 
getHttpClientBuilder(1);
 
-        // 404,Not found is non-retriable for Prometheus remote-write
-        int statusCode = 404;
+        // 400,Bad Request is non-retriable for Prometheus remote-write
+        int statusCode = 400;
         serverWillRespond(status(statusCode));
 
         // Discard and continue on non-retriable
@@ -278,14 +268,77 @@ public class HttpResponseHandlingBehaviourIT {
 
         PrometheusAsyncHttpClientBuilder clientBuilder = 
getHttpClientBuilder(1);
 
-        
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
-                errorHandlingBehavior =
+        VerifyableResponseCallback callback =
+                getResponseCallback(
+                        metricsCallback,
                         
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
-                                .builder()
-                                .build();
+                                .DEFAULT_BEHAVIORS);
+
+        SimpleHttpRequest request = buildRequest(wmRuntimeInfo);
+
+        try (CloseableHttpAsyncClient client = 
clientBuilder.buildAndStartClient(metricsCallback)) {
+            client.execute(request, callback);
+
+            await().untilAsserted(
+                            () -> {
+                                // Check the client execute only one request
+                                verify(exactly(1), 
postRequestedFor(urlEqualTo("/remote_write")));
+
+                                // Verify the callback was completed once with 
an exception
+                                assertCallbackCompletedOnceWithException(
+                                        PrometheusSinkWriteException.class, 
callback);
+                            });
+        }
+    }
+
+    @Test
+    void shouldNotRetryCompleteAndThrowExceptionOn403(WireMockRuntimeInfo 
wmRuntimeInfo)
+            throws URISyntaxException, IOException {
+
+        // 403,Forbidden is always fatal
+        int statusCode = 403;
+        serverWillRespond(status(statusCode));
+
+        PrometheusAsyncHttpClientBuilder clientBuilder = 
getHttpClientBuilder(1);
 
         VerifyableResponseCallback callback =
-                getResponseCallback(metricsCallback, errorHandlingBehavior);
+                getResponseCallback(
+                        metricsCallback,
+                        
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
+                                .DEFAULT_BEHAVIORS);
+
+        SimpleHttpRequest request = buildRequest(wmRuntimeInfo);
+
+        try (CloseableHttpAsyncClient client = 
clientBuilder.buildAndStartClient(metricsCallback)) {
+            client.execute(request, callback);
+
+            await().untilAsserted(
+                            () -> {
+                                // Check the client execute only one request
+                                verify(exactly(1), 
postRequestedFor(urlEqualTo("/remote_write")));
+
+                                // Verify the callback was completed once with 
an exception
+                                assertCallbackCompletedOnceWithException(
+                                        PrometheusSinkWriteException.class, 
callback);
+                            });
+        }
+    }
+
+    @Test
+    void shouldNotRetryCompleteAndThrowExceptionOn404(WireMockRuntimeInfo 
wmRuntimeInfo)
+            throws URISyntaxException, IOException {
+
+        // 404,Not Found is always fatal
+        int statusCode = 404;
+        serverWillRespond(status(statusCode));
+
+        PrometheusAsyncHttpClientBuilder clientBuilder = 
getHttpClientBuilder(1);
+
+        VerifyableResponseCallback callback =
+                getResponseCallback(
+                        metricsCallback,
+                        
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
+                                .DEFAULT_BEHAVIORS);
 
         SimpleHttpRequest request = buildRequest(wmRuntimeInfo);
 
diff --git 
a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilderTest.java
 
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilderTest.java
index f0639a0..dba0847 100644
--- 
a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilderTest.java
+++ 
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBuilderTest.java
@@ -52,9 +52,6 @@ class PrometheusSinkBuilderTest {
                                         PrometheusSinkConfiguration
                                                 
.SinkWriterErrorHandlingBehaviorConfiguration
                                                 .builder()
-                                                .onHttpClientIOFail(
-                                                        
PrometheusSinkConfiguration.OnErrorBehavior
-                                                                .FAIL)
                                                 .onMaxRetryExceeded(
                                                         
PrometheusSinkConfiguration.OnErrorBehavior
                                                                 .FAIL)
diff --git 
a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/SinkWriterErrorHandlingBehaviorConfigurationTest.java
 
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/SinkWriterErrorHandlingBehaviorConfigurationTest.java
index ef644f7..8068294 100644
--- 
a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/SinkWriterErrorHandlingBehaviorConfigurationTest.java
+++ 
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/SinkWriterErrorHandlingBehaviorConfigurationTest.java
@@ -37,13 +37,6 @@ class SinkWriterErrorHandlingBehaviorConfigurationTest {
                 DEFAULT_CONFIG.getOnMaxRetryExceeded());
     }
 
-    @Test
-    public void shouldDefaultToFailOnHttpClientIOFail() {
-        assertEquals(
-                PrometheusSinkConfiguration.OnErrorBehavior.FAIL,
-                DEFAULT_CONFIG.getOnHttpClientIOFail());
-    }
-
     @Test
     public void 
shouldDefaultToDiscardAndContinueOnPrometheusNonRetriableError() {
         assertEquals(
diff --git 
a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/examples/DataStreamExample.java
 
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/examples/DataStreamExample.java
index 9318d35..dfec646 100644
--- 
a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/examples/DataStreamExample.java
+++ 
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/examples/DataStreamExample.java
@@ -25,6 +25,7 @@ import org.apache.flink.connector.base.sink.AsyncSinkBase;
 import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner;
 import org.apache.flink.connector.prometheus.sink.PrometheusSink;
 import org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration;
+import 
org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.OnErrorBehavior;
 import org.apache.flink.connector.prometheus.sink.PrometheusTimeSeries;
 import 
org.apache.flink.connector.prometheus.sink.PrometheusTimeSeriesLabelsAndMetricNameKeySelector;
 import org.apache.flink.connector.prometheus.sink.prometheus.Types;
@@ -134,15 +135,8 @@ public class DataStreamExample {
                         .setErrorHandlingBehaviourConfiguration(
                                 PrometheusSinkConfiguration
                                         
.SinkWriterErrorHandlingBehaviorConfiguration.builder()
-                                        .onPrometheusNonRetriableError(
-                                                
PrometheusSinkConfiguration.OnErrorBehavior
-                                                        .FAIL) // Optional, 
default FAIL
                                         .onMaxRetryExceeded(
-                                                
PrometheusSinkConfiguration.OnErrorBehavior
-                                                        .FAIL) // Optional, 
default FAIL
-                                        .onHttpClientIOFail(
-                                                
PrometheusSinkConfiguration.OnErrorBehavior
-                                                        .FAIL) // Optional, 
default FAIL
+                                                OnErrorBehavior.FAIL) // 
Optional, default FAIL
                                         .build())
                         .setMetricGroupName("Prometheus") // Optional, default 
"Prometheus"
                         .build();
diff --git 
a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteResponseClassifierTest.java
 
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteResponseClassifierTest.java
new file mode 100644
index 0000000..e4d3adb
--- /dev/null
+++ 
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteResponseClassifierTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink.http;
+
+import org.apache.hc.core5.http.HttpResponse;
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.flink.connector.prometheus.sink.http.HttpClientTestUtils.httpResponse;
+import static 
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.FATAL_ERROR;
+import static 
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.NON_RETRIABLE_ERROR;
+import static 
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.RETRIABLE_ERROR;
+import static 
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType.UNHANDLED;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class RemoteWriteResponseClassifierTest {
+    @Test
+    void shouldClassify100AsUnhandled() {
+        HttpResponse response = httpResponse(100);
+        assertEquals(UNHANDLED, 
RemoteWriteResponseClassifier.classify(response));
+    }
+
+    @Test
+    void shouldClassify200AsSuccess() {
+        HttpResponse response = httpResponse(200);
+
+        assertEquals(
+                RemoteWriteResponseType.SUCCESS, 
RemoteWriteResponseClassifier.classify(response));
+    }
+
+    @Test
+    void shouldClassify400AsNonRetriableError() {
+        HttpResponse response = httpResponse(400);
+
+        assertEquals(NON_RETRIABLE_ERROR, 
RemoteWriteResponseClassifier.classify(response));
+    }
+
+    @Test
+    void shouldClassify403AsFatal() {
+        HttpResponse response = httpResponse(403);
+
+        assertEquals(FATAL_ERROR, 
RemoteWriteResponseClassifier.classify(response));
+    }
+
+    @Test
+    void shouldClassify404AsFatal() {
+        HttpResponse response = httpResponse(404);
+
+        assertEquals(FATAL_ERROR, 
RemoteWriteResponseClassifier.classify(response));
+    }
+
+    @Test
+    void shouldClassify429AsRetrialeError() {
+        HttpResponse response = httpResponse(429);
+
+        assertEquals(RETRIABLE_ERROR, 
RemoteWriteResponseClassifier.classify(response));
+    }
+
+    @Test
+    void shouldClassify500AsRetriableError() {
+        HttpResponse response = httpResponse(500);
+
+        assertEquals(RETRIABLE_ERROR, 
RemoteWriteResponseClassifier.classify(response));
+    }
+}
diff --git 
a/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/http/RethrowingIOSessionListenerTest.java
 
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/http/RethrowingIOSessionListenerTest.java
new file mode 100644
index 0000000..9712435
--- /dev/null
+++ 
b/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/http/RethrowingIOSessionListenerTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink.http;
+
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.PrometheusSinkWriteException;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class RethrowingIOSessionListenerTest {
+    @Test
+    void exceptionHandlerShouldRethrowPrometheusSinkWriteException() {
+        RethrowingIOSessionListener listener = new 
RethrowingIOSessionListener();
+        PrometheusSinkWriteException exception = new 
PrometheusSinkWriteException("Test exception");
+
+        Exception thrown = assertThrows(Exception.class, () -> 
listener.exception(null, exception));
+        assertEquals(exception, thrown);
+    }
+
+    @Test
+    void exceptionHandlerShouldNotRethrowOtherExceptions() {
+        RethrowingIOSessionListener listener = new 
RethrowingIOSessionListener();
+        Exception otherException = new Exception("Other exception");
+
+        assertDoesNotThrow(() -> listener.exception(null, otherException));
+    }
+}
diff --git a/pom.xml b/pom.xml
index 2bed12d..cafcc44 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,7 +54,8 @@ under the License.
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <flink.version>1.19.1</flink.version>
         <protobuf.version>3.22.2</protobuf.version>
-        <apache.httpclient.version>5.2.1</apache.httpclient.version>
+        <apache.httpcomponents.version>5.3</apache.httpcomponents.version>
+        
<apache.httpcomponents.client.version>5.3.1</apache.httpcomponents.client.version>
         <jackson-bom.version>2.14.3</jackson-bom.version>
         <log4j.version>2.17.1</log4j.version>
         <junit5.version>5.8.1</junit5.version>
@@ -95,12 +96,12 @@ under the License.
             <dependency>
                 <groupId>org.apache.httpcomponents.client5</groupId>
                 <artifactId>httpclient5</artifactId>
-                <version>${apache.httpclient.version}</version>
+                <version>${apache.httpcomponents.client.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.httpcomponents.core5</groupId>
                 <artifactId>httpcore5</artifactId>
-                <version>${apache.httpclient.version}</version>
+                <version>${apache.httpcomponents.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.xerial.snappy</groupId>

Reply via email to