http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java deleted file mode 100644 index 4d27da2..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java +++ /dev/null @@ -1,1496 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.itcases; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.functions.AggregateFunction; -import org.apache.flink.api.common.functions.FoldFunction; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.common.state.AggregatingState; -import org.apache.flink.api.common.state.AggregatingStateDescriptor; -import org.apache.flink.api.common.state.FoldingState; -import org.apache.flink.api.common.state.FoldingStateDescriptor; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.MapState; -import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.api.common.state.ReducingState; -import org.apache.flink.api.common.state.ReducingStateDescriptor; -import org.apache.flink.api.common.state.State; -import org.apache.flink.api.common.state.StateDescriptor; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.queryablestate.UnknownKeyOrNamespaceException; -import org.apache.flink.queryablestate.client.QueryableStateClient; -import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.concurrent.ScheduledExecutor; -import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess; -import org.apache.flink.runtime.minicluster.FlinkMiniCluster; -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.runtime.state.VoidNamespace; -import org.apache.flink.runtime.state.VoidNamespaceSerializer; -import org.apache.flink.runtime.state.VoidNamespaceTypeInfo; -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.QueryableStateStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.util.Collector; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.TestLogger; - -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicLongArray; -import java.util.function.Supplier; - -import scala.concurrent.Await; -import scala.concurrent.duration.Deadline; -import scala.concurrent.duration.FiniteDuration; -import scala.reflect.ClassTag$; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * Base class for queryable state integration tests with a configurable state backend. - */ -public abstract class AbstractQueryableStateTestBase extends TestLogger { - - private static final int NO_OF_RETRIES = 100; - private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10000L, TimeUnit.SECONDS); - private static final Time QUERY_RETRY_DELAY = Time.milliseconds(100L); - - private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4); - private final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService); - - /** - * State backend to use. - */ - protected AbstractStateBackend stateBackend; - - /** - * Shared between all the test. Make sure to have at least NUM_SLOTS - * available after your test finishes, e.g. cancel the job you submitted. - */ - protected static FlinkMiniCluster cluster; - - /** - * Client shared between all the test. - */ - protected static QueryableStateClient client; - - protected static int maxParallelism; - - @Before - public void setUp() throws Exception { - // NOTE: do not use a shared instance for all tests as the tests may brake - this.stateBackend = createStateBackend(); - - Assert.assertNotNull(cluster); - - maxParallelism = cluster.configuration().getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) * - cluster.configuration().getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); - } - - /** - * Creates a state backend instance which is used in the {@link #setUp()} method before each - * test case. - * - * @return a state backend instance for each unit test - */ - protected abstract AbstractStateBackend createStateBackend() throws Exception; - - /** - * Runs a simple topology producing random (key, 1) pairs at the sources (where - * number of keys is in fixed in range 0...numKeys). The records are keyed and - * a reducing queryable state instance is created, which sums up the records. - * - * <p>After submitting the job in detached mode, the QueryableStateCLient is used - * to query the counts of each key in rounds until all keys have non-zero counts. - */ - @Test - @SuppressWarnings("unchecked") - public void testQueryableState() throws Exception { - // Config - final Deadline deadline = TEST_TIMEOUT.fromNow(); - final int numKeys = 256; - - JobID jobId = null; - - try { - // - // Test program - // - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - - DataStream<Tuple2<Integer, Long>> source = env - .addSource(new TestKeyRangeSource(numKeys)); - - // Reducing state - ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>( - "any-name", - new SumReduce(), - source.getType()); - - final String queryName = "hakuna-matata"; - - source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { - private static final long serialVersionUID = 7143749578983540352L; - - @Override - public Integer getKey(Tuple2<Integer, Long> value) throws Exception { - return value.f0; - } - }).asQueryableState(queryName, reducingState); - - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - cluster.submitJobDetached(jobGraph); - - // - // Start querying - // - jobId = jobGraph.getJobID(); - - final AtomicLongArray counts = new AtomicLongArray(numKeys); - - boolean allNonZero = false; - while (!allNonZero && deadline.hasTimeLeft()) { - allNonZero = true; - - final List<CompletableFuture<ReducingState<Tuple2<Integer, Long>>>> futures = new ArrayList<>(numKeys); - - for (int i = 0; i < numKeys; i++) { - final int key = i; - - if (counts.get(key) > 0L) { - // Skip this one - continue; - } else { - allNonZero = false; - } - - CompletableFuture<ReducingState<Tuple2<Integer, Long>>> result = getKvStateWithRetries( - client, - jobId, - queryName, - key, - BasicTypeInfo.INT_TYPE_INFO, - reducingState, - QUERY_RETRY_DELAY, - false, - executor); - - result.thenAccept(response -> { - try { - Tuple2<Integer, Long> res = response.get(); - counts.set(key, res.f1); - assertEquals("Key mismatch", key, res.f0.intValue()); - } catch (Exception e) { - Assert.fail(e.getMessage()); - } - }); - - futures.add(result); - } - - // wait for all the futures to complete - CompletableFuture - .allOf(futures.toArray(new CompletableFuture<?>[futures.size()])) - .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - } - - assertTrue("Not all keys are non-zero", allNonZero); - - // All should be non-zero - for (int i = 0; i < numKeys; i++) { - long count = counts.get(i); - assertTrue("Count at position " + i + " is " + count, count > 0); - } - } finally { - // Free cluster resources - if (jobId != null) { - CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class))); - - cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - } - } - } - - /** - * Tests that duplicate query registrations fail the job at the JobManager. - * - * <b>NOTE: </b> This test is only in the non-HA variant of the tests because - * in the HA mode we use the actual JM code which does not recognize the - * {@code NotifyWhenJobStatus} message. * - */ - @Test - public void testDuplicateRegistrationFailsJob() throws Exception { - final Deadline deadline = TEST_TIMEOUT.fromNow(); - final int numKeys = 256; - - JobID jobId = null; - - try { - // - // Test program - // - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - - DataStream<Tuple2<Integer, Long>> source = env - .addSource(new TestKeyRangeSource(numKeys)); - - // Reducing state - ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>( - "any-name", - new SumReduce(), - source.getType()); - - final String queryName = "duplicate-me"; - - final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState = - source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { - private static final long serialVersionUID = -4126824763829132959L; - - @Override - public Integer getKey(Tuple2<Integer, Long> value) throws Exception { - return value.f0; - } - }).asQueryableState(queryName, reducingState); - - final QueryableStateStream<Integer, Tuple2<Integer, Long>> duplicate = - source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { - private static final long serialVersionUID = -6265024000462809436L; - - @Override - public Integer getKey(Tuple2<Integer, Long> value) throws Exception { - return value.f0; - } - }).asQueryableState(queryName); - - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); - - CompletableFuture<TestingJobManagerMessages.JobStatusIs> failedFuture = FutureUtils.toJava( - cluster.getLeaderGateway(deadline.timeLeft()) - .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.FAILED), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class))); - - cluster.submitJobDetached(jobGraph); - - TestingJobManagerMessages.JobStatusIs jobStatus = - failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - assertEquals(JobStatus.FAILED, jobStatus.state()); - - // Get the job and check the cause - JobManagerMessages.JobFound jobFound = FutureUtils.toJava( - cluster.getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class))) - .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - - String failureCause = jobFound.executionGraph().getFailureCause().getExceptionAsString(); - - assertTrue("Not instance of SuppressRestartsException", failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException")); - int causedByIndex = failureCause.indexOf("Caused by: "); - String subFailureCause = failureCause.substring(causedByIndex + "Caused by: ".length()); - assertTrue("Not caused by IllegalStateException", subFailureCause.startsWith("java.lang.IllegalStateException")); - assertTrue("Exception does not contain registration name", subFailureCause.contains(queryName)); - } finally { - // Free cluster resources - if (jobId != null) { - scala.concurrent.Future<CancellationSuccess> cancellation = cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<JobManagerMessages.CancellationSuccess>apply(JobManagerMessages.CancellationSuccess.class)); - - Await.ready(cancellation, deadline.timeLeft()); - } - } - } - - /** - * Tests simple value state queryable state instance. Each source emits - * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then - * queried. The tests succeeds after each subtask index is queried with - * value numElements (the latest element updated the state). - */ - @Test - public void testValueState() throws Exception { - // Config - final Deadline deadline = TEST_TIMEOUT.fromNow(); - - final long numElements = 1024L; - - JobID jobId = null; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - - DataStream<Tuple2<Integer, Long>> source = env - .addSource(new TestAscendingValueSource(numElements)); - - // Value state - ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>( - "any", - source.getType()); - - source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { - private static final long serialVersionUID = 7662520075515707428L; - - @Override - public Integer getKey(Tuple2<Integer, Long> value) throws Exception { - return value.f0; - } - }).asQueryableState("hakuna", valueState); - - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); - - cluster.submitJobDetached(jobGraph); - - executeValueQuery(deadline, client, jobId, "hakuna", valueState, numElements); - } finally { - // Free cluster resources - if (jobId != null) { - CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class))); - - cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - } - } - } - - /** - * Similar tests as {@link #testValueState()} but before submitting the - * job, we already issue one request which fails. - */ - @Test - public void testQueryNonStartedJobState() throws Exception { - // Config - final Deadline deadline = TEST_TIMEOUT.fromNow(); - - final long numElements = 1024L; - - JobID jobId = null; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - - DataStream<Tuple2<Integer, Long>> source = env - .addSource(new TestAscendingValueSource(numElements)); - - // Value state - ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>( - "any", - source.getType(), - null); - - QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState = - source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { - private static final long serialVersionUID = 7480503339992214681L; - - @Override - public Integer getKey(Tuple2<Integer, Long> value) throws Exception { - return value.f0; - } - }).asQueryableState("hakuna", valueState); - - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); - - // Now query - long expected = numElements; - - // query once - client.getKvState( - jobId, - queryableState.getQueryableStateName(), - 0, - VoidNamespace.INSTANCE, - BasicTypeInfo.INT_TYPE_INFO, - VoidNamespaceTypeInfo.INSTANCE, - valueState); - - cluster.submitJobDetached(jobGraph); - - executeValueQuery(deadline, client, jobId, "hakuna", valueState, expected); - } finally { - // Free cluster resources - if (jobId != null) { - CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class))); - - cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - } - } - } - - /** - * Tests simple value state queryable state instance with a default value - * set. Each source emits (subtaskIndex, 0)..(subtaskIndex, numElements) - * tuples, the key is mapped to 1 but key 0 is queried which should throw - * a {@link UnknownKeyOrNamespaceException} exception. - * - * @throws UnknownKeyOrNamespaceException thrown due querying a non-existent key - */ - @Test(expected = UnknownKeyOrNamespaceException.class) - public void testValueStateDefault() throws Throwable { - - // Config - final Deadline deadline = TEST_TIMEOUT.fromNow(); - - final long numElements = 1024L; - - JobID jobId = null; - try { - StreamExecutionEnvironment env = - StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies - .fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - - DataStream<Tuple2<Integer, Long>> source = env - .addSource(new TestAscendingValueSource(numElements)); - - // Value state - ValueStateDescriptor<Tuple2<Integer, Long>> valueState = - new ValueStateDescriptor<>( - "any", - source.getType(), - Tuple2.of(0, 1337L)); - - // only expose key "1" - QueryableStateStream<Integer, Tuple2<Integer, Long>> - queryableState = - source.keyBy( - new KeySelector<Tuple2<Integer, Long>, Integer>() { - private static final long serialVersionUID = 4509274556892655887L; - - @Override - public Integer getKey( - Tuple2<Integer, Long> value) throws - Exception { - return 1; - } - }).asQueryableState("hakuna", valueState); - - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); - - cluster.submitJobDetached(jobGraph); - - // Now query - int key = 0; - CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvStateWithRetries( - client, - jobId, - queryableState.getQueryableStateName(), - key, - BasicTypeInfo.INT_TYPE_INFO, - valueState, - QUERY_RETRY_DELAY, - true, - executor); - - try { - future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - } catch (ExecutionException | CompletionException e) { - // get() on a completedExceptionally future wraps the - // exception in an ExecutionException. - throw e.getCause(); - } - } finally { - - // Free cluster resources - if (jobId != null) { - CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class))); - - cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - } - } - } - - /** - * Tests simple value state queryable state instance. Each source emits - * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then - * queried. The tests succeeds after each subtask index is queried with - * value numElements (the latest element updated the state). - * - * <p>This is the same as the simple value state test, but uses the API shortcut. - */ - @Test - public void testValueStateShortcut() throws Exception { - // Config - final Deadline deadline = TEST_TIMEOUT.fromNow(); - - final long numElements = 1024L; - - JobID jobId = null; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - - DataStream<Tuple2<Integer, Long>> source = env - .addSource(new TestAscendingValueSource(numElements)); - - // Value state shortcut - QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState = - source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { - private static final long serialVersionUID = 9168901838808830068L; - - @Override - public Integer getKey(Tuple2<Integer, Long> value) throws Exception { - return value.f0; - } - }).asQueryableState("matata"); - - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); - - cluster.submitJobDetached(jobGraph); - - final ValueStateDescriptor<Tuple2<Integer, Long>> stateDesc = - (ValueStateDescriptor<Tuple2<Integer, Long>>) queryableState.getStateDescriptor(); - executeValueQuery(deadline, client, jobId, "matata", stateDesc, numElements); - } finally { - - // Free cluster resources - if (jobId != null) { - CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava( - cluster.getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class))); - - cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - } - } - } - - /** - * Tests simple folding state queryable state instance. Each source emits - * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then - * queried. The folding state sums these up and maps them to Strings. The - * test succeeds after each subtask index is queried with result n*(n+1)/2 - * (as a String). - */ - @Test - public void testFoldingState() throws Exception { - // Config - final Deadline deadline = TEST_TIMEOUT.fromNow(); - - final int numElements = 1024; - - JobID jobId = null; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - - DataStream<Tuple2<Integer, Long>> source = env - .addSource(new TestAscendingValueSource(numElements)); - - // Folding state - FoldingStateDescriptor<Tuple2<Integer, Long>, String> foldingState = - new FoldingStateDescriptor<>( - "any", - "0", - new SumFold(), - StringSerializer.INSTANCE); - - QueryableStateStream<Integer, String> queryableState = - source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { - private static final long serialVersionUID = -842809958106747539L; - - @Override - public Integer getKey(Tuple2<Integer, Long> value) throws Exception { - return value.f0; - } - }).asQueryableState("pumba", foldingState); - - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); - - cluster.submitJobDetached(jobGraph); - - // Now query - String expected = Integer.toString(numElements * (numElements + 1) / 2); - - for (int key = 0; key < maxParallelism; key++) { - boolean success = false; - while (deadline.hasTimeLeft() && !success) { - CompletableFuture<FoldingState<Tuple2<Integer, Long>, String>> future = getKvStateWithRetries( - client, - jobId, - "pumba", - key, - BasicTypeInfo.INT_TYPE_INFO, - foldingState, - QUERY_RETRY_DELAY, - false, - executor); - - String value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get(); - - //assertEquals("Key mismatch", key, value.f0.intValue()); - if (expected.equals(value)) { - success = true; - } else { - // Retry - Thread.sleep(50L); - } - } - - assertTrue("Did not succeed query", success); - } - } finally { - // Free cluster resources - if (jobId != null) { - CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class))); - - cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - } - } - } - - /** - * Tests simple reducing state queryable state instance. Each source emits - * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then - * queried. The reducing state instance sums these up. The test succeeds - * after each subtask index is queried with result n*(n+1)/2. - */ - @Test - public void testReducingState() throws Exception { - // Config - final Deadline deadline = TEST_TIMEOUT.fromNow(); - - final long numElements = 1024L; - - JobID jobId = null; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - - DataStream<Tuple2<Integer, Long>> source = env - .addSource(new TestAscendingValueSource(numElements)); - - // Reducing state - ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = - new ReducingStateDescriptor<>( - "any", - new SumReduce(), - source.getType()); - - source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { - private static final long serialVersionUID = 8470749712274833552L; - - @Override - public Integer getKey(Tuple2<Integer, Long> value) throws Exception { - return value.f0; - } - }).asQueryableState("jungle", reducingState); - - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); - - cluster.submitJobDetached(jobGraph); - - // Now query - long expected = numElements * (numElements + 1L) / 2L; - - for (int key = 0; key < maxParallelism; key++) { - boolean success = false; - while (deadline.hasTimeLeft() && !success) { - CompletableFuture<ReducingState<Tuple2<Integer, Long>>> future = getKvStateWithRetries( - client, - jobId, - "jungle", - key, - BasicTypeInfo.INT_TYPE_INFO, - reducingState, - QUERY_RETRY_DELAY, - false, - executor); - - Tuple2<Integer, Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get(); - - assertEquals("Key mismatch", key, value.f0.intValue()); - if (expected == value.f1) { - success = true; - } else { - // Retry - Thread.sleep(50L); - } - } - - assertTrue("Did not succeed query", success); - } - } finally { - // Free cluster resources - if (jobId != null) { - CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class))); - - cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - } - } - } - - /** - * Tests simple map state queryable state instance. Each source emits - * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then - * queried. The map state instance sums the values up. The test succeeds - * after each subtask index is queried with result n*(n+1)/2. - */ - @Test - public void testMapState() throws Exception { - // Config - final Deadline deadline = TEST_TIMEOUT.fromNow(); - - final long numElements = 1024L; - - JobID jobId = null; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - - DataStream<Tuple2<Integer, Long>> source = env - .addSource(new TestAscendingValueSource(numElements)); - - final MapStateDescriptor<Integer, Tuple2<Integer, Long>> mapStateDescriptor = new MapStateDescriptor<>( - "timon", - BasicTypeInfo.INT_TYPE_INFO, - source.getType()); - mapStateDescriptor.setQueryable("timon-queryable"); - - source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { - private static final long serialVersionUID = 8470749712274833552L; - - @Override - public Integer getKey(Tuple2<Integer, Long> value) throws Exception { - return value.f0; - } - }).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() { - private static final long serialVersionUID = -805125545438296619L; - - private transient MapState<Integer, Tuple2<Integer, Long>> mapState; - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - mapState = getRuntimeContext().getMapState(mapStateDescriptor); - } - - @Override - public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception { - Tuple2<Integer, Long> v = mapState.get(value.f0); - if (v == null) { - v = new Tuple2<>(value.f0, 0L); - } - mapState.put(value.f0, new Tuple2<>(v.f0, v.f1 + value.f1)); - } - }); - - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); - - cluster.submitJobDetached(jobGraph); - - // Now query - long expected = numElements * (numElements + 1L) / 2L; - - for (int key = 0; key < maxParallelism; key++) { - boolean success = false; - while (deadline.hasTimeLeft() && !success) { - CompletableFuture<MapState<Integer, Tuple2<Integer, Long>>> future = getKvStateWithRetries( - client, - jobId, - "timon-queryable", - key, - BasicTypeInfo.INT_TYPE_INFO, - mapStateDescriptor, - QUERY_RETRY_DELAY, - false, - executor); - - Tuple2<Integer, Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get(key); - assertEquals("Key mismatch", key, value.f0.intValue()); - if (expected == value.f1) { - success = true; - } else { - // Retry - Thread.sleep(50L); - } - } - - assertTrue("Did not succeed query", success); - } - } finally { - // Free cluster resources - if (jobId != null) { - CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class))); - - cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - } - } - } - - /** - * Tests simple list state queryable state instance. Each source emits - * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then - * queried. The list state instance add the values to the list. The test - * succeeds after each subtask index is queried and the list contains - * the correct number of distinct elements. - */ - @Test - public void testListState() throws Exception { - // Config - final Deadline deadline = TEST_TIMEOUT.fromNow(); - - final long numElements = 1024L; - - JobID jobId = null; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - - DataStream<Tuple2<Integer, Long>> source = env - .addSource(new TestAscendingValueSource(numElements)); - - final ListStateDescriptor<Long> listStateDescriptor = new ListStateDescriptor<Long>( - "list", - BasicTypeInfo.LONG_TYPE_INFO); - listStateDescriptor.setQueryable("list-queryable"); - - source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { - private static final long serialVersionUID = 8470749712274833552L; - - @Override - public Integer getKey(Tuple2<Integer, Long> value) throws Exception { - return value.f0; - } - }).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() { - private static final long serialVersionUID = -805125545438296619L; - - private transient ListState<Long> listState; - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - listState = getRuntimeContext().getListState(listStateDescriptor); - } - - @Override - public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception { - listState.add(value.f1); - } - }); - - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); - - cluster.submitJobDetached(jobGraph); - - // Now query - - Map<Integer, Set<Long>> results = new HashMap<>(); - - for (int key = 0; key < maxParallelism; key++) { - boolean success = false; - while (deadline.hasTimeLeft() && !success) { - CompletableFuture<ListState<Long>> future = getKvStateWithRetries( - client, - jobId, - "list-queryable", - key, - BasicTypeInfo.INT_TYPE_INFO, - listStateDescriptor, - QUERY_RETRY_DELAY, - false, - executor); - - Iterable<Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get(); - - Set<Long> res = new HashSet<>(); - for (Long v: value) { - res.add(v); - } - - // the source starts at 0, so +1 - if (res.size() == numElements + 1L) { - success = true; - results.put(key, res); - } else { - // Retry - Thread.sleep(50L); - } - } - - assertTrue("Did not succeed query", success); - } - - for (int key = 0; key < maxParallelism; key++) { - Set<Long> values = results.get(key); - for (long i = 0L; i <= numElements; i++) { - assertTrue(values.contains(i)); - } - } - - } finally { - // Free cluster resources - if (jobId != null) { - CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class))); - - cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - } - } - } - - @Test - public void testAggregatingState() throws Exception { - // Config - final Deadline deadline = TEST_TIMEOUT.fromNow(); - - final long numElements = 1024L; - - JobID jobId = null; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); - - DataStream<Tuple2<Integer, Long>> source = env - .addSource(new TestAscendingValueSource(numElements)); - - final AggregatingStateDescriptor<Tuple2<Integer, Long>, String, String> aggrStateDescriptor = - new AggregatingStateDescriptor<>( - "aggregates", - new SumAggr(), - String.class); - aggrStateDescriptor.setQueryable("aggr-queryable"); - - source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { - private static final long serialVersionUID = 8470749712274833552L; - - @Override - public Integer getKey(Tuple2<Integer, Long> value) throws Exception { - return value.f0; - } - }).transform( - "TestAggregatingOperator", - BasicTypeInfo.STRING_TYPE_INFO, - new AggregatingTestOperator(aggrStateDescriptor) - ); - - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); - - cluster.submitJobDetached(jobGraph); - - // Now query - - for (int key = 0; key < maxParallelism; key++) { - boolean success = false; - while (deadline.hasTimeLeft() && !success) { - CompletableFuture<AggregatingState<Tuple2<Integer, Long>, String>> future = getKvStateWithRetries( - client, - jobId, - "aggr-queryable", - key, - BasicTypeInfo.INT_TYPE_INFO, - aggrStateDescriptor, - QUERY_RETRY_DELAY, - false, - executor); - - String value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get(); - - if (Long.parseLong(value) == numElements * (numElements + 1L) / 2L) { - success = true; - } else { - // Retry - Thread.sleep(50L); - } - } - - assertTrue("Did not succeed query", success); - } - } finally { - // Free cluster resources - if (jobId != null) { - CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class))); - - cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - } - } - } - - ///// Sources/UDFs Used in the Tests ////// - - /** - * Test source producing (key, 0)..(key, maxValue) with key being the sub - * task index. - * - * <p>After all tuples have been emitted, the source waits to be cancelled - * and does not immediately finish. - */ - private static class TestAscendingValueSource extends RichParallelSourceFunction<Tuple2<Integer, Long>> { - - private static final long serialVersionUID = 1459935229498173245L; - - private final long maxValue; - private volatile boolean isRunning = true; - - TestAscendingValueSource(long maxValue) { - Preconditions.checkArgument(maxValue >= 0); - this.maxValue = maxValue; - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - } - - @Override - public void run(SourceContext<Tuple2<Integer, Long>> ctx) throws Exception { - // f0 => key - int key = getRuntimeContext().getIndexOfThisSubtask(); - Tuple2<Integer, Long> record = new Tuple2<>(key, 0L); - - long currentValue = 0; - while (isRunning && currentValue <= maxValue) { - synchronized (ctx.getCheckpointLock()) { - record.f1 = currentValue; - ctx.collect(record); - } - - currentValue++; - } - - while (isRunning) { - synchronized (this) { - wait(); - } - } - } - - @Override - public void cancel() { - isRunning = false; - - synchronized (this) { - notifyAll(); - } - } - - } - - /** - * Test source producing (key, 1) tuples with random key in key range (numKeys). - */ - private static class TestKeyRangeSource extends RichParallelSourceFunction<Tuple2<Integer, Long>> implements CheckpointListener { - - private static final long serialVersionUID = -5744725196953582710L; - - private static final AtomicLong LATEST_CHECKPOINT_ID = new AtomicLong(); - private final int numKeys; - private final ThreadLocalRandom random = ThreadLocalRandom.current(); - private volatile boolean isRunning = true; - - TestKeyRangeSource(int numKeys) { - this.numKeys = numKeys; - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - if (getRuntimeContext().getIndexOfThisSubtask() == 0) { - LATEST_CHECKPOINT_ID.set(0L); - } - } - - @Override - public void run(SourceContext<Tuple2<Integer, Long>> ctx) throws Exception { - // f0 => key - Tuple2<Integer, Long> record = new Tuple2<>(0, 1L); - - while (isRunning) { - synchronized (ctx.getCheckpointLock()) { - record.f0 = random.nextInt(numKeys); - ctx.collect(record); - } - // mild slow down - Thread.sleep(1L); - } - } - - @Override - public void cancel() { - isRunning = false; - } - - @Override - public void notifyCheckpointComplete(long checkpointId) throws Exception { - if (getRuntimeContext().getIndexOfThisSubtask() == 0) { - LATEST_CHECKPOINT_ID.set(checkpointId); - } - } - } - - /** - * An operator that uses {@link AggregatingState}. - * - * <p>The operator exists for lack of possibility to get an - * {@link AggregatingState} from the {@link org.apache.flink.api.common.functions.RuntimeContext}. - * If this were not the case, we could have a {@link ProcessFunction}. - */ - private static class AggregatingTestOperator - extends AbstractStreamOperator<String> - implements OneInputStreamOperator<Tuple2<Integer, Long>, String> { - - private static final long serialVersionUID = 1L; - - private final AggregatingStateDescriptor<Tuple2<Integer, Long>, String, String> stateDescriptor; - private transient AggregatingState<Tuple2<Integer, Long>, String> state; - - AggregatingTestOperator(AggregatingStateDescriptor<Tuple2<Integer, Long>, String, String> stateDesc) { - this.stateDescriptor = stateDesc; - } - - @Override - public void open() throws Exception { - super.open(); - this.state = getKeyedStateBackend().getPartitionedState( - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE, - stateDescriptor); - } - - @Override - public void processElement(StreamRecord<Tuple2<Integer, Long>> element) throws Exception { - state.add(element.getValue()); - } - } - - /** - * Test {@link AggregateFunction} concatenating the already stored string with the long passed as argument. - */ - private static class SumAggr implements AggregateFunction<Tuple2<Integer, Long>, String, String> { - - private static final long serialVersionUID = -6249227626701264599L; - - @Override - public String createAccumulator() { - return "0"; - } - - @Override - public String add(Tuple2<Integer, Long> value, String accumulator) { - long acc = Long.valueOf(accumulator); - acc += value.f1; - return Long.toString(acc); - } - - @Override - public String getResult(String accumulator) { - return accumulator; - } - - @Override - public String merge(String a, String b) { - return Long.toString(Long.valueOf(a) + Long.valueOf(b)); - } - } - - /** - * Test {@link FoldFunction} concatenating the already stored string with the long passed as argument. - */ - private static class SumFold implements FoldFunction<Tuple2<Integer, Long>, String> { - private static final long serialVersionUID = -6249227626701264599L; - - @Override - public String fold(String accumulator, Tuple2<Integer, Long> value) throws Exception { - long acc = Long.valueOf(accumulator); - acc += value.f1; - return Long.toString(acc); - } - } - - /** - * Test {@link ReduceFunction} summing up its two arguments. - */ - protected static class SumReduce implements ReduceFunction<Tuple2<Integer, Long>> { - private static final long serialVersionUID = -8651235077342052336L; - - @Override - public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception { - value1.f1 += value2.f1; - return value1; - } - } - - ///// General Utility Methods ////// - - private static <K, S extends State, V> CompletableFuture<S> getKvStateWithRetries( - final QueryableStateClient client, - final JobID jobId, - final String queryName, - final K key, - final TypeInformation<K> keyTypeInfo, - final StateDescriptor<S, V> stateDescriptor, - final Time retryDelay, - final boolean failForUnknownKeyOrNamespace, - final ScheduledExecutor executor) { - return retryWithDelay( - () -> client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor), - NO_OF_RETRIES, - retryDelay, - executor, - failForUnknownKeyOrNamespace); - } - - private static <T> CompletableFuture<T> retryWithDelay( - final Supplier<CompletableFuture<T>> operation, - final int retries, - final Time retryDelay, - final ScheduledExecutor scheduledExecutor, - final boolean failIfUnknownKeyOrNamespace) { - - final CompletableFuture<T> resultFuture = new CompletableFuture<>(); - - retryWithDelay( - resultFuture, - operation, - retries, - retryDelay, - scheduledExecutor, - failIfUnknownKeyOrNamespace); - - return resultFuture; - } - - public static <T> void retryWithDelay( - final CompletableFuture<T> resultFuture, - final Supplier<CompletableFuture<T>> operation, - final int retries, - final Time retryDelay, - final ScheduledExecutor scheduledExecutor, - final boolean failIfUnknownKeyOrNamespace) { - - if (!resultFuture.isDone()) { - final CompletableFuture<T> operationResultFuture = operation.get(); - operationResultFuture.whenCompleteAsync( - (t, throwable) -> { - if (throwable != null) { - if (throwable.getCause() instanceof CancellationException) { - resultFuture.completeExceptionally(new FutureUtils.RetryException("Operation future was cancelled.", throwable.getCause())); - } else if (throwable.getCause() instanceof AssertionError || - (failIfUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException)) { - resultFuture.completeExceptionally(throwable.getCause()); - } else { - if (retries > 0) { - final ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule( - () -> retryWithDelay(resultFuture, operation, retries - 1, retryDelay, scheduledExecutor, failIfUnknownKeyOrNamespace), - retryDelay.toMilliseconds(), - TimeUnit.MILLISECONDS); - - resultFuture.whenComplete( - (innerT, innerThrowable) -> scheduledFuture.cancel(false)); - } else { - resultFuture.completeExceptionally(new FutureUtils.RetryException("Could not complete the operation. Number of retries " + - "has been exhausted.", throwable)); - } - } - } else { - resultFuture.complete(t); - } - }, - scheduledExecutor); - - resultFuture.whenComplete( - (t, throwable) -> operationResultFuture.cancel(false)); - } - } - - /** - * Retry a query for state for keys between 0 and {@link #maxParallelism} until - * <tt>expected</tt> equals the value of the result tuple's second field. - */ - private void executeValueQuery( - final Deadline deadline, - final QueryableStateClient client, - final JobID jobId, - final String queryableStateName, - final ValueStateDescriptor<Tuple2<Integer, Long>> stateDescriptor, - final long expected) throws Exception { - - for (int key = 0; key < maxParallelism; key++) { - boolean success = false; - while (deadline.hasTimeLeft() && !success) { - CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvStateWithRetries( - client, - jobId, - queryableStateName, - key, - BasicTypeInfo.INT_TYPE_INFO, - stateDescriptor, - QUERY_RETRY_DELAY, - false, - executor); - - Tuple2<Integer, Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).value(); - - assertEquals("Key mismatch", key, value.f0.intValue()); - if (expected == value.f1) { - success = true; - } else { - // Retry - Thread.sleep(50L); - } - } - - assertTrue("Did not succeed query", success); - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java deleted file mode 100644 index ab75cf4..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.itcases; - -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.HighAvailabilityOptions; -import org.apache.flink.configuration.QueryableStateOptions; -import org.apache.flink.queryablestate.client.QueryableStateClient; -import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; -import org.apache.flink.runtime.testingUtils.TestingCluster; - -import org.apache.curator.test.TestingServer; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.rules.TemporaryFolder; - -import static org.junit.Assert.fail; - -/** - * Base class with the cluster configuration for the tests on the NON-HA mode. - */ -public abstract class HAAbstractQueryableStateTestBase extends AbstractQueryableStateTestBase { - - private static final int NUM_JMS = 2; - private static final int NUM_TMS = 2; - private static final int NUM_SLOTS_PER_TM = 4; - - private static TestingServer zkServer; - private static TemporaryFolder temporaryFolder; - - public static void setup(int proxyPortRangeStart, int serverPortRangeStart) { - try { - zkServer = new TestingServer(); - temporaryFolder = new TemporaryFolder(); - temporaryFolder.create(); - - Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); - config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true); - config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2); - config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2); - config.setString(QueryableStateOptions.PROXY_PORT_RANGE, proxyPortRangeStart + "-" + (proxyPortRangeStart + NUM_TMS)); - config.setString(QueryableStateOptions.SERVER_PORT_RANGE, serverPortRangeStart + "-" + (serverPortRangeStart + NUM_TMS)); - config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString()); - config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString()); - config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); - - cluster = new TestingCluster(config, false); - cluster.start(); - - client = new QueryableStateClient("localhost", proxyPortRangeStart); - - // verify that we are in HA mode - Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.ZOOKEEPER); - - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @AfterClass - public static void tearDown() { - if (cluster != null) { - cluster.stop(); - cluster.awaitTermination(); - } - - try { - zkServer.stop(); - zkServer.close(); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - - client.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java deleted file mode 100644 index 6f31e76..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.itcases; - -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; - -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.rules.TemporaryFolder; - -/** - * Several integration tests for queryable state using the {@link FsStateBackend}. - */ -public class HAQueryableStateFsBackendITCase extends HAAbstractQueryableStateTestBase { - - @Rule - public TemporaryFolder temporaryFolder = new TemporaryFolder(); - - @BeforeClass - public static void setup() { - setup(9064, 9069); - } - - @Override - protected AbstractStateBackend createStateBackend() throws Exception { - return new FsStateBackend(temporaryFolder.newFolder().toURI().toString()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java deleted file mode 100644 index 18b167f..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.itcases; - -import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; -import org.apache.flink.runtime.state.AbstractStateBackend; - -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Rule; -import org.junit.rules.TemporaryFolder; - -/** - * Several integration tests for queryable state using the {@link RocksDBStateBackend}. - */ -@Ignore -public class HAQueryableStateRocksDBBackendITCase extends HAAbstractQueryableStateTestBase { - - @Rule - public TemporaryFolder temporaryFolder = new TemporaryFolder(); - - @BeforeClass - public static void setup() { - setup(9074, 9079); - } - - @Override - protected AbstractStateBackend createStateBackend() throws Exception { - return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java deleted file mode 100644 index 907e8a3..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.itcases; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.contrib.streaming.state.PredefinedOptions; -import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; -import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializerTest; -import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.VoidNamespace; -import org.apache.flink.runtime.state.VoidNamespaceSerializer; -import org.apache.flink.runtime.state.internal.InternalListState; -import org.apache.flink.runtime.state.internal.InternalMapState; - -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.rocksdb.ColumnFamilyOptions; -import org.rocksdb.DBOptions; - -import java.io.File; - -import static org.mockito.Mockito.mock; - -/** - * Additional tests for the serialization and deserialization using - * the KvStateSerializer with a RocksDB state back-end. - */ -public final class KVStateRequestSerializerRocksDBTest { - - @Rule - public TemporaryFolder temporaryFolder = new TemporaryFolder(); - - /** - * Extension of {@link RocksDBKeyedStateBackend} to make {@link - * #createListState(TypeSerializer, ListStateDescriptor)} public for use in - * the tests. - * - * @param <K> key type - */ - static final class RocksDBKeyedStateBackend2<K> extends RocksDBKeyedStateBackend<K> { - - RocksDBKeyedStateBackend2( - final String operatorIdentifier, - final ClassLoader userCodeClassLoader, - final File instanceBasePath, - final DBOptions dbOptions, - final ColumnFamilyOptions columnFamilyOptions, - final TaskKvStateRegistry kvStateRegistry, - final TypeSerializer<K> keySerializer, - final int numberOfKeyGroups, - final KeyGroupRange keyGroupRange, - final ExecutionConfig executionConfig) throws Exception { - - super(operatorIdentifier, userCodeClassLoader, - instanceBasePath, - dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer, - numberOfKeyGroups, keyGroupRange, executionConfig, false); - } - - @Override - public <N, T> InternalListState<N, T> createListState( - final TypeSerializer<N> namespaceSerializer, - final ListStateDescriptor<T> stateDesc) throws Exception { - - return super.createListState(namespaceSerializer, stateDesc); - } - } - - /** - * Tests list serialization and deserialization match. - * - * @see KvStateRequestSerializerTest#testListSerialization() - * KvStateRequestSerializerTest#testListSerialization() using the heap state back-end - * test - */ - @Test - public void testListSerialization() throws Exception { - final long key = 0L; - - // objects for RocksDB state list serialisation - DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions(); - dbOptions.setCreateIfMissing(true); - ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions(); - final RocksDBKeyedStateBackend2<Long> longHeapKeyedStateBackend = - new RocksDBKeyedStateBackend2<>( - "no-op", - ClassLoader.getSystemClassLoader(), - temporaryFolder.getRoot(), - dbOptions, - columnFamilyOptions, - mock(TaskKvStateRegistry.class), - LongSerializer.INSTANCE, - 1, new KeyGroupRange(0, 0), - new ExecutionConfig() - ); - longHeapKeyedStateBackend.restore(null); - longHeapKeyedStateBackend.setCurrentKey(key); - - final InternalListState<VoidNamespace, Long> listState = longHeapKeyedStateBackend - .createListState(VoidNamespaceSerializer.INSTANCE, - new ListStateDescriptor<>("test", LongSerializer.INSTANCE)); - - KvStateRequestSerializerTest.testListSerialization(key, listState); - } - - /** - * Tests map serialization and deserialization match. - * - * @see KvStateRequestSerializerTest#testMapSerialization() - * KvStateRequestSerializerTest#testMapSerialization() using the heap state back-end - * test - */ - @Test - public void testMapSerialization() throws Exception { - final long key = 0L; - - // objects for RocksDB state list serialisation - DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions(); - dbOptions.setCreateIfMissing(true); - ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions(); - final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend = - new RocksDBKeyedStateBackend<>( - "no-op", - ClassLoader.getSystemClassLoader(), - temporaryFolder.getRoot(), - dbOptions, - columnFamilyOptions, - mock(TaskKvStateRegistry.class), - LongSerializer.INSTANCE, - 1, new KeyGroupRange(0, 0), - new ExecutionConfig(), - false); - longHeapKeyedStateBackend.restore(null); - longHeapKeyedStateBackend.setCurrentKey(key); - - final InternalMapState<VoidNamespace, Long, String> mapState = (InternalMapState<VoidNamespace, Long, String>) - longHeapKeyedStateBackend.getPartitionedState( - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE, - new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE)); - - KvStateRequestSerializerTest.testMapSerialization(key, mapState); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java deleted file mode 100644 index 2937a51..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.itcases; - -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.QueryableStateOptions; -import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.queryablestate.client.QueryableStateClient; -import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; -import org.apache.flink.runtime.testingUtils.TestingCluster; - -import org.junit.AfterClass; -import org.junit.Assert; - -import static org.junit.Assert.fail; - -/** - * Base class with the cluster configuration for the tests on the HA mode. - */ -public abstract class NonHAAbstractQueryableStateTestBase extends AbstractQueryableStateTestBase { - - private static final int NUM_TMS = 2; - private static final int NUM_SLOTS_PER_TM = 4; - - public static void setup(int proxyPortRangeStart, int serverPortRangeStart) { - try { - Configuration config = new Configuration(); - config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); - config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1); - config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true); - config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1); - config.setString(QueryableStateOptions.PROXY_PORT_RANGE, proxyPortRangeStart + "-" + (proxyPortRangeStart + NUM_TMS)); - config.setString(QueryableStateOptions.SERVER_PORT_RANGE, serverPortRangeStart + "-" + (serverPortRangeStart + NUM_TMS)); - - cluster = new TestingCluster(config, false); - cluster.start(true); - - client = new QueryableStateClient("localhost", proxyPortRangeStart); - - // verify that we are not in HA mode - Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.NONE); - - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @AfterClass - public static void tearDown() { - try { - cluster.shutdown(); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - client.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java deleted file mode 100644 index 9457e0f..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.itcases; - -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; - -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.rules.TemporaryFolder; - -/** - * Several integration tests for queryable state using the {@link FsStateBackend}. - */ -public class NonHAQueryableStateFsBackendITCase extends NonHAAbstractQueryableStateTestBase { - - @Rule - public TemporaryFolder temporaryFolder = new TemporaryFolder(); - - @BeforeClass - public static void setup() { - setup(9084, 9089); - } - - @Override - protected AbstractStateBackend createStateBackend() throws Exception { - return new FsStateBackend(temporaryFolder.newFolder().toURI().toString()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java deleted file mode 100644 index 39fbe9e..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.itcases; - -import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; -import org.apache.flink.runtime.state.AbstractStateBackend; - -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Rule; -import org.junit.rules.TemporaryFolder; - -/** - * Several integration tests for queryable state using the {@link RocksDBStateBackend}. - */ -@Ignore -public class NonHAQueryableStateRocksDBBackendITCase extends NonHAAbstractQueryableStateTestBase { - - @Rule - public TemporaryFolder temporaryFolder = new TemporaryFolder(); - - @BeforeClass - public static void setup() { - setup(9094, 9099); - } - - @Override - protected AbstractStateBackend createStateBackend() throws Exception { - return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java deleted file mode 100644 index 1fd7012..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.network; - -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.queryablestate.network.messages.MessageBody; -import org.apache.flink.queryablestate.network.messages.MessageDeserializer; -import org.apache.flink.queryablestate.network.messages.MessageSerializer; -import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; -import org.apache.flink.util.FlinkRuntimeException; -import org.apache.flink.util.Preconditions; - -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; - -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.CompletableFuture; - -/** - * Tests general behavior of the {@link AbstractServerBase}. - */ -public class AbstractServerTest { - - @Rule - public ExpectedException expectedEx = ExpectedException.none(); - - /** - * Tests that in case of port collision, a FlinkRuntimeException is thrown - * with a specific message. - */ - @Test - public void testServerInitializationFailure() throws Throwable { - - // the expected exception along with the adequate message - expectedEx.expect(FlinkRuntimeException.class); - expectedEx.expectMessage("Unable to start server Test Server 2. All ports in provided range are occupied."); - - TestServer server1 = null; - TestServer server2 = null; - try { - - server1 = startServer("Test Server 1", 7777); - Assert.assertEquals(7777L, server1.getServerAddress().getPort()); - - server2 = startServer("Test Server 2", 7777); - } finally { - - if (server1 != null) { - server1.shutdown(); - } - - if (server2 != null) { - server2.shutdown(); - } - } - } - - /** - * Tests that in case of port collision and big enough port range, - * the server will try to bind to the next port in the range. - */ - @Test - public void testPortRangeSuccess() throws Throwable { - TestServer server1 = null; - TestServer server2 = null; - Client<TestMessage, TestMessage> client = null; - - try { - server1 = startServer("Test Server 1", 7777, 7778, 7779); - Assert.assertEquals(7777L, server1.getServerAddress().getPort()); - - server2 = startServer("Test Server 2", 7777, 7778, 7779); - Assert.assertEquals(7778L, server2.getServerAddress().getPort()); - - client = new Client<>( - "Test Client", - 1, - new MessageSerializer<>(new TestMessage.TestMessageDeserializer(), new TestMessage.TestMessageDeserializer()), - new DisabledKvStateRequestStats()); - - TestMessage response1 = client.sendRequest(server1.getServerAddress(), new TestMessage("ping")).join(); - Assert.assertEquals(server1.getServerName() + "-ping", response1.getMessage()); - - TestMessage response2 = client.sendRequest(server2.getServerAddress(), new TestMessage("pong")).join(); - Assert.assertEquals(server2.getServerName() + "-pong", response2.getMessage()); - } finally { - - if (server1 != null) { - server1.shutdown(); - } - - if (server2 != null) { - server2.shutdown(); - } - - if (client != null) { - client.shutdown(); - } - } - } - - /** - * Initializes a {@link TestServer} with the given port range. - * @param serverName the name of the server. - * @param ports a range of ports. - * @return A test server with the given name. - */ - private TestServer startServer(String serverName, int... ports) throws Throwable { - List<Integer> portList = new ArrayList<>(ports.length); - for (int p : ports) { - portList.add(p); - } - - final TestServer server = new TestServer(serverName, portList.iterator()); - server.start(); - return server; - } - - /** - * A server that receives a {@link TestMessage test message} and returns another test - * message containing the same string as the request with the name of the server prepended. - */ - private class TestServer extends AbstractServerBase<TestMessage, TestMessage> { - - protected TestServer(String name, Iterator<Integer> bindPort) throws UnknownHostException { - super(name, InetAddress.getLocalHost(), bindPort, 1, 1); - } - - @Override - public AbstractServerHandler<TestMessage, TestMessage> initializeHandler() { - return new AbstractServerHandler<TestMessage, TestMessage>( - this, - new MessageSerializer<>(new TestMessage.TestMessageDeserializer(), new TestMessage.TestMessageDeserializer()), - new DisabledKvStateRequestStats()) { - - @Override - public CompletableFuture<TestMessage> handleRequest(long requestId, TestMessage request) { - TestMessage response = new TestMessage(getServerName() + '-' + request.getMessage()); - return CompletableFuture.completedFuture(response); - } - - @Override - public void shutdown() { - // do nothing - } - }; - } - } - - /** - * Message with a string as payload. - */ - private static class TestMessage extends MessageBody { - - private final String message; - - TestMessage(String message) { - this.message = Preconditions.checkNotNull(message); - } - - public String getMessage() { - return message; - } - - @Override - public byte[] serialize() { - byte[] content = message.getBytes(ConfigConstants.DEFAULT_CHARSET); - - // message size + 4 for the length itself - return ByteBuffer.allocate(content.length + Integer.BYTES) - .putInt(content.length) - .put(content) - .array(); - } - - /** - * The deserializer for our {@link TestMessage test messages}. - */ - public static class TestMessageDeserializer implements MessageDeserializer<TestMessage> { - - @Override - public TestMessage deserializeMessage(ByteBuf buf) { - int length = buf.readInt(); - String message = ""; - if (length > 0) { - byte[] name = new byte[length]; - buf.readBytes(name); - message = new String(name, ConfigConstants.DEFAULT_CHARSET); - } - return new TestMessage(message); - } - } - } -}
