[
https://issues.apache.org/jira/browse/BEAM-5650?focusedWorklogId=162778&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162778
]
ASF GitHub Bot logged work on BEAM-5650:
----------------------------------------
Author: ASF GitHub Bot
Created on: 05/Nov/18 23:16
Start Date: 05/Nov/18 23:16
Worklog Time Spent: 10m
Work Description: jhalaria opened a new pull request #6952: [BEAM-5650]:
Modify BoundedToUnboundedSourceAdapter to configure its reader to read more
than 1 bounded source
URL: https://github.com/apache/beam/pull/6952
@iemejia - Please review. Thank-you.
- Noticed an issue while reading around 1000 files from S3.
- Starting getting connection timeouts as the max number of open connections
is set to 50.
- Inside
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L250
, we open all the readers simultaneously. Thats the intended behavior for an
unbounded source. But when it comes to reading a bounded source (eg. Doing
reads from S3), opening all connections at the same time leads to connections
not available when the number of files we are trying to open is greater than
the maxHttpConnections possible which by default is set to 50
(ClientConfiguration.`DEFAULT_MAX_CONNECTIONS`) and there isn't a way to
override it [We should have the ability to override this anyways. Will create a
separate PR for that.].
- The Change essentially gives the `BoundedToUnboundedSourceAdapter` an
`ArrayDeque` so that one reader can read more than 1 `BoundedSource`. Each
Reader finishes reading from a `BoundedSource` and then goes to the next one.
- In case of check-pointing, if a reader has elements that aren't read yet
from a `BoundedSource`, we create a checkpoint with the remaining elements of
the `BoundedSource` and the elements remaining in the `ArrayDequeue`.
Post-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
--- | --- | --- | --- | --- | --- | --- | ---
Go | [](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
| --- | --- | --- | --- | --- | ---
Java | [](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
Python | [](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
| --- | [](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
</br> [](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/)
| --- | --- | ---
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 162778)
Time Spent: 10m
Remaining Estimate: 0h
> Timeout exceptions while reading a lot of files from a bounded source like S3
> when using TextIO
> -----------------------------------------------------------------------------------------------
>
> Key: BEAM-5650
> URL: https://issues.apache.org/jira/browse/BEAM-5650
> Project: Beam
> Issue Type: Bug
> Components: io-java-aws
> Reporter: Ankit Jhalaria
> Assignee: Ankit Jhalaria
> Priority: Major
> Time Spent: 10m
> Remaining Estimate: 0h
>
> * Using TextIO, I was trying to read around 850 files.
> * Getting this exception while using FlinkRunner
>
> {code:java}
> //Caused by: org.apache.flink.runtime.client.JobExecutionException:
> java.io.IOException: com.amazonaws.SdkClientException: Unable to execute HTTP
> request: Timeout waiting for connection from pool at
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:625)
> at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:175)
> at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:115) ...
> 28 more Caused by: java.io.IOException: com.amazonaws.SdkClientException:
> Unable to execute HTTP request: Timeout waiting for connection from pool at
> org.apache.beam.sdk.io.aws.s3.S3ReadableSeekableByteChannel.read(S3ReadableSeekableByteChannel.java:91)
> at
> org.apache.beam.sdk.io.CompressedSource$CompressedReader$CountingChannel.read(CompressedSource.java:382)
> at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:65) at
> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:109) at
> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) at
> java.io.FilterInputStream.read(FilterInputStream.java:133) at
> java.io.PushbackInputStream.read(PushbackInputStream.java:186) at
> org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.io.ByteStreams.read(ByteStreams.java:859)
> at
> org.apache.beam.sdk.io.Compression$3.readDecompressed(Compression.java:81) at
> org.apache.beam.sdk.io.CompressedSource$CompressionMode.createDecompressingChannel(CompressedSource.java:110)
> at
> org.apache.beam.sdk.io.CompressedSource$CompressedReader.startReading(CompressedSource.java:417)
> at
> org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:476)
> at
> org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249)
> at
> org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.advance(UnboundedReadFromBoundedSource.java:456)
> at
> org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.access$300(UnboundedReadFromBoundedSource.java:434)
> at
> org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.advance(UnboundedReadFromBoundedSource.java:286)
> at
> org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.start(UnboundedReadFromBoundedSource.java:279)
> at
> org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeStart(ReaderInvocationUtil.java:51)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:250)
> at
> org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$UnboundedSourceWrapperNoValueWithRecordId.run(FlinkStreamingTransformTranslators.java:1299)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
> at
> org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at
> java.lang.Thread.run(Thread.java:748) Caused by:
> com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout
> waiting for connection from pool at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1116)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1066)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4368) at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4315) at
> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1416)
> at
> org.apache.beam.sdk.io.aws.s3.S3ReadableSeekableByteChannel.read(S3ReadableSeekableByteChannel.java:89)
> ... 26 more Caused by: org.apache.http.conn.ConnectionPoolTimeoutException:
> Timeout waiting for connection from pool at
> org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:313)
> at
> org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:279)
> at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source) at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498) at
> com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
> at com.amazonaws.http.conn.$Proxy65.get(Unknown Source) at
> org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:191)
> at
> org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185) at
> org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
> at
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
> at
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
> at
> com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1238)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058)
> ... 36 more{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)