Eugene Kirpichov created BEAM-3778:
--------------------------------------

             Summary: Very poor performance of side inputs when input is finely 
sharded
                 Key: BEAM-3778
                 URL: https://issues.apache.org/jira/browse/BEAM-3778
             Project: Beam
          Issue Type: Bug
          Components: runner-dataflow
            Reporter: Eugene Kirpichov
            Assignee: Luke Cwik


This thread:
https://lists.apache.org/thread.html/324a4f86e567e3e1692466e70f44a08276123b467bacb2ecbf00515f@%3Cuser.beam.apache.org%3E

The user has a job that reads a few hundred thousand files and then writes them 
to BigQuery. This generates 1 temp file per input file. Then we gather the temp 
files into a View.asList() side input - and this side input ends up containing 
a few hundred thousand tiny ISM files, with 1 element per file, which performs 
horribly (taking hours to read the side input).

I think we need to reshuffle things onto a reasonable number of shards before 
writing them to ISM.

A side issue: this 
https://github.com/apache/beam/blob/v2.2.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ReifyAsIterable.java#L46
 triggers also the coder size estimation logic, which falsely thinks that size 
estimation in this case is cheap, and does double the work, as evidenced by the 
following stack trace:

Processing lull for PT30900.015S in state process of 
WriteTOBigQuery/BatchLoads/ReifyResults/ParDo(Anonymous)
 java.net.SocketInputStream.socketRead0(Native Method)
 java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
 java.net.SocketInputStream.read(SocketInputStream.java:170)
 java.net.SocketInputStream.read(SocketInputStream.java:141)
 sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
 sun.security.ssl.InputRecord.read(InputRecord.java:503)
 sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983)
 sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:940)
 sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
 java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
 java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
 java.io.BufferedInputStream.read(BufferedInputStream.java:345)
 sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:704)
 sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647)
 
sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1536)
 
sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441)
 java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
 
sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)
 
com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37)
 
com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
 com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981)
 
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
 
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
 
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeMedia(AbstractGoogleClientRequest.java:380)
 
com.google.api.services.storage.Storage$Objects$Get.executeMedia(Storage.java:4784)
 
com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.openStreamAndSetMetadata(GoogleCloudStorageReadChannel.java:656)
 
com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.performLazySeek(GoogleCloudStorageReadChannel.java:560)
 
com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:289)
 sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:65)
 sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:109)
 sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
 java.io.InputStream.read(InputStream.java:101)
 sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:81)
 org.apache.beam.sdk.util.VarInt.decodeLong(VarInt.java:79)
 org.apache.beam.sdk.util.VarInt.decodeInt(VarInt.java:63)
 
org.apache.beam.runners.dataflow.internal.IsmFormat$KeyPrefixCoder.decode(IsmFormat.java:694)
 com.google.cloud.dataflow.worker.IsmReader.readKey(IsmReader.java:999)
 com.google.cloud.dataflow.worker.IsmReader.access$2000(IsmReader.java:79)
 
com.google.cloud.dataflow.worker.IsmReader$WithinShardIsmReaderIterator.advance(IsmReader.java:952)
 
com.google.cloud.dataflow.worker.IsmReader$WithinShardIsmReaderIterator.start(IsmReader.java:942)
 
com.google.cloud.dataflow.worker.IsmReader$IsmCacheLoader.call(IsmReader.java:580)
 
com.google.cloud.dataflow.worker.IsmReader$IsmCacheLoader.call(IsmReader.java:569)
 
com.google.cloud.dataflow.worker.IsmReader$IsmCacheLoader.call(IsmReader.java:554)
 
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4904)
 
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
 
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
 
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
 
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
 
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
 
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899)
 com.google.cloud.dataflow.worker.IsmReader.fetch(IsmReader.java:605)
 com.google.cloud.dataflow.worker.IsmReader.getBlock(IsmReader.java:770)
 com.google.cloud.dataflow.worker.IsmReader.access$1000(IsmReader.java:79)
 
com.google.cloud.dataflow.worker.IsmReader$IsmPrefixReaderIterator.get(IsmReader.java:641)
 
com.google.cloud.dataflow.worker.IsmSideInputReader$ListOverReaderIterators.getUsingLong(IsmSideInputReader.java:674)
 
com.google.cloud.dataflow.worker.IsmSideInputReader$ListOverReaderIterators.access$1300(IsmSideInputReader.java:620)
 
com.google.cloud.dataflow.worker.IsmSideInputReader$ListOverReaderIterators$ListIteratorOverReaderIterators.next(IsmSideInputReader.java:715)
 java.util.Collections$UnmodifiableCollection$1.next(Collections.java:1042)
 
org.apache.beam.sdk.coders.IterableLikeCoder.registerByteSizeObserver(IterableLikeCoder.java:195)
 
org.apache.beam.sdk.coders.IterableLikeCoder.registerByteSizeObserver(IterableLikeCoder.java:60)
 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:685)
 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:599)
 
com.google.cloud.dataflow.worker.MapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(MapTaskExecutorFactory.java:520)
 
com.google.cloud.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:134)
 
com.google.cloud.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:63)
 
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:46)
 com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:272)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to