This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-opensearch.git


The following commit(s) were added to refs/heads/main by this push:
     new 00f1a5b  [FLINK-34942][connectors/opensearch] Add support for Flink 
1.19+
00f1a5b is described below

commit 00f1a5b13bfbadcb8efce8e16fb06ddea0d8e48e
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Sun Apr 14 19:13:15 2024 +0200

    [FLINK-34942][connectors/opensearch] Add support for Flink 1.19+
---
 .github/workflows/push_pr.yml                      |  11 +-
 .github/workflows/weekly.yml                       |  16 +-
 .../flink/core/execution/CheckpointingMode.java    |  32 ++++
 .../streaming/tests/OpensearchSinkE2ECase.java     |  41 ++++-
 flink-connector-opensearch/pom.xml                 |   6 +-
 .../opensearch/sink/OpensearchSinkBuilderTest.java |  90 +++-------
 .../opensearch/sink/OpensearchWriterITCase.java    |  19 +-
 .../sink/TestingSinkWriterMetricGroup.java         | 198 +++++++++++++++++++++
 flink-sql-connector-opensearch/pom.xml             |   6 +
 .../src/main/resources/META-INF/NOTICE             |  34 ++--
 pom.xml                                            |  46 ++++-
 11 files changed, 392 insertions(+), 107 deletions(-)

diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml
index 6c2dc05..8890292 100644
--- a/.github/workflows/push_pr.yml
+++ b/.github/workflows/push_pr.yml
@@ -25,7 +25,16 @@ jobs:
   compile_and_test:
     strategy:
       matrix:
-        flink: [1.17.1, 1.18-SNAPSHOT]
+        flink: [ 1.17-SNAPSHOT ]
+        jdk: [ '8, 11' ]
+        include:
+          - flink: 1.18-SNAPSHOT
+            jdk: '8, 11, 17'
+          - flink: 1.19-SNAPSHOT
+            jdk: '8, 11, 17, 21'
+          - flink: 1.20-SNAPSHOT
+            jdk: '8, 11, 17, 21'
     uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
     with:
       flink_version: ${{ matrix.flink }}
+      jdk_version: ${{ matrix.jdk }}
diff --git a/.github/workflows/weekly.yml b/.github/workflows/weekly.yml
index 937446e..ce9e95b 100644
--- a/.github/workflows/weekly.yml
+++ b/.github/workflows/weekly.yml
@@ -26,21 +26,31 @@ jobs:
     if: github.repository_owner == 'apache'
     strategy:
       matrix:
-        flink_branches: [{
+        flink_branches: [ {
           flink: 1.17-SNAPSHOT,
           branch: main
         }, {
           flink: 1.18-SNAPSHOT,
+          jdk: '8, 11',
           branch: main
         }, {
-          flink: 1.16.2,
+          flink: 1.19-SNAPSHOT,
+          jdk: '8, 11',
+          branch: main
+        }, {
+          flink: 1.20-SNAPSHOT,
+          jdk: '8, 11',
+          branch: main
+        }, {
+          flink: 1.17.2,
           branch: v1.0
         }, {
-          flink: 1.17.1,
+          flink: 1.18.1,
           branch: v1.0
         }]
     uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
     with:
       flink_version: ${{ matrix.flink_branches.flink }}
+      jdk_version: ${{ matrix.flink_branches.jdk || '8, 11' }}
       connector_branch: ${{ matrix.flink_branches.branch }}
       run_dependency_convergence: false
diff --git 
a/flink-connector-opensearch-e2e-tests/src/test/java/org/apache/flink/core/execution/CheckpointingMode.java
 
b/flink-connector-opensearch-e2e-tests/src/test/java/org/apache/flink/core/execution/CheckpointingMode.java
new file mode 100644
index 0000000..f003dd5
--- /dev/null
+++ 
b/flink-connector-opensearch-e2e-tests/src/test/java/org/apache/flink/core/execution/CheckpointingMode.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.execution;
+
+import org.apache.flink.streaming.tests.OpensearchSinkE2ECase;
+
+/**
+ * This is a copy of {@link CheckpointingMode} from flink-core module 
introduced in Flink 1.20. We
+ * need it here to make {@link OpensearchSinkE2ECase} compatible with earlier 
releases. Could be
+ * removed together with dropping support of Flink 1.19.
+ */
+public enum CheckpointingMode {
+    EXACTLY_ONCE,
+    AT_LEAST_ONCE;
+
+    private CheckpointingMode() {}
+}
diff --git 
a/flink-connector-opensearch-e2e-tests/src/test/java/org/apache/flink/streaming/tests/OpensearchSinkE2ECase.java
 
b/flink-connector-opensearch-e2e-tests/src/test/java/org/apache/flink/streaming/tests/OpensearchSinkE2ECase.java
index 6281e08..fe7217a 100644
--- 
a/flink-connector-opensearch-e2e-tests/src/test/java/org/apache/flink/streaming/tests/OpensearchSinkE2ECase.java
+++ 
b/flink-connector-opensearch-e2e-tests/src/test/java/org/apache/flink/streaming/tests/OpensearchSinkE2ECase.java
@@ -85,7 +85,8 @@ public class OpensearchSinkE2ECase extends 
SinkTestSuiteBase<ComparableTuple2<In
                                     .toUri()
                                     .toURL()));
 
-    @Override
+    /** Could be removed together with dropping support of Flink 1.19. */
+    @Deprecated
     protected void checkResultWithSemantic(
             ExternalSystemDataReader<ComparableTuple2<Integer, String>> reader,
             List<ComparableTuple2<Integer, String>> testData,
@@ -109,8 +110,46 @@ public class OpensearchSinkE2ECase extends 
SinkTestSuiteBase<ComparableTuple2<In
                 READER_RETRY_ATTEMPTS);
     }
 
+    protected void checkResultWithSemantic(
+            ExternalSystemDataReader<ComparableTuple2<Integer, String>> reader,
+            List<ComparableTuple2<Integer, String>> testData,
+            org.apache.flink.core.execution.CheckpointingMode semantic)
+            throws Exception {
+        waitUntilCondition(
+                () -> {
+                    try {
+                        List<ComparableTuple2<Integer, String>> result =
+                                reader.poll(Duration.ofMillis(READER_TIMEOUT));
+                        assertThat(sort(result).iterator())
+                                .matchesRecordsFromSource(
+                                        
Collections.singletonList(sort(testData)),
+                                        
convertFromCheckpointingMode(semantic));
+                        return true;
+                    } catch (Throwable t) {
+                        LOG.warn("Polled results not as expected", t);
+                        return false;
+                    }
+                },
+                5000,
+                READER_RETRY_ATTEMPTS);
+    }
+
     private static <T extends Comparable<T>> List<T> sort(List<T> list) {
         Collections.sort(list);
         return list;
     }
+
+    /** Could be removed together with dropping support of Flink 1.19. */
+    @Deprecated
+    private static org.apache.flink.streaming.api.CheckpointingMode 
convertFromCheckpointingMode(
+            org.apache.flink.core.execution.CheckpointingMode semantic) {
+        switch (semantic) {
+            case EXACTLY_ONCE:
+                return 
org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;
+            case AT_LEAST_ONCE:
+                return 
org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE;
+            default:
+                throw new IllegalArgumentException("Unsupported semantic: " + 
semantic);
+        }
+    }
 }
diff --git a/flink-connector-opensearch/pom.xml 
b/flink-connector-opensearch/pom.xml
index 49d20e0..ffd40d7 100644
--- a/flink-connector-opensearch/pom.xml
+++ b/flink-connector-opensearch/pom.xml
@@ -36,7 +36,11 @@ under the License.
 
        <!-- Allow users to pass custom connector versions -->
        <properties>
-               <opensearch.version>1.3.0</opensearch.version>
+               <opensearch.version>1.3.14</opensearch.version>
+               <flink.connector.module.config><!-- required by
+               OpensearchSinkITCase -->  
--add-opens=java.base/java.lang=ALL-UNNAMED <!--
+               OpensearchSinkITCase --> 
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED <!--
+               OpensearchDynamicSinkITCase --> 
--add-opens=java.base/java.util=ALL-UNNAMED </flink.connector.module.config>
        </properties>
 
        <dependencies>
diff --git 
a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java
 
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java
index 693ae44..ce4278b 100644
--- 
a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java
+++ 
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java
@@ -17,25 +17,17 @@
 
 package org.apache.flink.connector.opensearch.sink;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.api.common.operators.ProcessingTimeService;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.Sink.InitContext;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import 
org.apache.flink.connector.opensearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;
 import 
org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.SimpleUserCodeClassLoader;
 import org.apache.flink.util.TestLoggerExtension;
-import org.apache.flink.util.UserCodeClassLoader;
 import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.apache.http.HttpHost;
@@ -44,8 +36,8 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestFactory;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mockito;
 
-import java.util.OptionalLong;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Stream;
 
@@ -138,9 +130,12 @@ class OpensearchSinkBuilderTest {
         final OpensearchSink<Object> sink =
                 
createMinimalBuilder().setFailureHandler(failureHandler).build();
 
-        final InitContext sinkInitContext = new MockInitContext();
         final BulkResponseInspector bulkResponseInspector =
-                
sink.getBulkResponseInspectorFactory().apply(sinkInitContext::metricGroup);
+                sink.getBulkResponseInspectorFactory()
+                        .apply(
+                                () ->
+                                        
TestingSinkWriterMetricGroup.getSinkWriterMetricGroup(
+                                                new 
UnregisteredMetricsGroup()));
         assertThat(bulkResponseInspector)
                 .isInstanceOf(DefaultBulkResponseInspector.class)
                 .extracting(
@@ -163,7 +158,20 @@ class OpensearchSinkBuilderTest {
                         
.setBulkResponseInspectorFactory(bulkResponseInspectorFactory)
                         .build();
 
-        final InitContext sinkInitContext = new MockInitContext();
+        final InitContext sinkInitContext = Mockito.mock(InitContext.class);
+        Mockito.when(sinkInitContext.metricGroup())
+                .thenReturn(
+                        TestingSinkWriterMetricGroup.getSinkWriterMetricGroup(
+                                new UnregisteredMetricsGroup()));
+
+        Mockito.when(sinkInitContext.getMailboxExecutor())
+                .thenReturn(new 
OpensearchSinkBuilderTest.DummyMailboxExecutor());
+        Mockito.when(sinkInitContext.getProcessingTimeService())
+                .thenReturn(new TestProcessingTimeService());
+        Mockito.when(sinkInitContext.getUserCodeClassLoader())
+                .thenReturn(
+                        SimpleUserCodeClassLoader.create(
+                                
OpensearchSinkBuilderTest.class.getClassLoader()));
 
         assertThatCode(() -> 
sink.createWriter(sinkInitContext)).doesNotThrowAnyException();
         assertThat(called).isTrue();
@@ -184,64 +192,6 @@ class OpensearchSinkBuilderTest {
         }
     }
 
-    private static class MockInitContext
-            implements Sink.InitContext, 
SerializationSchema.InitializationContext {
-
-        public UserCodeClassLoader getUserCodeClassLoader() {
-            return SimpleUserCodeClassLoader.create(
-                    OpensearchSinkBuilderTest.class.getClassLoader());
-        }
-
-        public MailboxExecutor getMailboxExecutor() {
-            return new OpensearchSinkBuilderTest.DummyMailboxExecutor();
-        }
-
-        public ProcessingTimeService getProcessingTimeService() {
-            return new TestProcessingTimeService();
-        }
-
-        public int getSubtaskId() {
-            return 0;
-        }
-
-        public int getNumberOfParallelSubtasks() {
-            return 0;
-        }
-
-        public int getAttemptNumber() {
-            return 0;
-        }
-
-        public SinkWriterMetricGroup metricGroup() {
-            return InternalSinkWriterMetricGroup.mock(new 
UnregisteredMetricsGroup());
-        }
-
-        public MetricGroup getMetricGroup() {
-            return this.metricGroup();
-        }
-
-        public OptionalLong getRestoredCheckpointId() {
-            return OptionalLong.empty();
-        }
-
-        public SerializationSchema.InitializationContext
-                asSerializationSchemaInitializationContext() {
-            return this;
-        }
-
-        public boolean isObjectReuseEnabled() {
-            return false;
-        }
-
-        public <IN> TypeSerializer<IN> createInputSerializer() {
-            throw new UnsupportedOperationException();
-        }
-
-        public JobID getJobId() {
-            throw new UnsupportedOperationException();
-        }
-    }
-
     private OpensearchSinkBuilder<Object> createEmptyBuilder() {
         return new OpensearchSinkBuilder<>();
     }
diff --git 
a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
 
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
index fc083a4..838c6bd 100644
--- 
a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
+++ 
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
@@ -30,7 +30,6 @@ import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.metrics.testutils.MetricListener;
 import org.apache.flink.runtime.metrics.MetricNames;
-import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.TestLoggerExtension;
@@ -168,17 +167,19 @@ class OpensearchWriterITCase {
         final String index = "test-inc-byte-out";
         final OperatorIOMetricGroup operatorIOMetricGroup =
                 
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
-        final InternalSinkWriterMetricGroup metricGroup =
-                InternalSinkWriterMetricGroup.mock(
-                        metricListener.getMetricGroup(), 
operatorIOMetricGroup);
         final int flushAfterNActions = 2;
         final BulkProcessorConfig bulkProcessorConfig =
                 new BulkProcessorConfig(flushAfterNActions, -1, -1, 
FlushBackoffType.NONE, 0, 0);
 
         try (final OpensearchWriter<Tuple2<Integer, String>> writer =
-                createWriter(index, false, bulkProcessorConfig, metricGroup)) {
+                createWriter(
+                        index,
+                        false,
+                        bulkProcessorConfig,
+                        TestingSinkWriterMetricGroup.getSinkWriterMetricGroup(
+                                operatorIOMetricGroup, 
metricListener.getMetricGroup()))) {
             final Counter numBytesOut = 
operatorIOMetricGroup.getNumBytesOutCounter();
-            assertThat(numBytesOut.getCount()).isEqualTo(0);
+            assertThat(numBytesOut.getCount()).isZero();
             writer.write(Tuple2.of(1, buildMessage(1)), null);
             writer.write(Tuple2.of(2, buildMessage(2)), null);
 
@@ -280,7 +281,8 @@ class OpensearchWriterITCase {
                 index,
                 flushOnCheckpoint,
                 bulkProcessorConfig,
-                
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
+                TestingSinkWriterMetricGroup.getSinkWriterMetricGroup(
+                        metricListener.getMetricGroup()),
                 new DefaultFailureHandler());
     }
 
@@ -293,7 +295,8 @@ class OpensearchWriterITCase {
                 index,
                 flushOnCheckpoint,
                 bulkProcessorConfig,
-                
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
+                TestingSinkWriterMetricGroup.getSinkWriterMetricGroup(
+                        metricListener.getMetricGroup()),
                 failureHandler);
     }
 
diff --git 
a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/TestingSinkWriterMetricGroup.java
 
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/TestingSinkWriterMetricGroup.java
new file mode 100644
index 0000000..0ad609f
--- /dev/null
+++ 
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/TestingSinkWriterMetricGroup.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/** Testing implementation for {@link SinkWriterMetricGroup}. */
+public class TestingSinkWriterMetricGroup extends ProxyMetricGroup<MetricGroup>
+        implements SinkWriterMetricGroup {
+
+    private final Supplier<Counter> numRecordsOutErrorsCounterSupplier;
+
+    private final Supplier<Counter> numRecordsSendErrorsCounterSupplier;
+
+    private final Supplier<Counter> numRecordsSendCounterSupplier;
+
+    private final Supplier<Counter> numBytesSendCounterSupplier;
+
+    private final Consumer<Gauge<Long>> currentSendTimeGaugeConsumer;
+
+    private final Supplier<OperatorIOMetricGroup> ioMetricGroupSupplier;
+
+    public TestingSinkWriterMetricGroup(
+            MetricGroup parentMetricGroup,
+            Supplier<Counter> numRecordsOutErrorsCounterSupplier,
+            Supplier<Counter> numRecordsSendErrorsCounterSupplier,
+            Supplier<Counter> numRecordsSendCounterSupplier,
+            Supplier<Counter> numBytesSendCounterSupplier,
+            Consumer<Gauge<Long>> currentSendTimeGaugeConsumer,
+            Supplier<OperatorIOMetricGroup> ioMetricGroupSupplier) {
+        super(parentMetricGroup);
+        this.numRecordsOutErrorsCounterSupplier = 
numRecordsOutErrorsCounterSupplier;
+        this.numRecordsSendErrorsCounterSupplier = 
numRecordsSendErrorsCounterSupplier;
+        this.numRecordsSendCounterSupplier = numRecordsSendCounterSupplier;
+        this.numBytesSendCounterSupplier = numBytesSendCounterSupplier;
+        this.currentSendTimeGaugeConsumer = currentSendTimeGaugeConsumer;
+        this.ioMetricGroupSupplier = ioMetricGroupSupplier;
+    }
+
+    @Override
+    public Counter getNumRecordsOutErrorsCounter() {
+        return numRecordsOutErrorsCounterSupplier.get();
+    }
+
+    @Override
+    public Counter getNumRecordsSendErrorsCounter() {
+        return numRecordsSendErrorsCounterSupplier.get();
+    }
+
+    @Override
+    public Counter getNumRecordsSendCounter() {
+        return numRecordsSendCounterSupplier.get();
+    }
+
+    @Override
+    public Counter getNumBytesSendCounter() {
+        return numBytesSendCounterSupplier.get();
+    }
+
+    @Override
+    public void setCurrentSendTimeGauge(Gauge<Long> gauge) {
+        currentSendTimeGaugeConsumer.accept(gauge);
+    }
+
+    @Override
+    public OperatorIOMetricGroup getIOMetricGroup() {
+        return ioMetricGroupSupplier.get();
+    }
+
+    static TestingSinkWriterMetricGroup getSinkWriterMetricGroup(MetricGroup 
parentMetricGroup) {
+        final OperatorIOMetricGroup operatorIOMetricGroup =
+                
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
+        return getSinkWriterMetricGroup(operatorIOMetricGroup, 
parentMetricGroup);
+    }
+
+    static TestingSinkWriterMetricGroup getSinkWriterMetricGroup(
+            OperatorIOMetricGroup operatorIOMetricGroup, MetricGroup 
parentMetricGroup) {
+        Counter numRecordsOutErrors = 
parentMetricGroup.counter(MetricNames.NUM_RECORDS_OUT_ERRORS);
+        Counter numRecordsSendErrors =
+                parentMetricGroup.counter(MetricNames.NUM_RECORDS_SEND_ERRORS, 
numRecordsOutErrors);
+        Counter numRecordsWritten =
+                parentMetricGroup.counter(
+                        MetricNames.NUM_RECORDS_SEND,
+                        operatorIOMetricGroup.getNumRecordsOutCounter());
+        Counter numBytesWritten =
+                parentMetricGroup.counter(
+                        MetricNames.NUM_BYTES_SEND, 
operatorIOMetricGroup.getNumBytesOutCounter());
+        Consumer<Gauge<Long>> currentSendTimeGaugeConsumer =
+                currentSendTimeGauge ->
+                        parentMetricGroup.gauge(
+                                MetricNames.CURRENT_SEND_TIME, 
currentSendTimeGauge);
+        return new TestingSinkWriterMetricGroup.Builder()
+                .setParentMetricGroup(parentMetricGroup)
+                .setIoMetricGroupSupplier(() -> operatorIOMetricGroup)
+                .setNumRecordsOutErrorsCounterSupplier(() -> 
numRecordsOutErrors)
+                .setNumRecordsSendErrorsCounterSupplier(() -> 
numRecordsSendErrors)
+                .setNumRecordsSendCounterSupplier(() -> numRecordsWritten)
+                .setNumBytesSendCounterSupplier(() -> numBytesWritten)
+                .setCurrentSendTimeGaugeConsumer(currentSendTimeGaugeConsumer)
+                .build();
+    }
+
+    /** Builder for {@link TestingSinkWriterMetricGroup}. */
+    public static class Builder {
+
+        private MetricGroup parentMetricGroup = null;
+
+        private Supplier<Counter> numRecordsOutErrorsCounterSupplier = () -> 
null;
+
+        private Supplier<Counter> numRecordsSendErrorsCounterSupplier = () -> 
null;
+
+        private Supplier<Counter> numRecordsSendCounterSupplier = () -> null;
+
+        private Supplier<Counter> numBytesSendCounterSupplier = () -> null;
+
+        private Consumer<Gauge<Long>> currentSendTimeGaugeConsumer = counter 
-> {};
+
+        private Supplier<OperatorIOMetricGroup> ioMetricGroupSupplier = () -> 
null;
+
+        public Builder setParentMetricGroup(MetricGroup parentMetricGroup) {
+            this.parentMetricGroup = parentMetricGroup;
+            return this;
+        }
+
+        public Builder setNumRecordsOutErrorsCounterSupplier(
+                Supplier<Counter> numRecordsOutErrorsCounterSupplier) {
+            this.numRecordsOutErrorsCounterSupplier = 
numRecordsOutErrorsCounterSupplier;
+            return this;
+        }
+
+        public Builder setNumRecordsSendErrorsCounterSupplier(
+                Supplier<Counter> numRecordsSendErrorsCounterSupplier) {
+            this.numRecordsSendErrorsCounterSupplier = 
numRecordsSendErrorsCounterSupplier;
+            return this;
+        }
+
+        public Builder setNumRecordsSendCounterSupplier(
+                Supplier<Counter> numRecordsSendCounterSupplier) {
+            this.numRecordsSendCounterSupplier = numRecordsSendCounterSupplier;
+            return this;
+        }
+
+        public Builder setNumBytesSendCounterSupplier(
+                Supplier<Counter> numBytesSendCounterSupplier) {
+            this.numBytesSendCounterSupplier = numBytesSendCounterSupplier;
+            return this;
+        }
+
+        public Builder setCurrentSendTimeGaugeConsumer(
+                Consumer<Gauge<Long>> currentSendTimeGaugeConsumer) {
+            this.currentSendTimeGaugeConsumer = currentSendTimeGaugeConsumer;
+            return this;
+        }
+
+        public Builder setIoMetricGroupSupplier(
+                Supplier<OperatorIOMetricGroup> ioMetricGroupSupplier) {
+            this.ioMetricGroupSupplier = ioMetricGroupSupplier;
+            return this;
+        }
+
+        public TestingSinkWriterMetricGroup build() {
+            return new TestingSinkWriterMetricGroup(
+                    parentMetricGroup,
+                    numRecordsOutErrorsCounterSupplier,
+                    numRecordsSendErrorsCounterSupplier,
+                    numRecordsSendCounterSupplier,
+                    numBytesSendCounterSupplier,
+                    currentSendTimeGaugeConsumer,
+                    ioMetricGroupSupplier);
+        }
+    }
+}
diff --git a/flink-sql-connector-opensearch/pom.xml 
b/flink-sql-connector-opensearch/pom.xml
index e3f4cd4..8f3cdc1 100644
--- a/flink-sql-connector-opensearch/pom.xml
+++ b/flink-sql-connector-opensearch/pom.xml
@@ -40,6 +40,12 @@ under the License.
                        <artifactId>flink-connector-opensearch</artifactId>
                        <version>${project.version}</version>
                </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils</artifactId>
+                       <version>${flink.version}</version>
+                       <scope>test</scope>
+               </dependency>
        </dependencies>
 
        <build>
diff --git a/flink-sql-connector-opensearch/src/main/resources/META-INF/NOTICE 
b/flink-sql-connector-opensearch/src/main/resources/META-INF/NOTICE
index 79e4365..9da53ef 100644
--- a/flink-sql-connector-opensearch/src/main/resources/META-INF/NOTICE
+++ b/flink-sql-connector-opensearch/src/main/resources/META-INF/NOTICE
@@ -7,10 +7,10 @@ The Apache Software Foundation (http://www.apache.org/).
 This project bundles the following dependencies under the Apache Software 
License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
 
 - com.carrotsearch:hppc:0.8.1
-- com.fasterxml.jackson.core:jackson-core:2.13.4
-- com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.13.4
-- com.fasterxml.jackson.dataformat:jackson-dataformat-smile:2.13.4
-- com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.4
+- com.fasterxml.jackson.core:jackson-core:2.15.3
+- com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.15.3
+- com.fasterxml.jackson.dataformat:jackson-dataformat-smile:2.15.3
+- com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.15.3
 - com.github.spullara.mustache.java:compiler:0.9.10
 - commons-codec:commons-codec:1.15
 - commons-logging:commons-logging:1.1.3
@@ -32,16 +32,16 @@ This project bundles the following dependencies under the 
Apache Software Licens
 - org.apache.lucene:lucene-spatial3d:8.10.1
 - org.apache.lucene:lucene-spatial-extras:8.10.1
 - org.apache.lucene:lucene-suggest:8.10.1
-- org.opensearch.client:opensearch-rest-client:1.3.0
-- org.opensearch.client:opensearch-rest-high-level-client:1.3.0
-- org.opensearch:opensearch-cli:1.3.0
-- org.opensearch:opensearch-core:1.3.0
-- org.opensearch:opensearch-geo:1.3.0
-- org.opensearch:opensearch:1.3.0
-- org.opensearch:opensearch-secure-sm:1.3.0
-- org.opensearch:opensearch-x-content:1.3.0
-- org.opensearch.plugin:aggs-matrix-stats-client:1.3.0
-- org.opensearch.plugin:lang-mustache-client:1.3.0
-- org.opensearch.plugin:mapper-extras-client:1.3.0
-- org.opensearch.plugin:parent-join-client:1.3.0
-- org.opensearch.plugin:rank-eval-client:1.3.0
+- org.opensearch.client:opensearch-rest-client:1.3.14
+- org.opensearch.client:opensearch-rest-high-level-client:1.3.14
+- org.opensearch:opensearch-cli:1.3.14
+- org.opensearch:opensearch-core:1.3.14
+- org.opensearch:opensearch-geo:1.3.14
+- org.opensearch:opensearch:1.3.14
+- org.opensearch:opensearch-secure-sm:1.3.14
+- org.opensearch:opensearch-x-content:1.3.14
+- org.opensearch.plugin:aggs-matrix-stats-client:1.3.14
+- org.opensearch.plugin:lang-mustache-client:1.3.14
+- org.opensearch.plugin:mapper-extras-client:1.3.14
+- org.opensearch.plugin:parent-join-client:1.3.14
+- org.opensearch.plugin:rank-eval-client:1.3.14
diff --git a/pom.xml b/pom.xml
index d6dbd4c..631b6b3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -59,12 +59,12 @@ under the License.
        <properties>
                <flink.version>1.17.1</flink.version>
                
-               <commons-compress.version>1.23.0</commons-compress.version>
-               <jackson-bom.version>2.13.4.20221013</jackson-bom.version>
-               <junit5.version>5.9.2</junit5.version>
-               <assertj.version>3.21.0</assertj.version>
-               <testcontainers.version>1.18.3</testcontainers.version>
-               <mockito.version>2.21.0</mockito.version>
+               <commons-compress.version>1.26.1</commons-compress.version>
+               <jackson-bom.version>2.15.3</jackson-bom.version>
+               <junit5.version>5.10.2</junit5.version>
+               <assertj.version>3.25.3</assertj.version>
+               <testcontainers.version>1.19.7</testcontainers.version>
+               <mockito.version>3.12.4</mockito.version>
 
                <japicmp.skip>false</japicmp.skip>
                <japicmp.referenceVersion>1.0.0-1.16</japicmp.referenceVersion>
@@ -73,6 +73,12 @@ under the License.
                <log4j.version>2.17.2</log4j.version>
 
                
<flink.parent.artifactId>flink-connector-opensearch-parent</flink.parent.artifactId>
+               <!-- This property should contain the add-opens/add-exports 
commands required for the tests
+                in the given connector's module to pass.
+                It MUST be a space-separated list not containing any newlines,
+                of entries in the form 
'[-]{2}add-[opens|exports]=<module>/<package>=ALL-UNNAMED'.-->
+               <flink.connector.module.config/>
+               <flink.surefire.baseArgLine>-XX:+UseG1GC -Xms256m 
-XX:+IgnoreUnrecognizedVMOptions 
${flink.connector.module.config}</flink.surefire.baseArgLine>
        </properties>
 
        <dependencies>
@@ -279,6 +285,27 @@ under the License.
                                <version>${commons-compress.version}</version>
                        </dependency>
 
+                       <!-- For dependency convergence -->
+                       <dependency>
+                               <groupId>commons-io</groupId>
+                               <artifactId>commons-io</artifactId>
+                               <version>2.15.1</version>
+                       </dependency>
+
+                       <!-- For dependency convergence -->
+                       <dependency>
+                               <groupId>org.apache.commons</groupId>
+                               <artifactId>commons-lang3</artifactId>
+                               <version>3.14.0</version>
+                       </dependency>
+
+                       <!-- For dependency convergence -->
+                       <dependency>
+                               <groupId>net.bytebuddy</groupId>
+                               <artifactId>byte-buddy</artifactId>
+                               <version>1.14.13</version>
+                       </dependency>
+
                        <!-- For dependency convergence -->
                        <dependency>
                                <groupId>com.fasterxml.jackson</groupId>
@@ -297,6 +324,13 @@ under the License.
                                <scope>import</scope>
                        </dependency>
 
+                       <!-- For dependency convergence  -->
+                       <dependency>
+                               <groupId>org.xerial.snappy</groupId>
+                               <artifactId>snappy-java</artifactId>
+                               <version>1.1.10.5</version>
+                       </dependency>
+
                        <dependency>
                                <groupId>org.assertj</groupId>
                                <artifactId>assertj-core</artifactId>

Reply via email to