[
https://issues.apache.org/jira/browse/FLINK-12086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16808661#comment-16808661
]
Felix Wollschläger commented on FLINK-12086:
--------------------------------------------
Thanks for you point [~maguowei].
I've implemented a simple AbstractAsyncFunction for my needs:
{code}
package dev.codeflush;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Map;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
public abstract class AbstractAsyncFunction<IN, OUT, T> extends
RichAsyncFunction<IN, OUT> {
private final Map<ResultFuture<OUT>, T> values;
public AbstractAsyncFunction() {
this.values = Collections.synchronizedMap(new IdentityHashMap<>());
}
@Override
public final void asyncInvoke(IN input, ResultFuture<OUT> resultFuture)
throws Exception {
T value = submit(input);
if (value != null) {
this.values.put(resultFuture, value);
}
collect(input, value, new ResultFutureWrapper(resultFuture));
}
@Override
public final void timeout(IN input, ResultFuture<OUT> resultFuture) throws
Exception {
T value = this.values.remove(resultFuture);
timeout(input, value, resultFuture);
}
public abstract T submit(IN input) throws Exception;
public abstract void collect(IN input, T obj, ResultFuture<OUT>
resultFuture) throws Exception;
public abstract void timeout(IN input, T obj, ResultFuture<OUT>
resultFuture) throws Exception;
private class ResultFutureWrapper implements ResultFuture<OUT> {
private final ResultFuture<OUT> parent;
public ResultFutureWrapper(ResultFuture<OUT> parent) {
this.parent = parent;
}
@Override
public void complete(Collection<OUT> collection) {
this.parent.complete(collection);
AbstractAsyncFunction.this.values.remove(this.parent);
}
@Override
public void completeExceptionally(Throwable throwable) {
this.parent.completeExceptionally(throwable);
AbstractAsyncFunction.this.values.remove(this.parent);
}
}
}
{code}
I would still like to see something similar in the core API
> AsyncFunction - Add access to a user defined Object for cleanup on timeout
> --------------------------------------------------------------------------
>
> Key: FLINK-12086
> URL: https://issues.apache.org/jira/browse/FLINK-12086
> Project: Flink
> Issue Type: Improvement
> Components: API / DataStream
> Reporter: Felix Wollschläger
> Priority: Major
>
> When executing async-requests it would be nice to have access to a user
> defined object to perform cleanup when the process times out.
> For example, when executing Cassandra-Queries I'm using the drivers
> threadpool to submit Statements, which returns a
> com.datastax.driver.core.ResultSetFutre (
> [https://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/core/ResultSetFuture.html]
> ). When I run into a timeout I could cancel the Future because waiting for
> it to complete is unnecessary in that case.
>
> The API could be extendend to something like this:
>
> Adding an Type-Parameter to the AsnyFunction Interface:
> {code:java}
> AsyncFunction<IN, OUT, T>{code}
> Updating the asnyInvoke-Method to return the user-defined object:
> {code:java}
> T asyncInvoke(IN input, ResultFuture<OUT> future) throws Exception;{code}
> Updating the timeout-Method to accept the user-defined object:
> {code:java}
> void timeout(IN input, T obj, ResultFuture<OUT> resultFuture) throws
> Exception{code}
>
> An example Implementation could look like this:
> {code:java}
> package dev.codeflush;
> import org.apache.flink.streaming.api.functions.async.AsyncFunction;
> import org.apache.flink.streaming.api.functions.async.ResultFuture;
> import java.util.Collections;
> import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.ExecutionException;
> import java.util.concurrent.Future;
> public class SomeAsyncFunction implements AsyncFunction<Integer, String,
> Future<String>> {
> private static final long serialVersionUID = 1L;
>
> @Override
> public Future<String> asyncInvoke(Integer input, ResultFuture<String>
> resultFuture) throws Exception {
> Future<String> future = null; // submit something in a library
> thread-pool
> CompletableFuture.runAsync(() -> {
> try {
> resultFuture.complete(Collections.singleton(future.get()));
> } catch (ExecutionException e) {
> // handle this
> } catch (InterruptedException e) {
> // handle that
> }
> });
>
> return future;
> }
> @Override
> public void timeout(Integer input, Future<String> future,
> ResultFuture<String> resultFuture) throws Exception {
> future.cancel(true);
> resultFuture.complete(Collections.emptySet());
> }
> }
> {code}
> As it currently is, submitted tasks in the asnyInvoke-Method will use
> resources (Threads, IO) even if the application is no longer in a state where
> it could do something meaningful with the result.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)