This is an automated email from the ASF dual-hosted git repository. jiabaosun pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git
The following commit(s) were added to refs/heads/main by this push: new 0e31aa8 [FLINK-36074][Connectors/mongodb] Support insert overwrite semantics for mongodb table sink (#43) 0e31aa8 is described below commit 0e31aa8890e4514c84e1f1d11e4e0d287ba4df7a Author: Jiabao Sun <jiabao...@apache.org> AuthorDate: Wed Jan 22 10:33:19 2025 +0800 [FLINK-36074][Connectors/mongodb] Support insert overwrite semantics for mongodb table sink (#43) --- docs/content.zh/docs/connectors/table/mongodb.md | 9 ++++--- docs/content/docs/connectors/table/mongodb.md | 9 +++++-- .../src/test/resources/e2e_upsert.sql | 2 +- .../mongodb/table/MongoDynamicTableSink.java | 22 ++++++++++----- .../mongodb/table/MongoDynamicTableSinkITCase.java | 31 +++++++++++++++++++--- 5 files changed, 58 insertions(+), 15 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/mongodb.md b/docs/content.zh/docs/connectors/table/mongodb.md index 1c078f0..ea8a35d 100644 --- a/docs/content.zh/docs/connectors/table/mongodb.md +++ b/docs/content.zh/docs/connectors/table/mongodb.md @@ -347,11 +347,14 @@ lookup cache 的主要目的是用于提高时态表关联 MongoDB 连接器的 默认情况下,flink 会缓存主键的空查询结果,你可以通过将 `lookup.partial-cache.caching-missing-key` 设置为 false 来切换行为。 ### 幂等写入 -如果在 DDL 中定义了主键,MongoDB sink 将使用 upsert 语义而不是普通的 INSERT 语句。 -我们将 DDL 中声明的主键进行组合作为 MongoDB 保留主键 _id,使用 upsert 模式进行写入,来保证写入的幂等性。 +如果在 DDL 中定义了主键,MongoDB connector 将使用 UPSERT 模式 `db.connection.update(<query>, <update>, { upsert: true })` 写入 MongoDB +而不是 INSERT 模式 `db.connection.insert()`。 我们将 DDL 中声明的主键进行组合作为 MongoDB 保留主键 _id,使用 UPSERT 模式进行写入,来保证写入的幂等性。 + +当使用 `INSERT OVERWRITE` 写入 MongoDB Table 时,会强制使用 UPSERT 模式写入 MongoDB。 +因此,当DDL中没有定义 MongoDB Table 的主键时,会拒绝写入。 如果出现故障,Flink 作业会从上次成功的 checkpoint 恢复并重新处理,这可能导致在恢复过程中重复处理消息。 -强烈推荐使用 upsert 模式,因为如果需要重复处理记录,它有助于避免违反数据库主键约束和产生重复数据。 +强烈推荐使用 UPSERT 模式,因为如果需要重复处理记录,它有助于避免违反数据库主键约束和产生重复数据。 ### [Upsert 写入分片集合](https://www.mongodb.com/docs/manual/reference/method/db.collection.updateOne/#upsert-on-a-sharded-collection) diff --git a/docs/content/docs/connectors/table/mongodb.md b/docs/content/docs/connectors/table/mongodb.md index 8fcd35c..8b15b95 100644 --- a/docs/content/docs/connectors/table/mongodb.md +++ b/docs/content/docs/connectors/table/mongodb.md @@ -377,10 +377,15 @@ by setting `lookup.partial-cache.caching-missing-key` to false. ### Idempotent Writes -MongoDB sink will use upsert semantics rather than plain INSERT statements if primary key is defined -in DDL. We composite the primary key fields as the document _id which is the reserved primary key of +MongoDB connector use upsert writing mode `db.connection.update(<query>, <update>, { upsert: true })` +rather than insert writing mode `db.connection.insert()` if primary key is defined in DDL. +We composite the primary key fields as the document _id which is the reserved primary key of MongoDB. Use upsert mode to write rows into MongoDB, which provides idempotence. +When using the `INSERT OVERWRITE` statement to write to a MongoDB table, it forces the use of the upsert mode +to write to MongoDB. Therefore, if the primary key of the MongoDB table is not defined in the DDL, +the write operation will be rejected. + If there are failures, the Flink job will recover and re-process from last successful checkpoint, which can lead to re-processing messages during recovery. The upsert mode is highly recommended as it helps avoid constraint violations or duplicate data if records need to be re-processed. diff --git a/flink-connector-mongodb-e2e-tests/src/test/resources/e2e_upsert.sql b/flink-connector-mongodb-e2e-tests/src/test/resources/e2e_upsert.sql index ff3757e..34d5818 100644 --- a/flink-connector-mongodb-e2e-tests/src/test/resources/e2e_upsert.sql +++ b/flink-connector-mongodb-e2e-tests/src/test/resources/e2e_upsert.sql @@ -43,4 +43,4 @@ CREATE TABLE orders_bak ( 'collection' = 'orders_bak' ); -INSERT INTO orders_bak SELECT * FROM orders; +INSERT OVERWRITE orders_bak SELECT * FROM orders; diff --git a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSink.java b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSink.java index 99bdc7f..5775e41 100644 --- a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSink.java +++ b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSink.java @@ -28,6 +28,7 @@ import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.SinkV2Provider; +import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite; import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; @@ -48,14 +49,15 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** A {@link DynamicTableSink} for MongoDB. */ @Internal -public class MongoDynamicTableSink implements DynamicTableSink, SupportsPartitioning { +public class MongoDynamicTableSink + implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite { private static final Logger LOG = LoggerFactory.getLogger(MongoDynamicTableSink.class); private final MongoConnectionOptions connectionOptions; private final MongoWriteOptions writeOptions; @Nullable private final Integer parallelism; - private final boolean isUpsert; + private final boolean supportUpsert; private final ResolvedSchema resolvedSchema; private final String[] partitionKeys; private final SerializableFunction<RowData, BsonValue> primaryKeyExtractor; @@ -72,7 +74,7 @@ public class MongoDynamicTableSink implements DynamicTableSink, SupportsPartitio this.parallelism = parallelism; this.resolvedSchema = checkNotNull(resolvedSchema); this.partitionKeys = checkNotNull(partitionKeys); - this.isUpsert = resolvedSchema.getPrimaryKey().isPresent(); + this.supportUpsert = resolvedSchema.getPrimaryKey().isPresent(); this.primaryKeyExtractor = MongoPrimaryKeyExtractor.createPrimaryKeyExtractor(resolvedSchema); this.shardKeysExtractor = @@ -81,7 +83,7 @@ public class MongoDynamicTableSink implements DynamicTableSink, SupportsPartitio @Override public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { - if (isUpsert) { + if (supportUpsert) { return ChangelogMode.upsert(); } else { return ChangelogMode.insertOnly(); @@ -119,6 +121,14 @@ public class MongoDynamicTableSink implements DynamicTableSink, SupportsPartitio LOG.info("Applied static partition: {}", partition); } + @Override + public void applyOverwrite(boolean overwrite) { + if (overwrite && !supportUpsert) { + throw new IllegalStateException( + "Overwrite sink requires specifying the table's primary key"); + } + } + @Override public MongoDynamicTableSink copy() { return new MongoDynamicTableSink( @@ -142,7 +152,7 @@ public class MongoDynamicTableSink implements DynamicTableSink, SupportsPartitio return Objects.equals(connectionOptions, that.connectionOptions) && Objects.equals(writeOptions, that.writeOptions) && Objects.equals(parallelism, that.parallelism) - && Objects.equals(isUpsert, that.isUpsert) + && Objects.equals(supportUpsert, that.supportUpsert) && Objects.equals(resolvedSchema, that.resolvedSchema) && Arrays.equals(partitionKeys, that.partitionKeys); } @@ -154,7 +164,7 @@ public class MongoDynamicTableSink implements DynamicTableSink, SupportsPartitio connectionOptions, writeOptions, parallelism, - isUpsert, + supportUpsert, resolvedSchema) + Arrays.hashCode(partitionKeys); } diff --git a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSinkITCase.java b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSinkITCase.java index a9d53b1..a4f99c2 100644 --- a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSinkITCase.java +++ b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSinkITCase.java @@ -65,6 +65,8 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.MongoDBContainer; @@ -85,6 +87,7 @@ import java.util.regex.Pattern; import static org.apache.flink.table.api.Expressions.row; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** IT tests for {@link MongoDynamicTableSink}. */ @Testcontainers @@ -349,8 +352,9 @@ class MongoDynamicTableSinkITCase { assertThat(actual).isEqualTo(expected); } - @Test - void testSinkWithReservedId() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testSinkWithReservedId(boolean overwrite) throws Exception { String database = "test"; String collection = "sink_with_reserved_id"; @@ -368,7 +372,7 @@ class MongoDynamicTableSinkITCase { ObjectId objectId = new ObjectId(); tEnv.fromValues(row(objectId.toHexString(), "r1"), row("str", "r2")) - .executeInsert("mongo_sink") + .executeInsert("mongo_sink", overwrite) .await(); MongoCollection<Document> coll = @@ -385,6 +389,27 @@ class MongoDynamicTableSinkITCase { assertThat(actual).containsExactlyInAnyOrder(expected); } + @Test + void testOverwriteSinkWithoutPrimaryKey() { + String database = "test"; + String collection = "overwrite_sink_without_primary_key"; + + TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); + + tEnv.executeSql( + String.format( + "CREATE TABLE mongo_sink (" + "f1 STRING NOT NULL\n" + ")\n" + "WITH (%s)", + getConnectorSql(database, collection))); + + assertThatThrownBy( + () -> + tEnv.fromValues(row("d1"), row("d1")) + .executeInsert("mongo_sink", true) + .await()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Overwrite sink requires specifying the table's primary key"); + } + @Test void testSinkWithoutPrimaryKey() throws Exception { String database = "test";