This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 59cccb6097 [Improve][MongoDB] Implement TableSourceFactory to create 
mongodb source (#5813)
59cccb6097 is described below

commit 59cccb6097750556438c00687867938e371698fc
Author: hailin0 <[email protected]>
AuthorDate: Wed Nov 8 19:23:29 2023 +0800

    [Improve][MongoDB] Implement TableSourceFactory to create mongodb source 
(#5813)
---
 .../api/table/catalog/CatalogTableUtil.java        |   4 +
 .../seatunnel/mongodb/source/MongodbSource.java    | 163 ++++++++-------------
 .../mongodb/source/MongodbSourceFactory.java       |  20 +++
 .../source/split/SamplingSplitStrategy.java        |   6 -
 4 files changed, 83 insertions(+), 110 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index 0439754914..99c376d330 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -228,4 +228,8 @@ public class CatalogTableUtil implements Serializable {
     public static SeaTunnelRowType buildSimpleTextSchema() {
         return SIMPLE_SCHEMA;
     }
+
+    public static CatalogTable buildSimpleTextTable() {
+        return getCatalogTable("default", buildSimpleTextSchema());
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
index d9be1457f4..1a3c0f8f03 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
@@ -17,23 +17,18 @@
 
 package org.apache.seatunnel.connectors.seatunnel.mongodb.source;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.source.SupportColumnProjection;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbCollectionProvider;
-import 
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentDeserializer;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentRowDataDeserializer;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.source.config.MongodbReadOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.source.enumerator.MongodbSplitEnumerator;
@@ -44,142 +39,102 @@ import 
org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.SamplingSp
 
 import org.bson.BsonDocument;
 
-import com.google.auto.service.AutoService;
-
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
 
-@AutoService(SeaTunnelSource.class)
 public class MongodbSource
         implements SeaTunnelSource<SeaTunnelRow, MongoSplit, 
ArrayList<MongoSplit>>,
                 SupportColumnProjection {
 
     private static final long serialVersionUID = 1L;
 
-    private MongodbClientProvider clientProvider;
-
-    private DocumentDeserializer<SeaTunnelRow> deserializer;
-
-    private MongoSplitStrategy splitStrategy;
+    private final CatalogTable catalogTable;
+    private final ReadonlyConfig options;
 
-    private SeaTunnelRowType rowType;
-
-    private MongodbReadOptions mongodbReadOptions;
+    public MongodbSource(CatalogTable catalogTable, ReadonlyConfig options) {
+        this.catalogTable = catalogTable;
+        this.options = options;
+    }
 
     @Override
     public String getPluginName() {
         return CONNECTOR_IDENTITY;
     }
 
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        if (pluginConfig.hasPath(MongodbConfig.URI.key())
-                && pluginConfig.hasPath(MongodbConfig.DATABASE.key())
-                && pluginConfig.hasPath(MongodbConfig.COLLECTION.key())) {
-            String connection = 
pluginConfig.getString(MongodbConfig.URI.key());
-            String database = 
pluginConfig.getString(MongodbConfig.DATABASE.key());
-            String collection = 
pluginConfig.getString(MongodbConfig.COLLECTION.key());
-            clientProvider =
-                    MongodbCollectionProvider.builder()
-                            .connectionString(connection)
-                            .database(database)
-                            .collection(collection)
-                            .build();
-        }
-        if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) {
-            this.rowType = 
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
-        } else {
-            this.rowType = CatalogTableUtil.buildSimpleTextSchema();
-        }
-
-        if (pluginConfig.hasPath(MongodbConfig.FLAT_SYNC_STRING.key())) {
-            deserializer =
-                    new DocumentRowDataDeserializer(
-                            rowType.getFieldNames(),
-                            rowType,
-                            
pluginConfig.getBoolean(MongodbConfig.FLAT_SYNC_STRING.key()));
-        } else {
-            deserializer =
-                    new DocumentRowDataDeserializer(
-                            rowType.getFieldNames(),
-                            rowType,
-                            MongodbConfig.FLAT_SYNC_STRING.defaultValue());
-        }
-
-        SamplingSplitStrategy.Builder splitStrategyBuilder = 
SamplingSplitStrategy.builder();
-        if (pluginConfig.hasPath(MongodbConfig.MATCH_QUERY.key())) {
-            splitStrategyBuilder.setMatchQuery(
-                    
BsonDocument.parse(pluginConfig.getString(MongodbConfig.MATCH_QUERY.key())));
-        }
-
-        List<String> fallbackKeys = 
MongodbConfig.MATCH_QUERY.getFallbackKeys();
-        fallbackKeys.forEach(
-                key -> {
-                    if (pluginConfig.hasPath(key)) {
-                        splitStrategyBuilder.setMatchQuery(
-                                BsonDocument.parse(
-                                        
pluginConfig.getString(MongodbConfig.MATCH_QUERY.key())));
-                    }
-                });
-
-        if (pluginConfig.hasPath(MongodbConfig.SPLIT_KEY.key())) {
-            
splitStrategyBuilder.setSplitKey(pluginConfig.getString(MongodbConfig.SPLIT_KEY.key()));
-        }
-        if (pluginConfig.hasPath(MongodbConfig.SPLIT_SIZE.key())) {
-            splitStrategyBuilder.setSizePerSplit(
-                    pluginConfig.getLong(MongodbConfig.SPLIT_SIZE.key()));
-        }
-        if (pluginConfig.hasPath(MongodbConfig.PROJECTION.key())) {
-            splitStrategyBuilder.setProjection(
-                    
BsonDocument.parse(pluginConfig.getString(MongodbConfig.PROJECTION.key())));
-        }
-        splitStrategy = 
splitStrategyBuilder.setClientProvider(clientProvider).build();
-
-        MongodbReadOptions.MongoReadOptionsBuilder mongoReadOptionsBuilder =
-                MongodbReadOptions.builder();
-        if (pluginConfig.hasPath(MongodbConfig.MAX_TIME_MIN.key())) {
-            mongoReadOptionsBuilder.setMaxTimeMS(
-                    pluginConfig.getLong(MongodbConfig.MAX_TIME_MIN.key()));
-        }
-        if (pluginConfig.hasPath(MongodbConfig.FETCH_SIZE.key())) {
-            mongoReadOptionsBuilder.setFetchSize(
-                    pluginConfig.getInt(MongodbConfig.FETCH_SIZE.key()));
-        }
-        if (pluginConfig.hasPath(MongodbConfig.CURSOR_NO_TIMEOUT.key())) {
-            mongoReadOptionsBuilder.setNoCursorTimeout(
-                    
pluginConfig.getBoolean(MongodbConfig.CURSOR_NO_TIMEOUT.key()));
-        }
-        mongodbReadOptions = mongoReadOptionsBuilder.build();
-    }
-
     @Override
     public Boundedness getBoundedness() {
         return Boundedness.BOUNDED;
     }
 
     @Override
-    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
-        return rowType;
+    public List<CatalogTable> getProducedCatalogTables() {
+        return Collections.singletonList(catalogTable);
     }
 
     @Override
     public SourceReader<SeaTunnelRow, MongoSplit> 
createReader(SourceReader.Context readerContext) {
-        return new MongodbReader(readerContext, clientProvider, deserializer, 
mongodbReadOptions);
+        return new MongodbReader(
+                readerContext,
+                crateClientProvider(options),
+                createDeserializer(options, 
catalogTable.getSeaTunnelRowType()),
+                createMongodbReadOptions(options));
     }
 
     @Override
     public SourceSplitEnumerator<MongoSplit, ArrayList<MongoSplit>> 
createEnumerator(
             SourceSplitEnumerator.Context<MongoSplit> enumeratorContext) {
-        return new MongodbSplitEnumerator(enumeratorContext, clientProvider, 
splitStrategy);
+        MongodbClientProvider clientProvider = crateClientProvider(options);
+        return new MongodbSplitEnumerator(
+                enumeratorContext, clientProvider, 
createSplitStrategy(options, clientProvider));
     }
 
     @Override
     public SourceSplitEnumerator<MongoSplit, ArrayList<MongoSplit>> 
restoreEnumerator(
             SourceSplitEnumerator.Context<MongoSplit> enumeratorContext,
             ArrayList<MongoSplit> checkpointState) {
+        MongodbClientProvider clientProvider = crateClientProvider(options);
         return new MongodbSplitEnumerator(
-                enumeratorContext, clientProvider, splitStrategy, 
checkpointState);
+                enumeratorContext,
+                clientProvider,
+                createSplitStrategy(options, clientProvider),
+                checkpointState);
+    }
+
+    private MongodbClientProvider crateClientProvider(ReadonlyConfig config) {
+        return MongodbCollectionProvider.builder()
+                .connectionString(config.get(MongodbConfig.URI))
+                .database(config.get(MongodbConfig.DATABASE))
+                .collection(config.get(MongodbConfig.COLLECTION))
+                .build();
+    }
+
+    private DocumentRowDataDeserializer createDeserializer(
+            ReadonlyConfig config, SeaTunnelRowType rowType) {
+        return new DocumentRowDataDeserializer(
+                rowType.getFieldNames(), rowType, 
config.get(MongodbConfig.FLAT_SYNC_STRING));
+    }
+
+    private MongoSplitStrategy createSplitStrategy(
+            ReadonlyConfig config, MongodbClientProvider clientProvider) {
+        SamplingSplitStrategy.Builder splitStrategyBuilder = 
SamplingSplitStrategy.builder();
+        splitStrategyBuilder.setSplitKey(config.get(MongodbConfig.SPLIT_KEY));
+        
splitStrategyBuilder.setSizePerSplit(config.get(MongodbConfig.SPLIT_SIZE));
+        config.getOptional(MongodbConfig.MATCH_QUERY)
+                .ifPresent(s -> 
splitStrategyBuilder.setMatchQuery(BsonDocument.parse(s)));
+        config.getOptional(MongodbConfig.PROJECTION)
+                .ifPresent(s -> 
splitStrategyBuilder.setProjection(BsonDocument.parse(s)));
+        return splitStrategyBuilder.setClientProvider(clientProvider).build();
+    }
+
+    private MongodbReadOptions createMongodbReadOptions(ReadonlyConfig config) 
{
+        MongodbReadOptions.MongoReadOptionsBuilder mongoReadOptionsBuilder =
+                MongodbReadOptions.builder();
+        
mongoReadOptionsBuilder.setMaxTimeMS(config.get(MongodbConfig.MAX_TIME_MIN));
+        
mongoReadOptionsBuilder.setFetchSize(config.get(MongodbConfig.FETCH_SIZE));
+        
mongoReadOptionsBuilder.setNoCursorTimeout(config.get(MongodbConfig.CURSOR_NO_TIMEOUT));
+        return mongoReadOptionsBuilder.build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceFactory.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceFactory.java
index 8d0ce28da2..1153d2e767 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceFactory.java
@@ -17,11 +17,16 @@
 
 package org.apache.seatunnel.connectors.seatunnel.mongodb.source;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.MongoSplit;
@@ -63,4 +68,19 @@ public class MongodbSourceFactory implements 
TableSourceFactory {
             getSourceClass() {
         return MongodbSource.class;
     }
+
+    @Override
+    public TableSource<SeaTunnelRow, MongoSplit, ArrayList<MongoSplit>> 
createSource(
+            TableSourceFactoryContext context) {
+        return () -> {
+            ReadonlyConfig options = context.getOptions();
+            CatalogTable table;
+            if (options.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
+                table = CatalogTableUtil.buildWithConfig(options);
+            } else {
+                table = CatalogTableUtil.buildSimpleTextTable();
+            }
+            return new MongodbSource(table, options);
+        };
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/split/SamplingSplitStrategy.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/split/SamplingSplitStrategy.java
index 44fc116934..6ce9596daa 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/split/SamplingSplitStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/split/SamplingSplitStrategy.java
@@ -193,19 +193,13 @@ public class SamplingSplitStrategy implements 
MongoSplitStrategy, Serializable {
 
         private static final BsonDocument EMPTY_PROJECTION = new 
BsonDocument();
 
-        private static final String DEFAULT_SPLIT_KEY = "_id";
-
         private static final long DEFAULT_SAMPLES_PER_SPLIT = 10;
 
-        private static final long DEFAULT_SIZE_PER_SPLIT = 64 * 1024 * 1024;
-
         Builder() {
             this.clientProvider = null;
-            this.splitKey = DEFAULT_SPLIT_KEY;
             this.matchQuery = EMPTY_MATCH_QUERY;
             this.projection = EMPTY_PROJECTION;
             this.samplesPerSplit = DEFAULT_SAMPLES_PER_SPLIT;
-            this.sizePerSplit = DEFAULT_SIZE_PER_SPLIT;
         }
 
         public Builder setClientProvider(MongodbClientProvider clientProvider) 
{

Reply via email to