Repository: flink Updated Branches: refs/heads/release-1.4 42e24413b -> 3753ae251
[FLINK-8063][QS] QS client does not retry when an UnknownKvStateLocation is thrown. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0324e34 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0324e34 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0324e34 Branch: refs/heads/release-1.4 Commit: d0324e34a06e7374179d1627a4a3653d07f1c614 Parents: 42e2441 Author: kkloudas <kklou...@gmail.com> Authored: Tue Nov 14 15:05:45 2017 +0100 Committer: kkloudas <kklou...@gmail.com> Committed: Fri Nov 17 10:37:18 2017 +0100 ---------------------------------------------------------------------- .../network/AbstractServerHandler.java | 2 +- .../client/proxy/KvStateClientProxyHandler.java | 11 +- .../itcases/AbstractQueryableStateTestBase.java | 230 ++++++++++++------- 3 files changed, 150 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d0324e34/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java index 9e02291..7e71a11 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java @@ -262,7 +262,7 @@ public abstract class AbstractServerHandler<REQ extends MessageBody, RESP extend try { stats.reportFailedRequest(); - final String errMsg = "Failed request " + requestId + ". Caused by: " + ExceptionUtils.stringifyException(t); + final String errMsg = "Failed request " + requestId + "." + System.lineSeparator() + " Caused by: " + ExceptionUtils.stringifyException(t); final ByteBuf err = MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, new RuntimeException(errMsg)); ctx.writeAndFlush(err); } catch (IOException io) { http://git-wip-us.apache.org/repos/asf/flink/blob/d0324e34/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java index 73ef7f3..af33701 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java @@ -37,7 +37,6 @@ import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.query.KvStateClientProxy; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.KvStateMessage; -import org.apache.flink.runtime.query.UnknownKvStateLocation; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.util.Preconditions; @@ -48,7 +47,6 @@ import org.slf4j.LoggerFactory; import java.net.ConnectException; import java.net.InetSocketAddress; -import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -133,12 +131,11 @@ public class KvStateClientProxyHandler extends AbstractServerHandler<KvStateRequ operationFuture.whenCompleteAsync( (t, throwable) -> { if (throwable != null) { - if (throwable instanceof CancellationException) { - result.completeExceptionally(throwable); - } else if (throwable.getCause() instanceof UnknownKvStateIdException || + if ( + throwable.getCause() instanceof UnknownKvStateIdException || throwable.getCause() instanceof UnknownKvStateKeyGroupLocationException || - throwable.getCause() instanceof UnknownKvStateLocation || - throwable.getCause() instanceof ConnectException) { + throwable.getCause() instanceof ConnectException + ) { // These failures are likely to be caused by out-of-sync // KvStateLocation. Therefore we retry this query and http://git-wip-us.apache.org/repos/asf/flink/blob/d0324e34/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java index b4bae9c..c1cbb61 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java @@ -37,7 +37,6 @@ import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.base.StringSerializer; @@ -89,12 +88,10 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongArray; -import java.util.function.Supplier; import scala.concurrent.Await; import scala.concurrent.duration.Deadline; @@ -103,15 +100,14 @@ import scala.reflect.ClassTag$; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Base class for queryable state integration tests with a configurable state backend. */ public abstract class AbstractQueryableStateTestBase extends TestLogger { - private static final int NO_OF_RETRIES = 100; private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10000L, TimeUnit.SECONDS); - private static final Time QUERY_RETRY_DELAY = Time.milliseconds(100L); private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4); private final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService); @@ -229,14 +225,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { allNonZero = false; } - CompletableFuture<ReducingState<Tuple2<Integer, Long>>> result = getKvStateWithRetries( + CompletableFuture<ReducingState<Tuple2<Integer, Long>>> result = getKvState( + deadline, client, jobId, queryName, key, BasicTypeInfo.INT_TYPE_INFO, reducingState, - QUERY_RETRY_DELAY, false, executor); @@ -284,7 +280,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { * * <b>NOTE: </b> This test is only in the non-HA variant of the tests because * in the HA mode we use the actual JM code which does not recognize the - * {@code NotifyWhenJobStatus} message. * + * {@code NotifyWhenJobStatus} message. */ @Test public void testDuplicateRegistrationFailsJob() throws Exception { @@ -439,6 +435,92 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { } /** + * Tests that the correct exception is thrown if the query + * contains a wrong queryable state name. + */ + @Test + public void testWrongQueryableStateName() throws Exception { + // Config + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + final long numElements = 1024L; + + JobID jobId = null; + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); + + DataStream<Tuple2<Integer, Long>> source = env + .addSource(new TestAscendingValueSource(numElements)); + + // Value state + ValueStateDescriptor<Tuple2<Integer, Long>> valueState = + new ValueStateDescriptor<>("any", source.getType()); + + source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { + private static final long serialVersionUID = 7662520075515707428L; + + @Override + public Integer getKey(Tuple2<Integer, Long> value) throws Exception { + return value.f0; + } + }).asQueryableState("hakuna", valueState); + + // Submit the job graph + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + jobId = jobGraph.getJobID(); + + CompletableFuture<TestingJobManagerMessages.JobStatusIs> runningFuture = FutureUtils.toJava( + cluster.getLeaderGateway(deadline.timeLeft()) + .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class))); + + cluster.submitJobDetached(jobGraph); + + // expect for the job to be running + TestingJobManagerMessages.JobStatusIs jobStatus = + runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + assertEquals(JobStatus.RUNNING, jobStatus.state()); + + CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = client.getKvState( + jobId, + "wrong-hankuna", // this is the wrong name. + 0, + VoidNamespace.INSTANCE, + BasicTypeInfo.INT_TYPE_INFO, + VoidNamespaceTypeInfo.INSTANCE, + valueState); + + try { + future.get(); + fail(); // by now the job must have failed. + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof RuntimeException); + Assert.assertTrue(e.getCause().getMessage().contains( + "UnknownKvStateLocation: No KvStateLocation found for KvState instance with name 'wrong-hankuna'.")); + } catch (Exception ignored) { + fail("Unexpected type of exception."); + } + + } finally { + // Free cluster resources + if (jobId != null) { + CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster + .getLeaderGateway(deadline.timeLeft()) + .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class))); + + cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + } + } + } + + /** * Similar tests as {@link #testValueState()} but before submitting the * job, we already issue one request which fails. */ @@ -572,14 +654,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { // Now query int key = 0; - CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvStateWithRetries( + CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvState( + deadline, client, jobId, queryableState.getQueryableStateName(), key, BasicTypeInfo.INT_TYPE_INFO, valueState, - QUERY_RETRY_DELAY, true, executor); @@ -723,14 +805,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { for (int key = 0; key < maxParallelism; key++) { boolean success = false; while (deadline.hasTimeLeft() && !success) { - CompletableFuture<FoldingState<Tuple2<Integer, Long>, String>> future = getKvStateWithRetries( + CompletableFuture<FoldingState<Tuple2<Integer, Long>, String>> future = getKvState( + deadline, client, jobId, "pumba", key, BasicTypeInfo.INT_TYPE_INFO, foldingState, - QUERY_RETRY_DELAY, false, executor); @@ -814,14 +896,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { for (int key = 0; key < maxParallelism; key++) { boolean success = false; while (deadline.hasTimeLeft() && !success) { - CompletableFuture<ReducingState<Tuple2<Integer, Long>>> future = getKvStateWithRetries( + CompletableFuture<ReducingState<Tuple2<Integer, Long>>> future = getKvState( + deadline, client, jobId, "jungle", key, BasicTypeInfo.INT_TYPE_INFO, reducingState, - QUERY_RETRY_DELAY, false, executor); @@ -923,14 +1005,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { for (int key = 0; key < maxParallelism; key++) { boolean success = false; while (deadline.hasTimeLeft() && !success) { - CompletableFuture<MapState<Integer, Tuple2<Integer, Long>>> future = getKvStateWithRetries( + CompletableFuture<MapState<Integer, Tuple2<Integer, Long>>> future = getKvState( + deadline, client, jobId, "timon-queryable", key, BasicTypeInfo.INT_TYPE_INFO, mapStateDescriptor, - QUERY_RETRY_DELAY, false, executor); @@ -1028,14 +1110,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { for (int key = 0; key < maxParallelism; key++) { boolean success = false; while (deadline.hasTimeLeft() && !success) { - CompletableFuture<ListState<Long>> future = getKvStateWithRetries( + final CompletableFuture<ListState<Long>> future = getKvState( + deadline, client, jobId, "list-queryable", key, BasicTypeInfo.INT_TYPE_INFO, listStateDescriptor, - QUERY_RETRY_DELAY, false, executor); @@ -1130,14 +1212,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { for (int key = 0; key < maxParallelism; key++) { boolean success = false; while (deadline.hasTimeLeft() && !success) { - CompletableFuture<AggregatingState<Tuple2<Integer, Long>, String>> future = getKvStateWithRetries( + CompletableFuture<AggregatingState<Tuple2<Integer, Long>, String>> future = getKvState( + deadline, client, jobId, "aggr-queryable", key, BasicTypeInfo.INT_TYPE_INFO, aggrStateDescriptor, - QUERY_RETRY_DELAY, false, executor); @@ -1372,84 +1454,62 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { ///// General Utility Methods ////// - private static <K, S extends State, V> CompletableFuture<S> getKvStateWithRetries( + private static <K, S extends State, V> CompletableFuture<S> getKvState( + final Deadline deadline, final QueryableStateClient client, final JobID jobId, final String queryName, final K key, final TypeInformation<K> keyTypeInfo, final StateDescriptor<S, V> stateDescriptor, - final Time retryDelay, final boolean failForUnknownKeyOrNamespace, - final ScheduledExecutor executor) { - return retryWithDelay( - () -> client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor), - NO_OF_RETRIES, - retryDelay, - executor, - failForUnknownKeyOrNamespace); - } - - private static <T> CompletableFuture<T> retryWithDelay( - final Supplier<CompletableFuture<T>> operation, - final int retries, - final Time retryDelay, - final ScheduledExecutor scheduledExecutor, - final boolean failIfUnknownKeyOrNamespace) { - - final CompletableFuture<T> resultFuture = new CompletableFuture<>(); - - retryWithDelay( - resultFuture, - operation, - retries, - retryDelay, - scheduledExecutor, - failIfUnknownKeyOrNamespace); + final ScheduledExecutor executor) throws InterruptedException { + final CompletableFuture<S> resultFuture = new CompletableFuture<>(); + getKvStateIgnoringCertainExceptions( + deadline, resultFuture, client, jobId, queryName, key, keyTypeInfo, + stateDescriptor, failForUnknownKeyOrNamespace, executor); return resultFuture; } - public static <T> void retryWithDelay( - final CompletableFuture<T> resultFuture, - final Supplier<CompletableFuture<T>> operation, - final int retries, - final Time retryDelay, - final ScheduledExecutor scheduledExecutor, - final boolean failIfUnknownKeyOrNamespace) { + private static <K, S extends State, V> void getKvStateIgnoringCertainExceptions( + final Deadline deadline, + final CompletableFuture<S> resultFuture, + final QueryableStateClient client, + final JobID jobId, + final String queryName, + final K key, + final TypeInformation<K> keyTypeInfo, + final StateDescriptor<S, V> stateDescriptor, + final boolean failForUnknownKeyOrNamespace, + final ScheduledExecutor executor) throws InterruptedException { if (!resultFuture.isDone()) { - final CompletableFuture<T> operationResultFuture = operation.get(); - operationResultFuture.whenCompleteAsync( - (t, throwable) -> { - if (throwable != null) { - if (throwable.getCause() instanceof CancellationException) { - resultFuture.completeExceptionally(new FutureUtils.RetryException("Operation future was cancelled.", throwable.getCause())); - } else if (throwable.getCause() instanceof AssertionError || - (failIfUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException)) { - resultFuture.completeExceptionally(throwable.getCause()); - } else { - if (retries > 0) { - final ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule( - () -> retryWithDelay(resultFuture, operation, retries - 1, retryDelay, scheduledExecutor, failIfUnknownKeyOrNamespace), - retryDelay.toMilliseconds(), - TimeUnit.MILLISECONDS); - - resultFuture.whenComplete( - (innerT, innerThrowable) -> scheduledFuture.cancel(false)); - } else { - resultFuture.completeExceptionally(new FutureUtils.RetryException("Could not complete the operation. Number of retries " + - "has been exhausted.", throwable)); - } - } - } else { - resultFuture.complete(t); + Thread.sleep(100L); + CompletableFuture<S> expected = client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor); + expected.whenCompleteAsync((result, throwable) -> { + if (throwable != null) { + if ( + throwable.getCause() instanceof CancellationException || + throwable.getCause() instanceof AssertionError || + (failForUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException) + ) { + resultFuture.completeExceptionally(throwable.getCause()); + } else if (deadline.hasTimeLeft()) { + try { + getKvStateIgnoringCertainExceptions( + deadline, resultFuture, client, jobId, queryName, key, keyTypeInfo, + stateDescriptor, failForUnknownKeyOrNamespace, executor); + } catch (InterruptedException e) { + e.printStackTrace(); } - }, - scheduledExecutor); + } + } else { + resultFuture.complete(result); + } + }, executor); - resultFuture.whenComplete( - (t, throwable) -> operationResultFuture.cancel(false)); + resultFuture.whenComplete((result, throwable) -> expected.cancel(false)); } } @@ -1468,14 +1528,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { for (int key = 0; key < maxParallelism; key++) { boolean success = false; while (deadline.hasTimeLeft() && !success) { - CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvStateWithRetries( + CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvState( + deadline, client, jobId, queryableStateName, key, BasicTypeInfo.INT_TYPE_INFO, stateDescriptor, - QUERY_RETRY_DELAY, false, executor);