[ https://issues.apache.org/jira/browse/FLINK-37685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Renxiang Zhou updated FLINK-37685: ---------------------------------- Description: Currently, the implementation of asyncProcessWithKey needs to create a new record context for each call. When the processing key equals to the current record context key, we can directly process the Callback to improve processing efficiency. {code:java} public <K> void asyncProcessWithKey(K key, ThrowingRunnable<Exception> processing) { RecordContext<K> oldContext = asyncExecutionController.getCurrentContext(); ----------- // if the current key is equals to the old context key, we just process it with sync point. if (oldContext != null && oldContext.getKey() != null && oldContext.getKey().equals(key)) { asyncExecutionController.syncPointRequestWithCallback(processing, true); return; } ----------- // build a context and switch to the new context RecordContext<K> newContext = asyncExecutionController.buildContext(null, key, true); newContext.retain(); asyncExecutionController.setCurrentContext(newContext); // Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic when the call's key // pass the same key in. asyncExecutionController.syncPointRequestWithCallback(processing, true); newContext.release(); // switch to original context asyncExecutionController.setCurrentContext(oldContext); } {code} was: Currently, the implementation of asyncProcessWithKey needs to create a new record context for each call. When the processing key equals to the current record context key, we can directly process the Callback to improve processing efficiency. {code:java} public <K> void asyncProcessWithKey(K key, ThrowingRunnable<Exception> processing) { RecordContext<K> oldContext = asyncExecutionController.getCurrentContext(); // if the current key is equals to the old context key, we just process it with sync point. if (oldContext != null && oldContext.getKey() != null && oldContext.getKey().equals(key)) { asyncExecutionController.syncPointRequestWithCallback(processing, true); return; } // build a context and switch to the new context RecordContext<K> newContext = asyncExecutionController.buildContext(null, key, true); newContext.retain(); asyncExecutionController.setCurrentContext(newContext); // Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic when the call's key // pass the same key in. asyncExecutionController.syncPointRequestWithCallback(processing, true); newContext.release(); // switch to original context asyncExecutionController.setCurrentContext(oldContext); } {code} > process the callback immediately when the processing key equals to the key in > record context > -------------------------------------------------------------------------------------------- > > Key: FLINK-37685 > URL: https://issues.apache.org/jira/browse/FLINK-37685 > Project: Flink > Issue Type: Improvement > Components: Runtime / Async State Processing > Affects Versions: 2.0.0 > Reporter: Renxiang Zhou > Priority: Major > > Currently, the implementation of asyncProcessWithKey needs to create a new > record context for each call. When the processing key equals to the current > record context key, we can directly process the Callback to improve > processing efficiency. > > {code:java} > public <K> void asyncProcessWithKey(K key, ThrowingRunnable<Exception> > processing) { > RecordContext<K> oldContext = > asyncExecutionController.getCurrentContext(); > ----------- > // if the current key is equals to the old context key, we just process > it with sync point. > if (oldContext != null && oldContext.getKey() != null && > oldContext.getKey().equals(key)) { > asyncExecutionController.syncPointRequestWithCallback(processing, > true); > return; > } > ----------- > // build a context and switch to the new context > RecordContext<K> newContext = asyncExecutionController.buildContext(null, > key, true); > newContext.retain(); > asyncExecutionController.setCurrentContext(newContext); > // Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic > when the call's key > // pass the same key in. > asyncExecutionController.syncPointRequestWithCallback(processing, true); > newContext.release(); > // switch to original context > asyncExecutionController.setCurrentContext(oldContext); > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)