[ 
https://issues.apache.org/jira/browse/BEAM-975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16008116#comment-16008116
 ] 

Jean-Baptiste Onofré commented on BEAM-975:
-------------------------------------------

I think the problem happens for long running pipeline as the keep alive is 
false by default.

I created the following pull request to set keep alive enabled by default and 
extend the max connection idle time. The pull request also includes methods to 
change the keep alive and the max connection idle time, improve the javadoc, 
and use the same style as the other IOs.

> Issue with MongoDBIO
> --------------------
>
>                 Key: BEAM-975
>                 URL: https://issues.apache.org/jira/browse/BEAM-975
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-extensions
>            Reporter: Reza Nouri
>            Assignee: Jean-Baptiste Onofré
>
> It appears that there is an issue with MongoDBIO. I am using Apache Beam in a 
> REST service that reads data from Mongo. After a number of requests, mongoIO 
> throws the following exception:
> com.mongodb.MongoSocketReadException: Prematurely reached end of stream
>       at com.mongodb.connection.SocketStream.read(SocketStream.java:88)
>       at 
> com.mongodb.connection.InternalStreamConnection.receiveResponseBuffers(InternalStreamConnection.java:491)
>       at 
> com.mongodb.connection.InternalStreamConnection.receiveMessage(InternalStreamConnection.java:221)
>       at 
> com.mongodb.connection.CommandHelper.receiveReply(CommandHelper.java:134)
>       at 
> com.mongodb.connection.CommandHelper.receiveCommandResult(CommandHelper.java:121)
>       at 
> com.mongodb.connection.CommandHelper.executeCommand(CommandHelper.java:32)
>       at 
> com.mongodb.connection.InternalStreamConnectionInitializer.initializeConnectionDescription(InternalStreamConnectionInitializer.java:83)
>       at 
> com.mongodb.connection.InternalStreamConnectionInitializer.initialize(InternalStreamConnectionInitializer.java:43)
>       at 
> com.mongodb.connection.InternalStreamConnection.open(InternalStreamConnection.java:115)
>       at 
> com.mongodb.connection.UsageTrackingInternalConnection.open(UsageTrackingInternalConnection.java:46)
>       at 
> com.mongodb.connection.DefaultConnectionPool$PooledConnection.open(DefaultConnectionPool.java:381)
>       at 
> com.mongodb.connection.DefaultConnectionPool.get(DefaultConnectionPool.java:96)
>       at 
> com.mongodb.connection.DefaultConnectionPool.get(DefaultConnectionPool.java:82)
>       at 
> com.mongodb.connection.DefaultServer.getConnection(DefaultServer.java:72)
>       at 
> com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.getConnection(ClusterBinding.java:86)
>       at 
> com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:237)
>       at 
> com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:212)
>       at com.mongodb.operation.FindOperation.execute(FindOperation.java:482)
>       at com.mongodb.operation.FindOperation.execute(FindOperation.java:79)
>       at com.mongodb.Mongo.execute(Mongo.java:772)
>       at com.mongodb.Mongo$2.execute(Mongo.java:759)
>       at com.mongodb.OperationIterable.iterator(OperationIterable.java:47)
>       at com.mongodb.FindIterableImpl.iterator(FindIterableImpl.java:143)
>       at 
> org.apache.beam.sdk.io.mongodb.MongoDbIO$BoundedMongoDbReader.start(MongoDbIO.java:359)
>       at 
> org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$BoundedReadEvaluator.processElement(BoundedReadEvaluatorFactory.java:99)
>       at 
> org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:154)
>       at 
> org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:121)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> I suppose there must be a problem with Mongo connection which causes this 
> issue.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to