The intended usage of Spark is a single SparkContext (or SparkSession, in case 
of v2.x) per JVM. An implicit assumption which this entails is that the Driver 
JVM terminates when it finishes. `spark.stop()` or  `sc.stop()`  is not 
equivalent to JVM termination, which is what you are assuming in starting and 
shutting down the SparkContext for each independent application. If you believe 
this usage pattern of multiple SparkContexts is something you cannot do 
without,  I see two solutions:


  1.  spawn an independent Driver JVM for each Spark application.
  2.  Look into IgniteRDD (https://ignite.apache.org/features/igniterdd.html). 
It caters to your use-case.

P.S. The very premise on which you base your need to use multiple SparkContexts 
is questionable. Intermediate failures in your Spark jobs does not mean the 
entire DAG will be re-computed. You can always guard against this by caching 
intermediate data.

Sujith
________________________________
From: Chawla,Sumit <[email protected]>
Sent: Tuesday, March 6, 2018 11:15 AM
To: dev; User
Subject: OutOfDirectMemoryError for Spark 2.2

Hi All

I have a job which processes a large dataset.  All items in the dataset are 
unrelated.  To save on cluster resources,  I process these items in chunks.  
Since chunks are independent of each other,  I start and shut down the spark 
context for each chunk.  This allows me to keep DAG smaller and not retry the 
entire DAG in case of failures.   This mechanism used to work fine with Spark 
1.6.  Now,  as we have moved to 2.2,  the job started failing with 
OutOfDirectMemoryError error.


2018-03-03 22:00:59,687 WARN  [rpc-server-48-1] server.TransportChannelHandler 
(TransportChannelHandler.java:exceptionCaught(78)) - Exception in connection 
from /10.66.73.27:60374<http://10.66.73.27:60374>

io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 8388608 
byte(s) of direct memory (used: 1023410176, max: 1029177344)

at 
io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:506)

at 
io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:460)

at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:701)

at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:690)

at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:237)

at io.netty.buffer.PoolArena.allocate(PoolArena.java:213)

at io.netty.buffer.PoolArena.allocate(PoolArena.java:141)

at 
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:271)

at 
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:177)

at 
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:168)

at 
io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:129)

at 
io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)

at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)

at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:564)

I got some clue on what is causing this from 
https://github.com/netty/netty/issues/6343,  However I am not able to add up 
numbers on what is causing 1 GB of Direct Memory to fill up.


Output from jmap


7: 22230 1422720 io.netty.buffer.PoolSubpage

12: 1370 804640 io.netty.buffer.PoolSubpage[]

41: 3600 144000 io.netty.buffer.PoolChunkList

98: 1440 46080 io.netty.buffer.PoolThreadCache$SubPageMemoryRegionCache

113: 300 40800 io.netty.buffer.PoolArena$HeapArena

114: 300 40800 io.netty.buffer.PoolArena$DirectArena

192: 198 15840 io.netty.buffer.PoolChunk

274: 120 8320 io.netty.buffer.PoolThreadCache$MemoryRegionCache[]

406: 120 3840 io.netty.buffer.PoolThreadCache$NormalMemoryRegionCache

422: 72 3552 io.netty.buffer.PoolArena[]

458: 30 2640 io.netty.buffer.PooledUnsafeDirectByteBuf

500: 36 2016 io.netty.buffer.PooledByteBufAllocator

529: 32 1792 io.netty.buffer.UnpooledUnsafeHeapByteBuf

589: 20 1440 io.netty.buffer.PoolThreadCache

630: 37 1184 io.netty.buffer.EmptyByteBuf

703: 36 864 io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache

852: 22 528 io.netty.buffer.AdvancedLeakAwareByteBuf

889: 10 480 io.netty.buffer.SlicedAbstractByteBuf

917: 8 448 io.netty.buffer.UnpooledHeapByteBuf

1018: 20 320 io.netty.buffer.PoolThreadCache$1

1305: 4 128 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry

1404: 1 80 io.netty.buffer.PooledUnsafeHeapByteBuf

1473: 3 72 io.netty.buffer.PoolArena$SizeClass

1529: 1 64 io.netty.buffer.AdvancedLeakAwareCompositeByteBuf

1541: 2 64 io.netty.buffer.CompositeByteBuf$Component

1568: 1 56 io.netty.buffer.CompositeByteBuf

1896: 1 32 io.netty.buffer.PoolArena$SizeClass[]

2042: 1 24 io.netty.buffer.PooledUnsafeDirectByteBuf$1

2046: 1 24 io.netty.buffer.UnpooledByteBufAllocator

2051: 1 24 io.netty.buffer.PoolThreadCache$MemoryRegionCache$1

2078: 1 24 io.netty.buffer.PooledHeapByteBuf$1

2135: 1 24 io.netty.buffer.PooledUnsafeHeapByteBuf$1

2302: 1 16 io.netty.buffer.ByteBufUtil$1

2769: 1 16 io.netty.util.internal.__matchers__.io.netty.buffer.ByteBufMatcher



My Driver machine has 32 CPUs,  and as of now i have 15 machines in my cluster. 
  As of now, the error happens on processing 5th or 6th chunk.  I suspect the 
error is dependent on number of Executors and would happen early if we add more 
executors.


I am trying to come up an explanation of what is filling up the Direct Memory 
and how to quanitfy it as factor of Number of Executors.  Our cluster is shared 
cluster,  And we need to understand how much Driver Memory to allocate for most 
of the jobs.





Regards
Sumit Chawla

Reply via email to