[ https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15927872#comment-15927872 ]
Razvan edited comment on FLINK-6063 at 3/16/17 11:42 AM: --------------------------------------------------------- Below the code for the Job (Main Class): import java.util.concurrent.TimeUnit; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import com.test.event.reader.SimpleEventReader; public class StreamingAccuracyJob { private final static String JOB_NAME = "Prototype CEP"; final static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); private static void configureEnvironment() { // start a checkpoint every 10000 ms env.enableCheckpointing(10000); // advanced options: // set mode to exactly-once (this is the default) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // make sure 500 ms of progress happen between checkpoints env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); // checkpoints have to complete within one minute, or are discarded env.getCheckpointConfig().setCheckpointTimeout(60000); // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, // number // of // restart // attempts Time.of(10, TimeUnit.SECONDS) // delay )); env.setParallelism(1); // env.setMaxParallelism(1); // https://issues.apache.org/jira/browse/FLINK-5773 CheckpointConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); } public static void main(String[] args) throws Exception { DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>(KafkaConfiguration.TOPIC_NAME, new SimpleStringSchema(), KafkaConfiguration.getConnectionProperties())); configureEnvironment(); DataStream<EventTuple> streamTuples = stream.flatMap(new JsonToTupleFlatMap()); streamTuples.keyBy(SimpleEventReader.FIELD_USERID_TUPLE_POSITION).flatMap(new AccuracyTestEvents()); env.execute(JOB_NAME); } } was (Author: razvan): Below the code for the Job (Main Class): import java.util.concurrent.TimeUnit; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import com.booxware.event.reader.SimpleEventReader; public class StreamingAccuracyJob { private final static String JOB_NAME = "Prototype CEP"; final static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); private static void configureEnvironment() { // start a checkpoint every 10000 ms env.enableCheckpointing(10000); // advanced options: // set mode to exactly-once (this is the default) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // make sure 500 ms of progress happen between checkpoints env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); // checkpoints have to complete within one minute, or are discarded env.getCheckpointConfig().setCheckpointTimeout(60000); // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, // number // of // restart // attempts Time.of(10, TimeUnit.SECONDS) // delay )); env.setParallelism(1); // env.setMaxParallelism(1); // https://issues.apache.org/jira/browse/FLINK-5773 CheckpointConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); } public static void main(String[] args) throws Exception { DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>(KafkaConfiguration.TOPIC_NAME, new SimpleStringSchema(), KafkaConfiguration.getConnectionProperties())); configureEnvironment(); DataStream<EventTuple> streamTuples = stream.flatMap(new JsonToTupleFlatMap()); streamTuples.keyBy(SimpleEventReader.FIELD_USERID_TUPLE_POSITION).flatMap(new AccuracyTestEvents()); env.execute(JOB_NAME); } } > HA Configuration doesn't work with Flink 1.2 > -------------------------------------------- > > Key: FLINK-6063 > URL: https://issues.apache.org/jira/browse/FLINK-6063 > Project: Flink > Issue Type: Bug > Components: JobManager > Affects Versions: 1.2.0 > Reporter: Razvan > Priority: Critical > > I have a setup with flink 1.2 cluster, made up of 3 JobManagers and 2 > TaskManagers. I start the Zookeeper Quorum from JobManager1, I get > confirmation Zookeeper starts on the other 2 JobManagers then I start a Flink > job on this JobManager1. > > The flink-conf.yaml is the same on all 5 VMs (also everything else related > to flink because I copied the folder across all VMs as suggested in > tutorials) this means jobmanager.rpc.address: points to JobManager1 > everywhere. > If I turn off the VM running JobManager1 I would expect Zookeeper to say one > of the remaining JobManagers is the leader and the TaskManagers should > reconnect to it. Instead a new leader is elected but the slaves keep > connecting to the old master > 2017-03-15 10:28:28,655 INFO org.apache.flink.core.fs.FileSystem > - Ensuring all FileSystem streams are closed for Async > calls on Source: Custom Source -> Flat Map (1/1) > 2017-03-15 10:28:38,534 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Disassociated] > 2017-03-15 10:28:46,606 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:28:52,431 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:29:02,435 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:29:10,489 INFO > org.apache.flink.runtime.taskmanager.TaskManager - TaskManager > akka://flink/user/taskmanager disconnects from JobManager > akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its > leadership. > 2017-03-15 10:29:10,490 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Cancelling > all computations and discarding all cached data. > 2017-03-15 10:29:10,491 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Source: Custom Source > -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223). > 2017-03-15 10:29:10,491 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source -> Flat Map (1/1) > (75fd495cc6acfd72fbe957e60e513223) switched from RUNNING to FAILED. > java.lang.Exception: TaskManager akka://flink/user/taskmanager > disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: > Old JobManager lost its leadership. > at > org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074) > at > org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at > org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:122) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 2017-03-15 10:29:10,512 INFO org.apache.flink.runtime.taskmanager.Task > - Triggering cancellation of task code Source: Custom > Source -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223). > 2017-03-15 10:29:10,515 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Flat Map (1/1) > (dd555e0437867c3180a1ecaf0a9f4d04). > 2017-03-15 10:29:10,515 INFO org.apache.flink.runtime.taskmanager.Task > - Flat Map (1/1) (dd555e0437867c3180a1ecaf0a9f4d04) > switched from RUNNING to FAILED. > java.lang.Exception: TaskManager akka://flink/user/taskmanager > disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: > Old JobManager lost its leadership. > at > org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074) > at > org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at > org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:122) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 2017-03-15 10:29:10,516 INFO org.apache.flink.runtime.taskmanager.Task > - Triggering cancellation of task code Flat Map (1/1) > (dd555e0437867c3180a1ecaf0a9f4d04). > 2017-03-15 10:29:10,516 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > Disassociating from JobManager > 2017-03-15 10:29:10,525 INFO org.apache.flink.runtime.blob.BlobCache > - Shutting down BlobCache > 2017-03-15 10:29:10,542 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:29:10,546 INFO org.apache.flink.runtime.taskmanager.Task > - Freeing task resources for Source: Custom Source -> Flat > Map (1/1) (75fd495cc6acfd72fbe957e60e513223). > 2017-03-15 10:29:10,548 INFO org.apache.flink.runtime.taskmanager.Task > - Freeing task resources for Flat Map (1/1) > (dd555e0437867c3180a1ecaf0a9f4d04). > 2017-03-15 10:29:10,551 INFO org.apache.flink.core.fs.FileSystem > - Ensuring all FileSystem streams are closed for Flat Map > (1/1) > 2017-03-15 10:29:10,552 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Trying to > register at JobManager akka.tcp://flink@1.2.3.5:43893/user/jobmanager > (attempt 1, timeout: 500 milliseconds) > 2017-03-15 10:29:10,567 INFO org.apache.flink.core.fs.FileSystem > - Ensuring all FileSystem streams are closed for Source: > Custom Source -> Flat Map (1/1) > 2017-03-15 10:29:10,632 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Successful > registration at JobManager (akka.tcp://flink@1.2.3.5:43893/user/jobmanager), > starting network stack and library cache. > 2017-03-15 10:29:10,633 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Determined > BLOB server address to be /1.2.3.5:42830. Starting BLOB cache. > 2017-03-15 10:29:10,633 INFO org.apache.flink.runtime.blob.BlobCache > - Created BLOB cache storage directory > /tmp/blobStore-d97e08db-d2f1-4f00-a7d1-30c2f5823934 > 2017-03-15 10:29:15,551 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:29:20,571 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:29:25,582 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:29:30,592 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > I modified the original IPs to 1.2.3.4 for JobManager1 and 1.2.3.5 for > JobManager2 for confidentiality. -- This message was sent by Atlassian JIRA (v6.3.15#6346)