[ 
https://issues.apache.org/jira/browse/BEAM-6241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732323#comment-16732323
 ] 

Ahmed El.Hussaini commented on BEAM-6241:
-----------------------------------------

[~kenn] Thanks for taking a look here.

[~iemejia] I've cleaned the MongoDB query method greatly. It looks like this 
now:
{code:java}
@Override
    public boolean start() {
      Read spec = source.spec;

      // MongoDB Connection preparation
      client = createClient(spec);
      MongoDatabase mongoDatabase = client.getDatabase(spec.database());
      MongoCollection<Document> mongoCollection = 
mongoDatabase.getCollection(spec.collection());

      cursor =
          QueryBuilder.create(mongoCollection)
              .withDocumentId(spec.documentId())
              .withDocumentIdStr(spec.documentIdStr())
              .withFilter(spec.filter())
              .withLimit(spec.limit())
              .withPipeline(spec.pipeline())
              .withProjection(spec.projection())
              .cursor();

      return advance();
    }
{code}
 Now all MongoDB queries are performed using MongoDB's aggregates.

 

cc [~jbonofre]

> MongoDbIO - Add Limit and Aggregates Support
> --------------------------------------------
>
>                 Key: BEAM-6241
>                 URL: https://issues.apache.org/jira/browse/BEAM-6241
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-mongodb
>    Affects Versions: 2.9.0
>            Reporter: Ahmed El.Hussaini
>            Assignee: Ahmed El.Hussaini
>            Priority: Major
>              Labels: easyfix
>             Fix For: 2.10.0
>
>
> h2. Adds Support to Limit Results
>  
> {code:java}
> MongoDbIO.read()
> .withUri("mongodb://localhost:" + port)
> .withDatabase(DATABASE)
> .withCollection(COLLECTION)
> .withFilter("{\"scientist\":\"Einstein\"}")
> .withLimit(5));{code}
> h2. Adds Support to Use Aggregates
>  
> {code:java}
> List<BsonDocument> aggregates = new ArrayList<BsonDocument>();
>   aggregates.add(
>     new BsonDocument(
>       "$match",
>       new BsonDocument("country", new BsonDocument("$eq", new 
> BsonString("England")))));
> PCollection<Document> output =
>   pipeline.apply(
>     MongoDbIO.read()
>       .withUri("mongodb://localhost:" + port)
>       .withDatabase(DATABASE)
>       .withCollection(COLLECTION)
>       .withAggregate(aggregates));
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to