[ 
https://issues.apache.org/jira/browse/FLINK-37685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Renxiang Zhou updated FLINK-37685:
----------------------------------
    Description: 
Currently, the implementation of asyncProcessWithKey needs to create a new 
record context for each call. When the processing key equals to the current 
record context key, we can directly process the Callback to improve processing 
efficiency.

 
{code:java}
public <K> void asyncProcessWithKey(K key, ThrowingRunnable<Exception> 
processing) {
    RecordContext<K> oldContext = asyncExecutionController.getCurrentContext();

    -----------
    // if the current key is equals to the old context key, we just process it 
with sync point.
    if (oldContext != null && oldContext.getKey() != null && 
oldContext.getKey().equals(key)) {
        asyncExecutionController.syncPointRequestWithCallback(processing, true);
        return;
    }
    -----------

    // build a context and switch to the new context
    RecordContext<K> newContext = asyncExecutionController.buildContext(null, 
key, true);
    newContext.retain();
    asyncExecutionController.setCurrentContext(newContext);
    // Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic when 
the call's key
    // pass the same key in.
    asyncExecutionController.syncPointRequestWithCallback(processing, true);
    newContext.release();

    // switch to original context
    asyncExecutionController.setCurrentContext(oldContext);
} {code}

  was:
Currently, the implementation of asyncProcessWithKey needs to create a new 
record context for each call. When the processing key equals to the current 
record context key, we can directly process the Callback to improve processing 
efficiency.

 
{code:java}
public <K> void asyncProcessWithKey(K key, ThrowingRunnable<Exception> 
processing) {
    RecordContext<K> oldContext = asyncExecutionController.getCurrentContext();


    // if the current key is equals to the old context key, we just process it 
with sync point.
    if (oldContext != null && oldContext.getKey() != null && 
oldContext.getKey().equals(key)) {
        asyncExecutionController.syncPointRequestWithCallback(processing, true);
        return;
    }

     // build a context and switch to the new context
    RecordContext<K> newContext = asyncExecutionController.buildContext(null, 
key, true);
    newContext.retain();
    asyncExecutionController.setCurrentContext(newContext);
    // Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic when 
the call's key
    // pass the same key in.
    asyncExecutionController.syncPointRequestWithCallback(processing, true);
    newContext.release();

    // switch to original context
    asyncExecutionController.setCurrentContext(oldContext);
} {code}


> process the callback immediately when the processing key equals to the key in 
> record context
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-37685
>                 URL: https://issues.apache.org/jira/browse/FLINK-37685
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Async State Processing
>    Affects Versions: 2.0.0
>            Reporter: Renxiang Zhou
>            Priority: Major
>
> Currently, the implementation of asyncProcessWithKey needs to create a new 
> record context for each call. When the processing key equals to the current 
> record context key, we can directly process the Callback to improve 
> processing efficiency.
>  
> {code:java}
> public <K> void asyncProcessWithKey(K key, ThrowingRunnable<Exception> 
> processing) {
>     RecordContext<K> oldContext = 
> asyncExecutionController.getCurrentContext();
>     -----------
>     // if the current key is equals to the old context key, we just process 
> it with sync point.
>     if (oldContext != null && oldContext.getKey() != null && 
> oldContext.getKey().equals(key)) {
>         asyncExecutionController.syncPointRequestWithCallback(processing, 
> true);
>         return;
>     }
>     -----------
>     // build a context and switch to the new context
>     RecordContext<K> newContext = asyncExecutionController.buildContext(null, 
> key, true);
>     newContext.retain();
>     asyncExecutionController.setCurrentContext(newContext);
>     // Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic 
> when the call's key
>     // pass the same key in.
>     asyncExecutionController.syncPointRequestWithCallback(processing, true);
>     newContext.release();
>     // switch to original context
>     asyncExecutionController.setCurrentContext(oldContext);
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to