This is an automated email from the ASF dual-hosted git repository.
guoweijie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-opensearch.git
The following commit(s) were added to refs/heads/main by this push:
new 9e161cc [FLINK-33925][connectors/opensearch] Allow customising bulk
failure handling
9e161cc is described below
commit 9e161cc097b34d3ea0d32787f337e630250547d3
Author: Peter Fischer <[email protected]>
AuthorDate: Wed Mar 13 08:24:15 2024 +0100
[FLINK-33925][connectors/opensearch] Allow customising bulk failure handling
Extracted `BulkResponseInspector` interface to allow custom handling of
(partially) failed bulk requests. If not overridden, default behaviour remains
unchanged and partial failures are escalated.
* fixes https://issues.apache.org/jira/browse/FLINK-33925
* allows custom metrics to be exposed
---
.../opensearch/sink/BulkResponseInspector.java | 60 ++++++++++
.../connector/opensearch/sink/OpensearchSink.java | 14 ++-
.../opensearch/sink/OpensearchSinkBuilder.java | 48 +++++++-
.../opensearch/sink/OpensearchWriter.java | 114 +++++++++++-------
.../opensearch/sink/DefaultBulkInspectorTest.java | 133 +++++++++++++++++++++
.../opensearch/sink/OpensearchSinkBuilderTest.java | 131 ++++++++++++++++++++
.../opensearch/sink/OpensearchWriterITCase.java | 9 +-
7 files changed, 458 insertions(+), 51 deletions(-)
diff --git
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/BulkResponseInspector.java
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/BulkResponseInspector.java
new file mode 100644
index 0000000..3dbd6a8
--- /dev/null
+++
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/BulkResponseInspector.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.function.SerializableFunction;
+
+import org.opensearch.action.bulk.BulkRequest;
+import org.opensearch.action.bulk.BulkResponse;
+
+/** Callback for inspecting a {@link BulkResponse}. */
+@PublicEvolving
+@FunctionalInterface
+public interface BulkResponseInspector {
+
+ /**
+ * Callback to inspect a {@code response} in the context of its {@code
request}. It may throw a
+ * {@link org.apache.flink.util.FlinkRuntimeException} to indicate that
the bulk failed
+ * (partially).
+ */
+ void inspect(BulkRequest request, BulkResponse response);
+
+ /**
+ * Factory interface for creating a {@link BulkResponseInspector} in the
context of a sink.
+ * Allows obtaining a {@link org.apache.flink.metrics.MetricGroup} to
capture custom metrics.
+ */
+ @PublicEvolving
+ @FunctionalInterface
+ interface BulkResponseInspectorFactory
+ extends SerializableFunction<
+ BulkResponseInspectorFactory.InitContext,
BulkResponseInspector> {
+
+ /**
+ * The interface exposes a subset of {@link
+ * org.apache.flink.api.connector.sink2.Sink.InitContext}.
+ */
+ interface InitContext {
+
+ /** Returns: The metric group of the surrounding writer. */
+ MetricGroup metricGroup();
+ }
+ }
+}
diff --git
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java
index c02b4fe..c01f0d0 100644
---
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java
+++
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.base.DeliveryGuarantee;
+import
org.apache.flink.connector.opensearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;
import org.apache.http.HttpHost;
@@ -60,7 +61,7 @@ public class OpensearchSink<IN> implements Sink<IN> {
private final NetworkClientConfig networkClientConfig;
private final DeliveryGuarantee deliveryGuarantee;
private final RestClientFactory restClientFactory;
- private final FailureHandler failureHandler;
+ private final BulkResponseInspectorFactory bulkResponseInspectorFactory;
OpensearchSink(
List<HttpHost> hosts,
@@ -69,7 +70,7 @@ public class OpensearchSink<IN> implements Sink<IN> {
BulkProcessorConfig buildBulkProcessorConfig,
NetworkClientConfig networkClientConfig,
RestClientFactory restClientFactory,
- FailureHandler failureHandler) {
+ BulkResponseInspectorFactory bulkResponseInspectorFactory) {
this.hosts = checkNotNull(hosts);
checkArgument(!hosts.isEmpty(), "Hosts cannot be empty.");
this.emitter = checkNotNull(emitter);
@@ -77,7 +78,7 @@ public class OpensearchSink<IN> implements Sink<IN> {
this.buildBulkProcessorConfig = checkNotNull(buildBulkProcessorConfig);
this.networkClientConfig = checkNotNull(networkClientConfig);
this.restClientFactory = checkNotNull(restClientFactory);
- this.failureHandler = checkNotNull(failureHandler);
+ this.bulkResponseInspectorFactory =
checkNotNull(bulkResponseInspectorFactory);
}
@Override
@@ -91,11 +92,16 @@ public class OpensearchSink<IN> implements Sink<IN> {
context.metricGroup(),
context.getMailboxExecutor(),
restClientFactory,
- failureHandler);
+ bulkResponseInspectorFactory.apply(context::metricGroup));
}
@VisibleForTesting
DeliveryGuarantee getDeliveryGuarantee() {
return deliveryGuarantee;
}
+
+ @VisibleForTesting
+ BulkResponseInspectorFactory getBulkResponseInspectorFactory() {
+ return bulkResponseInspectorFactory;
+ }
}
diff --git
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java
index 736c607..ce7b8ae 100644
---
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java
+++
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java
@@ -20,6 +20,9 @@ package org.apache.flink.connector.opensearch.sink;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.base.DeliveryGuarantee;
+import
org.apache.flink.connector.opensearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;
+import
org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector;
+import
org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultFailureHandler;
import org.apache.flink.util.InstantiationUtil;
import org.apache.http.HttpHost;
@@ -27,7 +30,6 @@ import org.apache.http.HttpHost;
import java.util.Arrays;
import java.util.List;
-import static
org.apache.flink.connector.opensearch.sink.OpensearchWriter.DEFAULT_FAILURE_HANDLER;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -74,7 +76,8 @@ public class OpensearchSinkBuilder<IN> {
private Integer socketTimeout;
private Boolean allowInsecure;
private RestClientFactory restClientFactory;
- private FailureHandler failureHandler = DEFAULT_FAILURE_HANDLER;
+ private FailureHandler failureHandler = new DefaultFailureHandler();
+ private BulkResponseInspectorFactory bulkResponseInspectorFactory;
public OpensearchSinkBuilder() {
restClientFactory = new DefaultRestClientFactory();
@@ -315,6 +318,20 @@ public class OpensearchSinkBuilder<IN> {
return self();
}
+ /**
+ * Overrides the default {@link BulkResponseInspectorFactory}. A custom
{@link
+ * BulkResponseInspector}, for example, can change the failure handling
and capture additional
+ * metrics. See {@link #failureHandler} for a simpler way of handling
failures.
+ *
+ * @param bulkResponseInspectorFactory the factory
+ * @return this builder
+ */
+ public OpensearchSinkBuilder<IN> setBulkResponseInspectorFactory(
+ BulkResponseInspectorFactory bulkResponseInspectorFactory) {
+ this.bulkResponseInspectorFactory =
checkNotNull(bulkResponseInspectorFactory);
+ return self();
+ }
+
/**
* Constructs the {@link OpensearchSink} with the properties configured
this builder.
*
@@ -334,7 +351,13 @@ public class OpensearchSinkBuilder<IN> {
bulkProcessorConfig,
networkClientConfig,
restClientFactory,
- failureHandler);
+ getBulkResponseInspectorFactory());
+ }
+
+ protected BulkResponseInspectorFactory getBulkResponseInspectorFactory() {
+ return this.bulkResponseInspectorFactory == null
+ ? new DefaultBulkResponseInspectorFactory(failureHandler)
+ : this.bulkResponseInspectorFactory;
}
private NetworkClientConfig buildNetworkClientConfig() {
@@ -395,4 +418,23 @@ public class OpensearchSinkBuilder<IN> {
+ '\''
+ '}';
}
+
+ /**
+ * Default factory for {@link FailureHandler}-bound {@link
BulkResponseInspector
+ * BulkResponseInspectors}. A Static class is used instead of
anonymous/lambda to avoid
+ * non-serializable references to {@link OpensearchSinkBuilder}.
+ */
+ static class DefaultBulkResponseInspectorFactory implements
BulkResponseInspectorFactory {
+
+ private final FailureHandler failureHandler;
+
+ DefaultBulkResponseInspectorFactory(FailureHandler failureHandler) {
+ this.failureHandler = failureHandler;
+ }
+
+ @Override
+ public BulkResponseInspector apply(InitContext context) {
+ return new DefaultBulkResponseInspector(failureHandler);
+ }
+ }
}
diff --git
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java
index 68da301..d13973d 100644
---
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java
+++
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java
@@ -58,11 +58,6 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
private static final Logger LOG =
LoggerFactory.getLogger(OpensearchWriter.class);
- public static final FailureHandler DEFAULT_FAILURE_HANDLER =
- ex -> {
- throw new FlinkRuntimeException(ex);
- };
-
private final OpensearchEmitter<? super IN> emitter;
private final MailboxExecutor mailboxExecutor;
private final boolean flushOnCheckpoint;
@@ -70,7 +65,6 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
private final RestHighLevelClient client;
private final RequestIndexer requestIndexer;
private final Counter numBytesOutCounter;
- private final FailureHandler failureHandler;
private long pendingActions = 0;
private boolean checkpointInProgress = false;
@@ -102,7 +96,7 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
SinkWriterMetricGroup metricGroup,
MailboxExecutor mailboxExecutor,
RestClientFactory restClientFactory,
- FailureHandler failureHandler) {
+ BulkResponseInspector bulkResponseInspector) {
this.emitter = checkNotNull(emitter);
this.flushOnCheckpoint = flushOnCheckpoint;
this.mailboxExecutor = checkNotNull(mailboxExecutor);
@@ -113,7 +107,8 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
builder, new
DefaultRestClientConfig(networkClientConfig));
this.client = new RestHighLevelClient(builder);
- this.bulkProcessor = createBulkProcessor(bulkProcessorConfig);
+ this.bulkProcessor =
+ createBulkProcessor(bulkProcessorConfig,
checkNotNull(bulkResponseInspector));
this.requestIndexer = new
DefaultRequestIndexer(metricGroup.getNumRecordsSendCounter());
checkNotNull(metricGroup);
metricGroup.setCurrentSendTimeGauge(() -> ackTime - lastSendTime);
@@ -123,7 +118,6 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
} catch (Exception e) {
throw new FlinkRuntimeException("Failed to open the
OpensearchEmitter", e);
}
- this.failureHandler = failureHandler;
}
@Override
@@ -163,7 +157,8 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
client.close();
}
- private BulkProcessor createBulkProcessor(BulkProcessorConfig
bulkProcessorConfig) {
+ private BulkProcessor createBulkProcessor(
+ BulkProcessorConfig bulkProcessorConfig, BulkResponseInspector
bulkResponseInspector) {
final BulkProcessor.Builder builder =
BulkProcessor.builder(
@@ -180,7 +175,7 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
bulkResponseActionListener);
}
},
- new BulkListener());
+ new BulkListener(bulkResponseInspector));
if (bulkProcessorConfig.getBulkFlushMaxActions() != -1) {
builder.setBulkActions(bulkProcessorConfig.getBulkFlushMaxActions());
@@ -223,6 +218,12 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
private class BulkListener implements BulkProcessor.Listener {
+ private final BulkResponseInspector bulkResponseInspector;
+
+ public BulkListener(BulkResponseInspector bulkResponseInspector) {
+ this.bulkResponseInspector = bulkResponseInspector;
+ }
+
@Override
public void beforeBulk(long executionId, BulkRequest request) {
LOG.info("Sending bulk of {} actions to Opensearch.",
request.numberOfActions());
@@ -245,6 +246,11 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
},
"opensearchErrorCallback");
}
+
+ private void extractFailures(BulkRequest request, BulkResponse
response) {
+ bulkResponseInspector.inspect(request, response);
+ pendingActions -= request.numberOfActions();
+ }
}
private void enqueueActionInMailbox(
@@ -259,35 +265,6 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
mailboxExecutor.execute(action, actionName);
}
- private void extractFailures(BulkRequest request, BulkResponse response) {
- if (!response.hasFailures()) {
- pendingActions -= request.numberOfActions();
- return;
- }
-
- Throwable chainedFailures = null;
- for (int i = 0; i < response.getItems().length; i++) {
- final BulkItemResponse itemResponse = response.getItems()[i];
- if (!itemResponse.isFailed()) {
- continue;
- }
- final Throwable failure = itemResponse.getFailure().getCause();
- if (failure == null) {
- continue;
- }
- final RestStatus restStatus =
itemResponse.getFailure().getStatus();
- final DocWriteRequest<?> actionRequest = request.requests().get(i);
-
- chainedFailures =
- firstOrSuppressed(
- wrapException(restStatus, failure, actionRequest),
chainedFailures);
- }
- if (chainedFailures == null) {
- return;
- }
- failureHandler.onFailure(chainedFailures);
- }
-
private static Throwable wrapException(
RestStatus restStatus, Throwable rootFailure, DocWriteRequest<?>
actionRequest) {
if (restStatus == null) {
@@ -345,4 +322,61 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
}
}
}
+
+ /**
+ * A strict implementation that fails if either the whole bulk request
failed or any of its
+ * actions.
+ */
+ static class DefaultBulkResponseInspector implements BulkResponseInspector
{
+
+ @VisibleForTesting final FailureHandler failureHandler;
+
+ DefaultBulkResponseInspector() {
+ this(new DefaultFailureHandler());
+ }
+
+ DefaultBulkResponseInspector(FailureHandler failureHandler) {
+ this.failureHandler = checkNotNull(failureHandler);
+ }
+
+ @Override
+ public void inspect(BulkRequest request, BulkResponse response) {
+ if (!response.hasFailures()) {
+ return;
+ }
+
+ Throwable chainedFailures = null;
+ for (int i = 0; i < response.getItems().length; i++) {
+ final BulkItemResponse itemResponse = response.getItems()[i];
+ if (!itemResponse.isFailed()) {
+ continue;
+ }
+ final Throwable failure = itemResponse.getFailure().getCause();
+ if (failure == null) {
+ continue;
+ }
+ final RestStatus restStatus =
itemResponse.getFailure().getStatus();
+ final DocWriteRequest<?> actionRequest =
request.requests().get(i);
+
+ chainedFailures =
+ firstOrSuppressed(
+ wrapException(restStatus, failure,
actionRequest), chainedFailures);
+ }
+ if (chainedFailures == null) {
+ return;
+ }
+ failureHandler.onFailure(chainedFailures);
+ }
+ }
+
+ static class DefaultFailureHandler implements FailureHandler {
+
+ @Override
+ public void onFailure(Throwable failure) {
+ if (failure instanceof FlinkRuntimeException) {
+ throw (FlinkRuntimeException) failure;
+ }
+ throw new FlinkRuntimeException(failure);
+ }
+ }
}
diff --git
a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/DefaultBulkInspectorTest.java
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/DefaultBulkInspectorTest.java
new file mode 100644
index 0000000..e230da7
--- /dev/null
+++
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/DefaultBulkInspectorTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import
org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.opensearch.action.DocWriteRequest.OpType;
+import org.opensearch.action.DocWriteResponse;
+import org.opensearch.action.bulk.BulkItemResponse;
+import org.opensearch.action.bulk.BulkItemResponse.Failure;
+import org.opensearch.action.bulk.BulkRequest;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.action.delete.DeleteRequest;
+import org.opensearch.action.index.IndexRequest;
+
+import java.io.IOException;
+
+/** Tests for {@link DefaultBulkResponseInspector}. */
+@ExtendWith(TestLoggerExtension.class)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+class DefaultBulkResponseInspectorTest {
+
+ @Test
+ void testPassWithoutFailures() {
+ final DefaultBulkResponseInspector inspector = new
DefaultBulkResponseInspector();
+ Assertions.assertThatCode(
+ () ->
+ inspector.inspect(
+ new BulkRequest(),
+ new BulkResponse(new
BulkItemResponse[] {}, 0)))
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ void testPassesDespiteChainedFailure() {
+ final DefaultBulkResponseInspector inspector =
+ new DefaultBulkResponseInspector((failure) -> {});
+ Assertions.assertThatCode(
+ () -> {
+ final BulkRequest request = new BulkRequest();
+ request.add(
+ new IndexRequest(), new DeleteRequest(),
new DeleteRequest());
+
+ inspector.inspect(
+ request,
+ new BulkResponse(
+ new BulkItemResponse[] {
+ new BulkItemResponse(
+ 0, OpType.CREATE,
(DocWriteResponse) null),
+ new BulkItemResponse(
+ 1,
+ OpType.DELETE,
+ new Failure(
+ "index",
+ "type",
+ "id",
+ new
IOException("A"))),
+ new BulkItemResponse(
+ 2,
+ OpType.DELETE,
+ new Failure(
+ "index",
+ "type",
+ "id",
+ new
IOException("B")))
+ },
+ 0));
+ })
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ void testThrowsChainedFailure() {
+ final IOException failureCause0 = new IOException("A");
+ final IOException failureCause1 = new IOException("B");
+ final DefaultBulkResponseInspector inspector = new
DefaultBulkResponseInspector();
+ Assertions.assertThatExceptionOfType(FlinkRuntimeException.class)
+ .isThrownBy(
+ () -> {
+ final BulkRequest request = new BulkRequest();
+ request.add(
+ new IndexRequest(), new DeleteRequest(),
new DeleteRequest());
+
+ inspector.inspect(
+ request,
+ new BulkResponse(
+ new BulkItemResponse[] {
+ new BulkItemResponse(
+ 0, OpType.CREATE,
(DocWriteResponse) null),
+ new BulkItemResponse(
+ 1,
+ OpType.DELETE,
+ new Failure(
+ "index",
+ "type",
+ "id",
+
failureCause0)),
+ new BulkItemResponse(
+ 2,
+ OpType.DELETE,
+ new Failure(
+ "index",
+ "type",
+ "id",
+ failureCause1))
+ },
+ 0));
+ })
+ .withCause(failureCause0);
+ }
+}
diff --git
a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java
index 9939313..693ae44 100644
---
a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java
+++
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java
@@ -17,8 +17,26 @@
package org.apache.flink.connector.opensearch.sink;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
import org.apache.flink.connector.base.DeliveryGuarantee;
+import
org.apache.flink.connector.opensearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;
+import
org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.UserCodeClassLoader;
+import org.apache.flink.util.function.ThrowingRunnable;
import org.apache.http.HttpHost;
import org.junit.jupiter.api.DynamicTest;
@@ -27,9 +45,12 @@ import org.junit.jupiter.api.TestFactory;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;
+import java.util.OptionalLong;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -111,6 +132,116 @@ class OpensearchSinkBuilderTest {
.isInstanceOf(NullPointerException.class);
}
+ @Test
+ void testOverrideFailureHandler() {
+ final FailureHandler failureHandler = (failure) -> {};
+ final OpensearchSink<Object> sink =
+
createMinimalBuilder().setFailureHandler(failureHandler).build();
+
+ final InitContext sinkInitContext = new MockInitContext();
+ final BulkResponseInspector bulkResponseInspector =
+
sink.getBulkResponseInspectorFactory().apply(sinkInitContext::metricGroup);
+ assertThat(bulkResponseInspector)
+ .isInstanceOf(DefaultBulkResponseInspector.class)
+ .extracting(
+ (inspector) -> ((DefaultBulkResponseInspector)
inspector).failureHandler)
+ .isEqualTo(failureHandler);
+ }
+
+ @Test
+ void testOverrideBulkResponseInspectorFactory() {
+ final AtomicBoolean called = new AtomicBoolean();
+ final BulkResponseInspectorFactory bulkResponseInspectorFactory =
+ initContext -> {
+ final MetricGroup metricGroup = initContext.metricGroup();
+ metricGroup.addGroup("bulk").addGroup("result",
"failed").counter("actions");
+ called.set(true);
+ return (BulkResponseInspector) (request, response) -> {};
+ };
+ final OpensearchSink<Object> sink =
+ createMinimalBuilder()
+
.setBulkResponseInspectorFactory(bulkResponseInspectorFactory)
+ .build();
+
+ final InitContext sinkInitContext = new MockInitContext();
+
+ assertThatCode(() ->
sink.createWriter(sinkInitContext)).doesNotThrowAnyException();
+ assertThat(called).isTrue();
+ }
+
+ private static class DummyMailboxExecutor implements MailboxExecutor {
+ private DummyMailboxExecutor() {}
+
+ public void execute(
+ ThrowingRunnable<? extends Exception> command,
+ String descriptionFormat,
+ Object... descriptionArgs) {}
+
+ public void yield() throws InterruptedException, FlinkRuntimeException
{}
+
+ public boolean tryYield() throws FlinkRuntimeException {
+ return false;
+ }
+ }
+
+ private static class MockInitContext
+ implements Sink.InitContext,
SerializationSchema.InitializationContext {
+
+ public UserCodeClassLoader getUserCodeClassLoader() {
+ return SimpleUserCodeClassLoader.create(
+ OpensearchSinkBuilderTest.class.getClassLoader());
+ }
+
+ public MailboxExecutor getMailboxExecutor() {
+ return new OpensearchSinkBuilderTest.DummyMailboxExecutor();
+ }
+
+ public ProcessingTimeService getProcessingTimeService() {
+ return new TestProcessingTimeService();
+ }
+
+ public int getSubtaskId() {
+ return 0;
+ }
+
+ public int getNumberOfParallelSubtasks() {
+ return 0;
+ }
+
+ public int getAttemptNumber() {
+ return 0;
+ }
+
+ public SinkWriterMetricGroup metricGroup() {
+ return InternalSinkWriterMetricGroup.mock(new
UnregisteredMetricsGroup());
+ }
+
+ public MetricGroup getMetricGroup() {
+ return this.metricGroup();
+ }
+
+ public OptionalLong getRestoredCheckpointId() {
+ return OptionalLong.empty();
+ }
+
+ public SerializationSchema.InitializationContext
+ asSerializationSchemaInitializationContext() {
+ return this;
+ }
+
+ public boolean isObjectReuseEnabled() {
+ return false;
+ }
+
+ public <IN> TypeSerializer<IN> createInputSerializer() {
+ throw new UnsupportedOperationException();
+ }
+
+ public JobID getJobId() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
private OpensearchSinkBuilder<Object> createEmptyBuilder() {
return new OpensearchSinkBuilder<>();
}
diff --git
a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
index afdf26b..fc083a4 100644
---
a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
+++
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
@@ -21,6 +21,8 @@ import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.opensearch.OpensearchUtil;
+import
org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector;
+import
org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultFailureHandler;
import org.apache.flink.connector.opensearch.test.DockerImageVersions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
@@ -56,7 +58,6 @@ import java.util.Map;
import java.util.Optional;
import static
org.apache.flink.connector.opensearch.sink.OpensearchTestClient.buildMessage;
-import static
org.apache.flink.connector.opensearch.sink.OpensearchWriter.DEFAULT_FAILURE_HANDLER;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link OpensearchWriter}. */
@@ -280,7 +281,7 @@ class OpensearchWriterITCase {
flushOnCheckpoint,
bulkProcessorConfig,
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
- DEFAULT_FAILURE_HANDLER);
+ new DefaultFailureHandler());
}
private OpensearchWriter<Tuple2<Integer, String>> createWriter(
@@ -306,7 +307,7 @@ class OpensearchWriterITCase {
flushOnCheckpoint,
bulkProcessorConfig,
metricGroup,
- DEFAULT_FAILURE_HANDLER);
+ new DefaultFailureHandler());
}
private OpensearchWriter<Tuple2<Integer, String>> createWriter(
@@ -331,7 +332,7 @@ class OpensearchWriterITCase {
metricGroup,
new TestMailbox(),
new DefaultRestClientFactory(),
- failureHandler);
+ new DefaultBulkResponseInspector(failureHandler));
}
private static class UpdatingEmitter implements
OpensearchEmitter<Tuple2<Integer, String>> {