[
https://issues.apache.org/jira/browse/BEAM-3778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386874#comment-16386874
]
Eugene Kirpichov commented on BEAM-3778:
----------------------------------------
Meanwhile, BigQueryIO per se can circumvent this issue by reshuffling the input
that goes into ReifyAsIterable.
> Very poor performance of side inputs when input is finely sharded
> -----------------------------------------------------------------
>
> Key: BEAM-3778
> URL: https://issues.apache.org/jira/browse/BEAM-3778
> Project: Beam
> Issue Type: Bug
> Components: runner-dataflow
> Reporter: Eugene Kirpichov
> Assignee: Luke Cwik
> Priority: Major
>
> This thread:
> https://lists.apache.org/thread.html/324a4f86e567e3e1692466e70f44a08276123b467bacb2ecbf00515f@%3Cuser.beam.apache.org%3E
> The user has a job that reads a few hundred thousand files and then writes
> them to BigQuery. This generates 1 temp file per input file. Then we gather
> the temp files into a View.asList() side input - and this side input ends up
> containing a few hundred thousand tiny ISM files, with 1 element per file,
> which performs horribly (taking hours to read the side input).
> I think we need to reshuffle things onto a reasonable number of shards before
> writing them to ISM.
> A side issue: this
> https://github.com/apache/beam/blob/v2.2.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ReifyAsIterable.java#L46
> triggers also the coder size estimation logic, which falsely thinks that
> size estimation in this case is cheap, and does double the work, as evidenced
> by the following stack trace:
> Processing lull for PT30900.015S in state process of
> WriteTOBigQuery/BatchLoads/ReifyResults/ParDo(Anonymous)
> java.net.SocketInputStream.socketRead0(Native Method)
> java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
> java.net.SocketInputStream.read(SocketInputStream.java:170)
> java.net.SocketInputStream.read(SocketInputStream.java:141)
> sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
> sun.security.ssl.InputRecord.read(InputRecord.java:503)
> sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983)
> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:940)
> sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
> java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
> java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
> java.io.BufferedInputStream.read(BufferedInputStream.java:345)
> sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:704)
> sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647)
>
> sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1536)
>
> sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441)
> java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
>
> sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)
>
> com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37)
>
> com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
> com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981)
>
> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
>
> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
>
> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeMedia(AbstractGoogleClientRequest.java:380)
>
> com.google.api.services.storage.Storage$Objects$Get.executeMedia(Storage.java:4784)
>
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.openStreamAndSetMetadata(GoogleCloudStorageReadChannel.java:656)
>
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.performLazySeek(GoogleCloudStorageReadChannel.java:560)
>
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:289)
> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:65)
> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:109)
> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
> java.io.InputStream.read(InputStream.java:101)
> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:81)
> org.apache.beam.sdk.util.VarInt.decodeLong(VarInt.java:79)
> org.apache.beam.sdk.util.VarInt.decodeInt(VarInt.java:63)
>
> org.apache.beam.runners.dataflow.internal.IsmFormat$KeyPrefixCoder.decode(IsmFormat.java:694)
> com.google.cloud.dataflow.worker.IsmReader.readKey(IsmReader.java:999)
> com.google.cloud.dataflow.worker.IsmReader.access$2000(IsmReader.java:79)
>
> com.google.cloud.dataflow.worker.IsmReader$WithinShardIsmReaderIterator.advance(IsmReader.java:952)
>
> com.google.cloud.dataflow.worker.IsmReader$WithinShardIsmReaderIterator.start(IsmReader.java:942)
>
> com.google.cloud.dataflow.worker.IsmReader$IsmCacheLoader.call(IsmReader.java:580)
>
> com.google.cloud.dataflow.worker.IsmReader$IsmCacheLoader.call(IsmReader.java:569)
>
> com.google.cloud.dataflow.worker.IsmReader$IsmCacheLoader.call(IsmReader.java:554)
>
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4904)
>
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
>
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
>
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
>
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
>
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
>
> com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899)
> com.google.cloud.dataflow.worker.IsmReader.fetch(IsmReader.java:605)
> com.google.cloud.dataflow.worker.IsmReader.getBlock(IsmReader.java:770)
> com.google.cloud.dataflow.worker.IsmReader.access$1000(IsmReader.java:79)
>
> com.google.cloud.dataflow.worker.IsmReader$IsmPrefixReaderIterator.get(IsmReader.java:641)
>
> com.google.cloud.dataflow.worker.IsmSideInputReader$ListOverReaderIterators.getUsingLong(IsmSideInputReader.java:674)
>
> com.google.cloud.dataflow.worker.IsmSideInputReader$ListOverReaderIterators.access$1300(IsmSideInputReader.java:620)
>
> com.google.cloud.dataflow.worker.IsmSideInputReader$ListOverReaderIterators$ListIteratorOverReaderIterators.next(IsmSideInputReader.java:715)
> java.util.Collections$UnmodifiableCollection$1.next(Collections.java:1042)
>
> org.apache.beam.sdk.coders.IterableLikeCoder.registerByteSizeObserver(IterableLikeCoder.java:195)
>
> org.apache.beam.sdk.coders.IterableLikeCoder.registerByteSizeObserver(IterableLikeCoder.java:60)
>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:685)
>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:599)
>
> com.google.cloud.dataflow.worker.MapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(MapTaskExecutorFactory.java:520)
>
> com.google.cloud.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:134)
>
> com.google.cloud.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:63)
>
> com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:46)
>
> com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:272)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)