[ 
https://issues.apache.org/jira/browse/FLINK-12912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16869254#comment-16869254
 ] 

Piotr Nowojski commented on FLINK-12912:
----------------------------------------

After implementing https://issues.apache.org/jira/browse/FLINK-12777 the bugged 
code has moved fully to {{BufferStorage}} class and is visible via 
{{BufferStorage#isFull()}} method. For example following sequence doesn't work

{code:java}
                bufferStorage = new BufferStorageXYZ(maxSizeLimit = PAGE_SIZE * 
7);

                bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
                bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
                bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
                bufferStorage.rollOver();
                bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
                bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
                bufferStorage.rollOver();
                bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
                bufferStorage.add(generateRandomBuffer(PAGE_SIZE));

                assertFalse(bufferStorage.isFull());

                bufferStorage.add(generateRandomBuffer(PAGE_SIZE));

                assertTrue(bufferStorage.isFull());
{code}
 
Take a look at 
{{org.apache.flink.streaming.runtime.io.LinkedBufferStorageTest#testRolledIsFull}}.

> Incorrect handling of task.checkpoint.alignment.max-size when one checkpoint 
> subsumes another one
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-12912
>                 URL: https://issues.apache.org/jira/browse/FLINK-12912
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network
>    Affects Versions: 1.6.4, 1.7.2, 1.8.0
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
>            Priority: Major
>
> {{BarrierBuffer#numQueuedBytes}} which is used to evaluate {{ 
> task.checkpoint.alignment.max-size}} limit, is not correctly handled if one 
> checkpoint subsumes another one.
> The max size limit is checked against a sum of {{numQueuedBytes}} and 
> {{bufferBlocker.getBytesBlocked()}}. The {{getBytesBlocked}} keeps tracks of 
> the alignment size of the only most latest checkpoint. The bug is 
> {{BarrierBuffer#releaseBlocksAndResetBarriers()}} method, where while 
> handling first subsumed checkpoint in the branch:
> {code:java}
>               if (currentBuffered == null) {
>                       // common case: no more buffered data
>                       currentBuffered = 
> bufferBlocker.rollOverReusingResources();
>                       if (currentBuffered != null) {
>                               currentBuffered.open();
>                       }
>               }
> {code}
> we clear the {{bufferBlocker.getBytesBlocked()}} counter, while we do not 
> update {{numQueuedBytes}} counter. 
> For example when first checkpoint approached to 99.9% of max alignment size 
> when it was subsumed, due to this bug calculated alignment size drops to 0 
> bytes. For subsequent subsumed checkpoints {{numQueuedBytes}} is correctly 
> updated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to