This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 10c6a6f154 [Feature][Connector-V2] MongodbSink Support SaveMode (#9883)
10c6a6f154 is described below
commit 10c6a6f154731a7b5d0a20f1fa2b4272c0fd6963
Author: 老王 <[email protected]>
AuthorDate: Wed Oct 15 12:08:55 2025 +0800
[Feature][Connector-V2] MongodbSink Support SaveMode (#9883)
---
docs/en/connector-v2/sink/MongoDB.md | 28 +--
.../seatunnel/mongodb/catalog/MongodbCatalog.java | 211 ++++++++++++++++++++
.../mongodb/catalog/MongodbCatalogFactory.java | 48 +++++
.../seatunnel/mongodb/config/MongodbConfig.java | 14 ++
.../seatunnel/mongodb/sink/MongodbSink.java | 32 +++-
.../seatunnel/mongodb/sink/MongodbSinkFactory.java | 13 +-
.../mongodb/sink/MongodbWriterOptions.java | 18 +-
.../sink/savemode/MongodbSaveModeHandler.java | 39 ++++
.../connector/v2/mongodb/AbstractMongodbIT.java | 13 +-
.../e2e/connector/v2/mongodb/MongodbCDCIT.java | 4 +-
.../e2e/connector/v2/mongodb/MongodbIT.java | 213 +++++++++++++++++++--
11 files changed, 598 insertions(+), 35 deletions(-)
diff --git a/docs/en/connector-v2/sink/MongoDB.md
b/docs/en/connector-v2/sink/MongoDB.md
index 04bb548aad..d4f702bd17 100644
--- a/docs/en/connector-v2/sink/MongoDB.md
+++ b/docs/en/connector-v2/sink/MongoDB.md
@@ -62,19 +62,21 @@ The following table lists the field data type mapping from
MongoDB BSON type to
## Sink Options
-| Name | Type | Required | Default |
Description
|
-|-----------------------|----------|----------|---------|------------------------------------------------------------------------------------------------------------------------------|
-| uri | String | Yes | - | The MongoDB standard
connection uri. eg.
mongodb://user:password@hosts:27017/database?readPreference=secondary&slaveOk=true.
|
-| database | String | Yes | - | The name of MongoDB
database to read or write.
|
-| collection | String | Yes | - | The name of MongoDB
collection to read or write.
|
-| buffer-flush.max-rows | String | No | 1000 | Specifies the
maximum number of buffered rows per batch request.
|
-| buffer-flush.interval | String | No | 30000 | Specifies the
maximum interval of buffered rows per batch request, the unit is millisecond.
|
-| retry.max | String | No | 3 | Specifies the max
number of retry if writing records to database failed.
|
-| retry.interval | Duration | No | 1000 | Specifies the retry
time interval if writing records to database failed, the unit is millisecond.
|
-| upsert-enable | Boolean | No | false | Whether to write
documents via upsert mode.
|
-| primary-key | List | No | - | The primary keys for
upsert/update. Keys are in `["id","name",...]` format for properties.
|
-| transaction | Boolean | No | false | Whether to use
transactions in MongoSink (requires MongoDB 4.2+).
|
-| common-options | | No | - | Source plugin common
parameters, please refer to [Source Common Options](../sink-common-options.md)
for details |
+| Name | Type | Required | Default | Description
|
+|-----------------------|----------|----------|--------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| uri | String | Yes | - | The MongoDB standard
connection uri. eg.
mongodb://user:password@hosts:27017/database?readPreference=secondary&slaveOk=true.
|
+| database | String | Yes | - | The name of MongoDB
database to read or write.
|
+| collection | String | Yes | - | The name of MongoDB
collection to read or write.
|
+| buffer-flush.max-rows | String | No | 1000 | Specifies the maximum
number of buffered rows per batch request.
|
+| buffer-flush.interval | String | No | 30000 | Specifies the maximum
interval of buffered rows per batch request, the unit is millisecond.
|
+| retry.max | String | No | 3 | Specifies the max
number of retry if writing records to database failed.
|
+| retry.interval | Duration | No | 1000 | Specifies the retry
time interval if writing records to database failed, the unit is millisecond.
|
+| upsert-enable | Boolean | No | false | Whether to write
documents via upsert mode.
|
+| primary-key | List | No | - | The primary keys for
upsert/update. Keys are in `["id","name",...]` format for properties.
|
+| transaction | Boolean | No | false | Whether to use
transactions in MongoSink (requires MongoDB 4.2+).
|
+| common-options | | No | - | Source plugin common
parameters, please refer to [Source Common Options](../sink-common-options.md)
for details
|
+| data_save_mode | String | No | APPEND_DATA | The data
saving mode of mongodb,Option introduction,`DROP_DATA`:The collection will be
cleared before inserting data;`APPEND_DATA`:Append data
;`ERROR_WHEN_DATA_EXISTS`:An error will be reported if there is data in the
collection. |
+
### Tips
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/catalog/MongodbCatalog.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/catalog/MongodbCatalog.java
new file mode 100644
index 0000000000..ceda30fdd1
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/catalog/MongodbCatalog.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.catalog;
+
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.common.exception.CommonError;
+
+import org.bson.Document;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MongodbCatalog implements Catalog {
+
+ private final String catalogName;
+ private final String baseUrl;
+ private transient MongoClient mongoClient;
+ private final String defaultDatabase;
+
+ public MongodbCatalog(String catalogName, String baseUrl, String
defaultDatabase) {
+ this.catalogName = catalogName;
+ this.baseUrl = baseUrl;
+ this.defaultDatabase = defaultDatabase;
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ try {
+ if (mongoClient == null) {
+ mongoClient = MongoClients.create(baseUrl);
+ }
+ } catch (Exception e) {
+ throw new CatalogException("Failed to open MongoDB Catalog: " +
e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public String name() {
+ return catalogName;
+ }
+
+ @Override
+ public String getDefaultDatabase() throws CatalogException {
+ return defaultDatabase;
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException
{
+ try {
+ return listDatabases().contains(databaseName);
+ } catch (Exception e) {
+ throw new CatalogException("Failed to check database existence: "
+ databaseName, e);
+ }
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ try {
+ List<String> dbs = new ArrayList<>();
+ for (String name : mongoClient.listDatabaseNames()) {
+ dbs.add(name);
+ }
+ return dbs;
+ } catch (Exception e) {
+ throw new CatalogException("Failed to list databases", e);
+ }
+ }
+
+ @Override
+ public List<String> listTables(String databaseName)
+ throws CatalogException, DatabaseNotExistException {
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(name(), databaseName);
+ }
+ try {
+ MongoDatabase db = mongoClient.getDatabase(databaseName);
+ return db.listCollectionNames().into(new ArrayList<>());
+ } catch (Exception e) {
+ throw new CatalogException("Failed to list tables for database: "
+ databaseName, e);
+ }
+ }
+
+ @Override
+ public boolean tableExists(TablePath tablePath) throws CatalogException {
+ try {
+ return
listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName());
+ } catch (DatabaseNotExistException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public CatalogTable getTable(TablePath tablePath)
+ throws CatalogException, TableNotExistException {
+ throw CommonError.unsupportedOperation(name(), "get table with
tablePath ");
+ }
+
+ @Override
+ public void createTable(TablePath tablePath, CatalogTable table, boolean
ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
+ if (!databaseExists(tablePath.getDatabaseName())) {
+ throw new DatabaseNotExistException(name(),
tablePath.getDatabaseName());
+ }
+ if (tableExists(tablePath)) {
+ if (ignoreIfExists) return;
+ throw new TableAlreadyExistException(name(), tablePath);
+ }
+ try {
+ MongoDatabase db =
mongoClient.getDatabase(tablePath.getDatabaseName());
+ db.createCollection(tablePath.getTableName());
+ } catch (Exception e) {
+ throw new CatalogException(
+ "Failed to create collection: " + tablePath.getFullName(),
e);
+ }
+ }
+
+ @Override
+ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ if (!tableExists(tablePath)) {
+ if (ignoreIfNotExists) return;
+ throw new TableNotExistException(name(), tablePath);
+ }
+ try {
+ MongoDatabase db =
mongoClient.getDatabase(tablePath.getDatabaseName());
+ db.getCollection(tablePath.getTableName()).drop();
+ } catch (Exception e) {
+ throw new CatalogException("Failed to drop collection: " +
tablePath.getFullName(), e);
+ }
+ }
+
+ @Override
+ public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ throw CommonError.unsupportedOperation(name(), "create database ");
+ }
+
+ @Override
+ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {
+ throw CommonError.unsupportedOperation(name(), "drop database ");
+ }
+
+ @Override
+ public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ try {
+ if (!tableExists(tablePath)) {
+ if (ignoreIfNotExists) {
+ return;
+ }
+ throw new TableNotExistException(name(), tablePath);
+ }
+ MongoDatabase db =
mongoClient.getDatabase(tablePath.getDatabaseName());
+ MongoCollection<Document> collection =
db.getCollection(tablePath.getTableName());
+ collection.deleteMany(new Document());
+ } catch (Exception e) {
+ throw new CatalogException(
+ "Failed to truncate collection: " +
tablePath.getFullName(), e);
+ }
+ }
+
+ @Override
+ public boolean isExistsData(TablePath tablePath) {
+ try {
+ if (!tableExists(tablePath)) {
+ return false;
+ }
+ MongoDatabase db =
mongoClient.getDatabase(tablePath.getDatabaseName());
+ MongoCollection<Document> collection =
db.getCollection(tablePath.getTableName());
+ return collection.estimatedDocumentCount() > 0;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ if (mongoClient != null) {
+ mongoClient.close();
+ mongoClient = null;
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/catalog/MongodbCatalogFactory.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/catalog/MongodbCatalogFactory.java
new file mode 100644
index 0000000000..05a22cd6c7
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/catalog/MongodbCatalogFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
+
+import com.google.auto.service.AutoService;
+
+import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
+
+@AutoService(Factory.class)
+public class MongodbCatalogFactory implements CatalogFactory {
+ @Override
+ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+ return new MongodbCatalog(
+ catalogName, options.get(MongodbConfig.URI),
options.get(MongodbConfig.DATABASE));
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return CONNECTOR_IDENTITY;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder().required(MongodbConfig.URI,
MongodbConfig.DATABASE).build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
index 848a120e27..ce7f894c87 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
@@ -19,12 +19,18 @@ package
org.apache.seatunnel.connectors.seatunnel.mongodb.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.sink.DataSaveMode;
import org.bson.json.JsonMode;
import org.bson.json.JsonWriterSettings;
+import java.util.Arrays;
import java.util.List;
+import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
+import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
+import static
org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
+
public class MongodbConfig {
public static final String CONNECTOR_IDENTITY = "MongoDB";
@@ -153,4 +159,12 @@ public class MongodbConfig {
public static final Option<Boolean> TRANSACTION =
Options.key("transaction").booleanType().defaultValue(false).withDescription(".");
+
+ public static final Option<DataSaveMode> DATA_SAVE_MODE =
+ Options.key("data_save_mode")
+ .singleChoice(
+ DataSaveMode.class,
+ Arrays.asList(DROP_DATA, APPEND_DATA,
ERROR_WHEN_DATA_EXISTS))
+ .defaultValue(APPEND_DATA)
+ .withDescription("The save mode of collection data");
}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
index 563af55c19..fe1962b538 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
@@ -19,25 +19,35 @@ package
org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import
org.apache.seatunnel.connectors.seatunnel.mongodb.catalog.MongodbCatalog;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataDocumentSerializer;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataToBsonConverters;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.commit.MongodbSinkAggregatedCommitter;
+import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.savemode.MongodbSaveModeHandler;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.DocumentBulk;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbAggregatedCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbCommitInfo;
import java.util.Optional;
+import static
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
public class MongodbSink
implements SeaTunnelSink<
- SeaTunnelRow, DocumentBulk, MongodbCommitInfo,
MongodbAggregatedCommitInfo> {
+ SeaTunnelRow, DocumentBulk, MongodbCommitInfo,
MongodbAggregatedCommitInfo>,
+ SupportSaveMode {
private final MongodbWriterOptions options;
@@ -92,4 +102,24 @@ public class MongodbSink
public Optional<CatalogTable> getWriteCatalogTable() {
return Optional.ofNullable(catalogTable);
}
+
+ @Override
+ public Optional<SaveModeHandler> getSaveModeHandler() {
+ String url = options.getConnectString();
+ String database = options.getDatabase();
+ if (catalogTable != null) {
+ Optional<Catalog> catalogOptional =
+ Optional.of(new MongodbCatalog(CONNECTOR_IDENTITY, url,
database));
+ try {
+ DataSaveMode dataSaveMode = options.getDataSaveMode();
+ Catalog catalog = catalogOptional.get();
+ return Optional.of(
+ new MongodbSaveModeHandler(
+ SchemaSaveMode.IGNORE, dataSaveMode, catalog,
catalogTable));
+ } catch (Exception e) {
+ throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED,
e);
+ }
+ }
+ return Optional.empty();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
index d8e2cf03bc..43569e7d36 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
@@ -19,6 +19,8 @@ package
org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -46,7 +48,8 @@ public class MongodbSinkFactory implements TableSinkFactory {
MongodbConfig.RETRY_MAX,
MongodbConfig.RETRY_INTERVAL,
MongodbConfig.UPSERT_ENABLE,
- MongodbConfig.PRIMARY_KEY)
+ MongodbConfig.PRIMARY_KEY,
+ MongodbConfig.DATA_SAVE_MODE)
.build();
}
@@ -84,6 +87,12 @@ public class MongodbSinkFactory implements TableSinkFactory {
if (readonlyConfig.getOptional(MongodbConfig.TRANSACTION).isPresent())
{
builder.withTransaction(readonlyConfig.get(MongodbConfig.TRANSACTION));
}
- return () -> new MongodbSink(builder.build(),
context.getCatalogTable());
+
builder.withDataSaveMode(readonlyConfig.get(MongodbConfig.DATA_SAVE_MODE));
+ CatalogTable catalogTable = context.getCatalogTable();
+ // sourceCatalogTable to sinkCatalogTable
+ TableIdentifier tableIdentifier =
+ TableIdentifier.of(CONNECTOR_IDENTITY, database, collection);
+ CatalogTable sinkCatalogTable = CatalogTable.of(tableIdentifier,
catalogTable);
+ return () -> new MongodbSink(builder.build(), sinkCatalogTable);
}
}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java
index e9b8264775..d1e4ee4aeb 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+
import lombok.Getter;
import java.io.Serializable;
@@ -46,6 +48,8 @@ public class MongodbWriterOptions implements Serializable {
protected final boolean transaction;
+ protected final DataSaveMode dataSaveMode;
+
public MongodbWriterOptions(
String connectString,
String database,
@@ -56,7 +60,8 @@ public class MongodbWriterOptions implements Serializable {
String[] primaryKey,
int retryMax,
long retryInterval,
- boolean transaction) {
+ boolean transaction,
+ DataSaveMode dataSaveMode) {
this.connectString = connectString;
this.database = database;
this.collection = collection;
@@ -67,6 +72,7 @@ public class MongodbWriterOptions implements Serializable {
this.retryMax = retryMax;
this.retryInterval = retryInterval;
this.transaction = transaction;
+ this.dataSaveMode = dataSaveMode;
}
public static Builder builder() {
@@ -95,6 +101,8 @@ public class MongodbWriterOptions implements Serializable {
protected boolean transaction;
+ protected DataSaveMode dataSaveMode;
+
public Builder withConnectString(String connectString) {
this.connectString = connectString;
return this;
@@ -145,6 +153,11 @@ public class MongodbWriterOptions implements Serializable {
return this;
}
+ public Builder withDataSaveMode(DataSaveMode dataSaveMode) {
+ this.dataSaveMode = dataSaveMode;
+ return this;
+ }
+
public MongodbWriterOptions build() {
return new MongodbWriterOptions(
connectString,
@@ -156,7 +169,8 @@ public class MongodbWriterOptions implements Serializable {
primaryKey,
retryMax,
retryInterval,
- transaction);
+ transaction,
+ dataSaveMode);
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/savemode/MongodbSaveModeHandler.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/savemode/MongodbSaveModeHandler.java
new file mode 100644
index 0000000000..4846fded5f
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/savemode/MongodbSaveModeHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.savemode;
+
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+
+public class MongodbSaveModeHandler extends DefaultSaveModeHandler {
+ public MongodbSaveModeHandler(
+ SchemaSaveMode schemaSaveMode,
+ DataSaveMode dataSaveMode,
+ Catalog catalog,
+ CatalogTable catalogTable) {
+ super(schemaSaveMode, dataSaveMode, catalog, catalogTable, null);
+ }
+
+ public void handleSaveMode() {
+ // mongodb remove schema save mode,only data save mde
+ handleDataSaveMode();
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
index cf0b1dbd09..41a96abf02 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
@@ -31,6 +31,9 @@ import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.DockerLoggerFactory;
+import com.github.dockerjava.api.model.ExposedPort;
+import com.github.dockerjava.api.model.PortBinding;
+import com.github.dockerjava.api.model.Ports;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
@@ -119,7 +122,7 @@ public abstract class AbstractMongodbIT extends
TestSuiteBase implements TestRes
prepareInitDataInCollection(MONGODB_DOUBLE_TABLE, TEST_DOUBLE_DATASET);
}
- protected void clearDate(String table) {
+ protected void clearData(String table) {
client.getDatabase(MONGODB_DATABASE).getCollection(table).drop();
}
@@ -247,6 +250,14 @@ public abstract class AbstractMongodbIT extends
TestSuiteBase implements TestRes
.withNetwork(NETWORK)
.withNetworkAliases(MONGODB_CONTAINER_HOST)
.withExposedPorts(MONGODB_PORT)
+ .withCreateContainerCmdModifier(
+ cmd ->
+ cmd.getHostConfig()
+ .withPortBindings(
+ new PortBinding(
+
Ports.Binding.bindPort(
+
MONGODB_PORT),
+ new
ExposedPort(MONGODB_PORT))))
.waitingFor(
new HttpWaitStrategy()
.forPort(MONGODB_PORT)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbCDCIT.java
index 6db7db4fa1..b249dc2e88 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbCDCIT.java
@@ -63,7 +63,7 @@ public class MongodbCDCIT extends AbstractMongodbIT {
.map(Map.Entry::getValue)
.collect(Collectors.toCollection(ArrayList::new)))
.collect(Collectors.toList()));
- clearDate(MONGODB_CDC_RESULT_TABLE);
+ clearData(MONGODB_CDC_RESULT_TABLE);
}
@TestTemplate
@@ -85,6 +85,6 @@ public class MongodbCDCIT extends AbstractMongodbIT {
.map(Map.Entry::getValue)
.collect(Collectors.toCollection(ArrayList::new)))
.collect(Collectors.toList()));
- clearDate(MONGODB_CDC_RESULT_TABLE);
+ clearData(MONGODB_CDC_RESULT_TABLE);
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
index b289af315f..4a721333e9 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
@@ -17,20 +17,48 @@
package org.apache.seatunnel.e2e.connector.v2.mongodb;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataDocumentSerializer;
+import
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataToBsonConverters;
+import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongoKeyExtractor;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongodbSink;
+import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongodbWriterOptions;
+import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.DocumentBulk;
+import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbCommitInfo;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.bson.BsonDocument;
import org.bson.Document;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.WriteModel;
+import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
+import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
+
@Slf4j
public class MongodbIT extends AbstractMongodbIT {
@@ -42,7 +70,7 @@ public class MongodbIT extends AbstractMongodbIT {
Container.ExecResult assertResult =
container.executeJob("/mongodb_source_to_assert.conf");
Assertions.assertEquals(0, assertResult.getExitCode(),
assertResult.getStderr());
- clearDate(MONGODB_SINK_TABLE);
+ clearData(MONGODB_SINK_TABLE);
}
@TestTemplate
@@ -59,8 +87,8 @@ public class MongodbIT extends AbstractMongodbIT {
readMongodbData(MONGODB_NULL_TABLE_RESULT).stream()
.peek(e -> e.remove("_id"))
.collect(Collectors.toList()));
- clearDate(MONGODB_NULL_TABLE);
- clearDate(MONGODB_NULL_TABLE_RESULT);
+ clearData(MONGODB_NULL_TABLE);
+ clearData(MONGODB_NULL_TABLE_RESULT);
}
@TestTemplate
@@ -78,7 +106,7 @@ public class MongodbIT extends AbstractMongodbIT {
readMongodbData(MONGODB_MATCH_RESULT_TABLE).stream()
.peek(e -> e.remove("_id"))
.collect(Collectors.toList()));
- clearDate(MONGODB_MATCH_RESULT_TABLE);
+ clearData(MONGODB_MATCH_RESULT_TABLE);
Container.ExecResult projectionResult =
container.executeJob("/matchIT/mongodb_matchProjection_source_to_assert.conf");
@@ -93,7 +121,7 @@ public class MongodbIT extends AbstractMongodbIT {
readMongodbData(MONGODB_MATCH_RESULT_TABLE).stream()
.peek(e -> e.remove("_id"))
.collect(Collectors.toList()));
- clearDate(MONGODB_MATCH_RESULT_TABLE);
+ clearData(MONGODB_MATCH_RESULT_TABLE);
}
@TestTemplate
@@ -112,7 +140,7 @@ public class MongodbIT extends AbstractMongodbIT {
container.executeJob("/updateIT/update_mongodb_to_assert.conf");
Assertions.assertEquals(0, assertResult.getExitCode(),
assertResult.getStderr());
- clearDate(MONGODB_UPDATE_TABLE);
+ clearData(MONGODB_UPDATE_TABLE);
}
@TestTemplate
@@ -126,7 +154,7 @@ public class MongodbIT extends AbstractMongodbIT {
container.executeJob("/flatIT/mongodb_flat_source_to_assert.conf");
Assertions.assertEquals(0, assertResult.getExitCode(),
assertResult.getStderr());
- clearDate(MONGODB_FLAT_TABLE);
+ clearData(MONGODB_FLAT_TABLE);
}
@TestTemplate
@@ -144,7 +172,7 @@ public class MongodbIT extends AbstractMongodbIT {
readMongodbData(MONGODB_SPLIT_RESULT_TABLE).stream()
.peek(e -> e.remove("_id"))
.collect(Collectors.toList()));
- clearDate(MONGODB_SPLIT_RESULT_TABLE);
+ clearData(MONGODB_SPLIT_RESULT_TABLE);
Container.ExecResult projectionResult =
container.executeJob("/splitIT/mongodb_split_size_source_to_assert.conf");
@@ -158,7 +186,7 @@ public class MongodbIT extends AbstractMongodbIT {
readMongodbData(MONGODB_SPLIT_RESULT_TABLE).stream()
.peek(e -> e.remove("_id"))
.collect(Collectors.toList()));
- clearDate(MONGODB_SPLIT_RESULT_TABLE);
+ clearData(MONGODB_SPLIT_RESULT_TABLE);
}
@TestTemplate
@@ -177,7 +205,7 @@ public class MongodbIT extends AbstractMongodbIT {
container.executeJob("/updateIT/update_mongodb_to_assert.conf");
Assertions.assertEquals(0, assertResult.getExitCode(),
assertResult.getStderr());
- clearDate(MONGODB_UPDATE_TABLE);
+ clearData(MONGODB_UPDATE_TABLE);
// `matchQuery` compatible test
Container.ExecResult queryResult =
@@ -192,7 +220,7 @@ public class MongodbIT extends AbstractMongodbIT {
readMongodbData(MONGODB_MATCH_RESULT_TABLE).stream()
.peek(e -> e.remove("_id"))
.collect(Collectors.toList()));
- clearDate(MONGODB_MATCH_RESULT_TABLE);
+ clearData(MONGODB_MATCH_RESULT_TABLE);
}
@TestTemplate
@@ -218,8 +246,8 @@ public class MongodbIT extends AbstractMongodbIT {
Assertions.assertEquals(
0, assertUpsertResult.getExitCode(),
assertUpsertResult.getStderr());
- clearDate(MONGODB_TRANSACTION_SINK_TABLE);
- clearDate(MONGODB_TRANSACTION_UPSERT_TABLE);
+ clearData(MONGODB_TRANSACTION_SINK_TABLE);
+ clearData(MONGODB_TRANSACTION_UPSERT_TABLE);
}
@TestTemplate
@@ -235,6 +263,163 @@ public class MongodbIT extends AbstractMongodbIT {
readMongodbData(MONGODB_DOUBLE_TABLE_RESULT).stream()
.peek(e -> e.remove("_id"))
.collect(Collectors.toList()));
- clearDate(MONGODB_DOUBLE_TABLE_RESULT);
+ clearData(MONGODB_DOUBLE_TABLE_RESULT);
+ }
+
+ @SneakyThrows
+ @TestTemplate
+ public void testDropDataSaveMode(TestContainer container) {
+ // test drop data save mode
+ String collectionName = "drop_data_save_mode_coll";
+ MongoCollection<BsonDocument> collection =
+ client.getDatabase(MONGODB_DATABASE)
+ .getCollection(collectionName, BsonDocument.class);
+ // insert one row
+ beforeInsertData(collectionName, DataSaveMode.DROP_DATA, collection);
+ // build sink
+ final MongodbSink mongoDbSink = getSinkInstance(collectionName,
DataSaveMode.DROP_DATA);
+ final SinkWriter<SeaTunnelRow, MongodbCommitInfo, DocumentBulk> writer
=
+ mongoDbSink.createWriter(null);
+ final Optional<SaveModeHandler> saveModeHandlerOptional =
mongoDbSink.getSaveModeHandler();
+ // do save mode
+ if (saveModeHandlerOptional.isPresent()) {
+ final SaveModeHandler saveModeHandler =
saveModeHandlerOptional.get();
+ saveModeHandler.open();
+ saveModeHandler.handleSaveMode();
+ saveModeHandler.close();
+ }
+ // do write
+ writer.write(getSeaTunnelRowOne());
+ Assertions.assertEquals(1L, collection.countDocuments());
+ // clear
+ collection.drop();
+ }
+
+ @SneakyThrows
+ @TestTemplate
+ public void testAppendDataSaveMode(TestContainer container) {
+ // test drop data save mode
+ String collectionName = "append_data_save_mode_coll";
+ MongoCollection<BsonDocument> collection =
+ client.getDatabase(MONGODB_DATABASE)
+ .getCollection(collectionName, BsonDocument.class);
+ // insert one row
+ beforeInsertData(collectionName, DataSaveMode.APPEND_DATA, collection);
+ // build sink
+ final MongodbSink mongoDbSink = getSinkInstance(collectionName,
DataSaveMode.APPEND_DATA);
+ final SinkWriter<SeaTunnelRow, MongodbCommitInfo, DocumentBulk> writer
=
+ mongoDbSink.createWriter(null);
+ final Optional<SaveModeHandler> saveModeHandlerOptional =
mongoDbSink.getSaveModeHandler();
+ // do save mode
+ if (saveModeHandlerOptional.isPresent()) {
+ final SaveModeHandler saveModeHandler =
saveModeHandlerOptional.get();
+ saveModeHandler.open();
+ saveModeHandler.handleSaveMode();
+ saveModeHandler.close();
+ }
+ // do write
+ writer.write(getSeaTunnelRowOne());
+ Assertions.assertEquals(3L, collection.countDocuments());
+ // clear
+ collection.drop();
+ }
+
+ @SneakyThrows
+ @TestTemplate
+ public void testErrorWhenDataExistsSaveMode(TestContainer container) {
+ // test drop data save mode
+ String collectionName = "error_data_save_mode_coll";
+ MongoCollection<BsonDocument> collection =
+ client.getDatabase(MONGODB_DATABASE)
+ .getCollection(collectionName, BsonDocument.class);
+ // insert one row
+ beforeInsertData(collectionName, DataSaveMode.ERROR_WHEN_DATA_EXISTS,
collection);
+ // build sink
+ final MongodbSink mongoDbSink =
+ getSinkInstance(collectionName,
DataSaveMode.ERROR_WHEN_DATA_EXISTS);
+ final SinkWriter<SeaTunnelRow, MongodbCommitInfo, DocumentBulk> writer
=
+ mongoDbSink.createWriter(null);
+ final Optional<SaveModeHandler> saveModeHandlerOptional =
mongoDbSink.getSaveModeHandler();
+ // do save mode
+ if (saveModeHandlerOptional.isPresent()) {
+ final SaveModeHandler saveModeHandler =
saveModeHandlerOptional.get();
+ saveModeHandler.open();
+ Assertions.assertThrows(
+ SeaTunnelRuntimeException.class,
+ saveModeHandler::handleDataSaveMode,
+ "When there exist data, an error will be reported");
+ saveModeHandler.close();
+ }
+ Assertions.assertEquals(2L, collection.countDocuments());
+ // clear
+ collection.drop();
+ }
+
+ private void beforeInsertData(
+ String collection,
+ DataSaveMode dataSaveMode,
+ MongoCollection<BsonDocument> dropDataCollection) {
+ final RowDataDocumentSerializer rowDataDocumentSerializer =
+ new RowDataDocumentSerializer(
+ RowDataToBsonConverters.createConverter(
+
getCatalogTable(collection).getSeaTunnelRowType()),
+ getMongodbWriterOptions(collection, dataSaveMode),
+ new
MongoKeyExtractor(getMongodbWriterOptions(collection, dataSaveMode)));
+ WriteModel<BsonDocument> bsonDocumentWriteModelOne =
+
rowDataDocumentSerializer.serializeToWriteModel(getSeaTunnelRowOne());
+ WriteModel<BsonDocument> bsonDocumentWriteModelTwo =
+
rowDataDocumentSerializer.serializeToWriteModel(getSeaTunnelRowTwo());
+ List<WriteModel<BsonDocument>> writeModelList = new ArrayList<>();
+ writeModelList.add(bsonDocumentWriteModelOne);
+ writeModelList.add(bsonDocumentWriteModelTwo);
+ dropDataCollection.bulkWrite(writeModelList);
+ }
+
+ private SeaTunnelRow getSeaTunnelRowOne() {
+ return new SeaTunnelRow(new Object[] {1L, "A", 100});
+ }
+
+ private SeaTunnelRow getSeaTunnelRowTwo() {
+ return new SeaTunnelRow(new Object[] {2L, "B", 200});
+ }
+
+ private MongodbSink getSinkInstance(String collection, DataSaveMode
dataSaveMode) {
+ return new MongodbSink(
+ getMongodbWriterOptions(collection, dataSaveMode),
getCatalogTable(collection));
+ }
+
+ private MongodbWriterOptions getMongodbWriterOptions(
+ String collection, DataSaveMode dataSaveMode) {
+ String host = mongodbContainer.getContainerIpAddress();
+ int port = mongodbContainer.getFirstMappedPort();
+ String url = String.format("mongodb://%s:%d/%s", host, port,
MONGODB_DATABASE);
+ return MongodbWriterOptions.builder()
+ .withConnectString(url)
+ .withDatabase(MONGODB_DATABASE)
+ .withCollection(collection)
+ .withDataSaveMode(dataSaveMode)
+ .withFlushSize(1)
+ .build();
+ }
+
+ private CatalogTable getCatalogTable(String collection) {
+ return CatalogTable.of(
+ TableIdentifier.of(CONNECTOR_IDENTITY, MONGODB_DATABASE,
collection),
+ getTableSchema(),
+ new HashMap<>(),
+ new ArrayList<>(),
+ "");
+ }
+
+ private TableSchema getTableSchema() {
+ return new TableSchema(getColumns(), null, null);
+ }
+
+ private List<Column> getColumns() {
+ List<Column> columns = new ArrayList<>();
+ columns.add(new PhysicalColumn("c_int", BasicType.LONG_TYPE, 64L, 0,
true, "", ""));
+ columns.add(new PhysicalColumn("name", BasicType.STRING_TYPE, 100L, 0,
true, "", ""));
+ columns.add(new PhysicalColumn("score", BasicType.INT_TYPE, 32L, 0,
true, "", ""));
+ return columns;
}
}