http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java index 837b643..cd40c82 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java @@ -26,10 +26,10 @@ import org.apache.flink.runtime.io.network.api.reader.RecordReader; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; -import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; @@ -38,6 +38,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.CurrentJobStatus; import org.apache.flink.runtime.messages.JobManagerMessages.JobNotFound; import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.JobManagerActorTestUtils; import org.apache.flink.types.IntValue; import org.junit.Test; import scala.concurrent.Await; @@ -48,7 +49,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.flink.runtime.messages.JobManagerMessages.CancelJob; import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure; import static org.apache.flink.runtime.messages.JobManagerMessages.RequestJobStatus; -import static org.junit.Assert.fail; public class TaskCancelTest { @@ -109,30 +109,21 @@ public class TaskCancelTest { // Wait for the job to make some progress and then cancel awaitRunning( - flink.getLeaderGateway(TestingUtils.TESTING_DURATION()), - jobGraph.getJobID(), - TestingUtils.TESTING_DURATION()); + flink.getLeaderGateway(TestingUtils.TESTING_DURATION()), + jobGraph.getJobID(), + TestingUtils.TESTING_DURATION()); Thread.sleep(5000); cancelJob( - flink.getLeaderGateway(TestingUtils.TESTING_DURATION()), - jobGraph.getJobID(), - TestingUtils.TESTING_DURATION()); + flink.getLeaderGateway(TestingUtils.TESTING_DURATION()), + jobGraph.getJobID(), + TestingUtils.TESTING_DURATION()); // Wait for the job to be cancelled - JobStatus status = awaitTermination( - flink.getLeaderGateway(TestingUtils.TESTING_DURATION()), - jobGraph.getJobID(), - TestingUtils.TESTING_DURATION()); - - if (status == JobStatus.CANCELED) { - // Expected :-) All is swell. - } - else { - fail("The job finished with unexpected terminal state " + status + ". " - + "This indicates that there is a bug in the task cancellation."); - } + JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.CANCELED, + flink.getLeaderGateway(TestingUtils.TESTING_DURATION()), + TestingUtils.TESTING_DURATION()); } finally { if (flink != null) { @@ -224,42 +215,6 @@ public class TaskCancelTest { } - private JobStatus awaitTermination(ActorGateway jobManager, JobID jobId, FiniteDuration timeout) - throws Exception { - - checkNotNull(jobManager); - checkNotNull(jobId); - checkNotNull(timeout); - - while (true) { - Future<Object> ask = jobManager.ask( - new RequestJobStatus(jobId), - timeout); - - Object result = Await.result(ask, timeout); - - if (result instanceof CurrentJobStatus) { - // Success - CurrentJobStatus status = (CurrentJobStatus) result; - - if (!status.jobID().equals(jobId)) { - throw new Exception("JobManager responded for wrong job ID. Request: " - + jobId + ", response: " + status.jobID() + "."); - } - - if (status.status().isTerminalState()) { - return status.status(); - } - } - else if (result instanceof JobNotFound) { - throw new Exception("Cannot find job with ID " + jobId + "."); - } - else { - throw new Exception("Unexpected response to cancel request: " + result); - } - } - } - // --------------------------------------------------------------------------------------------- public static class InfiniteSource extends AbstractInvokable {
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java index 61b1f7a..069b6af 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java @@ -18,26 +18,17 @@ package org.apache.flink.runtime.testutils; -import static org.junit.Assert.fail; +import org.apache.flink.runtime.util.FileUtils; -import java.io.BufferedWriter; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.File; -import java.io.FileOutputStream; import java.io.FileWriter; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; +import java.io.InputStream; import java.io.PrintWriter; +import java.io.StringWriter; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; - -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.core.memory.InputViewDataInputStreamWrapper; -import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper; +import java.util.UUID; /** * This class contains auxiliary methods for unit tests. @@ -77,6 +68,18 @@ public class CommonTestUtils { } /** + * Create a temporary log4j configuration for the test. + */ + public static File createTemporaryLog4JProperties() throws IOException { + File log4jProps = File.createTempFile(FileUtils.getRandomFilename(""), "-log4j" + + ".properties"); + log4jProps.deleteOnExit(); + CommonTestUtils.printLog4jDebugConfig(log4jProps); + + return log4jProps; + } + + /** * Tries to get the java executable command with which the current JVM was started. * Returns null, if the command could not be found. * @@ -152,4 +155,50 @@ public class CommonTestUtils { fw.close(); } } + + public static File createTempDirectory() throws IOException { + File tempDir = new File(System.getProperty("java.io.tmpdir")); + + for (int i = 0; i < 10; i++) { + File dir = new File(tempDir, UUID.randomUUID().toString()); + if (!dir.exists() && dir.mkdirs()) { + return dir; + } + System.err.println("Could not use temporary directory " + dir.getAbsolutePath()); + } + + throw new IOException("Could not create temporary file directory"); + } + + /** + * Utility class to read the output of a process stream and forward it into a StringWriter. + */ + public static class PipeForwarder extends Thread { + + private final StringWriter target; + private final InputStream source; + + public PipeForwarder(InputStream source, StringWriter target) { + super("Pipe Forwarder"); + setDaemon(true); + + this.source = source; + this.target = target; + + start(); + } + + @Override + public void run() { + try { + int next; + while ((next = source.read()) != -1) { + target.write(next); + } + } + catch (IOException e) { + // terminate + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerActorTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerActorTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerActorTestUtils.java new file mode 100644 index 0000000..66e1d9b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerActorTestUtils.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testutils; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages.CurrentJobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages.JobStatusResponse; +import org.apache.flink.runtime.testingUtils.TestingJobManager; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.flink.runtime.messages.JobManagerMessages.JobNotFound; +import static org.apache.flink.runtime.messages.JobManagerMessages.RequestJobStatus; +import static org.apache.flink.runtime.messages.JobManagerMessages.getRequestJobStatus; +import static org.apache.flink.runtime.messages.JobManagerMessages.getRequestNumberRegisteredTaskManager; + +/** + * JobManager actor test utilities. + * + * <p>If you are using a {@link TestingJobManager} most of these are not needed. + */ +public class JobManagerActorTestUtils { + + /** + * Waits for the expected {@link JobStatus}. + * + * <p>Repeatedly queries the JobManager via {@link RequestJobStatus} messages. + * + * @param jobId Job ID of the job to wait for + * @param expectedJobStatus Expected job status + * @param jobManager Job manager actor to ask + * @param timeout Timeout after which the operation fails + * @throws Exception If the job is not found within the timeout or the job is in another state. + */ + public static void waitForJobStatus( + JobID jobId, + JobStatus expectedJobStatus, + ActorGateway jobManager, + FiniteDuration timeout) throws Exception { + + checkNotNull(jobId, "Job ID"); + checkNotNull(expectedJobStatus, "Expected job status"); + checkNotNull(jobManager, "Job manager"); + checkNotNull(timeout, "Timeout"); + + final Deadline deadline = timeout.fromNow(); + + while (deadline.hasTimeLeft()) { + // Request the job status + JobStatusResponse response = requestJobStatus(jobId, jobManager, deadline.timeLeft()); + + // Found the job + if (response instanceof CurrentJobStatus) { + JobStatus jobStatus = ((CurrentJobStatus) response).status(); + + // OK, that's what we were waiting for + if (jobStatus == expectedJobStatus) { + return; + } + else if (jobStatus.isTerminalState()) { + throw new IllegalStateException("Job is in terminal state " + jobStatus + ", " + + "but was waiting for " + expectedJobStatus + "."); + } + } + // Did not find the job... retry + else if (response instanceof JobNotFound) { + Thread.sleep(Math.min(100, deadline.timeLeft().toMillis())); + } + else { + throw new IllegalStateException("Unexpected response."); + } + } + + throw new IllegalStateException("Job not found within deadline."); + } + + /** + * Request a {@link JobStatusResponse}. + * + * @param jobId Job ID of the job to request the status of + * @param jobManager Job manager actor to ask + * @param timeout Timeout after which the operation fails + * @return The {@link JobStatusResponse} from the job manager + * @throws Exception If there is no answer within the timeout. + */ + public static JobStatusResponse requestJobStatus( + JobID jobId, + ActorGateway jobManager, + FiniteDuration timeout) throws Exception { + + checkNotNull(jobId, "Job ID"); + checkNotNull(jobManager, "Job manager"); + checkNotNull(timeout, "Timeout"); + + // Ask the JobManager + RequestJobStatus request = (RequestJobStatus) getRequestJobStatus(jobId); + Future<Object> ask = jobManager.ask(request, timeout); + Object response = Await.result(ask, timeout); + + if (response instanceof JobStatusResponse) { + return (JobStatusResponse) response; + } + + throw new IllegalStateException("Unexpected response."); + } + + /** + * Waits for a minimum number of task managers to connect to the job manager. + * + * @param minimumNumberOfTaskManagers Minimum number of task managers to wait for + * @param jobManager Job manager actor to ask + * @param timeout Timeout after which the operation fails + * @throws Exception If the task managers don't connection with the timeout. + */ + public static void waitForTaskManagers( + int minimumNumberOfTaskManagers, + ActorGateway jobManager, + FiniteDuration timeout) throws Exception { + + checkArgument(minimumNumberOfTaskManagers >= 1); + checkNotNull(jobManager, "Job manager"); + checkNotNull(timeout, "Timeout"); + + final Deadline deadline = timeout.fromNow(); + + while (deadline.hasTimeLeft()) { + Future<Object> ask = jobManager.ask(getRequestNumberRegisteredTaskManager(), + deadline.timeLeft()); + + Integer response = (Integer) Await.result(ask, deadline.timeLeft()); + + // All are connected. We are done. + if (response >= minimumNumberOfTaskManagers) { + return; + } + // Waiting for more... retry + else { + Thread.sleep(Math.min(100, deadline.timeLeft().toMillis())); + } + } + + throw new IllegalStateException("Task managers not connected within deadline."); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java new file mode 100644 index 0000000..85b768d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testutils; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.StreamingMode; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.JobManagerMode; +import org.apache.flink.util.NetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A {@link JobManager} instance running in a separate JVM. + */ +public class JobManagerProcess extends TestJvmProcess { + + private static final Logger LOG = LoggerFactory.getLogger(JobManagerProcess.class); + + /** ID for this JobManager */ + private final int id; + + /** The port the JobManager listens on */ + private final int jobManagerPort; + + /** The configuration for the JobManager */ + private final Configuration config; + + /** Configuration parsed as args for {@link JobManagerProcess.JobManagerProcessEntryPoint} */ + private final String[] jvmArgs; + + private ActorRef jobManagerRef; + + /** + * Creates a {@link JobManager} running in a separate JVM. + * + * <p>See {@link #JobManagerProcess(int, Configuration, int)} for a more + * detailed + * description. + * + * @param config Configuration for the job manager process + * @throws Exception + */ + public JobManagerProcess(int id, Configuration config) throws Exception { + this(id, config, 0); + } + + /** + * Creates a {@link JobManager} running in a separate JVM. + * + * @param id ID for the JobManager + * @param config Configuration for the job manager process + * @param jobManagerPort Job manager port (if <code>0</code>, pick any available port) + * @throws Exception + */ + public JobManagerProcess(int id, Configuration config, int jobManagerPort) throws Exception { + checkArgument(id >= 0, "Negative ID"); + this.id = id; + this.config = checkNotNull(config, "Configuration"); + this.jobManagerPort = jobManagerPort <= 0 ? NetUtils.getAvailablePort() : jobManagerPort; + + ArrayList<String> args = new ArrayList<>(); + args.add("--port"); + args.add(String.valueOf(this.jobManagerPort)); + + for (Map.Entry<String, String> entry : config.toMap().entrySet()) { + args.add("--" + entry.getKey()); + args.add(entry.getValue()); + } + + this.jvmArgs = new String[args.size()]; + args.toArray(jvmArgs); + } + + @Override + public String getName() { + return "JobManager " + id; + } + + @Override + public String[] getJvmArgs() { + return jvmArgs; + } + + @Override + public String getEntryPointClassName() { + return JobManagerProcessEntryPoint.class.getName(); + } + + public int getJobManagerPort() { + return jobManagerPort; + } + + public Configuration getConfig() { + return config; + } + + /** + * Returns the Akka URL of this JobManager. + */ + public String getJobManagerAkkaURL() { + return JobManager.getRemoteJobManagerAkkaURL( + new InetSocketAddress("localhost", jobManagerPort), + Option.<String>empty()); + } + + @Override + public String toString() { + return String.format("JobManagerProcess(id=%d, port=%d)", id, jobManagerPort); + } + + /** + * Waits for the job manager to be reachable. + * + * <p><strong>Important:</strong> Make sure to set the timeout larger than Akka's gating + * time. Otherwise, this will effectively not wait for the JobManager to startup, because the + * time out will fire immediately. + * + * @param actorSystem Actor system to be used to resolve JobManager address. + * @param timeout Timeout (make sure to set larger than Akka's gating time). + */ + public ActorRef getActorRef(ActorSystem actorSystem, FiniteDuration timeout) + throws Exception { + + if (jobManagerRef != null) { + return jobManagerRef; + } + + checkNotNull(actorSystem, "Actor system"); + + // Deadline passes timeout ms + Deadline deadline = timeout.fromNow(); + + while (deadline.hasTimeLeft()) { + try { + // If the Actor is not reachable yet, this throws an Exception. Retry until the + // deadline passes. + this.jobManagerRef = AkkaUtils.getActorRef( + getJobManagerAkkaURL(), + actorSystem, + deadline.timeLeft()); + + return jobManagerRef; + } + catch (Throwable ignored) { + // Retry + Thread.sleep(Math.min(100, deadline.timeLeft().toMillis())); + } + } + + throw new IllegalStateException("JobManager did not start up within " + timeout + "."); + } + + /** + * Entry point for the JobManager process. + */ + public static class JobManagerProcessEntryPoint { + + private static final Logger LOG = LoggerFactory.getLogger(JobManagerProcessEntryPoint.class); + + /** + * Runs the JobManager process in {@link JobManagerMode#CLUSTER} and {@link + * StreamingMode#STREAMING} (can handle both batch and streaming jobs). + * + * <p><strong>Required argument</strong>: <code>port</code>. Start the process with + * <code>--port PORT</code>. + * + * <p>Other arguments are parsed to a {@link Configuration} and passed to the + * JobManager, for instance: <code>--recovery.mode ZOOKEEPER --ha.zookeeper.quorum + * "xyz:123:456"</code>. + */ + public static void main(String[] args) { + try { + ParameterTool params = ParameterTool.fromArgs(args); + final int port = Integer.valueOf(params.getRequired("port")); + LOG.info("Running on port {}.", port); + + Configuration config = params.getConfiguration(); + LOG.info("Configuration: {}.", config); + + // Run the JobManager + JobManager.runJobManager(config, JobManagerMode.CLUSTER, StreamingMode.STREAMING, + "localhost", port); + + // Run forever. Forever, ever? Forever, ever! + new CountDownLatch(1).await(); + } + catch (Throwable t) { + LOG.error("Failed to start JobManager process", t); + System.exit(1); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java new file mode 100644 index 0000000..f683c55 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testutils; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.StreamingMode; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A {@link TaskManager} instance running in a separate JVM. + */ +public class TaskManagerProcess extends TestJvmProcess { + + /** ID for this TaskManager */ + private final int id; + + /** The configuration for the TaskManager */ + private final Configuration config; + + /** Configuration parsed as args for {@link TaskManagerProcess.TaskManagerProcessEntryPoint} */ + private final String[] jvmArgs; + + public TaskManagerProcess(int id, Configuration config) throws Exception { + checkArgument(id >= 0, "Negative ID"); + this.id = id; + this.config = checkNotNull(config, "Configuration"); + + ArrayList<String> args = new ArrayList<>(); + + for (Map.Entry<String, String> entry : config.toMap().entrySet()) { + args.add("--" + entry.getKey()); + args.add(entry.getValue()); + } + + this.jvmArgs = new String[args.size()]; + args.toArray(jvmArgs); + } + + @Override + public String getName() { + return "TaskManager " + id; + } + + @Override + public String[] getJvmArgs() { + return jvmArgs; + } + + @Override + public String getEntryPointClassName() { + return TaskManagerProcessEntryPoint.class.getName(); + } + + public int getId() { + return id; + } + + @Override + public String toString() { + return String.format("TaskManagerProcess(id=%d)", id); + } + + /** + * Entry point for the TaskManager process. + */ + public static class TaskManagerProcessEntryPoint { + + private static final Logger LOG = LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class); + + /** + * Runs the JobManager process in {@link StreamingMode#STREAMING} (can handle both batch + * and streaming jobs). + * + * <p>All arguments are parsed to a {@link Configuration} and passed to the Taskmanager, + * for instance: <code>--recovery.mode ZOOKEEPER --ha.zookeeper.quorum "xyz:123:456"</code>. + */ + public static void main(String[] args) throws Exception { + try { + Configuration config = ParameterTool.fromArgs(args).getConfiguration(); + + if (!config.containsKey(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY)) { + config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4); + } + + if (!config.containsKey(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY)) { + config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100); + } + + + LOG.info("Configuration: {}.", config); + + // Run the TaskManager + TaskManager.selectNetworkInterfaceAndRunTaskManager( + config, StreamingMode.STREAMING, TaskManager.class); + + // Run forever + new CountDownLatch(1).await(); + } + catch (Throwable t) { + LOG.error("Failed to start TaskManager process", t); + System.exit(1); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java new file mode 100644 index 0000000..0920b5c --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testutils; + +import org.apache.commons.lang3.ArrayUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.StringWriter; +import java.util.Arrays; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.flink.runtime.testutils.CommonTestUtils.createTemporaryLog4JProperties; +import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath; +import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath; +import static org.junit.Assert.fail; + +/** + * A {@link Process} running a separate JVM. + */ +public abstract class TestJvmProcess { + + private static final Logger LOG = LoggerFactory.getLogger(TestJvmProcess.class); + + /** Lock to guard {@link #createAndStart()} and {@link #destroy()} calls. */ + private final Object createDestroyLock = new Object(); + + /** The java command path */ + private final String javaCommandPath; + + /** The log4j configuration path. */ + private final String log4jConfigFilePath; + + /** Shutdown hook for resource cleanup */ + private final Thread shutdownHook; + + /** JVM process memory (set for both '-Xms' and '-Xmx'). */ + private int jvmMemoryInMb = 80; + + /** The JVM process */ + private Process process; + + /** Writer for the process output */ + private volatile StringWriter processOutput; + + public TestJvmProcess() throws Exception { + this(getJavaCommandPath(), createTemporaryLog4JProperties().getPath()); + } + + public TestJvmProcess(String javaCommandPath, String log4jConfigFilePath) { + this.javaCommandPath = checkNotNull(javaCommandPath, "Java command path"); + this.log4jConfigFilePath = checkNotNull(log4jConfigFilePath, "log4j config file path"); + + this.shutdownHook = new Thread(new Runnable() { + @Override + public void run() { + try { + destroy(); + } + catch (Throwable t) { + LOG.error("Error during process cleanup shutdown hook.", t); + } + } + }); + } + + /** + * Returns the name of the process. + */ + public abstract String getName(); + + /** + * Returns the arguments to the JVM. + * + * <p>These can be parsed by the main method of the entry point class. + */ + public abstract String[] getJvmArgs(); + + /** + * Returns the name of the class to run. + * + * <p>Arguments to the main method can be specified via {@link #getJvmArgs()}. + */ + public abstract String getEntryPointClassName(); + + // --------------------------------------------------------------------------------------------- + + /** + * Sets the memory for the process (<code>-Xms</code> and <code>-Xmx</code> flags) (>= 80). + * + * @param jvmMemoryInMb Amount of memory in Megabytes for the JVM (>= 80). + */ + public void setJVMMemory(int jvmMemoryInMb) { + checkArgument(jvmMemoryInMb >= 80, "JobManager JVM Requires at least 80 MBs of memory."); + this.jvmMemoryInMb = jvmMemoryInMb; + } + + /** + * Creates and starts the {@link Process}. + * + * <strong>Important:</strong> Don't forget to call {@link #destroy()} to prevent + * resource leaks. The created process will be child process and is not guaranteed to + * terminate when the parent process terminates. + */ + public void createAndStart() throws IOException { + String[] cmd = new String[] { + javaCommandPath, + "-Dlog.level=DEBUG", + "-Dlog4j.configuration=file:" + log4jConfigFilePath, + "-Xms" + jvmMemoryInMb + "m", + "-Xmx" + jvmMemoryInMb + "m", + "-classpath", getCurrentClasspath(), + getEntryPointClassName() }; + + String[] jvmArgs = getJvmArgs(); + + if (jvmArgs != null && jvmArgs.length > 0) { + cmd = ArrayUtils.addAll(cmd, jvmArgs); + } + + synchronized (createDestroyLock) { + if (process == null) { + LOG.debug("Running command '{}'.", Arrays.toString(cmd)); + this.process = new ProcessBuilder(cmd).start(); + + // Forward output + this.processOutput = new StringWriter(); + new CommonTestUtils.PipeForwarder(process.getErrorStream(), processOutput); + + try { + // Add JVM shutdown hook to call shutdown of service + Runtime.getRuntime().addShutdownHook(shutdownHook); + } + catch (IllegalStateException ignored) { + // JVM is already shutting down. No need to do this. + } + catch (Throwable t) { + LOG.error("Cannot register process cleanup shutdown hook.", t); + } + } + else { + throw new IllegalStateException("Already running."); + } + } + } + + public void printProcessLog() { + if (processOutput == null) { + throw new IllegalStateException("Not started"); + } + + System.out.println("-----------------------------------------"); + System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + getName()); + System.out.println("-----------------------------------------"); + + String out = processOutput.toString(); + if (out == null || out.length() == 0) { + System.out.println("(EMPTY)"); + } + else { + System.out.println(out); + } + + System.out.println("-----------------------------------------"); + System.out.println(" END SPAWNED PROCESS LOG " + getName()); + System.out.println("-----------------------------------------"); + } + + public void destroy() { + synchronized (createDestroyLock) { + if (process != null) { + LOG.debug("Destroying " + getName() + " process."); + + try { + process.destroy(); + } + catch (Throwable t) { + LOG.error("Error while trying to destroy process.", t); + } + finally { + process = null; + + if (shutdownHook != null && shutdownHook != Thread.currentThread()) { + try { + Runtime.getRuntime().removeShutdownHook(shutdownHook); + } + catch (IllegalStateException ignored) { + // JVM is in shutdown already, we can safely ignore this. + } + catch (Throwable t) { + LOG.warn("Exception while unregistering prcess cleanup shutdown hook."); + } + } + } + } + } + } + + // --------------------------------------------------------------------------------------------- + // File based synchronization utilities + // --------------------------------------------------------------------------------------------- + + public static void touchFile(File file) throws IOException { + if (!file.exists()) { + new FileOutputStream(file).close(); + } + if (!file.setLastModified(System.currentTimeMillis())) { + throw new IOException("Could not touch the file."); + } + } + + public static void waitForMarkerFiles(File basedir, String prefix, int num, long timeout) { + long now = System.currentTimeMillis(); + final long deadline = now + timeout; + + + while (now < deadline) { + boolean allFound = true; + + for (int i = 0; i < num; i++) { + File nextToCheck = new File(basedir, prefix + i); + if (!nextToCheck.exists()) { + allFound = false; + break; + } + } + + if (allFound) { + return; + } + else { + // not all found, wait for a bit + try { + Thread.sleep(10); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + + now = System.currentTimeMillis(); + } + } + + fail("The tasks were not started within time (" + timeout + "msecs)"); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java new file mode 100644 index 0000000..d2e5b6a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testutils; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobmanager.RecoveryMode; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * ZooKeeper test utilities. + */ +public class ZooKeeperTestUtils { + + /** + * Creates a configuration to operate in {@link RecoveryMode#ZOOKEEPER}. + * + * @param zooKeeperQuorum ZooKeeper quorum to connect to + * @param fsStateHandlePath Base path for file system state backend (for checkpoints and + * recovery) + * @return A new configuration to operate in {@link RecoveryMode#ZOOKEEPER}. + */ + public static Configuration createZooKeeperRecoveryModeConfig( + String zooKeeperQuorum, String fsStateHandlePath) { + + return setZooKeeperRecoveryMode(new Configuration(), zooKeeperQuorum, fsStateHandlePath); + } + + /** + * Sets all necessary configuration keys to operate in {@link RecoveryMode#ZOOKEEPER}. + * + * @param config Configuration to use + * @param zooKeeperQuorum ZooKeeper quorum to connect to + * @param fsStateHandlePath Base path for file system state backend (for checkpoints and + * recovery) + * @return The modified configuration to operate in {@link RecoveryMode#ZOOKEEPER}. + */ + public static Configuration setZooKeeperRecoveryMode( + Configuration config, + String zooKeeperQuorum, + String fsStateHandlePath) { + + checkNotNull(config, "Configuration"); + checkNotNull(zooKeeperQuorum, "ZooKeeper quorum"); + checkNotNull(fsStateHandlePath, "File state handle backend path"); + + // Web frontend, you have been dismissed. Sorry. + config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1); + + // ZooKeeper recovery mode + config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER"); + config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, zooKeeperQuorum); + + int connTimeout = 5000; + if (System.getenv().get("CI") != null) { + // The regular timeout is to aggressive for Travis and connections are often lost. + connTimeout = 20000; + } + + config.setInteger(ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT, connTimeout); + config.setInteger(ConfigConstants.ZOOKEEPER_SESSION_TIMEOUT, connTimeout); + + // File system state backend + config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); + config.setString(ConfigConstants.STATE_BACKEND_FS_DIR, fsStateHandlePath + "/checkpoints"); + config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, fsStateHandlePath + "/recovery"); + + // Akka failure detection and execution retries + config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms"); + config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s"); + config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9); + config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "10 s"); + + return config; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java new file mode 100644 index 0000000..f0130ec --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java @@ -0,0 +1,591 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.TestLogger; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.Stat; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for basic {@link ZooKeeperStateHandleStore} behaviour. + * + * <p> Tests include: + * <ul> + * <li>Expected usage of operations</li> + * <li>Correct ordering of ZooKeeper and state handle operations</li> + * </ul> + */ +public class ZooKeeperStateHandleStoreITCase extends TestLogger { + + private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1); + + @AfterClass + public static void tearDown() throws Exception { + if (ZooKeeper != null) { + ZooKeeper.shutdown(); + } + } + + @Before + public void cleanUp() throws Exception { + ZooKeeper.deleteAll(); + } + + /** + * Tests add operation with default {@link CreateMode}. + */ + @Test + public void testAdd() throws Exception { + // Setup + LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); + + ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( + ZooKeeper.getClient(), stateHandleProvider); + + // Config + final String pathInZooKeeper = "/testAdd"; + final Long state = 1239712317L; + + // Test + store.add(pathInZooKeeper, state); + + // Verify + // State handle created + assertEquals(1, stateHandleProvider.getStateHandles().size()); + assertEquals(state, stateHandleProvider.getStateHandles().get(0).getState(null)); + + // Path created and is persistent + Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper); + assertNotNull(stat); + assertEquals(0, stat.getEphemeralOwner()); + + // Data is equal + @SuppressWarnings("unchecked") + Long actual = ((StateHandle<Long>) InstantiationUtil.deserializeObject( + ZooKeeper.getClient().getData().forPath(pathInZooKeeper), + ClassLoader.getSystemClassLoader())).getState(null); + + assertEquals(state, actual); + } + + /** + * Tests that {@link CreateMode} is respected. + */ + @Test + public void testAddWithCreateMode() throws Exception { + LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); + + ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( + ZooKeeper.getClient(), stateHandleProvider); + + // Config + Long state = 3457347234L; + + CreateMode[] modes = CreateMode.values(); + for (int i = 0; i < modes.length; i++) { + CreateMode mode = modes[i]; + state += i; + + String pathInZooKeeper = "/testAddWithCreateMode" + mode.name(); + + // Test + store.add(pathInZooKeeper, state, mode); + + if (mode.isSequential()) { + // Figure out the sequential ID + List<String> paths = ZooKeeper.getClient().getChildren().forPath("/"); + for (String p : paths) { + if (p.startsWith("testAddWithCreateMode" + mode.name())) { + pathInZooKeeper = "/" + p; + break; + } + } + } + + // Verify + // State handle created + assertEquals(i + 1, stateHandleProvider.getStateHandles().size()); + assertEquals(state, stateHandleProvider.getStateHandles().get(i).getState(null)); + + // Path created + Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper); + + assertNotNull(stat); + + // Is ephemeral or persistent + if (mode.isEphemeral()) { + assertTrue(stat.getEphemeralOwner() != 0); + } + else { + assertEquals(0, stat.getEphemeralOwner()); + } + + // Data is equal + @SuppressWarnings("unchecked") + Long actual = ((StateHandle<Long>) InstantiationUtil.deserializeObject( + ZooKeeper.getClient().getData().forPath(pathInZooKeeper), + ClassLoader.getSystemClassLoader())).getState(null); + + assertEquals(state, actual); + } + } + + /** + * Tests that an existing path throws an Exception. + */ + @Test(expected = Exception.class) + public void testAddAlreadyExistingPath() throws Exception { + LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); + + ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( + ZooKeeper.getClient(), stateHandleProvider); + + ZooKeeper.getClient().create().forPath("/testAddAlreadyExistingPath"); + + store.add("/testAddAlreadyExistingPath", 1L); + } + + /** + * Tests that the created state handle is discarded if ZooKeeper create fails. + */ + @Test + public void testAddDiscardStateHandleAfterFailure() throws Exception { + // Setup + LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); + + CuratorFramework client = spy(ZooKeeper.getClient()); + when(client.create()).thenThrow(new RuntimeException("Expected test Exception.")); + + ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( + client, stateHandleProvider); + + // Config + final String pathInZooKeeper = "/testAddDiscardStateHandleAfterFailure"; + final Long state = 81282227L; + + try { + // Test + store.add(pathInZooKeeper, state); + fail("Did not throw expected exception"); + } + catch (Exception ignored) { + } + + // Verify + // State handle created and discarded + assertEquals(1, stateHandleProvider.getStateHandles().size()); + assertEquals(state, stateHandleProvider.getStateHandles().get(0).getState(null)); + assertEquals(1, stateHandleProvider.getStateHandles().get(0).getNumberOfDiscardCalls()); + } + + /** + * Tests that a state handle is replaced. + */ + @Test + public void testReplace() throws Exception { + // Setup + LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); + + ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( + ZooKeeper.getClient(), stateHandleProvider); + + // Config + final String pathInZooKeeper = "/testReplace"; + final Long initialState = 30968470898L; + final Long replaceState = 88383776661L; + + // Test + store.add(pathInZooKeeper, initialState); + store.replace(pathInZooKeeper, 0, replaceState); + + // Verify + // State handles created + assertEquals(2, stateHandleProvider.getStateHandles().size()); + assertEquals(initialState, stateHandleProvider.getStateHandles().get(0).getState(null)); + assertEquals(replaceState, stateHandleProvider.getStateHandles().get(1).getState(null)); + + // Path created and is persistent + Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper); + assertNotNull(stat); + assertEquals(0, stat.getEphemeralOwner()); + + // Data is equal + @SuppressWarnings("unchecked") + Long actual = ((StateHandle<Long>) InstantiationUtil.deserializeObject( + ZooKeeper.getClient().getData().forPath(pathInZooKeeper), + ClassLoader.getSystemClassLoader())).getState(null); + + assertEquals(replaceState, actual); + } + + /** + * Tests that a non existing path throws an Exception. + */ + @Test(expected = Exception.class) + public void testReplaceNonExistingPath() throws Exception { + StateHandleProvider<Long> stateHandleProvider = new LongStateHandleProvider(); + + ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( + ZooKeeper.getClient(), stateHandleProvider); + + store.replace("/testReplaceNonExistingPath", 0, 1L); + } + + /** + * Tests that the replace state handle is discarded if ZooKeeper setData fails. + */ + @Test + public void testReplaceDiscardStateHandleAfterFailure() throws Exception { + // Setup + LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); + + CuratorFramework client = spy(ZooKeeper.getClient()); + when(client.setData()).thenThrow(new RuntimeException("Expected test Exception.")); + + ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( + client, stateHandleProvider); + + // Config + final String pathInZooKeeper = "/testReplaceDiscardStateHandleAfterFailure"; + final Long initialState = 30968470898L; + final Long replaceState = 88383776661L; + + // Test + store.add(pathInZooKeeper, initialState); + + try { + store.replace(pathInZooKeeper, 0, replaceState); + fail("Did not throw expected exception"); + } + catch (Exception ignored) { + } + + // Verify + // State handle created and discarded + assertEquals(2, stateHandleProvider.getStateHandles().size()); + assertEquals(initialState, stateHandleProvider.getStateHandles().get(0).getState(null)); + assertEquals(replaceState, stateHandleProvider.getStateHandles().get(1).getState(null)); + assertEquals(1, stateHandleProvider.getStateHandles().get(1).getNumberOfDiscardCalls()); + + // Initial value + @SuppressWarnings("unchecked") + Long actual = ((StateHandle<Long>) InstantiationUtil.deserializeObject( + ZooKeeper.getClient().getData().forPath(pathInZooKeeper), + ClassLoader.getSystemClassLoader())).getState(null); + + assertEquals(initialState, actual); + } + + /** + * Tests get operation. + */ + @Test + public void testGetAndExists() throws Exception { + // Setup + LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); + + ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( + ZooKeeper.getClient(), stateHandleProvider); + + // Config + final String pathInZooKeeper = "/testGetAndExists"; + final Long state = 311222268470898L; + + // Test + assertEquals(-1, store.exists(pathInZooKeeper)); + + store.add(pathInZooKeeper, state); + StateHandle<Long> actual = store.get(pathInZooKeeper); + + // Verify + assertEquals(state, actual.getState(null)); + assertTrue(store.exists(pathInZooKeeper) >= 0); + } + + /** + * Tests that a non existing path throws an Exception. + */ + @Test(expected = Exception.class) + public void testGetNonExistingPath() throws Exception { + LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); + + ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( + ZooKeeper.getClient(), stateHandleProvider); + + store.get("/testGetNonExistingPath"); + } + + /** + * Tests that all added state is returned. + */ + @Test + public void testGetAll() throws Exception { + // Setup + LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); + + ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( + ZooKeeper.getClient(), stateHandleProvider); + + // Config + final String pathInZooKeeper = "/testGetAll"; + + final Set<Long> expected = new HashSet<>(); + expected.add(311222268470898L); + expected.add(132812888L); + expected.add(27255442L); + expected.add(11122233124L); + + // Test + for (long val : expected) { + store.add(pathInZooKeeper, val, CreateMode.PERSISTENT_SEQUENTIAL); + } + + for (Tuple2<StateHandle<Long>, String> val : store.getAll()) { + assertTrue(expected.remove(val.f0.getState(null))); + } + assertEquals(0, expected.size()); + } + + /** + * Tests that the state is returned sorted. + */ + @Test + public void testGetAllSortedByName() throws Exception { + // Setup + LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); + + ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( + ZooKeeper.getClient(), stateHandleProvider); + + // Config + final String pathInZooKeeper = "/testGetAllSortedByName"; + + final Long[] expected = new Long[] { + 311222268470898L, 132812888L, 27255442L, 11122233124L }; + + // Test + for (long val : expected) { + store.add(pathInZooKeeper, val, CreateMode.PERSISTENT_SEQUENTIAL); + } + + List<Tuple2<StateHandle<Long>, String>> actual = store.getAllSortedByName(); + assertEquals(expected.length, actual.size()); + + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], actual.get(i).f0.getState(null)); + } + } + + /** + * Tests that state handles are correctly removed. + */ + @Test + public void testRemove() throws Exception { + // Setup + LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); + + ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( + ZooKeeper.getClient(), stateHandleProvider); + + // Config + final String pathInZooKeeper = "/testRemove"; + final Long state = 27255442L; + + store.add(pathInZooKeeper, state); + + // Test + store.remove(pathInZooKeeper); + + // Verify discarded + assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size()); + } + + /** + * Tests that state handles are correctly removed with a callback. + */ + @Test + public void testRemoveWithCallback() throws Exception { + // Setup + LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); + + ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( + ZooKeeper.getClient(), stateHandleProvider); + + // Config + final String pathInZooKeeper = "/testRemoveWithCallback"; + final Long state = 27255442L; + + store.add(pathInZooKeeper, state); + + final CountDownLatch sync = new CountDownLatch(1); + BackgroundCallback callback = mock(BackgroundCallback.class); + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + sync.countDown(); + return null; + } + }).when(callback).processResult(eq(ZooKeeper.getClient()), any(CuratorEvent.class)); + + // Test + store.remove(pathInZooKeeper, callback); + + // Verify discarded and callback called + assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size()); + + sync.await(); + + verify(callback, times(1)) + .processResult(eq(ZooKeeper.getClient()), any(CuratorEvent.class)); + } + + /** + * Tests that state handles are correctly discarded. + */ + @Test + public void testRemoveAndDiscardState() throws Exception { + // Setup + LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); + + ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( + ZooKeeper.getClient(), stateHandleProvider); + + // Config + final String pathInZooKeeper = "/testDiscard"; + final Long state = 27255442L; + + store.add(pathInZooKeeper, state); + + // Test + store.removeAndDiscardState(pathInZooKeeper); + + // Verify discarded + assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size()); + } + + /** Tests that all state handles are correctly discarded. */ + @Test + public void testRemoveAndDiscardAllState() throws Exception { + // Setup + LongStateHandleProvider stateHandleProvider = new LongStateHandleProvider(); + + ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( + ZooKeeper.getClient(), stateHandleProvider); + + // Config + final String pathInZooKeeper = "/testDiscardAll"; + + final Set<Long> expected = new HashSet<>(); + expected.add(311222268470898L); + expected.add(132812888L); + expected.add(27255442L); + expected.add(11122233124L); + + // Test + for (long val : expected) { + store.add(pathInZooKeeper, val, CreateMode.PERSISTENT_SEQUENTIAL); + } + + store.removeAndDiscardAllState(); + + // Verify all discarded + assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size()); + } + + // --------------------------------------------------------------------------------------------- + // Simple test helpers + // --------------------------------------------------------------------------------------------- + + private static class LongStateHandleProvider implements StateHandleProvider<Long> { + + private static final long serialVersionUID = 4572084854499402276L; + + private final List<LongStateHandle> stateHandles = new ArrayList<>(); + + @Override + public StateHandle<Long> createStateHandle(Long state) { + LongStateHandle stateHandle = new LongStateHandle(state); + stateHandles.add(stateHandle); + + return stateHandle; + } + + public List<LongStateHandle> getStateHandles() { + return stateHandles; + } + } + + private static class LongStateHandle implements StateHandle<Long> { + + private static final long serialVersionUID = -3555329254423838912L; + + private final Long state; + + private int numberOfDiscardCalls; + + public LongStateHandle(Long state) { + this.state = state; + } + + @Override + public Long getState(ClassLoader ignored) throws Exception { + return state; + } + + @Override + public void discardState() throws Exception { + numberOfDiscardCalls++; + } + + public int getNumberOfDiscardCalls() { + return numberOfDiscardCalls; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java new file mode 100644 index 0000000..7ae89d1 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.test.TestingCluster; +import org.apache.curator.test.TestingServer; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.util.ZooKeeperUtils; + +/** + * Simple ZooKeeper and CuratorFramework setup for tests. + */ +public class ZooKeeperTestEnvironment { + + private final TestingServer zooKeeperServer; + + private final TestingCluster zooKeeperCluster; + + private final CuratorFramework client; + + /** + * Starts a ZooKeeper cluster with the number of quorum peers and a client. + * + * @param numberOfZooKeeperQuorumPeers Starts a {@link TestingServer}, if <code>1</code>. + * Starts a {@link TestingCluster}, if <code>=>1</code>. + */ + public ZooKeeperTestEnvironment(int numberOfZooKeeperQuorumPeers) { + if (numberOfZooKeeperQuorumPeers <= 0) { + throw new IllegalArgumentException("Number of peers needs to be >= 1."); + } + + final Configuration conf = new Configuration(); + + try { + if (numberOfZooKeeperQuorumPeers == 1) { + zooKeeperServer = new TestingServer(true); + zooKeeperCluster = null; + + conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, + zooKeeperServer.getConnectString()); + } + else { + zooKeeperServer = null; + zooKeeperCluster = new TestingCluster(numberOfZooKeeperQuorumPeers); + + zooKeeperCluster.start(); + + conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, + zooKeeperCluster.getConnectString()); + } + + client = ZooKeeperUtils.startCuratorFramework(conf); + + client.newNamespaceAwareEnsurePath("/") + .ensure(client.getZookeeperClient()); + } + catch (Exception e) { + throw new RuntimeException("Error setting up ZooKeeperTestEnvironment", e); + } + } + + /** + * Shutdown the client and ZooKeeper server/cluster. + */ + public void shutdown() throws Exception { + if (client != null) { + client.close(); + } + + if (zooKeeperServer != null) { + zooKeeperServer.close(); + } + + if (zooKeeperCluster != null) { + zooKeeperCluster.close(); + } + } + + public String getConnectString() { + if (zooKeeperServer != null) { + return zooKeeperServer.getConnectString(); + } + else { + return zooKeeperCluster.getConnectString(); + } + } + + /** + * Returns a client for the started ZooKeeper server/cluster. + */ + public CuratorFramework getClient() { + return client; + } + + /** + * Creates a new client for the started ZooKeeper server/cluster. + */ + public CuratorFramework createClient() { + Configuration config = new Configuration(); + config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, getConnectString()); + return ZooKeeperUtils.startCuratorFramework(config); + } + + /** + * Deletes all ZNodes under the root node. + * + * @throws Exception If the ZooKeeper operation fails + */ + public void deleteAll() throws Exception { + final String path = "/" + client.getNamespace(); + ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, false); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala index d1b8fac..9a1cde0 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala @@ -18,9 +18,12 @@ package org.apache.flink.runtime.executiongraph +import java.util.concurrent.TimeUnit + import org.apache.flink.api.common.JobID import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.akka.AkkaUtils +import org.apache.flink.runtime.execution.ExecutionState import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, JobVertex} import org.apache.flink.runtime.jobmanager.Tasks @@ -32,6 +35,7 @@ import org.scalatest.junit.JUnitRunner import org.scalatest.{Matchers, WordSpecLike} import scala.collection.JavaConverters._ +import scala.concurrent.duration.FiniteDuration @RunWith(classOf[JUnitRunner]) class ExecutionGraphRestartTest extends WordSpecLike with Matchers { @@ -126,8 +130,23 @@ class ExecutionGraphRestartTest extends WordSpecLike with Matchers { for (vertex <- eg.getAllExecutionVertices.asScala) { vertex.getCurrentExecutionAttempt().cancelingComplete() } - + + val timeout = new FiniteDuration(2, TimeUnit.MINUTES) + + // Wait for async restart + var deadline = timeout.fromNow + while (deadline.hasTimeLeft() && eg.getState != JobStatus.RUNNING) { + Thread.sleep(100) + } + eg.getState should equal(JobStatus.RUNNING) + + // Wait for deploying after async restart + deadline = timeout.fromNow + while (deadline.hasTimeLeft() && eg.getAllExecutionVertices.asScala.exists( + _.getCurrentExecutionAttempt.getState != ExecutionState.DEPLOYING)) { + Thread.sleep(100) + } for (vertex <- eg.getAllExecutionVertices.asScala) { vertex.getCurrentExecutionAttempt().markFinished() http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index c9ae1e4..703d7bf 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -43,28 +43,28 @@ import scala.concurrent.{Await, Future} * otherwise false */ class TestingCluster( - userConfiguration: Configuration, - singleActorSystem: Boolean, - synchronousDispatcher: Boolean, - streamingMode: StreamingMode) + userConfiguration: Configuration, + singleActorSystem: Boolean, + synchronousDispatcher: Boolean, + streamingMode: StreamingMode) extends FlinkMiniCluster( userConfiguration, singleActorSystem, streamingMode) { - + def this(userConfiguration: Configuration, singleActorSystem: Boolean, synchronousDispatcher: Boolean) - = this(userConfiguration, singleActorSystem, synchronousDispatcher, StreamingMode.BATCH_ONLY) + = this(userConfiguration, singleActorSystem, synchronousDispatcher, StreamingMode.BATCH_ONLY) def this(userConfiguration: Configuration, singleActorSystem: Boolean) - = this(userConfiguration, singleActorSystem, false) + = this(userConfiguration, singleActorSystem, false) def this(userConfiguration: Configuration) = this(userConfiguration, true, false) - + // -------------------------------------------------------------------------- - + override def generateConfiguration(userConfig: Configuration): Configuration = { val cfg = new Configuration() cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost") @@ -100,16 +100,18 @@ class TestingCluster( } val (executionContext, - instanceManager, - scheduler, - libraryCacheManager, - executionRetries, - delayBetweenRetries, - timeout, - archiveCount, - leaderElectionService) = JobManager.createJobManagerComponents( - config, - createLeaderElectionService()) + instanceManager, + scheduler, + libraryCacheManager, + executionRetries, + delayBetweenRetries, + timeout, + archiveCount, + leaderElectionService, + submittedJobsGraphs, + checkpointRecoveryFactory) = JobManager.createJobManagerComponents( + config, + createLeaderElectionService()) val testArchiveProps = Props(new TestingMemoryArchivist(archiveCount)) val archive = actorSystem.actorOf(testArchiveProps, archiveName) @@ -126,7 +128,9 @@ class TestingCluster( delayBetweenRetries, timeout, streamingMode, - leaderElectionService)) + leaderElectionService, + submittedJobsGraphs, + checkpointRecoveryFactory)) val dispatcherJobManagerProps = if (synchronousDispatcher) { // disable asynchronous futures (e.g. accumulator update in Heartbeat) http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala index 312a1e5..be72003 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala @@ -18,32 +18,18 @@ package org.apache.flink.runtime.testingUtils -import akka.actor.{Cancellable, Terminated, ActorRef} -import akka.pattern.pipe -import akka.pattern.ask -import org.apache.flink.api.common.JobID +import akka.actor.ActorRef import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.leaderelection.LeaderElectionService -import org.apache.flink.runtime.{StreamingMode, FlinkActor} import org.apache.flink.runtime.StreamingMode -import org.apache.flink.runtime.execution.ExecutionState +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.instance.InstanceManager -import org.apache.flink.runtime.jobgraph.JobStatus -import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.jobmanager.scheduler.Scheduler -import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged -import org.apache.flink.runtime.messages.JobManagerMessages.GrantLeadership -import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect} -import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ -import org.apache.flink.runtime.testingUtils.TestingMessages.{CheckIfJobRemoved, Alive, -DisableDisconnect} -import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged +import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore} +import org.apache.flink.runtime.leaderelection.LeaderElectionService -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ - import scala.language.postfixOps /** JobManager implementation extended by testing messages @@ -70,7 +56,9 @@ class TestingJobManager( delayBetweenRetries: Long, timeout: FiniteDuration, mode: StreamingMode, - leaderElectionService: LeaderElectionService) + leaderElectionService: LeaderElectionService, + submittedJobGraphs : SubmittedJobGraphStore, + checkpointRecoveryFactory : CheckpointRecoveryFactory) extends JobManager( flinkConfiguration, executionContext, @@ -82,5 +70,7 @@ class TestingJobManager( delayBetweenRetries, timeout, mode, - leaderElectionService) + leaderElectionService, + submittedJobGraphs, + checkpointRecoveryFactory) with TestingJobManagerLike {} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index b607433..72a8c25 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; +import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; import org.apache.flink.runtime.util.event.EventListener; import org.apache.flink.streaming.api.graph.StreamConfig; @@ -541,7 +542,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> } } } - } /** * Registers a timer. http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala index c7f7698..11eb174 100644 --- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala +++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala @@ -20,22 +20,20 @@ package org.apache.flink.test.util import java.util.concurrent.TimeoutException -import akka.pattern.ask -import akka.actor.{Props, ActorRef, ActorSystem} +import akka.actor.{ActorRef, ActorSystem} import akka.pattern.Patterns._ +import akka.pattern.ask import org.apache.curator.test.TestingCluster import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.StreamingMode import org.apache.flink.runtime.akka.AkkaUtils -import org.apache.flink.runtime.jobmanager.{RecoveryMode, JobManager} +import org.apache.flink.runtime.jobmanager.{JobManager, RecoveryMode} import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster import org.apache.flink.runtime.taskmanager.TaskManager -import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages -.NotifyWhenRegisteredAtJobManager -import org.apache.flink.runtime.testingUtils.{TestingUtils, TestingTaskManager, -TestingJobManager, TestingMemoryArchivist} +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager +import org.apache.flink.runtime.testingUtils.{TestingJobManager, TestingMemoryArchivist, TestingTaskManager, TestingUtils} -import scala.concurrent.{Future, Await} +import scala.concurrent.{Await, Future} /** * A forkable mini cluster is a special case of the mini cluster, used for parallel test execution @@ -47,20 +45,20 @@ import scala.concurrent.{Future, Await} * same [[ActorSystem]], otherwise false. */ class ForkableFlinkMiniCluster( - userConfiguration: Configuration, - singleActorSystem: Boolean, - streamingMode: StreamingMode) + userConfiguration: Configuration, + singleActorSystem: Boolean, + streamingMode: StreamingMode) extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem, streamingMode) { - def this(userConfiguration: Configuration, singleActorSystem: Boolean) - = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY) + def this(userConfiguration: Configuration, singleActorSystem: Boolean) + = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY) def this(userConfiguration: Configuration) = this(userConfiguration, true) - + // -------------------------------------------------------------------------- var zookeeperCluster: Option[TestingCluster] = None - + override def generateConfiguration(userConfiguration: Configuration): Configuration = { val forNumberString = System.getProperty("forkNumber") @@ -264,10 +262,10 @@ object ForkableFlinkMiniCluster { import org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT def startCluster( - numSlots: Int, - numTaskManagers: Int, - timeout: String = DEFAULT_AKKA_ASK_TIMEOUT) - : ForkableFlinkMiniCluster = { + numSlots: Int, + numTaskManagers: Int, + timeout: String = DEFAULT_AKKA_ASK_TIMEOUT) + : ForkableFlinkMiniCluster = { val config = new Configuration() config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
