This is an automated email from the ASF dual-hosted git repository.

jiabaosun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git


The following commit(s) were added to refs/heads/main by this push:
     new 0e31aa8  [FLINK-36074][Connectors/mongodb] Support insert overwrite 
semantics for mongodb table sink (#43)
0e31aa8 is described below

commit 0e31aa8890e4514c84e1f1d11e4e0d287ba4df7a
Author: Jiabao Sun <jiabao...@apache.org>
AuthorDate: Wed Jan 22 10:33:19 2025 +0800

    [FLINK-36074][Connectors/mongodb] Support insert overwrite semantics for 
mongodb table sink (#43)
---
 docs/content.zh/docs/connectors/table/mongodb.md   |  9 ++++---
 docs/content/docs/connectors/table/mongodb.md      |  9 +++++--
 .../src/test/resources/e2e_upsert.sql              |  2 +-
 .../mongodb/table/MongoDynamicTableSink.java       | 22 ++++++++++-----
 .../mongodb/table/MongoDynamicTableSinkITCase.java | 31 +++++++++++++++++++---
 5 files changed, 58 insertions(+), 15 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/mongodb.md 
b/docs/content.zh/docs/connectors/table/mongodb.md
index 1c078f0..ea8a35d 100644
--- a/docs/content.zh/docs/connectors/table/mongodb.md
+++ b/docs/content.zh/docs/connectors/table/mongodb.md
@@ -347,11 +347,14 @@ lookup cache 的主要目的是用于提高时态表关联 MongoDB 连接器的
 默认情况下,flink 会缓存主键的空查询结果,你可以通过将 `lookup.partial-cache.caching-missing-key` 设置为 
false 来切换行为。
 
 ### 幂等写入
-如果在 DDL 中定义了主键,MongoDB sink 将使用 upsert 语义而不是普通的 INSERT 语句。
-我们将 DDL 中声明的主键进行组合作为 MongoDB 保留主键 _id,使用 upsert 模式进行写入,来保证写入的幂等性。
+如果在 DDL 中定义了主键,MongoDB connector 将使用 UPSERT 模式 `db.connection.update(<query>, 
<update>, { upsert: true })` 写入 MongoDB 
+而不是 INSERT 模式 `db.connection.insert()`。 我们将 DDL 中声明的主键进行组合作为 MongoDB 保留主键 
_id,使用 UPSERT 模式进行写入,来保证写入的幂等性。
+
+当使用 `INSERT OVERWRITE` 写入 MongoDB Table 时,会强制使用 UPSERT 模式写入 MongoDB。
+因此,当DDL中没有定义 MongoDB Table 的主键时,会拒绝写入。
 
 如果出现故障,Flink 作业会从上次成功的 checkpoint 恢复并重新处理,这可能导致在恢复过程中重复处理消息。
-强烈推荐使用 upsert 模式,因为如果需要重复处理记录,它有助于避免违反数据库主键约束和产生重复数据。
+强烈推荐使用 UPSERT 模式,因为如果需要重复处理记录,它有助于避免违反数据库主键约束和产生重复数据。
 
 ### [Upsert 
写入分片集合](https://www.mongodb.com/docs/manual/reference/method/db.collection.updateOne/#upsert-on-a-sharded-collection)
 
diff --git a/docs/content/docs/connectors/table/mongodb.md 
b/docs/content/docs/connectors/table/mongodb.md
index 8fcd35c..8b15b95 100644
--- a/docs/content/docs/connectors/table/mongodb.md
+++ b/docs/content/docs/connectors/table/mongodb.md
@@ -377,10 +377,15 @@ by setting `lookup.partial-cache.caching-missing-key` to 
false.
 
 ### Idempotent Writes
 
-MongoDB sink will use upsert semantics rather than plain INSERT statements if 
primary key is defined 
-in DDL. We composite the primary key fields as the document _id which is the 
reserved primary key of
+MongoDB connector use upsert writing mode `db.connection.update(<query>, 
<update>, { upsert: true })` 
+rather than insert writing mode `db.connection.insert()` if primary key is 
defined in DDL.
+We composite the primary key fields as the document _id which is the reserved 
primary key of
 MongoDB. Use upsert mode to write rows into MongoDB, which provides 
idempotence.
 
+When using the `INSERT OVERWRITE` statement to write to a MongoDB table, it 
forces the use of the upsert mode 
+to write to MongoDB. Therefore, if the primary key of the MongoDB table is not 
defined in the DDL, 
+the write operation will be rejected.
+
 If there are failures, the Flink job will recover and re-process from last 
successful checkpoint, 
 which can lead to re-processing messages during recovery. The upsert mode is 
highly recommended as 
 it helps avoid constraint violations or duplicate data if records need to be 
re-processed.
diff --git 
a/flink-connector-mongodb-e2e-tests/src/test/resources/e2e_upsert.sql 
b/flink-connector-mongodb-e2e-tests/src/test/resources/e2e_upsert.sql
index ff3757e..34d5818 100644
--- a/flink-connector-mongodb-e2e-tests/src/test/resources/e2e_upsert.sql
+++ b/flink-connector-mongodb-e2e-tests/src/test/resources/e2e_upsert.sql
@@ -43,4 +43,4 @@ CREATE TABLE orders_bak (
   'collection' = 'orders_bak'
 );
 
-INSERT INTO orders_bak SELECT * FROM orders;
+INSERT OVERWRITE orders_bak SELECT * FROM orders;
diff --git 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSink.java
 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSink.java
index 99bdc7f..5775e41 100644
--- 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSink.java
+++ 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSink.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.SinkV2Provider;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
 import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
@@ -48,14 +49,15 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 
 /** A {@link DynamicTableSink} for MongoDB. */
 @Internal
-public class MongoDynamicTableSink implements DynamicTableSink, 
SupportsPartitioning {
+public class MongoDynamicTableSink
+        implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MongoDynamicTableSink.class);
 
     private final MongoConnectionOptions connectionOptions;
     private final MongoWriteOptions writeOptions;
     @Nullable private final Integer parallelism;
-    private final boolean isUpsert;
+    private final boolean supportUpsert;
     private final ResolvedSchema resolvedSchema;
     private final String[] partitionKeys;
     private final SerializableFunction<RowData, BsonValue> primaryKeyExtractor;
@@ -72,7 +74,7 @@ public class MongoDynamicTableSink implements 
DynamicTableSink, SupportsPartitio
         this.parallelism = parallelism;
         this.resolvedSchema = checkNotNull(resolvedSchema);
         this.partitionKeys = checkNotNull(partitionKeys);
-        this.isUpsert = resolvedSchema.getPrimaryKey().isPresent();
+        this.supportUpsert = resolvedSchema.getPrimaryKey().isPresent();
         this.primaryKeyExtractor =
                 
MongoPrimaryKeyExtractor.createPrimaryKeyExtractor(resolvedSchema);
         this.shardKeysExtractor =
@@ -81,7 +83,7 @@ public class MongoDynamicTableSink implements 
DynamicTableSink, SupportsPartitio
 
     @Override
     public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
-        if (isUpsert) {
+        if (supportUpsert) {
             return ChangelogMode.upsert();
         } else {
             return ChangelogMode.insertOnly();
@@ -119,6 +121,14 @@ public class MongoDynamicTableSink implements 
DynamicTableSink, SupportsPartitio
         LOG.info("Applied static partition: {}", partition);
     }
 
+    @Override
+    public void applyOverwrite(boolean overwrite) {
+        if (overwrite && !supportUpsert) {
+            throw new IllegalStateException(
+                    "Overwrite sink requires specifying the table's primary 
key");
+        }
+    }
+
     @Override
     public MongoDynamicTableSink copy() {
         return new MongoDynamicTableSink(
@@ -142,7 +152,7 @@ public class MongoDynamicTableSink implements 
DynamicTableSink, SupportsPartitio
         return Objects.equals(connectionOptions, that.connectionOptions)
                 && Objects.equals(writeOptions, that.writeOptions)
                 && Objects.equals(parallelism, that.parallelism)
-                && Objects.equals(isUpsert, that.isUpsert)
+                && Objects.equals(supportUpsert, that.supportUpsert)
                 && Objects.equals(resolvedSchema, that.resolvedSchema)
                 && Arrays.equals(partitionKeys, that.partitionKeys);
     }
@@ -154,7 +164,7 @@ public class MongoDynamicTableSink implements 
DynamicTableSink, SupportsPartitio
                                 connectionOptions,
                                 writeOptions,
                                 parallelism,
-                                isUpsert,
+                                supportUpsert,
                                 resolvedSchema)
                 + Arrays.hashCode(partitionKeys);
     }
diff --git 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSinkITCase.java
 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSinkITCase.java
index a9d53b1..a4f99c2 100644
--- 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSinkITCase.java
+++ 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSinkITCase.java
@@ -65,6 +65,8 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.MongoDBContainer;
@@ -85,6 +87,7 @@ import java.util.regex.Pattern;
 
 import static org.apache.flink.table.api.Expressions.row;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** IT tests for {@link MongoDynamicTableSink}. */
 @Testcontainers
@@ -349,8 +352,9 @@ class MongoDynamicTableSinkITCase {
         assertThat(actual).isEqualTo(expected);
     }
 
-    @Test
-    void testSinkWithReservedId() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testSinkWithReservedId(boolean overwrite) throws Exception {
         String database = "test";
         String collection = "sink_with_reserved_id";
 
@@ -368,7 +372,7 @@ class MongoDynamicTableSinkITCase {
 
         ObjectId objectId = new ObjectId();
         tEnv.fromValues(row(objectId.toHexString(), "r1"), row("str", "r2"))
-                .executeInsert("mongo_sink")
+                .executeInsert("mongo_sink", overwrite)
                 .await();
 
         MongoCollection<Document> coll =
@@ -385,6 +389,27 @@ class MongoDynamicTableSinkITCase {
         assertThat(actual).containsExactlyInAnyOrder(expected);
     }
 
+    @Test
+    void testOverwriteSinkWithoutPrimaryKey() {
+        String database = "test";
+        String collection = "overwrite_sink_without_primary_key";
+
+        TableEnvironment tEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE mongo_sink (" + "f1 STRING NOT NULL\n" + 
")\n" + "WITH (%s)",
+                        getConnectorSql(database, collection)));
+
+        assertThatThrownBy(
+                        () ->
+                                tEnv.fromValues(row("d1"), row("d1"))
+                                        .executeInsert("mongo_sink", true)
+                                        .await())
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("Overwrite sink requires specifying the 
table's primary key");
+    }
+
     @Test
     void testSinkWithoutPrimaryKey() throws Exception {
         String database = "test";

Reply via email to