This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git
The following commit(s) were added to refs/heads/main by this push:
new e03d336 [FLINK-38606] Update to Flink 2.1.0
e03d336 is described below
commit e03d33635cb3901ef85f768c7a02d21fc4c5505f
Author: Rahul Teke <[email protected]>
AuthorDate: Fri Feb 6 13:22:54 2026 +0000
[FLINK-38606] Update to Flink 2.1.0
---
.github/workflows/push_pr.yml | 4 +-
flink-connector-mongodb-table-tests/pom.xml | 79 ++++++++++++++++++++++
.../table/MongoFilterPushDownVisitorTest.java | 2 +-
.../mongodb/table/MongoTablePlanTest.java | 0
.../connector/mongodb/table/MongoTablePlanTest.xml | 0
flink-connector-mongodb/pom.xml | 11 +--
.../flink/connector/mongodb/sink/MongoSink.java | 3 +-
.../connector/mongodb/sink/writer/MongoWriter.java | 4 +-
.../writer/context/DefaultMongoSinkContext.java | 8 +--
.../sink/writer/context/MongoSinkContext.java | 4 +-
.../connector/mongodb/source/MongoSource.java | 5 --
.../mongodb/source/reader/MongoSourceReader.java | 7 +-
.../connector/mongodb/sink/MongoSinkITCase.java | 9 ++-
.../mongodb/sink/writer/MongoWriterITCase.java | 3 +-
pom.xml | 3 +-
15 files changed, 105 insertions(+), 37 deletions(-)
diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml
index 37f8922..a8d750d 100644
--- a/.github/workflows/push_pr.yml
+++ b/.github/workflows/push_pr.yml
@@ -29,8 +29,8 @@ jobs:
strategy:
matrix:
mongodb: [ mongodb4, mongodb5, mongodb6, mongodb7 ]
- flink: [ 1.19-SNAPSHOT, 1.20-SNAPSHOT ]
- jdk: [ '8, 11, 17, 21' ]
+ flink: [ 2.1-SNAPSHOT ]
+ jdk: [ '11','17, 21' ]
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
with:
diff --git a/flink-connector-mongodb-table-tests/pom.xml
b/flink-connector-mongodb-table-tests/pom.xml
new file mode 100644
index 0000000..6da604e
--- /dev/null
+++ b/flink-connector-mongodb-table-tests/pom.xml
@@ -0,0 +1,79 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-mongodb-parent</artifactId>
+ <version>2.1-SNAPSHOT</version>
+ </parent>
+ <artifactId>flink-connector-mongodb-table-tests</artifactId>
+
+ <!-- This module was added specifically to separate test cases which
require direct access to planner scala module -->
+ <!-- The flink-connector-mongodb project should depend only on
flink-table-planner-loader and not the scala project as recommend in flink docs
-->
+ <name>Flink : Connectors : MongoDB : Table : Tests</name>
+ <packaging>jar</packaging>
+
+ <properties>
+ <surefire.module.config>
+ --add-opens=java.base/java.util=ALL-UNNAMED
+ --add-opens=java.base/java.lang=ALL-UNNAMED
+ </surefire.module.config>
+ </properties>
+
+
+ <dependencies>
+ <!-- Core -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-base</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-mongodb</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- Tests -->
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>mongodb</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Table API integration tests -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitorTest.java
b/flink-connector-mongodb-table-tests/src/test/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitorTest.java
similarity index 99%
rename from
flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitorTest.java
rename to
flink-connector-mongodb-table-tests/src/test/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitorTest.java
index f92ce17..8d7e9f1 100644
---
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitorTest.java
+++
b/flink-connector-mongodb-table-tests/src/test/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitorTest.java
@@ -286,7 +286,7 @@ class MongoFilterPushDownVisitorTest {
RexNodeExpression rexExp =
(RexNodeExpression)
tbImpl.getParser().parseSqlExpression(sqlExp, sourceType, null);
- RexNode cnf = FlinkRexUtil.toCnf(rexBuilder, -1, rexExp.getRexNode());
+ RexNode cnf = FlinkRexUtil.toCnf(rexBuilder, rexExp.getRexNode());
// converts the cnf condition to a list of AND conditions
List<RexNode> conjunctions = RelOptUtil.conjunctions(cnf);
diff --git
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.java
b/flink-connector-mongodb-table-tests/src/test/java/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.java
similarity index 100%
rename from
flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.java
rename to
flink-connector-mongodb-table-tests/src/test/java/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.java
diff --git
a/flink-connector-mongodb/src/test/resources/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.xml
b/flink-connector-mongodb-table-tests/src/test/resources/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.xml
similarity index 100%
rename from
flink-connector-mongodb/src/test/resources/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.xml
rename to
flink-connector-mongodb-table-tests/src/test/resources/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.xml
diff --git a/flink-connector-mongodb/pom.xml b/flink-connector-mongodb/pom.xml
index 8e21e2f..9944fa4 100644
--- a/flink-connector-mongodb/pom.xml
+++ b/flink-connector-mongodb/pom.xml
@@ -124,19 +124,10 @@ under the License.
<!-- Table API integration tests -->
<dependency>
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
-
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
diff --git
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/MongoSink.java
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/MongoSink.java
index 33596c7..be1dc62 100644
---
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/MongoSink.java
+++
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/MongoSink.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
@@ -78,7 +79,7 @@ public class MongoSink<IN> implements Sink<IN> {
}
@Override
- public SinkWriter<IN> createWriter(InitContext context) {
+ public SinkWriter<IN> createWriter(WriterInitContext context) {
return new MongoWriter<>(
connectionOptions,
writeOptions,
diff --git
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java
index 5319959..cbd9a61 100644
---
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java
+++
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java
@@ -22,8 +22,8 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
import
org.apache.flink.connector.mongodb.sink.writer.context.DefaultMongoSinkContext;
@@ -89,7 +89,7 @@ public class MongoWriter<IN> implements SinkWriter<IN> {
MongoConnectionOptions connectionOptions,
MongoWriteOptions writeOptions,
boolean flushOnCheckpoint,
- Sink.InitContext initContext,
+ WriterInitContext initContext,
MongoSerializationSchema<IN> serializationSchema) {
this.connectionOptions = checkNotNull(connectionOptions);
this.writeOptions = checkNotNull(writeOptions);
diff --git
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/DefaultMongoSinkContext.java
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/DefaultMongoSinkContext.java
index f2958e1..d62094e 100644
---
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/DefaultMongoSinkContext.java
+++
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/DefaultMongoSinkContext.java
@@ -18,23 +18,23 @@
package org.apache.flink.connector.mongodb.sink.writer.context;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
/** Default {@link MongoSinkContext} implementation. */
@Internal
public class DefaultMongoSinkContext implements MongoSinkContext {
- private final Sink.InitContext initContext;
+ private final WriterInitContext initContext;
private final MongoWriteOptions writeOptions;
- public DefaultMongoSinkContext(Sink.InitContext initContext,
MongoWriteOptions writeOptions) {
+ public DefaultMongoSinkContext(WriterInitContext initContext,
MongoWriteOptions writeOptions) {
this.initContext = initContext;
this.writeOptions = writeOptions;
}
@Override
- public Sink.InitContext getInitContext() {
+ public WriterInitContext getInitContext() {
return initContext;
}
diff --git
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/MongoSinkContext.java
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/MongoSinkContext.java
index 55e89d8..645c3da 100644
---
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/MongoSinkContext.java
+++
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/context/MongoSinkContext.java
@@ -18,7 +18,7 @@
package org.apache.flink.connector.mongodb.sink.writer.context;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
import
org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
@@ -27,7 +27,7 @@ import
org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializat
public interface MongoSinkContext {
/** Returns the current sink's init context. */
- Sink.InitContext getInitContext();
+ WriterInitContext getInitContext();
/** Returns the current process time in flink. */
long processTime();
diff --git
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSource.java
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSource.java
index bbec562..ea57a83 100644
---
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSource.java
+++
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSource.java
@@ -26,9 +26,7 @@ import
org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
-import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
import org.apache.flink.connector.mongodb.source.config.MongoReadOptions;
import
org.apache.flink.connector.mongodb.source.enumerator.MongoSourceEnumState;
@@ -134,8 +132,6 @@ public class MongoSource<OUT>
@Override
public SourceReader<OUT, MongoSourceSplit>
createReader(SourceReaderContext readerContext) {
- FutureCompletingBlockingQueue<RecordsWithSplitIds<BsonDocument>>
elementsQueue =
- new FutureCompletingBlockingQueue<>();
MongoSourceReaderContext mongoReaderContext =
new MongoSourceReaderContext(readerContext, limit);
@@ -150,7 +146,6 @@ public class MongoSource<OUT>
mongoReaderContext);
return new MongoSourceReader<>(
- elementsQueue,
splitReaderSupplier,
new MongoRecordEmitter<>(deserializationSchema),
mongoReaderContext);
diff --git
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/MongoSourceReader.java
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/MongoSourceReader.java
index 5eb6669..57c126d 100644
---
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/MongoSourceReader.java
+++
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/MongoSourceReader.java
@@ -19,11 +19,9 @@ package org.apache.flink.connector.mongodb.source.reader;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
-import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit;
import
org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplitState;
import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit;
@@ -49,13 +47,12 @@ public class MongoSourceReader<OUT>
private static final Logger LOG =
LoggerFactory.getLogger(MongoSourceReader.class);
public MongoSourceReader(
- FutureCompletingBlockingQueue<RecordsWithSplitIds<BsonDocument>>
elementQueue,
Supplier<SplitReader<BsonDocument, MongoSourceSplit>>
splitReaderSupplier,
RecordEmitter<BsonDocument, OUT, MongoSourceSplitState>
recordEmitter,
MongoSourceReaderContext readerContext) {
super(
- elementQueue,
- new SingleThreadFetcherManager<>(elementQueue,
splitReaderSupplier),
+ new SingleThreadFetcherManager<>(
+ splitReaderSupplier, readerContext.getConfiguration()),
recordEmitter,
readerContext.getConfiguration(),
readerContext);
diff --git
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java
index 867f1f0..d7b4a07 100644
---
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java
+++
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java
@@ -18,8 +18,9 @@
package org.apache.flink.connector.mongodb.sink;
import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
import
org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
@@ -101,9 +102,11 @@ class MongoSinkITCase {
throws Exception {
final String collection = "test-sink-with-delivery-" +
deliveryGuarantee;
final MongoSink<Document> sink = createSink(collection,
deliveryGuarantee);
- final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ Configuration config = new Configuration();
+ config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
+ final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment(config);
env.enableCheckpointing(100L);
- env.setRestartStrategy(RestartStrategies.noRestart());
env.fromSequence(1, 5).map(new TestMapFunction()).sinkTo(sink);
env.execute();
diff --git
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
index daa5b8b..2c5ba84 100644
---
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
+++
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
@@ -243,7 +243,8 @@ class MongoWriterITCase {
MongoSerializationSchema<Document> testSerializationSchema =
(element, context) -> {
-
assertThat(context.getInitContext().getSubtaskId()).isEqualTo(0);
+
assertThat(context.getInitContext().getTaskInfo().getIndexOfThisSubtask())
+ .isEqualTo(0);
assertThat(context.getWriteOptions()).isEqualTo(expectOptions);
assertThat(context.processTime())
.isEqualTo(
diff --git a/pom.xml b/pom.xml
index 7ceeae5..aea6d86 100644
--- a/pom.xml
+++ b/pom.xml
@@ -57,7 +57,7 @@ under the License.
<mongodb7.version>7.0.12</mongodb7.version>
<mongodb.version>${mongodb4.version}</mongodb.version>
- <flink.version>1.20.0</flink.version>
+ <flink.version>2.1.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<scala-library.version>2.12.7</scala-library.version>
<junit5.version>5.8.1</junit5.version>
@@ -79,6 +79,7 @@ under the License.
<module>flink-connector-mongodb</module>
<module>flink-sql-connector-mongodb</module>
<module>flink-connector-mongodb-e2e-tests</module>
+ <module>flink-connector-mongodb-table-tests</module>
</modules>
<dependencies>