Hello,

I am about to deploy my first Flink projects  to production, but I am
running into a very big hurdle.  I am unable to launch my project so it can
write to an S3 bucket.  My project is running on an EMR cluster, where I
have installed Flink 1.3.2.  I am using Yarn to launch the application, and
it seems to run fine unless I am trying to enable check pointing(with a S3
target).  I am looking to use RocksDB as my check-pointing backend.  I have
asked a few places, and I am still unable to find a solution to this
problem.  Here are my steps for creating a cluster, and launching my
application, perhaps I am missing a step.  I'd be happy to provide any
additional information if needed.

AWS Portal:

    1) EMR -> Create Cluster
    2) Advanced Options
    3) Release = emr-5.8.0
    4) Only select Hadoop 2.7.3
    5) Next -> Next -> Next -> Create Cluster ( I do fill out
names/keys/etc)

Once the cluster is up I ssh into the Master and do the following:

    1  wget
http://apache.claz.org/flink/flink-1.3.2/flink-1.3.2-bin-hadoop27-scala_2.11.tgz
    2  tar -xzf flink-1.3.2-bin-hadoop27-scala_2.11.tgz
    3  cd flink-1.3.2
    4  ./bin/yarn-session.sh -n 2 -tm 5120 -s 4 -d
    5  Change conf/flink-conf.yaml
    6  ./bin/flink run -m yarn-cluster -yn 1 ~/flink-consumer.jar

My conf/flink-conf.yaml I add the following fields:

    state.backend: rocksdb
    state.backend.fs.checkpointdir: s3:/bucket/location
    state.checkpoints.dir: s3:/bucket/location

My program's checkpointing setup:


env.enableCheckpointing(getCheckpointRate,CheckpointingMode.EXACTLY_ONCE)

env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

env.getCheckpointConfig.setMinPauseBetweenCheckpoints(getCheckpointMinPause)
    env.getCheckpointConfig.setCheckpointTimeout(getCheckpointTimeout)
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    env.setStateBackend(new RocksDBStateBackend("s3://bucket/location",
true))

Reply via email to