This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 830196d8b7 [Improve][Connector-v2][Mongodb]Optimize reading logic 
(#5001)
830196d8b7 is described below

commit 830196d8b74a74fa6b97c28e1a5f9a0e058c2158
Author: monster <[email protected]>
AuthorDate: Mon Jul 3 14:46:35 2023 +0800

    [Improve][Connector-v2][Mongodb]Optimize reading logic (#5001)
    
    Co-authored-by: chenqqq11 <[email protected]>
---
 .../mongodb/source/reader/MongodbReader.java       | 42 +++++++++++++---------
 1 file changed, 26 insertions(+), 16 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/reader/MongodbReader.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/reader/MongodbReader.java
index 48ff273eaa..e55fa5c876 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/reader/MongodbReader.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/reader/MongodbReader.java
@@ -29,7 +29,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.MongoSplit
 
 import org.bson.BsonDocument;
 
-import com.mongodb.client.FindIterable;
 import com.mongodb.client.MongoCursor;
 import lombok.extern.slf4j.Slf4j;
 
@@ -38,6 +37,8 @@ import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 /** MongoReader reads MongoDB by splits (queries). */
 @Slf4j
@@ -70,7 +71,7 @@ public class MongodbReader implements 
SourceReader<SeaTunnelRow, MongoSplit> {
     }
 
     @Override
-    public void open() throws Exception {
+    public void open() {
         if (cursor != null) {
             cursor.close();
         }
@@ -87,26 +88,18 @@ public class MongodbReader implements 
SourceReader<SeaTunnelRow, MongoSplit> {
     public void pollNext(Collector<SeaTunnelRow> output) {
         synchronized (output.getCheckpointLock()) {
             MongoSplit currentSplit = pendingSplits.poll();
-            if (null != currentSplit) {
+            if (currentSplit != null) {
                 if (cursor != null) {
                     // current split is in-progress
                     return;
                 }
                 log.info("Prepared to read split {}", currentSplit.splitId());
-                FindIterable<BsonDocument> rs =
-                        clientProvider
-                                .getDefaultCollection()
-                                .find(currentSplit.getQuery())
-                                .projection(currentSplit.getProjection())
-                                .batchSize(readOptions.getFetchSize())
-                                
.noCursorTimeout(readOptions.isNoCursorTimeout())
-                                .maxTime(readOptions.getMaxTimeMS(), 
TimeUnit.MINUTES);
-                cursor = rs.iterator();
-                while (cursor.hasNext()) {
-                    SeaTunnelRow deserialize = 
deserializer.deserialize(cursor.next());
-                    output.collect(deserialize);
+                try {
+                    getCursor(currentSplit);
+                    
cursorToStream().map(deserializer::deserialize).forEach(output::collect);
+                } finally {
+                    closeCurrentSplit();
                 }
-                closeCurrentSplit();
             }
             if (noMoreSplit && pendingSplits.isEmpty()) {
                 // signal to the source that we have reached the end of the 
data.
@@ -116,6 +109,23 @@ public class MongodbReader implements 
SourceReader<SeaTunnelRow, MongoSplit> {
         }
     }
 
+    private void getCursor(MongoSplit split) {
+        cursor =
+                clientProvider
+                        .getDefaultCollection()
+                        .find(split.getQuery())
+                        .projection(split.getProjection())
+                        .batchSize(readOptions.getFetchSize())
+                        .noCursorTimeout(readOptions.isNoCursorTimeout())
+                        .maxTime(readOptions.getMaxTimeMS(), TimeUnit.MINUTES)
+                        .iterator();
+    }
+
+    private Stream<BsonDocument> cursorToStream() {
+        Iterable<BsonDocument> iterable = () -> cursor;
+        return StreamSupport.stream(iterable.spliterator(), false);
+    }
+
     @Override
     public List<MongoSplit> snapshotState(long checkpointId) {
         return new ArrayList<>(pendingSplits);

Reply via email to