[
https://issues.apache.org/jira/browse/BEAM-6241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732323#comment-16732323
]
Ahmed El.Hussaini commented on BEAM-6241:
-----------------------------------------
[~kenn] Thanks for taking a look here.
[~iemejia] I've cleaned the MongoDB query method greatly. It looks like this
now:
{code:java}
@Override
public boolean start() {
Read spec = source.spec;
// MongoDB Connection preparation
client = createClient(spec);
MongoDatabase mongoDatabase = client.getDatabase(spec.database());
MongoCollection<Document> mongoCollection =
mongoDatabase.getCollection(spec.collection());
cursor =
QueryBuilder.create(mongoCollection)
.withDocumentId(spec.documentId())
.withDocumentIdStr(spec.documentIdStr())
.withFilter(spec.filter())
.withLimit(spec.limit())
.withPipeline(spec.pipeline())
.withProjection(spec.projection())
.cursor();
return advance();
}
{code}
Now all MongoDB queries are performed using MongoDB's aggregates.
cc [~jbonofre]
> MongoDbIO - Add Limit and Aggregates Support
> --------------------------------------------
>
> Key: BEAM-6241
> URL: https://issues.apache.org/jira/browse/BEAM-6241
> Project: Beam
> Issue Type: Improvement
> Components: io-java-mongodb
> Affects Versions: 2.9.0
> Reporter: Ahmed El.Hussaini
> Assignee: Ahmed El.Hussaini
> Priority: Major
> Labels: easyfix
> Fix For: 2.10.0
>
>
> h2. Adds Support to Limit Results
>
> {code:java}
> MongoDbIO.read()
> .withUri("mongodb://localhost:" + port)
> .withDatabase(DATABASE)
> .withCollection(COLLECTION)
> .withFilter("{\"scientist\":\"Einstein\"}")
> .withLimit(5));{code}
> h2. Adds Support to Use Aggregates
>
> {code:java}
> List<BsonDocument> aggregates = new ArrayList<BsonDocument>();
> aggregates.add(
> new BsonDocument(
> "$match",
> new BsonDocument("country", new BsonDocument("$eq", new
> BsonString("England")))));
> PCollection<Document> output =
> pipeline.apply(
> MongoDbIO.read()
> .withUri("mongodb://localhost:" + port)
> .withDatabase(DATABASE)
> .withCollection(COLLECTION)
> .withAggregate(aggregates));
> {code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)