[
https://issues.apache.org/jira/browse/BEAM-7256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16842167#comment-16842167
]
Ahmed El.Hussaini commented on BEAM-7256:
-----------------------------------------
[~jcornejo] since you're using "withBucketAuto(true)" internally the IO will
use aggregation which will, in turn, result in an exception if the $sort stage
exceeds 100MB.
I see that you're trying to tail the oplog here which will be a huge collection
in production so I suggest you use filters to only include the oplog for a
specific collection.
One thing that comes to mind now that would be helpful to debug such cases in
the future is adding to the IO the ability to explain the query or aggregate.
Example
{code:java}
p.apply("ReadFromMongoDB", MongoDbIO.read()
.withUri(options.getMongoDBUri())
.withDatabase("local")
.withCollection("oplog.rs")
.withBucketAuto(true)
.withExplain(true);{code}
This will then log the explain output to the standard output if running locally
(i.e. options.getMongoDBUri() returns localhost )
[~jcornejo] On a side note, I'm not sure using beam to tail the oplog would be
the right choice. You need the ability to use the cursor option
CursorType.TailableAwait which is currently not supported by MongoDBIO. You
might also need to replay the log, which again is not supported.
What I suggest you do here is the following:
* Tail the log using vanilla Java code.
* Publish batch messages of oplogs to Kafka or preferably Pubsub
* Consume the messages using a streaming pipeline running on Dataflow or Flink.
Note: If you're not using Atlas, then you can fix this issue by simply omitting
withBucketAuto(true).
> Add support for allowDiskUse (AggregationOptions) in MongoDbIO
> ---------------------------------------------------------------
>
> Key: BEAM-7256
> URL: https://issues.apache.org/jira/browse/BEAM-7256
> Project: Beam
> Issue Type: Improvement
> Components: io-java-mongodb
> Affects Versions: 2.12.0
> Reporter: Javier Cornejo
> Priority: Major
> Attachments: Screen Shot 2019-05-09 at 12.30.51.png
>
>
> When a read is executed over a collection that exceed the memory limit of
> 104857600 an exception occurs. This is declared by mongodb and is possible to
> control the error passing a AggregationOptions allowDiskUse true so mongo can
> sort with disk usage.
> This should be happen only when aggregations are added to read but now is
> happening even without aggregation at all.
> Please let me know how can help with this improvement / bug.
>
> !Screen Shot 2019-05-09 at 12.30.51.png!
> {code:java}
> PCollection<KV<String, Document>> updateColls = p.apply("Reading Ops
> Collection: " + key, MongoDbIO .read() .withUri(options.getMongoDBUri())
> .withDatabase("local") .withCollection("oplog.rs") .withBucketAuto(true) //
> .withQueryFn( // FindQuery.create().withFilters( // Filters.and( //
> Filters.gt("ts", ts.format(dtf)), // Filters.eq("ns",
> options.getMongoDBDBName() + "" + key), // Filters.eq("op", "u") // ) // ) //
> // AggregationQuery.create().withMongoDbPipeline(updatedDocsOplogAggregation)
> // ) )
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)