This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git


The following commit(s) were added to refs/heads/main by this push:
     new e03d336  [FLINK-38606] Update to Flink 2.1.0
e03d336 is described below

commit e03d33635cb3901ef85f768c7a02d21fc4c5505f
Author: Rahul Teke <[email protected]>
AuthorDate: Fri Feb 6 13:22:54 2026 +0000

    [FLINK-38606] Update to Flink 2.1.0
---
 .github/workflows/push_pr.yml                      |  4 +-
 flink-connector-mongodb-table-tests/pom.xml        | 79 ++++++++++++++++++++++
 .../table/MongoFilterPushDownVisitorTest.java      |  2 +-
 .../mongodb/table/MongoTablePlanTest.java          |  0
 .../connector/mongodb/table/MongoTablePlanTest.xml |  0
 flink-connector-mongodb/pom.xml                    | 11 +--
 .../flink/connector/mongodb/sink/MongoSink.java    |  3 +-
 .../connector/mongodb/sink/writer/MongoWriter.java |  4 +-
 .../writer/context/DefaultMongoSinkContext.java    |  8 +--
 .../sink/writer/context/MongoSinkContext.java      |  4 +-
 .../connector/mongodb/source/MongoSource.java      |  5 --
 .../mongodb/source/reader/MongoSourceReader.java   |  7 +-
 .../connector/mongodb/sink/MongoSinkITCase.java    |  9 ++-
 .../mongodb/sink/writer/MongoWriterITCase.java     |  3 +-
 pom.xml                                            |  3 +-
 15 files changed, 105 insertions(+), 37 deletions(-)

diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml
index 37f8922..a8d750d 100644
--- a/.github/workflows/push_pr.yml
+++ b/.github/workflows/push_pr.yml
@@ -29,8 +29,8 @@ jobs:
     strategy:
       matrix:
         mongodb: [ mongodb4, mongodb5, mongodb6, mongodb7 ]
-        flink: [ 1.19-SNAPSHOT, 1.20-SNAPSHOT ]
-        jdk: [ '8, 11, 17, 21' ]
+        flink: [ 2.1-SNAPSHOT ]
+        jdk: [ '11','17, 21' ]
 
     uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
     with:
diff --git a/flink-connector-mongodb-table-tests/pom.xml 
b/flink-connector-mongodb-table-tests/pom.xml
new file mode 100644
index 0000000..6da604e
--- /dev/null
+++ b/flink-connector-mongodb-table-tests/pom.xml
@@ -0,0 +1,79 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connector-mongodb-parent</artifactId>
+               <version>2.1-SNAPSHOT</version>
+       </parent>
+       <artifactId>flink-connector-mongodb-table-tests</artifactId>
+
+       <!-- This module was added specifically to separate test cases which 
require direct access to planner scala module -->
+       <!-- The flink-connector-mongodb project should depend only on 
flink-table-planner-loader and not the scala project as recommend in flink docs 
-->
+       <name>Flink : Connectors : MongoDB : Table : Tests</name>
+       <packaging>jar</packaging>
+
+       <properties>
+               <surefire.module.config>
+                       --add-opens=java.base/java.util=ALL-UNNAMED
+                       --add-opens=java.base/java.lang=ALL-UNNAMED
+               </surefire.module.config>
+       </properties>
+
+
+       <dependencies>
+               <!-- Core -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-base</artifactId>
+                       <version>${flink.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java</artifactId>
+                       <version>${flink.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-mongodb</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <!-- Tests -->
+
+               <dependency>
+                       <groupId>org.testcontainers</groupId>
+                       <artifactId>mongodb</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-test-utils</artifactId>
+                       <version>${flink.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <!-- Table API integration tests -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+                       <version>${flink.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+                       <version>${flink.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+</project>
diff --git 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitorTest.java
 
b/flink-connector-mongodb-table-tests/src/test/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitorTest.java
similarity index 99%
rename from 
flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitorTest.java
rename to 
flink-connector-mongodb-table-tests/src/test/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitorTest.java
index f92ce17..8d7e9f1 100644
--- 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitorTest.java
+++ 
b/flink-connector-mongodb-table-tests/src/test/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitorTest.java
@@ -286,7 +286,7 @@ class MongoFilterPushDownVisitorTest {
         RexNodeExpression rexExp =
                 (RexNodeExpression) 
tbImpl.getParser().parseSqlExpression(sqlExp, sourceType, null);
 
-        RexNode cnf = FlinkRexUtil.toCnf(rexBuilder, -1, rexExp.getRexNode());
+        RexNode cnf = FlinkRexUtil.toCnf(rexBuilder, rexExp.getRexNode());
         // converts the cnf condition to a list of AND conditions
         List<RexNode> conjunctions = RelOptUtil.conjunctions(cnf);
 
diff --git 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.java
 
b/flink-connector-mongodb-table-tests/src/test/java/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.java
similarity index 100%
rename from 
flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.java
rename to 
flink-connector-mongodb-table-tests/src/test/java/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.java
diff --git 
a/flink-connector-mongodb/src/test/resources/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.xml
 
b/flink-connector-mongodb-table-tests/src/test/resources/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.xml
similarity index 100%
rename from 
flink-connector-mongodb/src/test/resources/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.xml
rename to 
flink-connector-mongodb-table-tests/src/test/resources/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.xml
diff --git a/flink-connector-mongodb/pom.xml b/flink-connector-mongodb/pom.xml
index 8e21e2f..9944fa4 100644
--- a/flink-connector-mongodb/pom.xml
+++ b/flink-connector-mongodb/pom.xml
@@ -124,19 +124,10 @@ under the License.
                <!-- Table API integration tests -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+                       <artifactId>flink-table-planner-loader</artifactId>
                        <version>${flink.version}</version>
                        <scope>test</scope>
                </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
-                       <version>${flink.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-table-runtime</artifactId>
diff --git 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/MongoSink.java
 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/MongoSink.java
index 33596c7..be1dc62 100644
--- 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/MongoSink.java
+++ 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/MongoSink.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
@@ -78,7 +79,7 @@ public class MongoSink<IN> implements Sink<IN> {
     }
 
     @Override
-    public SinkWriter<IN> createWriter(InitContext context) {
+    public SinkWriter<IN> createWriter(WriterInitContext context) {
         return new MongoWriter<>(
                 connectionOptions,
                 writeOptions,
diff --git 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java
 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java
index 5319959..cbd9a61 100644
--- 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java
+++ 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java
@@ -22,8 +22,8 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.functions.util.ListCollector;
 import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
 import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
 import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
 import 
org.apache.flink.connector.mongodb.sink.writer.context.DefaultMongoSinkContext;
@@ -89,7 +89,7 @@ public class MongoWriter<IN> implements SinkWriter<IN> {
             MongoConnectionOptions connectionOptions,
             MongoWriteOptions writeOptions,
             boolean flushOnCheckpoint,
-            Sink.InitContext initContext,
+            WriterInitContext initContext,
             MongoSerializationSchema<IN> serializationSchema) {
         this.connectionOptions = checkNotNull(connectionOptions);
         this.writeOptions = checkNotNull(writeOptions);
diff --git 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/DefaultMongoSinkContext.java
 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/DefaultMongoSinkContext.java
index f2958e1..d62094e 100644
--- 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/DefaultMongoSinkContext.java
+++ 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/DefaultMongoSinkContext.java
@@ -18,23 +18,23 @@
 package org.apache.flink.connector.mongodb.sink.writer.context;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
 import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
 
 /** Default {@link MongoSinkContext} implementation. */
 @Internal
 public class DefaultMongoSinkContext implements MongoSinkContext {
 
-    private final Sink.InitContext initContext;
+    private final WriterInitContext initContext;
     private final MongoWriteOptions writeOptions;
 
-    public DefaultMongoSinkContext(Sink.InitContext initContext, 
MongoWriteOptions writeOptions) {
+    public DefaultMongoSinkContext(WriterInitContext initContext, 
MongoWriteOptions writeOptions) {
         this.initContext = initContext;
         this.writeOptions = writeOptions;
     }
 
     @Override
-    public Sink.InitContext getInitContext() {
+    public WriterInitContext getInitContext() {
         return initContext;
     }
 
diff --git 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/MongoSinkContext.java
 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/MongoSinkContext.java
index 55e89d8..645c3da 100644
--- 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/MongoSinkContext.java
+++ 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/MongoSinkContext.java
@@ -18,7 +18,7 @@
 package org.apache.flink.connector.mongodb.sink.writer.context;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
 import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
 import 
org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
 
@@ -27,7 +27,7 @@ import 
org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializat
 public interface MongoSinkContext {
 
     /** Returns the current sink's init context. */
-    Sink.InitContext getInitContext();
+    WriterInitContext getInitContext();
 
     /** Returns the current process time in flink. */
     long processTime();
diff --git 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSource.java
 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSource.java
index bbec562..ea57a83 100644
--- 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSource.java
+++ 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSource.java
@@ -26,9 +26,7 @@ import 
org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
-import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
 import org.apache.flink.connector.mongodb.source.config.MongoReadOptions;
 import 
org.apache.flink.connector.mongodb.source.enumerator.MongoSourceEnumState;
@@ -134,8 +132,6 @@ public class MongoSource<OUT>
 
     @Override
     public SourceReader<OUT, MongoSourceSplit> 
createReader(SourceReaderContext readerContext) {
-        FutureCompletingBlockingQueue<RecordsWithSplitIds<BsonDocument>> 
elementsQueue =
-                new FutureCompletingBlockingQueue<>();
 
         MongoSourceReaderContext mongoReaderContext =
                 new MongoSourceReaderContext(readerContext, limit);
@@ -150,7 +146,6 @@ public class MongoSource<OUT>
                                 mongoReaderContext);
 
         return new MongoSourceReader<>(
-                elementsQueue,
                 splitReaderSupplier,
                 new MongoRecordEmitter<>(deserializationSchema),
                 mongoReaderContext);
diff --git 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/MongoSourceReader.java
 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/MongoSourceReader.java
index 5eb6669..57c126d 100644
--- 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/MongoSourceReader.java
+++ 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/MongoSourceReader.java
@@ -19,11 +19,9 @@ package org.apache.flink.connector.mongodb.source.reader;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
 import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
-import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit;
 import 
org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplitState;
 import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit;
@@ -49,13 +47,12 @@ public class MongoSourceReader<OUT>
     private static final Logger LOG = 
LoggerFactory.getLogger(MongoSourceReader.class);
 
     public MongoSourceReader(
-            FutureCompletingBlockingQueue<RecordsWithSplitIds<BsonDocument>> 
elementQueue,
             Supplier<SplitReader<BsonDocument, MongoSourceSplit>> 
splitReaderSupplier,
             RecordEmitter<BsonDocument, OUT, MongoSourceSplitState> 
recordEmitter,
             MongoSourceReaderContext readerContext) {
         super(
-                elementQueue,
-                new SingleThreadFetcherManager<>(elementQueue, 
splitReaderSupplier),
+                new SingleThreadFetcherManager<>(
+                        splitReaderSupplier, readerContext.getConfiguration()),
                 recordEmitter,
                 readerContext.getConfiguration(),
                 readerContext);
diff --git 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java
 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java
index 867f1f0..d7b4a07 100644
--- 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java
+++ 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java
@@ -18,8 +18,9 @@
 package org.apache.flink.connector.mongodb.sink;
 
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
 import 
org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
@@ -101,9 +102,11 @@ class MongoSinkITCase {
             throws Exception {
         final String collection = "test-sink-with-delivery-" + 
deliveryGuarantee;
         final MongoSink<Document> sink = createSink(collection, 
deliveryGuarantee);
-        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        Configuration config = new Configuration();
+        config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
+        final StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.getExecutionEnvironment(config);
         env.enableCheckpointing(100L);
-        env.setRestartStrategy(RestartStrategies.noRestart());
 
         env.fromSequence(1, 5).map(new TestMapFunction()).sinkTo(sink);
         env.execute();
diff --git 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
index daa5b8b..2c5ba84 100644
--- 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
+++ 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
@@ -243,7 +243,8 @@ class MongoWriterITCase {
 
         MongoSerializationSchema<Document> testSerializationSchema =
                 (element, context) -> {
-                    
assertThat(context.getInitContext().getSubtaskId()).isEqualTo(0);
+                    
assertThat(context.getInitContext().getTaskInfo().getIndexOfThisSubtask())
+                            .isEqualTo(0);
                     
assertThat(context.getWriteOptions()).isEqualTo(expectOptions);
                     assertThat(context.processTime())
                             .isEqualTo(
diff --git a/pom.xml b/pom.xml
index 7ceeae5..aea6d86 100644
--- a/pom.xml
+++ b/pom.xml
@@ -57,7 +57,7 @@ under the License.
                <mongodb7.version>7.0.12</mongodb7.version>
                <mongodb.version>${mongodb4.version}</mongodb.version>
 
-               <flink.version>1.20.0</flink.version>
+               <flink.version>2.1.0</flink.version>
                <scala.binary.version>2.12</scala.binary.version>
                <scala-library.version>2.12.7</scala-library.version>
                <junit5.version>5.8.1</junit5.version>
@@ -79,6 +79,7 @@ under the License.
                <module>flink-connector-mongodb</module>
                <module>flink-sql-connector-mongodb</module>
                <module>flink-connector-mongodb-e2e-tests</module>
+               <module>flink-connector-mongodb-table-tests</module>
        </modules>
 
        <dependencies>

Reply via email to