This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 59cccb6097 [Improve][MongoDB] Implement TableSourceFactory to create
mongodb source (#5813)
59cccb6097 is described below
commit 59cccb6097750556438c00687867938e371698fc
Author: hailin0 <[email protected]>
AuthorDate: Wed Nov 8 19:23:29 2023 +0800
[Improve][MongoDB] Implement TableSourceFactory to create mongodb source
(#5813)
---
.../api/table/catalog/CatalogTableUtil.java | 4 +
.../seatunnel/mongodb/source/MongodbSource.java | 163 ++++++++-------------
.../mongodb/source/MongodbSourceFactory.java | 20 +++
.../source/split/SamplingSplitStrategy.java | 6 -
4 files changed, 83 insertions(+), 110 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index 0439754914..99c376d330 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -228,4 +228,8 @@ public class CatalogTableUtil implements Serializable {
public static SeaTunnelRowType buildSimpleTextSchema() {
return SIMPLE_SCHEMA;
}
+
+ public static CatalogTable buildSimpleTextTable() {
+ return getCatalogTable("default", buildSimpleTextSchema());
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
index d9be1457f4..1a3c0f8f03 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
@@ -17,23 +17,18 @@
package org.apache.seatunnel.connectors.seatunnel.mongodb.source;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportColumnProjection;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbCollectionProvider;
-import
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentDeserializer;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentRowDataDeserializer;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.source.config.MongodbReadOptions;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.source.enumerator.MongodbSplitEnumerator;
@@ -44,142 +39,102 @@ import
org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.SamplingSp
import org.bson.BsonDocument;
-import com.google.auto.service.AutoService;
-
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
-@AutoService(SeaTunnelSource.class)
public class MongodbSource
implements SeaTunnelSource<SeaTunnelRow, MongoSplit,
ArrayList<MongoSplit>>,
SupportColumnProjection {
private static final long serialVersionUID = 1L;
- private MongodbClientProvider clientProvider;
-
- private DocumentDeserializer<SeaTunnelRow> deserializer;
-
- private MongoSplitStrategy splitStrategy;
+ private final CatalogTable catalogTable;
+ private final ReadonlyConfig options;
- private SeaTunnelRowType rowType;
-
- private MongodbReadOptions mongodbReadOptions;
+ public MongodbSource(CatalogTable catalogTable, ReadonlyConfig options) {
+ this.catalogTable = catalogTable;
+ this.options = options;
+ }
@Override
public String getPluginName() {
return CONNECTOR_IDENTITY;
}
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- if (pluginConfig.hasPath(MongodbConfig.URI.key())
- && pluginConfig.hasPath(MongodbConfig.DATABASE.key())
- && pluginConfig.hasPath(MongodbConfig.COLLECTION.key())) {
- String connection =
pluginConfig.getString(MongodbConfig.URI.key());
- String database =
pluginConfig.getString(MongodbConfig.DATABASE.key());
- String collection =
pluginConfig.getString(MongodbConfig.COLLECTION.key());
- clientProvider =
- MongodbCollectionProvider.builder()
- .connectionString(connection)
- .database(database)
- .collection(collection)
- .build();
- }
- if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) {
- this.rowType =
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
- } else {
- this.rowType = CatalogTableUtil.buildSimpleTextSchema();
- }
-
- if (pluginConfig.hasPath(MongodbConfig.FLAT_SYNC_STRING.key())) {
- deserializer =
- new DocumentRowDataDeserializer(
- rowType.getFieldNames(),
- rowType,
-
pluginConfig.getBoolean(MongodbConfig.FLAT_SYNC_STRING.key()));
- } else {
- deserializer =
- new DocumentRowDataDeserializer(
- rowType.getFieldNames(),
- rowType,
- MongodbConfig.FLAT_SYNC_STRING.defaultValue());
- }
-
- SamplingSplitStrategy.Builder splitStrategyBuilder =
SamplingSplitStrategy.builder();
- if (pluginConfig.hasPath(MongodbConfig.MATCH_QUERY.key())) {
- splitStrategyBuilder.setMatchQuery(
-
BsonDocument.parse(pluginConfig.getString(MongodbConfig.MATCH_QUERY.key())));
- }
-
- List<String> fallbackKeys =
MongodbConfig.MATCH_QUERY.getFallbackKeys();
- fallbackKeys.forEach(
- key -> {
- if (pluginConfig.hasPath(key)) {
- splitStrategyBuilder.setMatchQuery(
- BsonDocument.parse(
-
pluginConfig.getString(MongodbConfig.MATCH_QUERY.key())));
- }
- });
-
- if (pluginConfig.hasPath(MongodbConfig.SPLIT_KEY.key())) {
-
splitStrategyBuilder.setSplitKey(pluginConfig.getString(MongodbConfig.SPLIT_KEY.key()));
- }
- if (pluginConfig.hasPath(MongodbConfig.SPLIT_SIZE.key())) {
- splitStrategyBuilder.setSizePerSplit(
- pluginConfig.getLong(MongodbConfig.SPLIT_SIZE.key()));
- }
- if (pluginConfig.hasPath(MongodbConfig.PROJECTION.key())) {
- splitStrategyBuilder.setProjection(
-
BsonDocument.parse(pluginConfig.getString(MongodbConfig.PROJECTION.key())));
- }
- splitStrategy =
splitStrategyBuilder.setClientProvider(clientProvider).build();
-
- MongodbReadOptions.MongoReadOptionsBuilder mongoReadOptionsBuilder =
- MongodbReadOptions.builder();
- if (pluginConfig.hasPath(MongodbConfig.MAX_TIME_MIN.key())) {
- mongoReadOptionsBuilder.setMaxTimeMS(
- pluginConfig.getLong(MongodbConfig.MAX_TIME_MIN.key()));
- }
- if (pluginConfig.hasPath(MongodbConfig.FETCH_SIZE.key())) {
- mongoReadOptionsBuilder.setFetchSize(
- pluginConfig.getInt(MongodbConfig.FETCH_SIZE.key()));
- }
- if (pluginConfig.hasPath(MongodbConfig.CURSOR_NO_TIMEOUT.key())) {
- mongoReadOptionsBuilder.setNoCursorTimeout(
-
pluginConfig.getBoolean(MongodbConfig.CURSOR_NO_TIMEOUT.key()));
- }
- mongodbReadOptions = mongoReadOptionsBuilder.build();
- }
-
@Override
public Boundedness getBoundedness() {
return Boundedness.BOUNDED;
}
@Override
- public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
- return rowType;
+ public List<CatalogTable> getProducedCatalogTables() {
+ return Collections.singletonList(catalogTable);
}
@Override
public SourceReader<SeaTunnelRow, MongoSplit>
createReader(SourceReader.Context readerContext) {
- return new MongodbReader(readerContext, clientProvider, deserializer,
mongodbReadOptions);
+ return new MongodbReader(
+ readerContext,
+ crateClientProvider(options),
+ createDeserializer(options,
catalogTable.getSeaTunnelRowType()),
+ createMongodbReadOptions(options));
}
@Override
public SourceSplitEnumerator<MongoSplit, ArrayList<MongoSplit>>
createEnumerator(
SourceSplitEnumerator.Context<MongoSplit> enumeratorContext) {
- return new MongodbSplitEnumerator(enumeratorContext, clientProvider,
splitStrategy);
+ MongodbClientProvider clientProvider = crateClientProvider(options);
+ return new MongodbSplitEnumerator(
+ enumeratorContext, clientProvider,
createSplitStrategy(options, clientProvider));
}
@Override
public SourceSplitEnumerator<MongoSplit, ArrayList<MongoSplit>>
restoreEnumerator(
SourceSplitEnumerator.Context<MongoSplit> enumeratorContext,
ArrayList<MongoSplit> checkpointState) {
+ MongodbClientProvider clientProvider = crateClientProvider(options);
return new MongodbSplitEnumerator(
- enumeratorContext, clientProvider, splitStrategy,
checkpointState);
+ enumeratorContext,
+ clientProvider,
+ createSplitStrategy(options, clientProvider),
+ checkpointState);
+ }
+
+ private MongodbClientProvider crateClientProvider(ReadonlyConfig config) {
+ return MongodbCollectionProvider.builder()
+ .connectionString(config.get(MongodbConfig.URI))
+ .database(config.get(MongodbConfig.DATABASE))
+ .collection(config.get(MongodbConfig.COLLECTION))
+ .build();
+ }
+
+ private DocumentRowDataDeserializer createDeserializer(
+ ReadonlyConfig config, SeaTunnelRowType rowType) {
+ return new DocumentRowDataDeserializer(
+ rowType.getFieldNames(), rowType,
config.get(MongodbConfig.FLAT_SYNC_STRING));
+ }
+
+ private MongoSplitStrategy createSplitStrategy(
+ ReadonlyConfig config, MongodbClientProvider clientProvider) {
+ SamplingSplitStrategy.Builder splitStrategyBuilder =
SamplingSplitStrategy.builder();
+ splitStrategyBuilder.setSplitKey(config.get(MongodbConfig.SPLIT_KEY));
+
splitStrategyBuilder.setSizePerSplit(config.get(MongodbConfig.SPLIT_SIZE));
+ config.getOptional(MongodbConfig.MATCH_QUERY)
+ .ifPresent(s ->
splitStrategyBuilder.setMatchQuery(BsonDocument.parse(s)));
+ config.getOptional(MongodbConfig.PROJECTION)
+ .ifPresent(s ->
splitStrategyBuilder.setProjection(BsonDocument.parse(s)));
+ return splitStrategyBuilder.setClientProvider(clientProvider).build();
+ }
+
+ private MongodbReadOptions createMongodbReadOptions(ReadonlyConfig config)
{
+ MongodbReadOptions.MongoReadOptionsBuilder mongoReadOptionsBuilder =
+ MongodbReadOptions.builder();
+
mongoReadOptionsBuilder.setMaxTimeMS(config.get(MongodbConfig.MAX_TIME_MIN));
+
mongoReadOptionsBuilder.setFetchSize(config.get(MongodbConfig.FETCH_SIZE));
+
mongoReadOptionsBuilder.setNoCursorTimeout(config.get(MongodbConfig.CURSOR_NO_TIMEOUT));
+ return mongoReadOptionsBuilder.build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceFactory.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceFactory.java
index 8d0ce28da2..1153d2e767 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceFactory.java
@@ -17,11 +17,16 @@
package org.apache.seatunnel.connectors.seatunnel.mongodb.source;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.MongoSplit;
@@ -63,4 +68,19 @@ public class MongodbSourceFactory implements
TableSourceFactory {
getSourceClass() {
return MongodbSource.class;
}
+
+ @Override
+ public TableSource<SeaTunnelRow, MongoSplit, ArrayList<MongoSplit>>
createSource(
+ TableSourceFactoryContext context) {
+ return () -> {
+ ReadonlyConfig options = context.getOptions();
+ CatalogTable table;
+ if (options.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
+ table = CatalogTableUtil.buildWithConfig(options);
+ } else {
+ table = CatalogTableUtil.buildSimpleTextTable();
+ }
+ return new MongodbSource(table, options);
+ };
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/split/SamplingSplitStrategy.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/split/SamplingSplitStrategy.java
index 44fc116934..6ce9596daa 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/split/SamplingSplitStrategy.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/split/SamplingSplitStrategy.java
@@ -193,19 +193,13 @@ public class SamplingSplitStrategy implements
MongoSplitStrategy, Serializable {
private static final BsonDocument EMPTY_PROJECTION = new
BsonDocument();
- private static final String DEFAULT_SPLIT_KEY = "_id";
-
private static final long DEFAULT_SAMPLES_PER_SPLIT = 10;
- private static final long DEFAULT_SIZE_PER_SPLIT = 64 * 1024 * 1024;
-
Builder() {
this.clientProvider = null;
- this.splitKey = DEFAULT_SPLIT_KEY;
this.matchQuery = EMPTY_MATCH_QUERY;
this.projection = EMPTY_PROJECTION;
this.samplesPerSplit = DEFAULT_SAMPLES_PER_SPLIT;
- this.sizePerSplit = DEFAULT_SIZE_PER_SPLIT;
}
public Builder setClientProvider(MongodbClientProvider clientProvider)
{