This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-opensearch.git
The following commit(s) were added to refs/heads/main by this push:
new 00f1a5b [FLINK-34942][connectors/opensearch] Add support for Flink
1.19+
00f1a5b is described below
commit 00f1a5b13bfbadcb8efce8e16fb06ddea0d8e48e
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Sun Apr 14 19:13:15 2024 +0200
[FLINK-34942][connectors/opensearch] Add support for Flink 1.19+
---
.github/workflows/push_pr.yml | 11 +-
.github/workflows/weekly.yml | 16 +-
.../flink/core/execution/CheckpointingMode.java | 32 ++++
.../streaming/tests/OpensearchSinkE2ECase.java | 41 ++++-
flink-connector-opensearch/pom.xml | 6 +-
.../opensearch/sink/OpensearchSinkBuilderTest.java | 90 +++-------
.../opensearch/sink/OpensearchWriterITCase.java | 19 +-
.../sink/TestingSinkWriterMetricGroup.java | 198 +++++++++++++++++++++
flink-sql-connector-opensearch/pom.xml | 6 +
.../src/main/resources/META-INF/NOTICE | 34 ++--
pom.xml | 46 ++++-
11 files changed, 392 insertions(+), 107 deletions(-)
diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml
index 6c2dc05..8890292 100644
--- a/.github/workflows/push_pr.yml
+++ b/.github/workflows/push_pr.yml
@@ -25,7 +25,16 @@ jobs:
compile_and_test:
strategy:
matrix:
- flink: [1.17.1, 1.18-SNAPSHOT]
+ flink: [ 1.17-SNAPSHOT ]
+ jdk: [ '8, 11' ]
+ include:
+ - flink: 1.18-SNAPSHOT
+ jdk: '8, 11, 17'
+ - flink: 1.19-SNAPSHOT
+ jdk: '8, 11, 17, 21'
+ - flink: 1.20-SNAPSHOT
+ jdk: '8, 11, 17, 21'
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
with:
flink_version: ${{ matrix.flink }}
+ jdk_version: ${{ matrix.jdk }}
diff --git a/.github/workflows/weekly.yml b/.github/workflows/weekly.yml
index 937446e..ce9e95b 100644
--- a/.github/workflows/weekly.yml
+++ b/.github/workflows/weekly.yml
@@ -26,21 +26,31 @@ jobs:
if: github.repository_owner == 'apache'
strategy:
matrix:
- flink_branches: [{
+ flink_branches: [ {
flink: 1.17-SNAPSHOT,
branch: main
}, {
flink: 1.18-SNAPSHOT,
+ jdk: '8, 11',
branch: main
}, {
- flink: 1.16.2,
+ flink: 1.19-SNAPSHOT,
+ jdk: '8, 11',
+ branch: main
+ }, {
+ flink: 1.20-SNAPSHOT,
+ jdk: '8, 11',
+ branch: main
+ }, {
+ flink: 1.17.2,
branch: v1.0
}, {
- flink: 1.17.1,
+ flink: 1.18.1,
branch: v1.0
}]
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
with:
flink_version: ${{ matrix.flink_branches.flink }}
+ jdk_version: ${{ matrix.flink_branches.jdk || '8, 11' }}
connector_branch: ${{ matrix.flink_branches.branch }}
run_dependency_convergence: false
diff --git
a/flink-connector-opensearch-e2e-tests/src/test/java/org/apache/flink/core/execution/CheckpointingMode.java
b/flink-connector-opensearch-e2e-tests/src/test/java/org/apache/flink/core/execution/CheckpointingMode.java
new file mode 100644
index 0000000..f003dd5
--- /dev/null
+++
b/flink-connector-opensearch-e2e-tests/src/test/java/org/apache/flink/core/execution/CheckpointingMode.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.execution;
+
+import org.apache.flink.streaming.tests.OpensearchSinkE2ECase;
+
+/**
+ * This is a copy of {@link CheckpointingMode} from flink-core module
introduced in Flink 1.20. We
+ * need it here to make {@link OpensearchSinkE2ECase} compatible with earlier
releases. Could be
+ * removed together with dropping support of Flink 1.19.
+ */
+public enum CheckpointingMode {
+ EXACTLY_ONCE,
+ AT_LEAST_ONCE;
+
+ private CheckpointingMode() {}
+}
diff --git
a/flink-connector-opensearch-e2e-tests/src/test/java/org/apache/flink/streaming/tests/OpensearchSinkE2ECase.java
b/flink-connector-opensearch-e2e-tests/src/test/java/org/apache/flink/streaming/tests/OpensearchSinkE2ECase.java
index 6281e08..fe7217a 100644
---
a/flink-connector-opensearch-e2e-tests/src/test/java/org/apache/flink/streaming/tests/OpensearchSinkE2ECase.java
+++
b/flink-connector-opensearch-e2e-tests/src/test/java/org/apache/flink/streaming/tests/OpensearchSinkE2ECase.java
@@ -85,7 +85,8 @@ public class OpensearchSinkE2ECase extends
SinkTestSuiteBase<ComparableTuple2<In
.toUri()
.toURL()));
- @Override
+ /** Could be removed together with dropping support of Flink 1.19. */
+ @Deprecated
protected void checkResultWithSemantic(
ExternalSystemDataReader<ComparableTuple2<Integer, String>> reader,
List<ComparableTuple2<Integer, String>> testData,
@@ -109,8 +110,46 @@ public class OpensearchSinkE2ECase extends
SinkTestSuiteBase<ComparableTuple2<In
READER_RETRY_ATTEMPTS);
}
+ protected void checkResultWithSemantic(
+ ExternalSystemDataReader<ComparableTuple2<Integer, String>> reader,
+ List<ComparableTuple2<Integer, String>> testData,
+ org.apache.flink.core.execution.CheckpointingMode semantic)
+ throws Exception {
+ waitUntilCondition(
+ () -> {
+ try {
+ List<ComparableTuple2<Integer, String>> result =
+ reader.poll(Duration.ofMillis(READER_TIMEOUT));
+ assertThat(sort(result).iterator())
+ .matchesRecordsFromSource(
+
Collections.singletonList(sort(testData)),
+
convertFromCheckpointingMode(semantic));
+ return true;
+ } catch (Throwable t) {
+ LOG.warn("Polled results not as expected", t);
+ return false;
+ }
+ },
+ 5000,
+ READER_RETRY_ATTEMPTS);
+ }
+
private static <T extends Comparable<T>> List<T> sort(List<T> list) {
Collections.sort(list);
return list;
}
+
+ /** Could be removed together with dropping support of Flink 1.19. */
+ @Deprecated
+ private static org.apache.flink.streaming.api.CheckpointingMode
convertFromCheckpointingMode(
+ org.apache.flink.core.execution.CheckpointingMode semantic) {
+ switch (semantic) {
+ case EXACTLY_ONCE:
+ return
org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;
+ case AT_LEAST_ONCE:
+ return
org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE;
+ default:
+ throw new IllegalArgumentException("Unsupported semantic: " +
semantic);
+ }
+ }
}
diff --git a/flink-connector-opensearch/pom.xml
b/flink-connector-opensearch/pom.xml
index 49d20e0..ffd40d7 100644
--- a/flink-connector-opensearch/pom.xml
+++ b/flink-connector-opensearch/pom.xml
@@ -36,7 +36,11 @@ under the License.
<!-- Allow users to pass custom connector versions -->
<properties>
- <opensearch.version>1.3.0</opensearch.version>
+ <opensearch.version>1.3.14</opensearch.version>
+ <flink.connector.module.config><!-- required by
+ OpensearchSinkITCase -->
--add-opens=java.base/java.lang=ALL-UNNAMED <!--
+ OpensearchSinkITCase -->
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED <!--
+ OpensearchDynamicSinkITCase -->
--add-opens=java.base/java.util=ALL-UNNAMED </flink.connector.module.config>
</properties>
<dependencies>
diff --git
a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java
index 693ae44..ce4278b 100644
---
a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java
+++
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java
@@ -17,25 +17,17 @@
package org.apache.flink.connector.opensearch.sink;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.api.common.operators.ProcessingTimeService;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.Sink.InitContext;
import org.apache.flink.connector.base.DeliveryGuarantee;
import
org.apache.flink.connector.opensearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;
import
org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector;
import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.SimpleUserCodeClassLoader;
import org.apache.flink.util.TestLoggerExtension;
-import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.function.ThrowingRunnable;
import org.apache.http.HttpHost;
@@ -44,8 +36,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestFactory;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mockito;
-import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
@@ -138,9 +130,12 @@ class OpensearchSinkBuilderTest {
final OpensearchSink<Object> sink =
createMinimalBuilder().setFailureHandler(failureHandler).build();
- final InitContext sinkInitContext = new MockInitContext();
final BulkResponseInspector bulkResponseInspector =
-
sink.getBulkResponseInspectorFactory().apply(sinkInitContext::metricGroup);
+ sink.getBulkResponseInspectorFactory()
+ .apply(
+ () ->
+
TestingSinkWriterMetricGroup.getSinkWriterMetricGroup(
+ new
UnregisteredMetricsGroup()));
assertThat(bulkResponseInspector)
.isInstanceOf(DefaultBulkResponseInspector.class)
.extracting(
@@ -163,7 +158,20 @@ class OpensearchSinkBuilderTest {
.setBulkResponseInspectorFactory(bulkResponseInspectorFactory)
.build();
- final InitContext sinkInitContext = new MockInitContext();
+ final InitContext sinkInitContext = Mockito.mock(InitContext.class);
+ Mockito.when(sinkInitContext.metricGroup())
+ .thenReturn(
+ TestingSinkWriterMetricGroup.getSinkWriterMetricGroup(
+ new UnregisteredMetricsGroup()));
+
+ Mockito.when(sinkInitContext.getMailboxExecutor())
+ .thenReturn(new
OpensearchSinkBuilderTest.DummyMailboxExecutor());
+ Mockito.when(sinkInitContext.getProcessingTimeService())
+ .thenReturn(new TestProcessingTimeService());
+ Mockito.when(sinkInitContext.getUserCodeClassLoader())
+ .thenReturn(
+ SimpleUserCodeClassLoader.create(
+
OpensearchSinkBuilderTest.class.getClassLoader()));
assertThatCode(() ->
sink.createWriter(sinkInitContext)).doesNotThrowAnyException();
assertThat(called).isTrue();
@@ -184,64 +192,6 @@ class OpensearchSinkBuilderTest {
}
}
- private static class MockInitContext
- implements Sink.InitContext,
SerializationSchema.InitializationContext {
-
- public UserCodeClassLoader getUserCodeClassLoader() {
- return SimpleUserCodeClassLoader.create(
- OpensearchSinkBuilderTest.class.getClassLoader());
- }
-
- public MailboxExecutor getMailboxExecutor() {
- return new OpensearchSinkBuilderTest.DummyMailboxExecutor();
- }
-
- public ProcessingTimeService getProcessingTimeService() {
- return new TestProcessingTimeService();
- }
-
- public int getSubtaskId() {
- return 0;
- }
-
- public int getNumberOfParallelSubtasks() {
- return 0;
- }
-
- public int getAttemptNumber() {
- return 0;
- }
-
- public SinkWriterMetricGroup metricGroup() {
- return InternalSinkWriterMetricGroup.mock(new
UnregisteredMetricsGroup());
- }
-
- public MetricGroup getMetricGroup() {
- return this.metricGroup();
- }
-
- public OptionalLong getRestoredCheckpointId() {
- return OptionalLong.empty();
- }
-
- public SerializationSchema.InitializationContext
- asSerializationSchemaInitializationContext() {
- return this;
- }
-
- public boolean isObjectReuseEnabled() {
- return false;
- }
-
- public <IN> TypeSerializer<IN> createInputSerializer() {
- throw new UnsupportedOperationException();
- }
-
- public JobID getJobId() {
- throw new UnsupportedOperationException();
- }
- }
-
private OpensearchSinkBuilder<Object> createEmptyBuilder() {
return new OpensearchSinkBuilder<>();
}
diff --git
a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
index fc083a4..838c6bd 100644
---
a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
+++
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
@@ -30,7 +30,6 @@ import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.runtime.metrics.MetricNames;
-import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLoggerExtension;
@@ -168,17 +167,19 @@ class OpensearchWriterITCase {
final String index = "test-inc-byte-out";
final OperatorIOMetricGroup operatorIOMetricGroup =
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
- final InternalSinkWriterMetricGroup metricGroup =
- InternalSinkWriterMetricGroup.mock(
- metricListener.getMetricGroup(),
operatorIOMetricGroup);
final int flushAfterNActions = 2;
final BulkProcessorConfig bulkProcessorConfig =
new BulkProcessorConfig(flushAfterNActions, -1, -1,
FlushBackoffType.NONE, 0, 0);
try (final OpensearchWriter<Tuple2<Integer, String>> writer =
- createWriter(index, false, bulkProcessorConfig, metricGroup)) {
+ createWriter(
+ index,
+ false,
+ bulkProcessorConfig,
+ TestingSinkWriterMetricGroup.getSinkWriterMetricGroup(
+ operatorIOMetricGroup,
metricListener.getMetricGroup()))) {
final Counter numBytesOut =
operatorIOMetricGroup.getNumBytesOutCounter();
- assertThat(numBytesOut.getCount()).isEqualTo(0);
+ assertThat(numBytesOut.getCount()).isZero();
writer.write(Tuple2.of(1, buildMessage(1)), null);
writer.write(Tuple2.of(2, buildMessage(2)), null);
@@ -280,7 +281,8 @@ class OpensearchWriterITCase {
index,
flushOnCheckpoint,
bulkProcessorConfig,
-
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
+ TestingSinkWriterMetricGroup.getSinkWriterMetricGroup(
+ metricListener.getMetricGroup()),
new DefaultFailureHandler());
}
@@ -293,7 +295,8 @@ class OpensearchWriterITCase {
index,
flushOnCheckpoint,
bulkProcessorConfig,
-
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
+ TestingSinkWriterMetricGroup.getSinkWriterMetricGroup(
+ metricListener.getMetricGroup()),
failureHandler);
}
diff --git
a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/TestingSinkWriterMetricGroup.java
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/TestingSinkWriterMetricGroup.java
new file mode 100644
index 0000000..0ad609f
--- /dev/null
+++
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/TestingSinkWriterMetricGroup.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/** Testing implementation for {@link SinkWriterMetricGroup}. */
+public class TestingSinkWriterMetricGroup extends ProxyMetricGroup<MetricGroup>
+ implements SinkWriterMetricGroup {
+
+ private final Supplier<Counter> numRecordsOutErrorsCounterSupplier;
+
+ private final Supplier<Counter> numRecordsSendErrorsCounterSupplier;
+
+ private final Supplier<Counter> numRecordsSendCounterSupplier;
+
+ private final Supplier<Counter> numBytesSendCounterSupplier;
+
+ private final Consumer<Gauge<Long>> currentSendTimeGaugeConsumer;
+
+ private final Supplier<OperatorIOMetricGroup> ioMetricGroupSupplier;
+
+ public TestingSinkWriterMetricGroup(
+ MetricGroup parentMetricGroup,
+ Supplier<Counter> numRecordsOutErrorsCounterSupplier,
+ Supplier<Counter> numRecordsSendErrorsCounterSupplier,
+ Supplier<Counter> numRecordsSendCounterSupplier,
+ Supplier<Counter> numBytesSendCounterSupplier,
+ Consumer<Gauge<Long>> currentSendTimeGaugeConsumer,
+ Supplier<OperatorIOMetricGroup> ioMetricGroupSupplier) {
+ super(parentMetricGroup);
+ this.numRecordsOutErrorsCounterSupplier =
numRecordsOutErrorsCounterSupplier;
+ this.numRecordsSendErrorsCounterSupplier =
numRecordsSendErrorsCounterSupplier;
+ this.numRecordsSendCounterSupplier = numRecordsSendCounterSupplier;
+ this.numBytesSendCounterSupplier = numBytesSendCounterSupplier;
+ this.currentSendTimeGaugeConsumer = currentSendTimeGaugeConsumer;
+ this.ioMetricGroupSupplier = ioMetricGroupSupplier;
+ }
+
+ @Override
+ public Counter getNumRecordsOutErrorsCounter() {
+ return numRecordsOutErrorsCounterSupplier.get();
+ }
+
+ @Override
+ public Counter getNumRecordsSendErrorsCounter() {
+ return numRecordsSendErrorsCounterSupplier.get();
+ }
+
+ @Override
+ public Counter getNumRecordsSendCounter() {
+ return numRecordsSendCounterSupplier.get();
+ }
+
+ @Override
+ public Counter getNumBytesSendCounter() {
+ return numBytesSendCounterSupplier.get();
+ }
+
+ @Override
+ public void setCurrentSendTimeGauge(Gauge<Long> gauge) {
+ currentSendTimeGaugeConsumer.accept(gauge);
+ }
+
+ @Override
+ public OperatorIOMetricGroup getIOMetricGroup() {
+ return ioMetricGroupSupplier.get();
+ }
+
+ static TestingSinkWriterMetricGroup getSinkWriterMetricGroup(MetricGroup
parentMetricGroup) {
+ final OperatorIOMetricGroup operatorIOMetricGroup =
+
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
+ return getSinkWriterMetricGroup(operatorIOMetricGroup,
parentMetricGroup);
+ }
+
+ static TestingSinkWriterMetricGroup getSinkWriterMetricGroup(
+ OperatorIOMetricGroup operatorIOMetricGroup, MetricGroup
parentMetricGroup) {
+ Counter numRecordsOutErrors =
parentMetricGroup.counter(MetricNames.NUM_RECORDS_OUT_ERRORS);
+ Counter numRecordsSendErrors =
+ parentMetricGroup.counter(MetricNames.NUM_RECORDS_SEND_ERRORS,
numRecordsOutErrors);
+ Counter numRecordsWritten =
+ parentMetricGroup.counter(
+ MetricNames.NUM_RECORDS_SEND,
+ operatorIOMetricGroup.getNumRecordsOutCounter());
+ Counter numBytesWritten =
+ parentMetricGroup.counter(
+ MetricNames.NUM_BYTES_SEND,
operatorIOMetricGroup.getNumBytesOutCounter());
+ Consumer<Gauge<Long>> currentSendTimeGaugeConsumer =
+ currentSendTimeGauge ->
+ parentMetricGroup.gauge(
+ MetricNames.CURRENT_SEND_TIME,
currentSendTimeGauge);
+ return new TestingSinkWriterMetricGroup.Builder()
+ .setParentMetricGroup(parentMetricGroup)
+ .setIoMetricGroupSupplier(() -> operatorIOMetricGroup)
+ .setNumRecordsOutErrorsCounterSupplier(() ->
numRecordsOutErrors)
+ .setNumRecordsSendErrorsCounterSupplier(() ->
numRecordsSendErrors)
+ .setNumRecordsSendCounterSupplier(() -> numRecordsWritten)
+ .setNumBytesSendCounterSupplier(() -> numBytesWritten)
+ .setCurrentSendTimeGaugeConsumer(currentSendTimeGaugeConsumer)
+ .build();
+ }
+
+ /** Builder for {@link TestingSinkWriterMetricGroup}. */
+ public static class Builder {
+
+ private MetricGroup parentMetricGroup = null;
+
+ private Supplier<Counter> numRecordsOutErrorsCounterSupplier = () ->
null;
+
+ private Supplier<Counter> numRecordsSendErrorsCounterSupplier = () ->
null;
+
+ private Supplier<Counter> numRecordsSendCounterSupplier = () -> null;
+
+ private Supplier<Counter> numBytesSendCounterSupplier = () -> null;
+
+ private Consumer<Gauge<Long>> currentSendTimeGaugeConsumer = counter
-> {};
+
+ private Supplier<OperatorIOMetricGroup> ioMetricGroupSupplier = () ->
null;
+
+ public Builder setParentMetricGroup(MetricGroup parentMetricGroup) {
+ this.parentMetricGroup = parentMetricGroup;
+ return this;
+ }
+
+ public Builder setNumRecordsOutErrorsCounterSupplier(
+ Supplier<Counter> numRecordsOutErrorsCounterSupplier) {
+ this.numRecordsOutErrorsCounterSupplier =
numRecordsOutErrorsCounterSupplier;
+ return this;
+ }
+
+ public Builder setNumRecordsSendErrorsCounterSupplier(
+ Supplier<Counter> numRecordsSendErrorsCounterSupplier) {
+ this.numRecordsSendErrorsCounterSupplier =
numRecordsSendErrorsCounterSupplier;
+ return this;
+ }
+
+ public Builder setNumRecordsSendCounterSupplier(
+ Supplier<Counter> numRecordsSendCounterSupplier) {
+ this.numRecordsSendCounterSupplier = numRecordsSendCounterSupplier;
+ return this;
+ }
+
+ public Builder setNumBytesSendCounterSupplier(
+ Supplier<Counter> numBytesSendCounterSupplier) {
+ this.numBytesSendCounterSupplier = numBytesSendCounterSupplier;
+ return this;
+ }
+
+ public Builder setCurrentSendTimeGaugeConsumer(
+ Consumer<Gauge<Long>> currentSendTimeGaugeConsumer) {
+ this.currentSendTimeGaugeConsumer = currentSendTimeGaugeConsumer;
+ return this;
+ }
+
+ public Builder setIoMetricGroupSupplier(
+ Supplier<OperatorIOMetricGroup> ioMetricGroupSupplier) {
+ this.ioMetricGroupSupplier = ioMetricGroupSupplier;
+ return this;
+ }
+
+ public TestingSinkWriterMetricGroup build() {
+ return new TestingSinkWriterMetricGroup(
+ parentMetricGroup,
+ numRecordsOutErrorsCounterSupplier,
+ numRecordsSendErrorsCounterSupplier,
+ numRecordsSendCounterSupplier,
+ numBytesSendCounterSupplier,
+ currentSendTimeGaugeConsumer,
+ ioMetricGroupSupplier);
+ }
+ }
+}
diff --git a/flink-sql-connector-opensearch/pom.xml
b/flink-sql-connector-opensearch/pom.xml
index e3f4cd4..8f3cdc1 100644
--- a/flink-sql-connector-opensearch/pom.xml
+++ b/flink-sql-connector-opensearch/pom.xml
@@ -40,6 +40,12 @@ under the License.
<artifactId>flink-connector-opensearch</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/flink-sql-connector-opensearch/src/main/resources/META-INF/NOTICE
b/flink-sql-connector-opensearch/src/main/resources/META-INF/NOTICE
index 79e4365..9da53ef 100644
--- a/flink-sql-connector-opensearch/src/main/resources/META-INF/NOTICE
+++ b/flink-sql-connector-opensearch/src/main/resources/META-INF/NOTICE
@@ -7,10 +7,10 @@ The Apache Software Foundation (http://www.apache.org/).
This project bundles the following dependencies under the Apache Software
License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
- com.carrotsearch:hppc:0.8.1
-- com.fasterxml.jackson.core:jackson-core:2.13.4
-- com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.13.4
-- com.fasterxml.jackson.dataformat:jackson-dataformat-smile:2.13.4
-- com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.4
+- com.fasterxml.jackson.core:jackson-core:2.15.3
+- com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.15.3
+- com.fasterxml.jackson.dataformat:jackson-dataformat-smile:2.15.3
+- com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.15.3
- com.github.spullara.mustache.java:compiler:0.9.10
- commons-codec:commons-codec:1.15
- commons-logging:commons-logging:1.1.3
@@ -32,16 +32,16 @@ This project bundles the following dependencies under the
Apache Software Licens
- org.apache.lucene:lucene-spatial3d:8.10.1
- org.apache.lucene:lucene-spatial-extras:8.10.1
- org.apache.lucene:lucene-suggest:8.10.1
-- org.opensearch.client:opensearch-rest-client:1.3.0
-- org.opensearch.client:opensearch-rest-high-level-client:1.3.0
-- org.opensearch:opensearch-cli:1.3.0
-- org.opensearch:opensearch-core:1.3.0
-- org.opensearch:opensearch-geo:1.3.0
-- org.opensearch:opensearch:1.3.0
-- org.opensearch:opensearch-secure-sm:1.3.0
-- org.opensearch:opensearch-x-content:1.3.0
-- org.opensearch.plugin:aggs-matrix-stats-client:1.3.0
-- org.opensearch.plugin:lang-mustache-client:1.3.0
-- org.opensearch.plugin:mapper-extras-client:1.3.0
-- org.opensearch.plugin:parent-join-client:1.3.0
-- org.opensearch.plugin:rank-eval-client:1.3.0
+- org.opensearch.client:opensearch-rest-client:1.3.14
+- org.opensearch.client:opensearch-rest-high-level-client:1.3.14
+- org.opensearch:opensearch-cli:1.3.14
+- org.opensearch:opensearch-core:1.3.14
+- org.opensearch:opensearch-geo:1.3.14
+- org.opensearch:opensearch:1.3.14
+- org.opensearch:opensearch-secure-sm:1.3.14
+- org.opensearch:opensearch-x-content:1.3.14
+- org.opensearch.plugin:aggs-matrix-stats-client:1.3.14
+- org.opensearch.plugin:lang-mustache-client:1.3.14
+- org.opensearch.plugin:mapper-extras-client:1.3.14
+- org.opensearch.plugin:parent-join-client:1.3.14
+- org.opensearch.plugin:rank-eval-client:1.3.14
diff --git a/pom.xml b/pom.xml
index d6dbd4c..631b6b3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -59,12 +59,12 @@ under the License.
<properties>
<flink.version>1.17.1</flink.version>
- <commons-compress.version>1.23.0</commons-compress.version>
- <jackson-bom.version>2.13.4.20221013</jackson-bom.version>
- <junit5.version>5.9.2</junit5.version>
- <assertj.version>3.21.0</assertj.version>
- <testcontainers.version>1.18.3</testcontainers.version>
- <mockito.version>2.21.0</mockito.version>
+ <commons-compress.version>1.26.1</commons-compress.version>
+ <jackson-bom.version>2.15.3</jackson-bom.version>
+ <junit5.version>5.10.2</junit5.version>
+ <assertj.version>3.25.3</assertj.version>
+ <testcontainers.version>1.19.7</testcontainers.version>
+ <mockito.version>3.12.4</mockito.version>
<japicmp.skip>false</japicmp.skip>
<japicmp.referenceVersion>1.0.0-1.16</japicmp.referenceVersion>
@@ -73,6 +73,12 @@ under the License.
<log4j.version>2.17.2</log4j.version>
<flink.parent.artifactId>flink-connector-opensearch-parent</flink.parent.artifactId>
+ <!-- This property should contain the add-opens/add-exports
commands required for the tests
+ in the given connector's module to pass.
+ It MUST be a space-separated list not containing any newlines,
+ of entries in the form
'[-]{2}add-[opens|exports]=<module>/<package>=ALL-UNNAMED'.-->
+ <flink.connector.module.config/>
+ <flink.surefire.baseArgLine>-XX:+UseG1GC -Xms256m
-XX:+IgnoreUnrecognizedVMOptions
${flink.connector.module.config}</flink.surefire.baseArgLine>
</properties>
<dependencies>
@@ -279,6 +285,27 @@ under the License.
<version>${commons-compress.version}</version>
</dependency>
+ <!-- For dependency convergence -->
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.15.1</version>
+ </dependency>
+
+ <!-- For dependency convergence -->
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.14.0</version>
+ </dependency>
+
+ <!-- For dependency convergence -->
+ <dependency>
+ <groupId>net.bytebuddy</groupId>
+ <artifactId>byte-buddy</artifactId>
+ <version>1.14.13</version>
+ </dependency>
+
<!-- For dependency convergence -->
<dependency>
<groupId>com.fasterxml.jackson</groupId>
@@ -297,6 +324,13 @@ under the License.
<scope>import</scope>
</dependency>
+ <!-- For dependency convergence -->
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>1.1.10.5</version>
+ </dependency>
+
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>