This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-opensearch.git


The following commit(s) were added to refs/heads/main by this push:
     new 9e161cc  [FLINK-33925][connectors/opensearch] Allow customising bulk 
failure handling
9e161cc is described below

commit 9e161cc097b34d3ea0d32787f337e630250547d3
Author: Peter Fischer <[email protected]>
AuthorDate: Wed Mar 13 08:24:15 2024 +0100

    [FLINK-33925][connectors/opensearch] Allow customising bulk failure handling
    
    Extracted `BulkResponseInspector` interface to allow custom handling of 
(partially) failed bulk requests. If not overridden, default behaviour remains 
unchanged and partial failures are escalated.
    
    * fixes https://issues.apache.org/jira/browse/FLINK-33925
    * allows custom metrics to be exposed
---
 .../opensearch/sink/BulkResponseInspector.java     |  60 ++++++++++
 .../connector/opensearch/sink/OpensearchSink.java  |  14 ++-
 .../opensearch/sink/OpensearchSinkBuilder.java     |  48 +++++++-
 .../opensearch/sink/OpensearchWriter.java          | 114 +++++++++++-------
 .../opensearch/sink/DefaultBulkInspectorTest.java  | 133 +++++++++++++++++++++
 .../opensearch/sink/OpensearchSinkBuilderTest.java | 131 ++++++++++++++++++++
 .../opensearch/sink/OpensearchWriterITCase.java    |   9 +-
 7 files changed, 458 insertions(+), 51 deletions(-)

diff --git 
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/BulkResponseInspector.java
 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/BulkResponseInspector.java
new file mode 100644
index 0000000..3dbd6a8
--- /dev/null
+++ 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/BulkResponseInspector.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.function.SerializableFunction;
+
+import org.opensearch.action.bulk.BulkRequest;
+import org.opensearch.action.bulk.BulkResponse;
+
+/** Callback for inspecting a {@link BulkResponse}. */
+@PublicEvolving
+@FunctionalInterface
+public interface BulkResponseInspector {
+
+    /**
+     * Callback to inspect a {@code response} in the context of its {@code 
request}. It may throw a
+     * {@link org.apache.flink.util.FlinkRuntimeException} to indicate that 
the bulk failed
+     * (partially).
+     */
+    void inspect(BulkRequest request, BulkResponse response);
+
+    /**
+     * Factory interface for creating a {@link BulkResponseInspector} in the 
context of a sink.
+     * Allows obtaining a {@link org.apache.flink.metrics.MetricGroup} to 
capture custom metrics.
+     */
+    @PublicEvolving
+    @FunctionalInterface
+    interface BulkResponseInspectorFactory
+            extends SerializableFunction<
+                    BulkResponseInspectorFactory.InitContext, 
BulkResponseInspector> {
+
+        /**
+         * The interface exposes a subset of {@link
+         * org.apache.flink.api.connector.sink2.Sink.InitContext}.
+         */
+        interface InitContext {
+
+            /** Returns: The metric group of the surrounding writer. */
+            MetricGroup metricGroup();
+        }
+    }
+}
diff --git 
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java
 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java
index c02b4fe..c01f0d0 100644
--- 
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java
+++ 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import 
org.apache.flink.connector.opensearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;
 
 import org.apache.http.HttpHost;
 
@@ -60,7 +61,7 @@ public class OpensearchSink<IN> implements Sink<IN> {
     private final NetworkClientConfig networkClientConfig;
     private final DeliveryGuarantee deliveryGuarantee;
     private final RestClientFactory restClientFactory;
-    private final FailureHandler failureHandler;
+    private final BulkResponseInspectorFactory bulkResponseInspectorFactory;
 
     OpensearchSink(
             List<HttpHost> hosts,
@@ -69,7 +70,7 @@ public class OpensearchSink<IN> implements Sink<IN> {
             BulkProcessorConfig buildBulkProcessorConfig,
             NetworkClientConfig networkClientConfig,
             RestClientFactory restClientFactory,
-            FailureHandler failureHandler) {
+            BulkResponseInspectorFactory bulkResponseInspectorFactory) {
         this.hosts = checkNotNull(hosts);
         checkArgument(!hosts.isEmpty(), "Hosts cannot be empty.");
         this.emitter = checkNotNull(emitter);
@@ -77,7 +78,7 @@ public class OpensearchSink<IN> implements Sink<IN> {
         this.buildBulkProcessorConfig = checkNotNull(buildBulkProcessorConfig);
         this.networkClientConfig = checkNotNull(networkClientConfig);
         this.restClientFactory = checkNotNull(restClientFactory);
-        this.failureHandler = checkNotNull(failureHandler);
+        this.bulkResponseInspectorFactory = 
checkNotNull(bulkResponseInspectorFactory);
     }
 
     @Override
@@ -91,11 +92,16 @@ public class OpensearchSink<IN> implements Sink<IN> {
                 context.metricGroup(),
                 context.getMailboxExecutor(),
                 restClientFactory,
-                failureHandler);
+                bulkResponseInspectorFactory.apply(context::metricGroup));
     }
 
     @VisibleForTesting
     DeliveryGuarantee getDeliveryGuarantee() {
         return deliveryGuarantee;
     }
+
+    @VisibleForTesting
+    BulkResponseInspectorFactory getBulkResponseInspectorFactory() {
+        return bulkResponseInspectorFactory;
+    }
 }
diff --git 
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java
 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java
index 736c607..ce7b8ae 100644
--- 
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java
+++ 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java
@@ -20,6 +20,9 @@ package org.apache.flink.connector.opensearch.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import 
org.apache.flink.connector.opensearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;
+import 
org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector;
+import 
org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultFailureHandler;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.apache.http.HttpHost;
@@ -27,7 +30,6 @@ import org.apache.http.HttpHost;
 import java.util.Arrays;
 import java.util.List;
 
-import static 
org.apache.flink.connector.opensearch.sink.OpensearchWriter.DEFAULT_FAILURE_HANDLER;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -74,7 +76,8 @@ public class OpensearchSinkBuilder<IN> {
     private Integer socketTimeout;
     private Boolean allowInsecure;
     private RestClientFactory restClientFactory;
-    private FailureHandler failureHandler = DEFAULT_FAILURE_HANDLER;
+    private FailureHandler failureHandler = new DefaultFailureHandler();
+    private BulkResponseInspectorFactory bulkResponseInspectorFactory;
 
     public OpensearchSinkBuilder() {
         restClientFactory = new DefaultRestClientFactory();
@@ -315,6 +318,20 @@ public class OpensearchSinkBuilder<IN> {
         return self();
     }
 
+    /**
+     * Overrides the default {@link BulkResponseInspectorFactory}. A custom 
{@link
+     * BulkResponseInspector}, for example, can change the failure handling 
and capture additional
+     * metrics. See {@link #failureHandler} for a simpler way of handling 
failures.
+     *
+     * @param bulkResponseInspectorFactory the factory
+     * @return this builder
+     */
+    public OpensearchSinkBuilder<IN> setBulkResponseInspectorFactory(
+            BulkResponseInspectorFactory bulkResponseInspectorFactory) {
+        this.bulkResponseInspectorFactory = 
checkNotNull(bulkResponseInspectorFactory);
+        return self();
+    }
+
     /**
      * Constructs the {@link OpensearchSink} with the properties configured 
this builder.
      *
@@ -334,7 +351,13 @@ public class OpensearchSinkBuilder<IN> {
                 bulkProcessorConfig,
                 networkClientConfig,
                 restClientFactory,
-                failureHandler);
+                getBulkResponseInspectorFactory());
+    }
+
+    protected BulkResponseInspectorFactory getBulkResponseInspectorFactory() {
+        return this.bulkResponseInspectorFactory == null
+                ? new DefaultBulkResponseInspectorFactory(failureHandler)
+                : this.bulkResponseInspectorFactory;
     }
 
     private NetworkClientConfig buildNetworkClientConfig() {
@@ -395,4 +418,23 @@ public class OpensearchSinkBuilder<IN> {
                 + '\''
                 + '}';
     }
+
+    /**
+     * Default factory for {@link FailureHandler}-bound {@link 
BulkResponseInspector
+     * BulkResponseInspectors}. A Static class is used instead of 
anonymous/lambda to avoid
+     * non-serializable references to {@link OpensearchSinkBuilder}.
+     */
+    static class DefaultBulkResponseInspectorFactory implements 
BulkResponseInspectorFactory {
+
+        private final FailureHandler failureHandler;
+
+        DefaultBulkResponseInspectorFactory(FailureHandler failureHandler) {
+            this.failureHandler = failureHandler;
+        }
+
+        @Override
+        public BulkResponseInspector apply(InitContext context) {
+            return new DefaultBulkResponseInspector(failureHandler);
+        }
+    }
 }
diff --git 
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java
 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java
index 68da301..d13973d 100644
--- 
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java
+++ 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java
@@ -58,11 +58,6 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(OpensearchWriter.class);
 
-    public static final FailureHandler DEFAULT_FAILURE_HANDLER =
-            ex -> {
-                throw new FlinkRuntimeException(ex);
-            };
-
     private final OpensearchEmitter<? super IN> emitter;
     private final MailboxExecutor mailboxExecutor;
     private final boolean flushOnCheckpoint;
@@ -70,7 +65,6 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
     private final RestHighLevelClient client;
     private final RequestIndexer requestIndexer;
     private final Counter numBytesOutCounter;
-    private final FailureHandler failureHandler;
 
     private long pendingActions = 0;
     private boolean checkpointInProgress = false;
@@ -102,7 +96,7 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
             SinkWriterMetricGroup metricGroup,
             MailboxExecutor mailboxExecutor,
             RestClientFactory restClientFactory,
-            FailureHandler failureHandler) {
+            BulkResponseInspector bulkResponseInspector) {
         this.emitter = checkNotNull(emitter);
         this.flushOnCheckpoint = flushOnCheckpoint;
         this.mailboxExecutor = checkNotNull(mailboxExecutor);
@@ -113,7 +107,8 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
                         builder, new 
DefaultRestClientConfig(networkClientConfig));
 
         this.client = new RestHighLevelClient(builder);
-        this.bulkProcessor = createBulkProcessor(bulkProcessorConfig);
+        this.bulkProcessor =
+                createBulkProcessor(bulkProcessorConfig, 
checkNotNull(bulkResponseInspector));
         this.requestIndexer = new 
DefaultRequestIndexer(metricGroup.getNumRecordsSendCounter());
         checkNotNull(metricGroup);
         metricGroup.setCurrentSendTimeGauge(() -> ackTime - lastSendTime);
@@ -123,7 +118,6 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
         } catch (Exception e) {
             throw new FlinkRuntimeException("Failed to open the 
OpensearchEmitter", e);
         }
-        this.failureHandler = failureHandler;
     }
 
     @Override
@@ -163,7 +157,8 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
         client.close();
     }
 
-    private BulkProcessor createBulkProcessor(BulkProcessorConfig 
bulkProcessorConfig) {
+    private BulkProcessor createBulkProcessor(
+            BulkProcessorConfig bulkProcessorConfig, BulkResponseInspector 
bulkResponseInspector) {
 
         final BulkProcessor.Builder builder =
                 BulkProcessor.builder(
@@ -180,7 +175,7 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
                                         bulkResponseActionListener);
                             }
                         },
-                        new BulkListener());
+                        new BulkListener(bulkResponseInspector));
 
         if (bulkProcessorConfig.getBulkFlushMaxActions() != -1) {
             
builder.setBulkActions(bulkProcessorConfig.getBulkFlushMaxActions());
@@ -223,6 +218,12 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
 
     private class BulkListener implements BulkProcessor.Listener {
 
+        private final BulkResponseInspector bulkResponseInspector;
+
+        public BulkListener(BulkResponseInspector bulkResponseInspector) {
+            this.bulkResponseInspector = bulkResponseInspector;
+        }
+
         @Override
         public void beforeBulk(long executionId, BulkRequest request) {
             LOG.info("Sending bulk of {} actions to Opensearch.", 
request.numberOfActions());
@@ -245,6 +246,11 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
                     },
                     "opensearchErrorCallback");
         }
+
+        private void extractFailures(BulkRequest request, BulkResponse 
response) {
+            bulkResponseInspector.inspect(request, response);
+            pendingActions -= request.numberOfActions();
+        }
     }
 
     private void enqueueActionInMailbox(
@@ -259,35 +265,6 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
         mailboxExecutor.execute(action, actionName);
     }
 
-    private void extractFailures(BulkRequest request, BulkResponse response) {
-        if (!response.hasFailures()) {
-            pendingActions -= request.numberOfActions();
-            return;
-        }
-
-        Throwable chainedFailures = null;
-        for (int i = 0; i < response.getItems().length; i++) {
-            final BulkItemResponse itemResponse = response.getItems()[i];
-            if (!itemResponse.isFailed()) {
-                continue;
-            }
-            final Throwable failure = itemResponse.getFailure().getCause();
-            if (failure == null) {
-                continue;
-            }
-            final RestStatus restStatus = 
itemResponse.getFailure().getStatus();
-            final DocWriteRequest<?> actionRequest = request.requests().get(i);
-
-            chainedFailures =
-                    firstOrSuppressed(
-                            wrapException(restStatus, failure, actionRequest), 
chainedFailures);
-        }
-        if (chainedFailures == null) {
-            return;
-        }
-        failureHandler.onFailure(chainedFailures);
-    }
-
     private static Throwable wrapException(
             RestStatus restStatus, Throwable rootFailure, DocWriteRequest<?> 
actionRequest) {
         if (restStatus == null) {
@@ -345,4 +322,61 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
             }
         }
     }
+
+    /**
+     * A strict implementation that fails if either the whole bulk request 
failed or any of its
+     * actions.
+     */
+    static class DefaultBulkResponseInspector implements BulkResponseInspector 
{
+
+        @VisibleForTesting final FailureHandler failureHandler;
+
+        DefaultBulkResponseInspector() {
+            this(new DefaultFailureHandler());
+        }
+
+        DefaultBulkResponseInspector(FailureHandler failureHandler) {
+            this.failureHandler = checkNotNull(failureHandler);
+        }
+
+        @Override
+        public void inspect(BulkRequest request, BulkResponse response) {
+            if (!response.hasFailures()) {
+                return;
+            }
+
+            Throwable chainedFailures = null;
+            for (int i = 0; i < response.getItems().length; i++) {
+                final BulkItemResponse itemResponse = response.getItems()[i];
+                if (!itemResponse.isFailed()) {
+                    continue;
+                }
+                final Throwable failure = itemResponse.getFailure().getCause();
+                if (failure == null) {
+                    continue;
+                }
+                final RestStatus restStatus = 
itemResponse.getFailure().getStatus();
+                final DocWriteRequest<?> actionRequest = 
request.requests().get(i);
+
+                chainedFailures =
+                        firstOrSuppressed(
+                                wrapException(restStatus, failure, 
actionRequest), chainedFailures);
+            }
+            if (chainedFailures == null) {
+                return;
+            }
+            failureHandler.onFailure(chainedFailures);
+        }
+    }
+
+    static class DefaultFailureHandler implements FailureHandler {
+
+        @Override
+        public void onFailure(Throwable failure) {
+            if (failure instanceof FlinkRuntimeException) {
+                throw (FlinkRuntimeException) failure;
+            }
+            throw new FlinkRuntimeException(failure);
+        }
+    }
 }
diff --git 
a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/DefaultBulkInspectorTest.java
 
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/DefaultBulkInspectorTest.java
new file mode 100644
index 0000000..e230da7
--- /dev/null
+++ 
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/DefaultBulkInspectorTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import 
org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.opensearch.action.DocWriteRequest.OpType;
+import org.opensearch.action.DocWriteResponse;
+import org.opensearch.action.bulk.BulkItemResponse;
+import org.opensearch.action.bulk.BulkItemResponse.Failure;
+import org.opensearch.action.bulk.BulkRequest;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.action.delete.DeleteRequest;
+import org.opensearch.action.index.IndexRequest;
+
+import java.io.IOException;
+
+/** Tests for {@link DefaultBulkResponseInspector}. */
+@ExtendWith(TestLoggerExtension.class)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+class DefaultBulkResponseInspectorTest {
+
+    @Test
+    void testPassWithoutFailures() {
+        final DefaultBulkResponseInspector inspector = new 
DefaultBulkResponseInspector();
+        Assertions.assertThatCode(
+                        () ->
+                                inspector.inspect(
+                                        new BulkRequest(),
+                                        new BulkResponse(new 
BulkItemResponse[] {}, 0)))
+                .doesNotThrowAnyException();
+    }
+
+    @Test
+    void testPassesDespiteChainedFailure() {
+        final DefaultBulkResponseInspector inspector =
+                new DefaultBulkResponseInspector((failure) -> {});
+        Assertions.assertThatCode(
+                        () -> {
+                            final BulkRequest request = new BulkRequest();
+                            request.add(
+                                    new IndexRequest(), new DeleteRequest(), 
new DeleteRequest());
+
+                            inspector.inspect(
+                                    request,
+                                    new BulkResponse(
+                                            new BulkItemResponse[] {
+                                                new BulkItemResponse(
+                                                        0, OpType.CREATE, 
(DocWriteResponse) null),
+                                                new BulkItemResponse(
+                                                        1,
+                                                        OpType.DELETE,
+                                                        new Failure(
+                                                                "index",
+                                                                "type",
+                                                                "id",
+                                                                new 
IOException("A"))),
+                                                new BulkItemResponse(
+                                                        2,
+                                                        OpType.DELETE,
+                                                        new Failure(
+                                                                "index",
+                                                                "type",
+                                                                "id",
+                                                                new 
IOException("B")))
+                                            },
+                                            0));
+                        })
+                .doesNotThrowAnyException();
+    }
+
+    @Test
+    void testThrowsChainedFailure() {
+        final IOException failureCause0 = new IOException("A");
+        final IOException failureCause1 = new IOException("B");
+        final DefaultBulkResponseInspector inspector = new 
DefaultBulkResponseInspector();
+        Assertions.assertThatExceptionOfType(FlinkRuntimeException.class)
+                .isThrownBy(
+                        () -> {
+                            final BulkRequest request = new BulkRequest();
+                            request.add(
+                                    new IndexRequest(), new DeleteRequest(), 
new DeleteRequest());
+
+                            inspector.inspect(
+                                    request,
+                                    new BulkResponse(
+                                            new BulkItemResponse[] {
+                                                new BulkItemResponse(
+                                                        0, OpType.CREATE, 
(DocWriteResponse) null),
+                                                new BulkItemResponse(
+                                                        1,
+                                                        OpType.DELETE,
+                                                        new Failure(
+                                                                "index",
+                                                                "type",
+                                                                "id",
+                                                                
failureCause0)),
+                                                new BulkItemResponse(
+                                                        2,
+                                                        OpType.DELETE,
+                                                        new Failure(
+                                                                "index",
+                                                                "type",
+                                                                "id",
+                                                                failureCause1))
+                                            },
+                                            0));
+                        })
+                .withCause(failureCause0);
+    }
+}
diff --git 
a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java
 
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java
index 9939313..693ae44 100644
--- 
a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java
+++ 
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java
@@ -17,8 +17,26 @@
 
 package org.apache.flink.connector.opensearch.sink;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import 
org.apache.flink.connector.opensearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;
+import 
org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
 import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.UserCodeClassLoader;
+import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.apache.http.HttpHost;
 import org.junit.jupiter.api.DynamicTest;
@@ -27,9 +45,12 @@ import org.junit.jupiter.api.TestFactory;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.util.OptionalLong;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Stream;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatNoException;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -111,6 +132,116 @@ class OpensearchSinkBuilderTest {
                 .isInstanceOf(NullPointerException.class);
     }
 
+    @Test
+    void testOverrideFailureHandler() {
+        final FailureHandler failureHandler = (failure) -> {};
+        final OpensearchSink<Object> sink =
+                
createMinimalBuilder().setFailureHandler(failureHandler).build();
+
+        final InitContext sinkInitContext = new MockInitContext();
+        final BulkResponseInspector bulkResponseInspector =
+                
sink.getBulkResponseInspectorFactory().apply(sinkInitContext::metricGroup);
+        assertThat(bulkResponseInspector)
+                .isInstanceOf(DefaultBulkResponseInspector.class)
+                .extracting(
+                        (inspector) -> ((DefaultBulkResponseInspector) 
inspector).failureHandler)
+                .isEqualTo(failureHandler);
+    }
+
+    @Test
+    void testOverrideBulkResponseInspectorFactory() {
+        final AtomicBoolean called = new AtomicBoolean();
+        final BulkResponseInspectorFactory bulkResponseInspectorFactory =
+                initContext -> {
+                    final MetricGroup metricGroup = initContext.metricGroup();
+                    metricGroup.addGroup("bulk").addGroup("result", 
"failed").counter("actions");
+                    called.set(true);
+                    return (BulkResponseInspector) (request, response) -> {};
+                };
+        final OpensearchSink<Object> sink =
+                createMinimalBuilder()
+                        
.setBulkResponseInspectorFactory(bulkResponseInspectorFactory)
+                        .build();
+
+        final InitContext sinkInitContext = new MockInitContext();
+
+        assertThatCode(() -> 
sink.createWriter(sinkInitContext)).doesNotThrowAnyException();
+        assertThat(called).isTrue();
+    }
+
+    private static class DummyMailboxExecutor implements MailboxExecutor {
+        private DummyMailboxExecutor() {}
+
+        public void execute(
+                ThrowingRunnable<? extends Exception> command,
+                String descriptionFormat,
+                Object... descriptionArgs) {}
+
+        public void yield() throws InterruptedException, FlinkRuntimeException 
{}
+
+        public boolean tryYield() throws FlinkRuntimeException {
+            return false;
+        }
+    }
+
+    private static class MockInitContext
+            implements Sink.InitContext, 
SerializationSchema.InitializationContext {
+
+        public UserCodeClassLoader getUserCodeClassLoader() {
+            return SimpleUserCodeClassLoader.create(
+                    OpensearchSinkBuilderTest.class.getClassLoader());
+        }
+
+        public MailboxExecutor getMailboxExecutor() {
+            return new OpensearchSinkBuilderTest.DummyMailboxExecutor();
+        }
+
+        public ProcessingTimeService getProcessingTimeService() {
+            return new TestProcessingTimeService();
+        }
+
+        public int getSubtaskId() {
+            return 0;
+        }
+
+        public int getNumberOfParallelSubtasks() {
+            return 0;
+        }
+
+        public int getAttemptNumber() {
+            return 0;
+        }
+
+        public SinkWriterMetricGroup metricGroup() {
+            return InternalSinkWriterMetricGroup.mock(new 
UnregisteredMetricsGroup());
+        }
+
+        public MetricGroup getMetricGroup() {
+            return this.metricGroup();
+        }
+
+        public OptionalLong getRestoredCheckpointId() {
+            return OptionalLong.empty();
+        }
+
+        public SerializationSchema.InitializationContext
+                asSerializationSchemaInitializationContext() {
+            return this;
+        }
+
+        public boolean isObjectReuseEnabled() {
+            return false;
+        }
+
+        public <IN> TypeSerializer<IN> createInputSerializer() {
+            throw new UnsupportedOperationException();
+        }
+
+        public JobID getJobId() {
+            throw new UnsupportedOperationException();
+        }
+    }
+
     private OpensearchSinkBuilder<Object> createEmptyBuilder() {
         return new OpensearchSinkBuilder<>();
     }
diff --git 
a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
 
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
index afdf26b..fc083a4 100644
--- 
a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
+++ 
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
@@ -21,6 +21,8 @@ import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connector.opensearch.OpensearchUtil;
+import 
org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector;
+import 
org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultFailureHandler;
 import org.apache.flink.connector.opensearch.test.DockerImageVersions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
@@ -56,7 +58,6 @@ import java.util.Map;
 import java.util.Optional;
 
 import static 
org.apache.flink.connector.opensearch.sink.OpensearchTestClient.buildMessage;
-import static 
org.apache.flink.connector.opensearch.sink.OpensearchWriter.DEFAULT_FAILURE_HANDLER;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link OpensearchWriter}. */
@@ -280,7 +281,7 @@ class OpensearchWriterITCase {
                 flushOnCheckpoint,
                 bulkProcessorConfig,
                 
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
-                DEFAULT_FAILURE_HANDLER);
+                new DefaultFailureHandler());
     }
 
     private OpensearchWriter<Tuple2<Integer, String>> createWriter(
@@ -306,7 +307,7 @@ class OpensearchWriterITCase {
                 flushOnCheckpoint,
                 bulkProcessorConfig,
                 metricGroup,
-                DEFAULT_FAILURE_HANDLER);
+                new DefaultFailureHandler());
     }
 
     private OpensearchWriter<Tuple2<Integer, String>> createWriter(
@@ -331,7 +332,7 @@ class OpensearchWriterITCase {
                 metricGroup,
                 new TestMailbox(),
                 new DefaultRestClientFactory(),
-                failureHandler);
+                new DefaultBulkResponseInspector(failureHandler));
     }
 
     private static class UpdatingEmitter implements 
OpensearchEmitter<Tuple2<Integer, String>> {


Reply via email to