This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 830196d8b7 [Improve][Connector-v2][Mongodb]Optimize reading logic
(#5001)
830196d8b7 is described below
commit 830196d8b74a74fa6b97c28e1a5f9a0e058c2158
Author: monster <[email protected]>
AuthorDate: Mon Jul 3 14:46:35 2023 +0800
[Improve][Connector-v2][Mongodb]Optimize reading logic (#5001)
Co-authored-by: chenqqq11 <[email protected]>
---
.../mongodb/source/reader/MongodbReader.java | 42 +++++++++++++---------
1 file changed, 26 insertions(+), 16 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/reader/MongodbReader.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/reader/MongodbReader.java
index 48ff273eaa..e55fa5c876 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/reader/MongodbReader.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/reader/MongodbReader.java
@@ -29,7 +29,6 @@ import
org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.MongoSplit
import org.bson.BsonDocument;
-import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCursor;
import lombok.extern.slf4j.Slf4j;
@@ -38,6 +37,8 @@ import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
/** MongoReader reads MongoDB by splits (queries). */
@Slf4j
@@ -70,7 +71,7 @@ public class MongodbReader implements
SourceReader<SeaTunnelRow, MongoSplit> {
}
@Override
- public void open() throws Exception {
+ public void open() {
if (cursor != null) {
cursor.close();
}
@@ -87,26 +88,18 @@ public class MongodbReader implements
SourceReader<SeaTunnelRow, MongoSplit> {
public void pollNext(Collector<SeaTunnelRow> output) {
synchronized (output.getCheckpointLock()) {
MongoSplit currentSplit = pendingSplits.poll();
- if (null != currentSplit) {
+ if (currentSplit != null) {
if (cursor != null) {
// current split is in-progress
return;
}
log.info("Prepared to read split {}", currentSplit.splitId());
- FindIterable<BsonDocument> rs =
- clientProvider
- .getDefaultCollection()
- .find(currentSplit.getQuery())
- .projection(currentSplit.getProjection())
- .batchSize(readOptions.getFetchSize())
-
.noCursorTimeout(readOptions.isNoCursorTimeout())
- .maxTime(readOptions.getMaxTimeMS(),
TimeUnit.MINUTES);
- cursor = rs.iterator();
- while (cursor.hasNext()) {
- SeaTunnelRow deserialize =
deserializer.deserialize(cursor.next());
- output.collect(deserialize);
+ try {
+ getCursor(currentSplit);
+
cursorToStream().map(deserializer::deserialize).forEach(output::collect);
+ } finally {
+ closeCurrentSplit();
}
- closeCurrentSplit();
}
if (noMoreSplit && pendingSplits.isEmpty()) {
// signal to the source that we have reached the end of the
data.
@@ -116,6 +109,23 @@ public class MongodbReader implements
SourceReader<SeaTunnelRow, MongoSplit> {
}
}
+ private void getCursor(MongoSplit split) {
+ cursor =
+ clientProvider
+ .getDefaultCollection()
+ .find(split.getQuery())
+ .projection(split.getProjection())
+ .batchSize(readOptions.getFetchSize())
+ .noCursorTimeout(readOptions.isNoCursorTimeout())
+ .maxTime(readOptions.getMaxTimeMS(), TimeUnit.MINUTES)
+ .iterator();
+ }
+
+ private Stream<BsonDocument> cursorToStream() {
+ Iterable<BsonDocument> iterable = () -> cursor;
+ return StreamSupport.stream(iterable.spliterator(), false);
+ }
+
@Override
public List<MongoSplit> snapshotState(long checkpointId) {
return new ArrayList<>(pendingSplits);