[ 
https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15927872#comment-15927872
 ] 

Razvan edited comment on FLINK-6063 at 3/16/17 11:42 AM:
---------------------------------------------------------

Below the code for the Job (Main Class):


import java.util.concurrent.TimeUnit;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import com.test.event.reader.SimpleEventReader;

public class StreamingAccuracyJob {

        private final static String JOB_NAME = "Prototype CEP";

        final static StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

        private static void configureEnvironment() {
                // start a checkpoint every 10000 ms
                env.enableCheckpointing(10000);

                // advanced options:

                // set mode to exactly-once (this is the default)
                
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

                // make sure 500 ms of progress happen between checkpoints
                env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);

                // checkpoints have to complete within one minute, or are 
discarded
                env.getCheckpointConfig().setCheckpointTimeout(60000);

                // allow only one checkpoint to be in progress at the same time
                env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

                env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, 
// number
                                                                                
                                                                // of
                                                                                
                                                                // restart
                                                                                
                                                                // attempts
                                Time.of(10, TimeUnit.SECONDS) // delay
                ));

                env.setParallelism(1);
                // env.setMaxParallelism(1);
                // https://issues.apache.org/jira/browse/FLINK-5773

                CheckpointConfig config = env.getCheckpointConfig();
                
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

        }

        public static void main(String[] args) throws Exception {

                DataStream<String> stream = env.addSource(new 
FlinkKafkaConsumer010<>(KafkaConfiguration.TOPIC_NAME,
                                new SimpleStringSchema(), 
KafkaConfiguration.getConnectionProperties()));

                configureEnvironment();

                DataStream<EventTuple> streamTuples = stream.flatMap(new 
JsonToTupleFlatMap());

                
streamTuples.keyBy(SimpleEventReader.FIELD_USERID_TUPLE_POSITION).flatMap(new 
AccuracyTestEvents());

                env.execute(JOB_NAME);
        }

}



was (Author: razvan):
Below the code for the Job (Main Class):


import java.util.concurrent.TimeUnit;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import com.booxware.event.reader.SimpleEventReader;

public class StreamingAccuracyJob {

        private final static String JOB_NAME = "Prototype CEP";

        final static StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

        private static void configureEnvironment() {
                // start a checkpoint every 10000 ms
                env.enableCheckpointing(10000);

                // advanced options:

                // set mode to exactly-once (this is the default)
                
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

                // make sure 500 ms of progress happen between checkpoints
                env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);

                // checkpoints have to complete within one minute, or are 
discarded
                env.getCheckpointConfig().setCheckpointTimeout(60000);

                // allow only one checkpoint to be in progress at the same time
                env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

                env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, 
// number
                                                                                
                                                                // of
                                                                                
                                                                // restart
                                                                                
                                                                // attempts
                                Time.of(10, TimeUnit.SECONDS) // delay
                ));

                env.setParallelism(1);
                // env.setMaxParallelism(1);
                // https://issues.apache.org/jira/browse/FLINK-5773

                CheckpointConfig config = env.getCheckpointConfig();
                
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

        }

        public static void main(String[] args) throws Exception {

                DataStream<String> stream = env.addSource(new 
FlinkKafkaConsumer010<>(KafkaConfiguration.TOPIC_NAME,
                                new SimpleStringSchema(), 
KafkaConfiguration.getConnectionProperties()));

                configureEnvironment();

                DataStream<EventTuple> streamTuples = stream.flatMap(new 
JsonToTupleFlatMap());

                
streamTuples.keyBy(SimpleEventReader.FIELD_USERID_TUPLE_POSITION).flatMap(new 
AccuracyTestEvents());

                env.execute(JOB_NAME);
        }

}


> HA Configuration doesn't work with Flink 1.2
> --------------------------------------------
>
>                 Key: FLINK-6063
>                 URL: https://issues.apache.org/jira/browse/FLINK-6063
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager
>    Affects Versions: 1.2.0
>            Reporter: Razvan
>            Priority: Critical
>
>  I have a setup with flink 1.2 cluster, made up of 3 JobManagers and 2 
> TaskManagers. I start the Zookeeper Quorum from JobManager1, I get 
> confirmation Zookeeper starts on the other 2 JobManagers then I start a Flink 
> job on this JobManager1.   
>  
>  The flink-conf.yaml is the same on all 5 VMs (also everything else related 
> to flink because I copied the folder across all VMs as suggested in 
> tutorials) this means jobmanager.rpc.address: points to JobManager1 
> everywhere.
> If I turn off the VM running JobManager1 I would expect Zookeeper to say one 
> of the remaining JobManagers is the leader and the TaskManagers should 
> reconnect to it. Instead a new leader is elected but the slaves keep 
> connecting to the old master
>     2017-03-15 10:28:28,655 INFO  org.apache.flink.core.fs.FileSystem         
>                   - Ensuring all FileSystem streams are closed for Async 
> calls on Source: Custom Source -> Flat Map (1/1)
>     2017-03-15 10:28:38,534 WARN  akka.remote.ReliableDeliverySupervisor      
>                   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Disassociated] 
>     2017-03-15 10:28:46,606 WARN  akka.remote.ReliableDeliverySupervisor      
>                   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
>     2017-03-15 10:28:52,431 WARN  akka.remote.ReliableDeliverySupervisor      
>                   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
>     2017-03-15 10:29:02,435 WARN  akka.remote.ReliableDeliverySupervisor      
>                   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
>     2017-03-15 10:29:10,489 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager 
> akka://flink/user/taskmanager disconnects from JobManager 
> akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its 
> leadership.
>     2017-03-15 10:29:10,490 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - Cancelling 
> all computations and discarding all cached data.
>     2017-03-15 10:29:10,491 INFO  org.apache.flink.runtime.taskmanager.Task   
>                   - Attempting to fail task externally Source: Custom Source 
> -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223).
>     2017-03-15 10:29:10,491 INFO  org.apache.flink.runtime.taskmanager.Task   
>                   - Source: Custom Source -> Flat Map (1/1) 
> (75fd495cc6acfd72fbe957e60e513223) switched from RUNNING to FAILED.
>     java.lang.Exception: TaskManager akka://flink/user/taskmanager 
> disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: 
> Old JobManager lost its leadership.
>       at 
> org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074)
>       at 
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426)
>       at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286)
>       at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>       at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
>       at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>       at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>       at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>       at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>       at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>       at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>       at 
> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:122)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>     2017-03-15 10:29:10,512 INFO  org.apache.flink.runtime.taskmanager.Task   
>                   - Triggering cancellation of task code Source: Custom 
> Source -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223).
>     2017-03-15 10:29:10,515 INFO  org.apache.flink.runtime.taskmanager.Task   
>                   - Attempting to fail task externally Flat Map (1/1) 
> (dd555e0437867c3180a1ecaf0a9f4d04).
>     2017-03-15 10:29:10,515 INFO  org.apache.flink.runtime.taskmanager.Task   
>                   - Flat Map (1/1) (dd555e0437867c3180a1ecaf0a9f4d04) 
> switched from RUNNING to FAILED.
>     java.lang.Exception: TaskManager akka://flink/user/taskmanager 
> disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: 
> Old JobManager lost its leadership.
>       at 
> org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074)
>       at 
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426)
>       at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286)
>       at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>       at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
>       at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>       at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>       at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>       at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>       at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>       at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>       at 
> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:122)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>     2017-03-15 10:29:10,516 INFO  org.apache.flink.runtime.taskmanager.Task   
>                   - Triggering cancellation of task code Flat Map (1/1) 
> (dd555e0437867c3180a1ecaf0a9f4d04).
>     2017-03-15 10:29:10,516 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - 
> Disassociating from JobManager
>     2017-03-15 10:29:10,525 INFO  org.apache.flink.runtime.blob.BlobCache     
>                   - Shutting down BlobCache
>     2017-03-15 10:29:10,542 WARN  akka.remote.ReliableDeliverySupervisor      
>                   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
>     2017-03-15 10:29:10,546 INFO  org.apache.flink.runtime.taskmanager.Task   
>                   - Freeing task resources for Source: Custom Source -> Flat 
> Map (1/1) (75fd495cc6acfd72fbe957e60e513223).
>     2017-03-15 10:29:10,548 INFO  org.apache.flink.runtime.taskmanager.Task   
>                   - Freeing task resources for Flat Map (1/1) 
> (dd555e0437867c3180a1ecaf0a9f4d04).
>     2017-03-15 10:29:10,551 INFO  org.apache.flink.core.fs.FileSystem         
>                   - Ensuring all FileSystem streams are closed for Flat Map 
> (1/1)
>     2017-03-15 10:29:10,552 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - Trying to 
> register at JobManager akka.tcp://flink@1.2.3.5:43893/user/jobmanager 
> (attempt 1, timeout: 500 milliseconds)
>     2017-03-15 10:29:10,567 INFO  org.apache.flink.core.fs.FileSystem         
>                   - Ensuring all FileSystem streams are closed for Source: 
> Custom Source -> Flat Map (1/1)
>     2017-03-15 10:29:10,632 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - Successful 
> registration at JobManager (akka.tcp://flink@1.2.3.5:43893/user/jobmanager), 
> starting network stack and library cache.
>     2017-03-15 10:29:10,633 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - Determined 
> BLOB server address to be /1.2.3.5:42830. Starting BLOB cache.
>     2017-03-15 10:29:10,633 INFO  org.apache.flink.runtime.blob.BlobCache     
>                   - Created BLOB cache storage directory 
> /tmp/blobStore-d97e08db-d2f1-4f00-a7d1-30c2f5823934
>     2017-03-15 10:29:15,551 WARN  akka.remote.ReliableDeliverySupervisor      
>                   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
>     2017-03-15 10:29:20,571 WARN  akka.remote.ReliableDeliverySupervisor      
>                   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
>     2017-03-15 10:29:25,582 WARN  akka.remote.ReliableDeliverySupervisor      
>                   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
>     2017-03-15 10:29:30,592 WARN  akka.remote.ReliableDeliverySupervisor      
>                   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
>   I modified the original IPs to 1.2.3.4 for JobManager1 and 1.2.3.5 for 
> JobManager2 for confidentiality.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to