Eugene Kirpichov created BEAM-3778:
--------------------------------------
Summary: Very poor performance of side inputs when input is finely
sharded
Key: BEAM-3778
URL: https://issues.apache.org/jira/browse/BEAM-3778
Project: Beam
Issue Type: Bug
Components: runner-dataflow
Reporter: Eugene Kirpichov
Assignee: Luke Cwik
This thread:
https://lists.apache.org/thread.html/324a4f86e567e3e1692466e70f44a08276123b467bacb2ecbf00515f@%3Cuser.beam.apache.org%3E
The user has a job that reads a few hundred thousand files and then writes them
to BigQuery. This generates 1 temp file per input file. Then we gather the temp
files into a View.asList() side input - and this side input ends up containing
a few hundred thousand tiny ISM files, with 1 element per file, which performs
horribly (taking hours to read the side input).
I think we need to reshuffle things onto a reasonable number of shards before
writing them to ISM.
A side issue: this
https://github.com/apache/beam/blob/v2.2.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ReifyAsIterable.java#L46
triggers also the coder size estimation logic, which falsely thinks that size
estimation in this case is cheap, and does double the work, as evidenced by the
following stack trace:
Processing lull for PT30900.015S in state process of
WriteTOBigQuery/BatchLoads/ReifyResults/ParDo(Anonymous)
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
java.net.SocketInputStream.read(SocketInputStream.java:170)
java.net.SocketInputStream.read(SocketInputStream.java:141)
sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
sun.security.ssl.InputRecord.read(InputRecord.java:503)
sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983)
sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:940)
sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
java.io.BufferedInputStream.read(BufferedInputStream.java:345)
sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:704)
sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647)
sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1536)
sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441)
java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)
com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37)
com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeMedia(AbstractGoogleClientRequest.java:380)
com.google.api.services.storage.Storage$Objects$Get.executeMedia(Storage.java:4784)
com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.openStreamAndSetMetadata(GoogleCloudStorageReadChannel.java:656)
com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.performLazySeek(GoogleCloudStorageReadChannel.java:560)
com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:289)
sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:65)
sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:109)
sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
java.io.InputStream.read(InputStream.java:101)
sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:81)
org.apache.beam.sdk.util.VarInt.decodeLong(VarInt.java:79)
org.apache.beam.sdk.util.VarInt.decodeInt(VarInt.java:63)
org.apache.beam.runners.dataflow.internal.IsmFormat$KeyPrefixCoder.decode(IsmFormat.java:694)
com.google.cloud.dataflow.worker.IsmReader.readKey(IsmReader.java:999)
com.google.cloud.dataflow.worker.IsmReader.access$2000(IsmReader.java:79)
com.google.cloud.dataflow.worker.IsmReader$WithinShardIsmReaderIterator.advance(IsmReader.java:952)
com.google.cloud.dataflow.worker.IsmReader$WithinShardIsmReaderIterator.start(IsmReader.java:942)
com.google.cloud.dataflow.worker.IsmReader$IsmCacheLoader.call(IsmReader.java:580)
com.google.cloud.dataflow.worker.IsmReader$IsmCacheLoader.call(IsmReader.java:569)
com.google.cloud.dataflow.worker.IsmReader$IsmCacheLoader.call(IsmReader.java:554)
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4904)
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899)
com.google.cloud.dataflow.worker.IsmReader.fetch(IsmReader.java:605)
com.google.cloud.dataflow.worker.IsmReader.getBlock(IsmReader.java:770)
com.google.cloud.dataflow.worker.IsmReader.access$1000(IsmReader.java:79)
com.google.cloud.dataflow.worker.IsmReader$IsmPrefixReaderIterator.get(IsmReader.java:641)
com.google.cloud.dataflow.worker.IsmSideInputReader$ListOverReaderIterators.getUsingLong(IsmSideInputReader.java:674)
com.google.cloud.dataflow.worker.IsmSideInputReader$ListOverReaderIterators.access$1300(IsmSideInputReader.java:620)
com.google.cloud.dataflow.worker.IsmSideInputReader$ListOverReaderIterators$ListIteratorOverReaderIterators.next(IsmSideInputReader.java:715)
java.util.Collections$UnmodifiableCollection$1.next(Collections.java:1042)
org.apache.beam.sdk.coders.IterableLikeCoder.registerByteSizeObserver(IterableLikeCoder.java:195)
org.apache.beam.sdk.coders.IterableLikeCoder.registerByteSizeObserver(IterableLikeCoder.java:60)
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:685)
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:599)
com.google.cloud.dataflow.worker.MapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(MapTaskExecutorFactory.java:520)
com.google.cloud.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:134)
com.google.cloud.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:63)
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:46)
com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:272)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)