[ 
https://issues.apache.org/jira/browse/BEAM-3778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386874#comment-16386874
 ] 

Eugene Kirpichov commented on BEAM-3778:
----------------------------------------

Meanwhile, BigQueryIO per se can circumvent this issue by reshuffling the input 
that goes into ReifyAsIterable.

> Very poor performance of side inputs when input is finely sharded
> -----------------------------------------------------------------
>
>                 Key: BEAM-3778
>                 URL: https://issues.apache.org/jira/browse/BEAM-3778
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow
>            Reporter: Eugene Kirpichov
>            Assignee: Luke Cwik
>            Priority: Major
>
> This thread:
> https://lists.apache.org/thread.html/324a4f86e567e3e1692466e70f44a08276123b467bacb2ecbf00515f@%3Cuser.beam.apache.org%3E
> The user has a job that reads a few hundred thousand files and then writes 
> them to BigQuery. This generates 1 temp file per input file. Then we gather 
> the temp files into a View.asList() side input - and this side input ends up 
> containing a few hundred thousand tiny ISM files, with 1 element per file, 
> which performs horribly (taking hours to read the side input).
> I think we need to reshuffle things onto a reasonable number of shards before 
> writing them to ISM.
> A side issue: this 
> https://github.com/apache/beam/blob/v2.2.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ReifyAsIterable.java#L46
>  triggers also the coder size estimation logic, which falsely thinks that 
> size estimation in this case is cheap, and does double the work, as evidenced 
> by the following stack trace:
> Processing lull for PT30900.015S in state process of 
> WriteTOBigQuery/BatchLoads/ReifyResults/ParDo(Anonymous)
>  java.net.SocketInputStream.socketRead0(Native Method)
>  java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
>  java.net.SocketInputStream.read(SocketInputStream.java:170)
>  java.net.SocketInputStream.read(SocketInputStream.java:141)
>  sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
>  sun.security.ssl.InputRecord.read(InputRecord.java:503)
>  sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983)
>  sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:940)
>  sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
>  java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
>  java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
>  java.io.BufferedInputStream.read(BufferedInputStream.java:345)
>  sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:704)
>  sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647)
>  
> sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1536)
>  
> sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441)
>  java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
>  
> sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)
>  
> com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37)
>  
> com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
>  com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981)
>  
> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
>  
> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
>  
> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeMedia(AbstractGoogleClientRequest.java:380)
>  
> com.google.api.services.storage.Storage$Objects$Get.executeMedia(Storage.java:4784)
>  
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.openStreamAndSetMetadata(GoogleCloudStorageReadChannel.java:656)
>  
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.performLazySeek(GoogleCloudStorageReadChannel.java:560)
>  
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:289)
>  sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:65)
>  sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:109)
>  sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
>  java.io.InputStream.read(InputStream.java:101)
>  sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:81)
>  org.apache.beam.sdk.util.VarInt.decodeLong(VarInt.java:79)
>  org.apache.beam.sdk.util.VarInt.decodeInt(VarInt.java:63)
>  
> org.apache.beam.runners.dataflow.internal.IsmFormat$KeyPrefixCoder.decode(IsmFormat.java:694)
>  com.google.cloud.dataflow.worker.IsmReader.readKey(IsmReader.java:999)
>  com.google.cloud.dataflow.worker.IsmReader.access$2000(IsmReader.java:79)
>  
> com.google.cloud.dataflow.worker.IsmReader$WithinShardIsmReaderIterator.advance(IsmReader.java:952)
>  
> com.google.cloud.dataflow.worker.IsmReader$WithinShardIsmReaderIterator.start(IsmReader.java:942)
>  
> com.google.cloud.dataflow.worker.IsmReader$IsmCacheLoader.call(IsmReader.java:580)
>  
> com.google.cloud.dataflow.worker.IsmReader$IsmCacheLoader.call(IsmReader.java:569)
>  
> com.google.cloud.dataflow.worker.IsmReader$IsmCacheLoader.call(IsmReader.java:554)
>  
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4904)
>  
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
>  
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
>  
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
>  
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
>  
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
>  
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899)
>  com.google.cloud.dataflow.worker.IsmReader.fetch(IsmReader.java:605)
>  com.google.cloud.dataflow.worker.IsmReader.getBlock(IsmReader.java:770)
>  com.google.cloud.dataflow.worker.IsmReader.access$1000(IsmReader.java:79)
>  
> com.google.cloud.dataflow.worker.IsmReader$IsmPrefixReaderIterator.get(IsmReader.java:641)
>  
> com.google.cloud.dataflow.worker.IsmSideInputReader$ListOverReaderIterators.getUsingLong(IsmSideInputReader.java:674)
>  
> com.google.cloud.dataflow.worker.IsmSideInputReader$ListOverReaderIterators.access$1300(IsmSideInputReader.java:620)
>  
> com.google.cloud.dataflow.worker.IsmSideInputReader$ListOverReaderIterators$ListIteratorOverReaderIterators.next(IsmSideInputReader.java:715)
>  java.util.Collections$UnmodifiableCollection$1.next(Collections.java:1042)
>  
> org.apache.beam.sdk.coders.IterableLikeCoder.registerByteSizeObserver(IterableLikeCoder.java:195)
>  
> org.apache.beam.sdk.coders.IterableLikeCoder.registerByteSizeObserver(IterableLikeCoder.java:60)
>  
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:685)
>  
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:599)
>  
> com.google.cloud.dataflow.worker.MapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(MapTaskExecutorFactory.java:520)
>  
> com.google.cloud.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:134)
>  
> com.google.cloud.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:63)
>  
> com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:46)
>  
> com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:272)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to