[ 
https://issues.apache.org/jira/browse/SPARK-21517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhoukang updated SPARK-21517:
-----------------------------
    Description: 
In our production cluster,oom happens when NettyBlockRpcServer receive 
OpenBlocks message.The reason we observed is below:
When BlockManagerManagedBuffer call ChunkedByteBuffer#toNetty, it will use 
Unpooled.wrappedBuffer(ByteBuffer... buffers) which use default 
maxNumComponents=16 in low-level CompositeByteBuf.When our component's number 
is bigger than 16, it will execute during buffer copy.

{code:java}
private void consolidateIfNeeded() {
        int numComponents = this.components.size();
        if(numComponents > this.maxNumComponents) {
            int capacity = 
((CompositeByteBuf.Component)this.components.get(numComponents - 1)).endOffset;
            ByteBuf consolidated = this.allocBuffer(capacity);

            for(int c = 0; c < numComponents; ++c) {
                CompositeByteBuf.Component c1 = 
(CompositeByteBuf.Component)this.components.get(c);
                ByteBuf b = c1.buf;
                consolidated.writeBytes(b);
                c1.freeIfNecessary();
            }

            CompositeByteBuf.Component var7 = new 
CompositeByteBuf.Component(consolidated);
            var7.endOffset = var7.length;
            this.components.clear();
            this.components.add(var7);
        }

    }
{code}
in CompositeByteBuf which will consume some memory.


  was:
In our production cluster,oom happens when NettyBlockRpcServer receive 
OpenBlocks message.The reason we observed is below:
When BlockManagerManagedBuffer call ChunkedByteBuffer#toNetty, it will use 
Unpooled.wrappedBuffer(ByteBuffer... buffers) which use default 
maxNumComponents=16 in low-level CompositeByteBuf.When our component's number 
is bigger than 16, it will execute 

{code:java}
private void consolidateIfNeeded() {
        int numComponents = this.components.size();
        if(numComponents > this.maxNumComponents) {
            int capacity = 
((CompositeByteBuf.Component)this.components.get(numComponents - 1)).endOffset;
            ByteBuf consolidated = this.allocBuffer(capacity);

            for(int c = 0; c < numComponents; ++c) {
                CompositeByteBuf.Component c1 = 
(CompositeByteBuf.Component)this.components.get(c);
                ByteBuf b = c1.buf;
                consolidated.writeBytes(b);
                c1.freeIfNecessary();
            }

            CompositeByteBuf.Component var7 = new 
CompositeByteBuf.Component(consolidated);
            var7.endOffset = var7.length;
            this.components.clear();
            this.components.add(var7);
        }

    }
{code}
in CompositeByteBuf which will consume some memory.



> Fetch local data via block manager cause oom
> --------------------------------------------
>
>                 Key: SPARK-21517
>                 URL: https://issues.apache.org/jira/browse/SPARK-21517
>             Project: Spark
>          Issue Type: Improvement
>          Components: Block Manager, Spark Core
>    Affects Versions: 1.6.1, 2.1.0
>            Reporter: zhoukang
>
> In our production cluster,oom happens when NettyBlockRpcServer receive 
> OpenBlocks message.The reason we observed is below:
> When BlockManagerManagedBuffer call ChunkedByteBuffer#toNetty, it will use 
> Unpooled.wrappedBuffer(ByteBuffer... buffers) which use default 
> maxNumComponents=16 in low-level CompositeByteBuf.When our component's number 
> is bigger than 16, it will execute during buffer copy.
> {code:java}
> private void consolidateIfNeeded() {
>         int numComponents = this.components.size();
>         if(numComponents > this.maxNumComponents) {
>             int capacity = 
> ((CompositeByteBuf.Component)this.components.get(numComponents - 
> 1)).endOffset;
>             ByteBuf consolidated = this.allocBuffer(capacity);
>             for(int c = 0; c < numComponents; ++c) {
>                 CompositeByteBuf.Component c1 = 
> (CompositeByteBuf.Component)this.components.get(c);
>                 ByteBuf b = c1.buf;
>                 consolidated.writeBytes(b);
>                 c1.freeIfNecessary();
>             }
>             CompositeByteBuf.Component var7 = new 
> CompositeByteBuf.Component(consolidated);
>             var7.endOffset = var7.length;
>             this.components.clear();
>             this.components.add(var7);
>         }
>     }
> {code}
> in CompositeByteBuf which will consume some memory.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to