[
https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=236835&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-236835
]
ASF GitHub Bot logged work on BEAM-5775:
----------------------------------------
Author: ASF GitHub Bot
Created on: 03/May/19 12:43
Start Date: 03/May/19 12:43
Worklog Time Spent: 10m
Work Description: iemejia commented on issue #8371: [BEAM-5775] Move
(most) of the batch spark pipelines' transformations to using lazy
serialization.
URL: https://github.com/apache/beam/pull/8371#issuecomment-489081256
Could you notice any performance improvement with this PR? I like it for
consistency, but I have found improvements and regressions depending on the
pipelines.
I also have a weird issue with this one @mikekap I was running nexmark to
see if I could find considerable improvements due to this PR, but when I invoke
it multiple times it fails, curiously this does not happen for example with
current master. Would you mind to take a look to see if maybe is some
configuration, it is strange.
```bash
./gradlew :beam-sdks-java-nexmark:run \
-Pnexmark.runner=":beam-runners-spark" \
-Pnexmark.args="
--runner=SparkRunner
--suite=SMOKE
--streamTimeout=60
--streaming=false
--manageResources=false
--monitorJobs=true"
```
The exception log
```
==========================================================================================
Run started 2019-05-03T12:36:02.235Z and ran for PT42.696S
Default configuration:
{"debug":true,"query":null,"sourceType":"DIRECT","sinkType":"DEVNULL","exportSummaryToBigQuery":false,"pubSubMode":"COMBINED","sideInputType":"DIRECT","sideInputRowCount":500,"sideInputNumShards":3,"sideInputUrl":null,"sessionGap":{"standardDays":0,"standardHours":0,"standardMinutes":10,"standardSeconds":600,"millis":600000},"numEvents":100000,"numEventGenerators":100,"rateShape":"SINE","firstEventRate":10000,"nextEventRate":10000,"rateUnit":"PER_SECOND","ratePeriodSec":600,"preloadSeconds":0,"streamTimeout":240,"isRateLimited":false,"useWallclockEventTime":false,"avgPersonByteSize":200,"avgAuctionByteSize":500,"avgBidByteSize":100,"hotAuctionRatio":2,"hotSellersRatio":4,"hotBiddersRatio":4,"windowSizeSec":10,"windowPeriodSec":5,"watermarkHoldbackSec":0,"numInFlightAuctions":100,"numActivePeople":1000,"coderStrategy":"HAND","cpuDelayMs":0,"diskBusyBytes":0,"auctionSkip":123,"fanout":5,"maxAuctionsWaitingTime":600,"occasionalDelaySec":3,"probDelayedEvent":0.1,"maxLogEvents":100000,"usePubsubPublishTime":false,"outOfOrderGroupSize":1}
Configurations:
Conf Description
0000 query:PASSTHROUGH; streamTimeout:60
0001 query:CURRENCY_CONVERSION; streamTimeout:60
0002 query:SELECTION; streamTimeout:60
0003 query:LOCAL_ITEM_SUGGESTION; streamTimeout:60
0004 query:AVERAGE_PRICE_FOR_CATEGORY; numEvents:10000; streamTimeout:60
0005 query:HOT_ITEMS; streamTimeout:60
0006 query:AVERAGE_SELLING_PRICE_BY_SELLER; numEvents:10000;
streamTimeout:60
0007 query:HIGHEST_BID; streamTimeout:60
0008 query:MONITOR_NEW_USERS; streamTimeout:60
0009 query:WINNING_BIDS; numEvents:10000; streamTimeout:60
0010 query:LOG_TO_SHARDED_FILES; streamTimeout:60
0011 query:USER_SESSIONS; streamTimeout:60
0012 query:PROCESSING_TIME_WINDOWS; streamTimeout:60
0013 query:BOUNDED_SIDE_INPUT_JOIN; streamTimeout:60
0014 query:SESSION_SIDE_INPUT_JOIN; streamTimeout:60
Performance:
Conf Runtime(sec) (Baseline) Events(/sec) (Baseline) Results
(Baseline)
0000 1.5 66093.9 100000
0001 0.8 130378.1 92000
0002 0.3 325732.9 351
0003 2.3 43327.6 580
0004 0.9 11402.5 40
0005 2.0 48947.6 12
0006 0.5 19267.8 103
0007 2.4 40833.0 1
0008 1.6 63775.5 6000
0009 0.5 20120.7 298
0010 0.9 114155.3 1
0011 1.0 97371.0 1919
0012 0.9 109769.5 1919
0013 0.4 269541.8 92000
0014 *** not run ***
==========================================================================================
2019-05-03T12:36:44.932Z Generating 100000 events in batch mode
19/05/03 14:36:47 ERROR org.apache.spark.SparkContext: Error initializing
SparkContext.
java.lang.IllegalStateException: failed to create a child event loop
at
io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:88)
at
io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:58)
at
io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:47)
at
io.netty.channel.MultithreadEventLoopGroup.<init>(MultithreadEventLoopGroup.java:59)
at
io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:77)
at
io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:72)
at
io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:59)
at
org.apache.spark.network.util.NettyUtils.createEventLoop(NettyUtils.java:50)
at
org.apache.spark.network.server.TransportServer.init(TransportServer.java:95)
at
org.apache.spark.network.server.TransportServer.<init>(TransportServer.java:75)
at
org.apache.spark.network.TransportContext.createServer(TransportContext.java:114)
at
org.apache.spark.network.netty.NettyBlockTransferService.org$apache$spark$network$netty$NettyBlockTransferService$$startService$1(NettyBlockTransferService.scala:83)
at
org.apache.spark.network.netty.NettyBlockTransferService$$anonfun$createServer$1.apply(NettyBlockTransferService.scala:87)
at
org.apache.spark.network.netty.NettyBlockTransferService$$anonfun$createServer$1.apply(NettyBlockTransferService.scala:87)
at
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:2269)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:2261)
at
org.apache.spark.network.netty.NettyBlockTransferService.createServer(NettyBlockTransferService.scala:87)
at
org.apache.spark.network.netty.NettyBlockTransferService.init(NettyBlockTransferService.scala:75)
at
org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:233)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:510)
at
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
at
org.apache.beam.runners.spark.translation.SparkContextFactory.createSparkContext(SparkContextFactory.java:98)
at
org.apache.beam.runners.spark.translation.SparkContextFactory.getSparkContext(SparkContextFactory.java:64)
at
org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:213)
at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:89)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
at
org.apache.beam.sdk.nexmark.NexmarkLauncher.run(NexmarkLauncher.java:1177)
at org.apache.beam.sdk.nexmark.Main$Run.call(Main.java:90)
at org.apache.beam.sdk.nexmark.Main$Run.call(Main.java:79)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.netty.channel.ChannelException: failed to open a new selector
at
io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:175)
at io.netty.channel.nio.NioEventLoop.<init>(NioEventLoop.java:149)
at
io.netty.channel.nio.NioEventLoopGroup.newChild(NioEventLoopGroup.java:127)
at
io.netty.channel.nio.NioEventLoopGroup.newChild(NioEventLoopGroup.java:36)
at
io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:84)
... 36 more
Caused by: java.io.IOException: Too many open files
at sun.nio.ch.EPollArrayWrapper.epollCreate(Native Method)
at sun.nio.ch.EPollArrayWrapper.<init>(EPollArrayWrapper.java:130)
at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:69)
at
sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
at
io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:173)
... 40 more
19/05/03 14:36:47 WARN org.apache.spark.metrics.MetricsSystem: Stopping a
MetricsSystem that is not running
==========================================================================================
Run started 2019-05-03T12:36:02.235Z and ran for PT45.018S
Default configuration:
{"debug":true,"query":null,"sourceType":"DIRECT","sinkType":"DEVNULL","exportSummaryToBigQuery":false,"pubSubMode":"COMBINED","sideInputType":"DIRECT","sideInputRowCount":500,"sideInputNumShards":3,"sideInputUrl":null,"sessionGap":{"standardDays":0,"standardHours":0,"standardMinutes":10,"standardSeconds":600,"millis":600000},"numEvents":100000,"numEventGenerators":100,"rateShape":"SINE","firstEventRate":10000,"nextEventRate":10000,"rateUnit":"PER_SECOND","ratePeriodSec":600,"preloadSeconds":0,"streamTimeout":240,"isRateLimited":false,"useWallclockEventTime":false,"avgPersonByteSize":200,"avgAuctionByteSize":500,"avgBidByteSize":100,"hotAuctionRatio":2,"hotSellersRatio":4,"hotBiddersRatio":4,"windowSizeSec":10,"windowPeriodSec":5,"watermarkHoldbackSec":0,"numInFlightAuctions":100,"numActivePeople":1000,"coderStrategy":"HAND","cpuDelayMs":0,"diskBusyBytes":0,"auctionSkip":123,"fanout":5,"maxAuctionsWaitingTime":600,"occasionalDelaySec":3,"probDelayedEvent":0.1,"maxLogEvents":100000,"usePubsubPublishTime":false,"outOfOrderGroupSize":1}
Configurations:
Conf Description
0000 query:PASSTHROUGH; streamTimeout:60
Exception in thread "main" 0001 query:CURRENCY_CONVERSION;
streamTimeout:60
0002 query:SELECTION; streamTimeout:60
0003 query:LOCAL_ITEM_SUGGESTION; streamTimeout:60
0004 query:AVERAGE_PRICE_FOR_CATEGORY; numEvents:10000; streamTimeout:60
0005 query:HOT_ITEMS; streamTimeout:60
0006 query:AVERAGE_SELLING_PRICE_BY_SELLER; numEvents:10000;
streamTimeout:60
0007 query:HIGHEST_BID; streamTimeout:60
0008 query:MONITOR_NEW_USERS; streamTimeout:60
0009 query:WINNING_BIDS; numEvents:10000; streamTimeout:60
0010 query:LOG_TO_SHARDED_FILES; streamTimeout:60
0011 query:USER_SESSIONS; streamTimeout:60
java.lang.RuntimeException: java.lang.IllegalStateException: failed to
create a child event loop
0012 query:PROCESSING_TIME_WINDOWS; streamTimeout:60
at org.apache.beam.sdk.nexmark.Main.runAll(Main.java:128)
at org.apache.beam.sdk.nexmark.Main.main(Main.java:415)
Caused by: java.lang.IllegalStateException: failed to create a child event
loop
at
io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:88)
at
io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:58)
0013 query:BOUNDED_SIDE_INPUT_JOIN; streamTimeout:60
at
io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:47)
0014 query:SESSION_SIDE_INPUT_JOIN; streamTimeout:60
Performance:
at
io.netty.channel.MultithreadEventLoopGroup.<init>(MultithreadEventLoopGroup.java:59)
at
io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:77)
at
io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:72)
at
io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:59)
at
org.apache.spark.network.util.NettyUtils.createEventLoop(NettyUtils.java:50)
at
org.apache.spark.network.server.TransportServer.init(TransportServer.java:95)
at
org.apache.spark.network.server.TransportServer.<init>(TransportServer.java:75)
at
org.apache.spark.network.TransportContext.createServer(TransportContext.java:114)
Conf Runtime(sec) (Baseline) Events(/sec) (Baseline) Results
(Baseline)
at
org.apache.spark.network.netty.NettyBlockTransferService.org$apache$spark$network$netty$NettyBlockTransferService$$startService$1(NettyBlockTransferService.scala:83)
0000 1.5 66093.9 100000
at
org.apache.spark.network.netty.NettyBlockTransferService$$anonfun$createServer$1.apply(NettyBlockTransferService.scala:87)
0001 0.8 130378.1 92000
at
org.apache.spark.network.netty.NettyBlockTransferService$$anonfun$createServer$1.apply(NettyBlockTransferService.scala:87)
0002 0.3 325732.9 351
at
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:2269)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:2261)
0003 2.3 43327.6 580
at
org.apache.spark.network.netty.NettyBlockTransferService.createServer(NettyBlockTransferService.scala:87)
0004 0.9 11402.5 40
at
org.apache.spark.network.netty.NettyBlockTransferService.init(NettyBlockTransferService.scala:75)
0005 2.0 48947.6 12
at
org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:233)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:510)
0006 0.5 19267.8 103
at
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
0007 2.4 40833.0 1
at
org.apache.beam.runners.spark.translation.SparkContextFactory.createSparkContext(SparkContextFactory.java:98)
0008 1.6 63775.5 6000
at
org.apache.beam.runners.spark.translation.SparkContextFactory.getSparkContext(SparkContextFactory.java:64)
at
org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:213)
0009 0.5 20120.7 298
at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:89)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
0010 0.9 114155.3 1
at
org.apache.beam.sdk.nexmark.NexmarkLauncher.run(NexmarkLauncher.java:1177)
at org.apache.beam.sdk.nexmark.Main$Run.call(Main.java:90)
at org.apache.beam.sdk.nexmark.Main$Run.call(Main.java:79)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
0011 1.0 97371.0 1919
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
0012 0.9 109769.5 1919
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
0013 0.4 269541.8 92000
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
0014 *** not run ***
at java.lang.Thread.run(Thread.java:748)
Caused by: io.netty.channel.ChannelException: failed to open a new selector
at
io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:175)
at io.netty.channel.nio.NioEventLoop.<init>(NioEventLoop.java:149)
==========================================================================================
at
io.netty.channel.nio.NioEventLoopGroup.newChild(NioEventLoopGroup.java:127)
at
io.netty.channel.nio.NioEventLoopGroup.newChild(NioEventLoopGroup.java:36)
at
io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:84)
... 36 more
Caused by: java.io.IOException: Too many open files
at sun.nio.ch.EPollArrayWrapper.epollCreate(Native Method)
at sun.nio.ch.EPollArrayWrapper.<init>(EPollArrayWrapper.java:130)
at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:69)
at
sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
at
io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:173)
... 40 more
> Task :beam-sdks-java-nexmark:run FAILED
FAILURE: Build failed with an exception.
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 236835)
Time Spent: 10h 50m (was: 10h 40m)
> Make the spark runner not serialize data unless spark is spilling to disk
> -------------------------------------------------------------------------
>
> Key: BEAM-5775
> URL: https://issues.apache.org/jira/browse/BEAM-5775
> Project: Beam
> Issue Type: Improvement
> Components: runner-spark
> Reporter: Mike Kaplinskiy
> Assignee: Mike Kaplinskiy
> Priority: Minor
> Fix For: 2.13.0
>
> Time Spent: 10h 50m
> Remaining Estimate: 0h
>
> Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data.
> This lets Spark keep the data in memory avoiding the serialization round
> trip. Unfortunately the logic is fairly coarse - as soon as you switch to
> MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen
> to keep the data in memory, incurring the serialization overhead.
>
> Ideally Beam would serialize the data lazily - as Spark chooses to spill to
> disk. This would be a change in behavior when using beam, but luckily Spark
> has a solution for folks that want data serialized in memory -
> MEMORY_AND_DISK_SER will keep the data serialized.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)