[
https://issues.apache.org/jira/browse/FLINK-10311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16632263#comment-16632263
]
ASF GitHub Bot commented on FLINK-10311:
----------------------------------------
asfgit closed pull request #6712: [FLINK-10311][tests] HA end-to-end/Jepsen
tests for standby Dispatchers
URL: https://github.com/apache/flink/pull/6712
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/flink-jepsen/README.md b/flink-jepsen/README.md
index 934324607f0..a3e2668c26b 100644
--- a/flink-jepsen/README.md
+++ b/flink-jepsen/README.md
@@ -5,10 +5,11 @@ distributed coordination of Apache FlinkĀ®.
## Test Coverage
Jepsen is a framework built to test the behavior of distributed systems
-under faults. The tests in this particular project deploy Flink on either YARN
or Mesos, submit a
+under faults. The tests in this particular project deploy Flink on YARN,
Mesos, or as a standalone cluster, submit a
job, and examine the availability of the job after injecting faults.
A job is said to be available if all the tasks of the job are running.
The faults that can be currently introduced to the Flink cluster include:
+
* Killing of TaskManager/JobManager processes
* Stopping HDFS NameNode
* Network partitions
diff --git a/flink-jepsen/project.clj b/flink-jepsen/project.clj
index 78935d71187..8c3e8451536 100644
--- a/flink-jepsen/project.clj
+++ b/flink-jepsen/project.clj
@@ -18,6 +18,7 @@
:license {:name "Apache License"
:url "http://www.apache.org/licenses/LICENSE-2.0"}
:main jepsen.flink.flink
+ :aot [jepsen.flink.flink]
:dependencies [[org.clojure/clojure "1.9.0"],
[cheshire "5.8.0"]
[clj-http "3.8.0"]
diff --git a/flink-jepsen/scripts/run-tests.sh
b/flink-jepsen/scripts/run-tests.sh
index e44812402f6..a2b256b6f6a 100755
--- a/flink-jepsen/scripts/run-tests.sh
+++ b/flink-jepsen/scripts/run-tests.sh
@@ -36,8 +36,15 @@ do
lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers
--deployment-mode yarn-session
lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers
--deployment-mode yarn-session
lein run test "${common_jepsen_args[@]}" --nemesis-gen
fail-name-node-during-recovery --deployment-mode yarn-session
+
lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers
--deployment-mode yarn-job
lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers
--deployment-mode yarn-job
lein run test "${common_jepsen_args[@]}" --nemesis-gen
fail-name-node-during-recovery --deployment-mode yarn-job
+
+ lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers
--deployment-mode mesos-session
+ lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers
--deployment-mode mesos-session
+
+ lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers
--deployment-mode standalone-session
+ lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers
--client-gen cancel-job --deployment-mode standalone-session
echo
done
diff --git a/flink-jepsen/src/jepsen/flink/checker.clj
b/flink-jepsen/src/jepsen/flink/checker.clj
index 02cc863bef5..7e437e9d628 100644
--- a/flink-jepsen/src/jepsen/flink/checker.clj
+++ b/flink-jepsen/src/jepsen/flink/checker.clj
@@ -48,59 +48,126 @@
[[_ v]]
(zero? v))
+(defn- set-job-not-running
+ [model] (assoc model :healthy-count 0))
+
+(defn- track-job-running
+ [model]
+ (update model :healthy-count inc))
+
+(defn- elapsed-seconds
+ [start end]
+ (ju/nanos->secs (- end start)))
+
+(defn- should-cluster-be-healthy?
+ [model op]
+ (let [{:keys [active-nemeses last-failure job-recovery-grace-period]} model]
+ (and
+ (not (nemeses-active? active-nemeses))
+ (> (elapsed-seconds last-failure (:time op))
job-recovery-grace-period))))
+
+(defn- start-fault
+ [model op]
+ (let [{:keys [active-nemeses]} model]
+ (assoc
+ model
+ :active-nemeses (update active-nemeses
+ (strip-op-suffix op)
+ safe-inc))))
+
+(defn- stop-fault
+ [model op]
+ (let [{:keys [active-nemeses]} model]
+ (assoc
+ model
+ :active-nemeses (dissoc-if zero-value?
+ (update active-nemeses (strip-op-suffix op)
dec))
+ :last-failure (:time op))))
+
+(defn- job-allowed-to-be-running?
+ [model op]
+ (let [{:keys [job-canceled? job-canceled-time
job-cancellation-grace-period]} model
+ now (:time op)]
+ (cond
+ (not job-canceled?) true
+ :else (> job-cancellation-grace-period (elapsed-seconds
job-canceled-time now)))))
+
+(defn- handle-job-running?-op
+ "Returns the new model for an op {:f :job-running? ...}."
+ [model op]
+ (assert (#{:ok :fail :info} (:type op)) "Unexpected type")
+ (let [{:keys [job-canceled?]} model
+ job-running (:value op)
+ request-failed (#{:info :fail} (:type op))]
+ (if (and request-failed
+ (should-cluster-be-healthy? model op))
+ (model/inconsistent "Cluster is not running.")
+ (if job-running ; cluster is
running, check if job is running
+ (if (job-allowed-to-be-running? model op) ; job is running
but is it supposed to be running?
+ (track-job-running model)
+ (model/inconsistent
+ "Job is running after cancellation."))
+ (if (and ; job is not
running
+ (should-cluster-be-healthy? model op)
+ (not job-canceled?))
+ (model/inconsistent "Job is not running.") ; job is not
running but it should be running because grace period passed
+ (set-job-not-running model))))))
+
(defrecord
JobRunningWithinGracePeriod
- ^{:doc "A Model which is consistent iff. the Flink job became available
within
+ ^{:doc "A Model which is consistent if the Flink job and the Flink cluster
became available within
`job-recovery-grace-period` seconds after the last fault injected by the
nemesis.
Note that some faults happen at a single point in time (e.g., killing of
processes). Other faults,
such as network splits, happen during a period of time, and can thus be
interleaving. As long as
- there are active faults, the job is allowed not to be available."}
+ there are active faults, the job and the cluster are allowed to be
unavailable.
+
+ Note that this model assumes that the client dispatches the operations
reliably, i.e., in case of
+ exceptions, the operations are retried or failed fatally."}
[active-nemeses ; stores active
failures
healthy-count ; how many
consecutive times was the job running?
last-failure ; timestamp when
the last failure was injected/ended
healthy-threshold ; after how many
times is the job considered healthy
- job-recovery-grace-period] ; after how many
seconds should the job be recovered
+ job-recovery-grace-period ; after how many
seconds should the job be recovered
+ job-cancellation-grace-period ; after how many
seconds should the job be canceled?
+ job-canceled? ; is the job
canceled?
+ job-canceled-time] ; timestamp of
cancellation
Model
(step [this op]
(case (:process op)
:nemesis (cond
(nil? (:value op)) this
- (stoppable-op? op) (assoc
- this
- :active-nemeses (update active-nemeses
- (strip-op-suffix
op)
- safe-inc))
- (stop-op? op) (assoc
- this
- :active-nemeses (dissoc-if zero-value?
- (update
active-nemeses (strip-op-suffix op) dec))
- :last-failure (:time op))
+ (stoppable-op? op) (start-fault this op)
+ (stop-op? op) (stop-fault this op)
:else (assoc this :last-failure (:time op)))
- (case (:f op)
- :job-running? (case (:type op)
- :info this ; ignore :info
operations
- :fail this ; ignore :fail
operations
- :invoke this ; ignore :invoke
operations
- :ok (if (:value op) ; check if job is
running
- (assoc ; job is running
- this
- :healthy-count
- (inc healthy-count))
- (if (and ; job is not
running
- (not (nemeses-active? active-nemeses))
- (< healthy-count healthy-threshold)
- (> (ju/nanos->secs (- (:time op)
last-failure)) job-recovery-grace-period))
- ; job is not running but it should be running
- ; because grace period passed
- (model/inconsistent "Job is not running.")
- (conj this
- [:healthy-count 0]))))
- ; ignore other client operations
- this))))
+ (if (= :invoke (:type op))
+ this ; ignore :invoke
operations
+ (case (:f op)
+ :job-running? (handle-job-running?-op this op)
+ :cancel-job (do
+ (assert (= :ok (:type op)) ":cancel-job must not fail")
+ (assoc this :job-canceled? true :job-canceled-time
(:time op)))
+ ; ignore other client operations
+ this)))))
(defn job-running-within-grace-period
- [job-running-healthy-threshold job-recovery-grace-period]
- (JobRunningWithinGracePeriod. {} 0 nil job-running-healthy-threshold
job-recovery-grace-period))
+ ([job-running-healthy-threshold job-recovery-grace-period
job-cancellation-grace-period]
+ (JobRunningWithinGracePeriod. {} 0 nil job-running-healthy-threshold
job-recovery-grace-period job-cancellation-grace-period false nil))
+ ([job-running-healthy-threshold job-recovery-grace-period]
+ (job-running-within-grace-period job-running-healthy-threshold
job-recovery-grace-period 10)))
+
+(defn get-job-running-history
+ [history]
+ (->>
+ history
+ (remove #(= (:process %) :nemesis))
+ (remove #(= (:type %) :invoke))
+ (map :value)
+ (map boolean)
+ (remove nil?)))
+
+(defn- healthy?
+ [model]
+ (>= (:healthy-count model) (:healthy-threshold model)))
(defn job-running-checker
[]
@@ -111,18 +178,11 @@
result-map (conj {}
(find test :nemesis-gen)
(find test :deployment-mode))]
- (if (or (model/inconsistent? final) (zero? (:healthy-count final 0)))
- (into result-map {:valid? false
- :error (:msg final)})
+ (if (or (model/inconsistent? final)
+ (and
+ (not (healthy? final))
+ (not (:job-canceled? final))))
+ (into result-map {:valid? false
+ :final-model final})
(into result-map {:valid? true
:final-model final}))))))
-
-(defn get-job-running-history
- [history]
- (->>
- history
- (remove #(= (:process %) :nemesis))
- (remove #(= (:type %) :invoke))
- (map :value)
- (map boolean)
- (remove nil?)))
diff --git a/flink-jepsen/src/jepsen/flink/client.clj
b/flink-jepsen/src/jepsen/flink/client.clj
index 905dc48911f..1ab987bd704 100644
--- a/flink-jepsen/src/jepsen/flink/client.clj
+++ b/flink-jepsen/src/jepsen/flink/client.clj
@@ -47,6 +47,12 @@
(info "Waiting for path" path "in ZK.")
(wait-for-zk-operation zk-client zk/exists path))
+(defn get-only-application-id
+ [coll]
+ (assert (= 1 (count coll)) (str "Expected 1 application id, got " coll ". "
+ "Failed to deploy the Flink cluster, or
there are lingering Flink clusters."))
+ (first coll))
+
(defn wait-for-children-to-exist
[zk-client path]
(wait-for-zk-operation zk-client zk/children path))
@@ -60,7 +66,7 @@
(->
(wait-for-children-to-exist zk-client "/flink")
(deref)
- (first))))
+ (get-only-application-id))))
(defn watch-node-bytes
[zk-client path callback]
@@ -97,54 +103,100 @@
:jobs
(map :id)))
-(defn get-job-details!
- [base-url job-id]
- (assert base-url)
- (assert job-id)
- (let [job-details (->
- (http/get (str base-url "/jobs/" job-id) {:as :json})
- :body)]
- (assert (:vertices job-details) "Job does not have vertices")
- job-details))
-
(defn job-running?
[base-url job-id]
- (->>
- (get-job-details! base-url job-id)
- :vertices
- (map :status)
- (every? #(= "RUNNING" %))))
+ (let [response (http/get (str base-url "/jobs/" job-id) {:as :json
:throw-exceptions false})
+ body (:body response)
+ error (:errors body)]
+ (cond
+ (http/missing? response) false
+ (not (http/success? response)) (throw (ex-info "Could not determine if
job is running" {:job-id job-id :error error}))
+ :else (do
+ (assert (:vertices body) "Job does not have vertices")
+ (->>
+ body
+ :vertices
+ (map :status)
+ (every? #(= "RUNNING" %)))))))
+
+(defn- cancel-job!
+ "Cancels the specified job. Returns true if the job could be canceled.
+ Returns false if the job does not exist. Throws an exception if the HTTP
status
+ is not successful."
+ [base-url job-id]
+ (let [response (http/patch (str base-url "/jobs/" job-id) {:as :json
:throw-exceptions false})
+ error (-> response :body :errors)]
+ (cond
+ (http/missing? response) false
+ (not (http/success? response)) (throw (ex-info "Job cancellation
unsuccessful" {:job-id job-id :error error}))
+ :else true)))
+
+(defmacro dispatch-operation
+ [op & body]
+ `(try
+ (assoc ~op :type :ok :value ~@body)
+ (catch Exception e# (do
+ (warn e# "An exception occurred while running"
(quote ~@body))
+ (assoc ~op :type :fail :error (.getMessage e#))))))
+
+(defmacro dispatch-operation-or-fatal
+ "Dispatches op by evaluating body, retrying a number of times if needed.
+ Fails fatally if all retries are exhausted."
+ [op & body]
+ `(assoc ~op :type :ok :value (fu/retry (fn [] ~@body) :fallback (fn [e#]
+ (fatal e#
"Required operation did not succeed" (quote ~@body))
+
(System/exit 1)))))
+
+(defn- dispatch-rest-operation!
+ [rest-url job-id op]
+ (assert job-id)
+ (if-not rest-url
+ (assoc op :type :fail :error "Have not determined REST URL yet.")
+ (case (:f op)
+ :job-running? (dispatch-operation op (fu/retry
+ (partial job-running? rest-url
job-id)
+ :retries 3
+ :fallback #(throw %)))
+ :cancel-job (dispatch-operation-or-fatal op (cancel-job! rest-url
job-id)))))
(defrecord Client
- [deploy-cluster! closer rest-url init-future job-id]
+ [deploy-cluster! ; function that
starts a non-standalone cluster and submits the job
+ closer ; function that
closes the ZK client
+ rest-url ; atom storing the
current rest-url
+ init-future ; future that
completes if rest-url is set to an initial value
+ job-id ; atom storing the
job-id
+ job-submitted?] ; Has the job
already been submitted? Used to avoid re-submission if the client is re-opened.
client/Client
- (open! [this test node]
+ (open! [this test _]
+ (info "Open client.")
(let [{:keys [rest-url-atom closer init-future]} (make-job-manager-url
test)]
- (assoc this :closer closer :rest-url rest-url-atom :init-future
init-future :job-id (atom nil))))
-
- (setup! [this test] this)
-
- (invoke! [this test op]
- (case (:f op)
- :submit (do
- (deploy-cluster! test)
- (deref init-future)
- (let [jobs (fu/retry (fn [] (list-jobs! @rest-url))
- :fallback (fn [e] (do
- (fatal e "Could not
get running jobs.")
- (System/exit 1))))
- num-jobs (count jobs)]
- (assert (= 1 num-jobs) (str "Expected 1 job, was " num-jobs))
- (reset! job-id (first jobs)))
- (assoc op :type :ok))
- :job-running? (let [base-url @rest-url]
- (if base-url
- (try
- (assoc op :type :ok :value (job-running? base-url
@job-id))
- (catch Exception e (do
- (warn e "Get job details from"
base-url "failed.")
- (assoc op :type :fail))))
- (assoc op :type :fail :value "Cluster not deployed
yet.")))))
-
- (teardown! [this test])
- (close! [this test] (closer)))
+ (assoc this :closer closer
+ :rest-url rest-url-atom
+ :init-future init-future)))
+
+ (setup! [_ test]
+ (info "Setup client.")
+ (when (compare-and-set! job-submitted? false true)
+ (deploy-cluster! test)
+ (deref init-future)
+ (let [jobs (fu/retry (fn [] (list-jobs! @rest-url))
+ :fallback (fn [e]
+ (fatal e "Could not get running jobs.")
+ (System/exit 1)))
+ num-jobs (count jobs)
+ job (first jobs)]
+ (assert (= 1 num-jobs) (str "Expected 1 job, was " num-jobs))
+ (info "Submitted job" job)
+ (reset! job-id job))))
+
+ (invoke! [_ _ op]
+ (dispatch-rest-operation! @rest-url @job-id op))
+
+ (teardown! [_ _])
+ (close! [_ _]
+ (info "Closing client.")
+ (closer)))
+
+(defn create-client
+ [deploy-cluster!]
+ (Client. deploy-cluster! nil nil nil (atom nil) (atom false)))
diff --git a/flink-jepsen/src/jepsen/flink/db.clj
b/flink-jepsen/src/jepsen/flink/db.clj
index 79ed8a45b4d..e0f5ff856d4 100644
--- a/flink-jepsen/src/jepsen/flink/db.clj
+++ b/flink-jepsen/src/jepsen/flink/db.clj
@@ -36,21 +36,22 @@
(def conf-file (str install-dir "/conf/flink-conf.yaml"))
(def masters-file (str install-dir "/conf/masters"))
-(def default-flink-dist-url
"https://archive.apache.org/dist/flink/flink-1.5.0/flink-1.5.0-bin-hadoop28-scala_2.11.tgz")
+(def default-flink-dist-url
"https://archive.apache.org/dist/flink/flink-1.6.0/flink-1.6.0-bin-hadoop28-scala_2.11.tgz")
(def hadoop-dist-url
"https://archive.apache.org/dist/hadoop/common/hadoop-2.8.3/hadoop-2.8.3.tar.gz")
(def deb-zookeeper-package "3.4.9-3+deb8u1")
(def deb-mesos-package "1.5.0-2.0.2")
(def deb-marathon-package "1.6.322")
(def taskmanager-slots 1)
-(def master-count 1)
(defn flink-configuration
- [test]
+ [test node]
{:high-availability "zookeeper"
:high-availability.zookeeper.quorum (zookeeper-quorum test)
:high-availability.storageDir (str (:ha-storage-dir test) "/ha")
+ :jobmanager.rpc.address node
:state.savepoints.dir (str (:ha-storage-dir test)
"/savepoints")
+ :rest.address node
:rest.port 8081
:rest.bind-address "0.0.0.0"
:taskmanager.numberOfTaskSlots taskmanager-slots
@@ -59,23 +60,17 @@
:state.backend.local-recovery "false"
:taskmanager.registration.timeout "30 s"})
-(defn master-nodes
- [test]
- (take master-count (sort (:nodes test))))
-
(defn write-configuration!
- "Writes the flink-conf.yaml and masters file to the flink conf directory"
- [test]
+ "Writes the flink-conf.yaml to the flink conf directory"
+ [test node]
(let [c (clojure.string/join "\n" (map (fn [[k v]] (str (name k) ": " v))
- (seq (flink-configuration test))))
- m (clojure.string/join "\n" (master-nodes test))]
+ (seq (flink-configuration test
node))))]
(c/exec :echo c :> conf-file)
- (c/exec :echo m :> masters-file)
;; TODO: write log4j.properties properly
(c/exec (c/lit (str "sed -i'.bak' -e '/log4j.rootLogger=/ s/=.*/=DEBUG,
file/' " install-dir "/conf/log4j.properties")))))
(defn install-flink!
- [test]
+ [test node]
(let [url (:tarball test)]
(info "Installing Flink from" url)
(cu/install-archive! url install-dir)
@@ -83,12 +78,12 @@
(c/exec (c/lit (str "ls " install-dir "/opt/flink-s3-fs-hadoop* | xargs -I
{} mv {} " install-dir "/lib")))
(c/upload (:job-jar test) upload-dir)
(c/exec :mv (str upload-dir "/" (.getName (clojure.java.io/file (:job-jar
test)))) install-dir)
- (write-configuration! test)))
+ (write-configuration! test node)))
(defn teardown-flink!
[]
(info "Tearing down Flink")
- (cu/grepkill! "flink")
+ (meh (cu/grepkill! "flink"))
(meh (c/exec :rm :-rf install-dir))
(meh (c/exec :rm :-rf (c/lit "/tmp/.yarn-properties*"))))
@@ -101,7 +96,7 @@
(reify db/DB
(setup! [_ test node]
(c/su
- (install-flink! test)))
+ (install-flink! test node)))
(teardown! [_ test node]
(c/su
@@ -120,26 +115,49 @@
(doall (map #(db/setup! % test node) dbs))))
(teardown! [_ test node]
(c/su
- (doall (map #(db/teardown! % test node) dbs))))
+ (try
+ (doall (map #(db/teardown! % test node) dbs))
+ (finally (fu/stop-all-supervised-services!)))))
db/LogFiles
(log-files [_ test node]
- (flatten (map #(db/log-files % test node) dbs)))))
+ (->>
+ (filter (partial satisfies? db/LogFiles) dbs)
+ (map #(db/log-files % test node))
+ (flatten)))))
-;;; YARN
+(defn- sorted-nodes
+ [test]
+ (-> test :nodes sort))
-(defn flink-yarn-db
+(defn- select-nodes
+ [test selector]
+ (-> (sorted-nodes test)
+ selector))
+
+(defn- first-node
+ [test]
+ (select-nodes test first))
+
+(defn- create-env-vars
+ "Expects a map containing environment variables, and returns a string that
can be used to set
+ environment variables for a child process using Bash's quick assignment and
inheritance trick.
+ For example, for a map {:FOO \"bar\"}, this function returns \"FOO=bar \"."
+ [m]
+ (->>
+ (map #(str (name (first %)) "=" (second %)) m)
+ (clojure.string/join " ")
+ (#(str % " "))))
+
+(defn- hadoop-env-vars
[]
- (let [zk (zk/db deb-zookeeper-package)
- hadoop (hadoop/db hadoop-dist-url)
- flink (flink-db)]
- (combined-db [hadoop zk flink])))
+ (create-env-vars {:HADOOP_CLASSPATH (str "`" hadoop/install-dir "/bin/hadoop
classpath`")
+ :HADOOP_CONF_DIR hadoop/hadoop-conf-dir}))
(defn exec-flink!
- [test cmd args]
+ [cmd args]
(c/su
(c/exec (c/lit (str
- "HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop
classpath` "
- "HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir " "
+ (hadoop-env-vars)
install-dir "/bin/flink " cmd " " args)))))
(defn flink-run-cli-args
@@ -149,24 +167,84 @@
["-d"]
(if (:main-class test)
[(str "-c " (:main-class test))]
- [])
- (if (= :yarn-job (:deployment-mode test))
- ["-m yarn-cluster" "-yjm 2048m" "-ytm 2048m"]
[])))
(defn submit-job!
([test] (submit-job! test []))
([test cli-args]
- (exec-flink! test "run" (clojure.string/join
- " "
- (concat cli-args
- (flink-run-cli-args test)
- [(str install-dir "/" (last (str/split
(:job-jar test) #"/")))
- (:job-args test)])))))
-
-(defn first-node
+ (exec-flink! "run" (clojure.string/join
+ " "
+ (concat cli-args
+ (flink-run-cli-args test)
+ [(str install-dir "/" (last (str/split
(:job-jar test) #"/")))
+ (:job-args test)])))))
+
+;;; Standalone
+
+(def standalone-master-count 2)
+
+(defn- standalone-master-nodes
[test]
- (-> test :nodes sort first))
+ (select-nodes test (partial take standalone-master-count)))
+
+(defn- standalone-taskmanager-nodes
+ [test]
+ (select-nodes test (partial drop standalone-master-count)))
+
+(defn- start-standalone-masters!
+ [test node]
+ (when (some #{node} (standalone-master-nodes test))
+ (fu/create-supervised-service!
+ "flink-master"
+ (str "env " (hadoop-env-vars)
+ install-dir "/bin/jobmanager.sh start-foreground "
+ ">> " log-dir "/jobmanager.log"))))
+
+(defn- start-standalone-taskmanagers!
+ [test node]
+ (when (some #{node} (standalone-taskmanager-nodes test))
+ (fu/create-supervised-service!
+ "flink-taskmanager"
+ (str "env " (hadoop-env-vars)
+ install-dir "/bin/taskmanager.sh start-foreground "
+ ">> " log-dir "/taskmanager.log"))))
+
+(defn- start-flink-db
+ []
+ (reify db/DB
+ (setup! [_ test node]
+ (c/su
+ (start-standalone-masters! test node)
+ (start-standalone-taskmanagers! test node)))
+
+ (teardown! [_ test node]
+ (c/su
+ (when (some #{node} (standalone-master-nodes test))
+ (fu/stop-supervised-service! "flink-master"))
+ (when (some #{node} (standalone-taskmanager-nodes test))
+ (fu/stop-supervised-service! "flink-taskmanager"))))))
+
+(defn flink-standalone-db
+ []
+ (let [zk (zk/db deb-zookeeper-package)
+ hadoop (hadoop/db hadoop-dist-url)
+ flink (flink-db)
+ start-flink (start-flink-db)]
+ (combined-db [hadoop zk flink start-flink])))
+
+(defn submit-job-from-first-node!
+ [test]
+ (c/on (first-node test)
+ (submit-job! test)))
+
+;;; YARN
+
+(defn flink-yarn-db
+ []
+ (let [zk (zk/db deb-zookeeper-package)
+ hadoop (hadoop/db hadoop-dist-url)
+ flink (flink-db)]
+ (combined-db [hadoop zk flink])))
(defn start-yarn-session!
[test]
@@ -174,8 +252,7 @@
(c/on node
(info "Starting YARN session from" node)
(c/su
- (c/exec (c/lit (str "HADOOP_CLASSPATH=`" hadoop/install-dir
"/bin/hadoop classpath` "
- "HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir
+ (c/exec (c/lit (str (hadoop-env-vars)
" " install-dir "/bin/yarn-session.sh -d -jm
2048m -tm 2048m")))
(submit-job! test)))))
@@ -183,7 +260,7 @@
[test]
(c/on (first-node test)
(c/su
- (submit-job! test))))
+ (submit-job! test ["-m yarn-cluster" "-yjm 2048m" "-ytm 2048m"]))))
;;; Mesos
@@ -203,6 +280,23 @@
(fatal e "Could not submit job.")
(System/exit 1)))))
+(defn mesos-appmaster-cmd
+ "Returns the command used by Marathon to start Flink's Mesos application
master."
+ [test]
+ (str (hadoop-env-vars)
+ install-dir "/bin/mesos-appmaster.sh "
+ "-Dmesos.master=" (zookeeper-uri
+ test
+ mesos/zk-namespace) " "
+ "-Djobmanager.rpc.address=$(hostname -f) "
+ "-Djobmanager.heap.mb=2048 "
+ "-Djobmanager.rpc.port=6123 "
+ "-Dmesos.resourcemanager.tasks.mem=2048 "
+ "-Dtaskmanager.heap.mb=2048 "
+ "-Dtaskmanager.numberOfTaskSlots=2 "
+ "-Dmesos.resourcemanager.tasks.cpus=1 "
+ "-Drest.bind-address=$(hostname -f) "))
+
(defn start-mesos-session!
[test]
(c/su
@@ -210,21 +304,7 @@
(http/post
(str (mesos/marathon-base-url test) "/v2/apps")
{:form-params {:id "flink"
- :cmd (str
"HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop classpath` "
-
"HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir " "
-
install-dir "/bin/mesos-appmaster.sh "
-
"-Dmesos.master=" (zookeeper-uri
-
test
-
mesos/zk-namespace) " "
-
"-Djobmanager.rpc.address=$(hostname -f) "
-
"-Djobmanager.heap.mb=2048 "
-
"-Djobmanager.rpc.port=6123 "
-
"-rest.port=8081 "
-
"-Dmesos.resourcemanager.tasks.mem=2048 "
-
"-Dtaskmanager.heap.mb=2048 "
-
"-Dtaskmanager.numberOfTaskSlots=2 "
-
"-Dmesos.resourcemanager.tasks.cpus=1 "
-
"-Drest.bind-address=$(hostname -f) ")
+ :cmd
(mesos-appmaster-cmd test)
:cpus 1.0
:mem 2048
:maxLaunchDelaySeconds 3}
diff --git a/flink-jepsen/src/jepsen/flink/flink.clj
b/flink-jepsen/src/jepsen/flink/flink.clj
index d5d41579c38..c5d0d225932 100644
--- a/flink-jepsen/src/jepsen/flink/flink.clj
+++ b/flink-jepsen/src/jepsen/flink/flink.clj
@@ -15,6 +15,7 @@
;; limitations under the License.
(ns jepsen.flink.flink
+ (:gen-class)
(:require [clojure.tools.logging :refer :all]
[jepsen
[cli :as cli]
@@ -24,31 +25,48 @@
[jepsen.flink.client :refer :all]
[jepsen.flink.checker :as flink-checker]
[jepsen.flink.db :as fdb]
- [jepsen.flink.nemesis :as fn])
- (:import (jepsen.flink.client Client)))
+ [jepsen.flink.nemesis :as fn]))
(def flink-test-config
- {:yarn-session {:db (fdb/flink-yarn-db)
- :deployment-strategy fdb/start-yarn-session!}
- :yarn-job {:db (fdb/flink-yarn-db)
- :deployment-strategy fdb/start-yarn-job!}
- :mesos-session {:db (fdb/flink-mesos-db)
- :deployment-strategy fdb/start-mesos-session!}})
+ {:yarn-session {:db (fdb/flink-yarn-db)
+ :deployment-strategy fdb/start-yarn-session!}
+ :yarn-job {:db (fdb/flink-yarn-db)
+ :deployment-strategy fdb/start-yarn-job!}
+ :mesos-session {:db (fdb/flink-mesos-db)
+ :deployment-strategy fdb/start-mesos-session!}
+ :standalone-session {:db (fdb/flink-standalone-db)
+ :deployment-strategy fdb/submit-job-from-first-node!}})
-(defn client-gen
+(def poll-job-running {:type :invoke, :f :job-running?, :value nil})
+(def cancel-job {:type :invoke, :f :cancel-job, :value nil})
+(def poll-job-running-loop (gen/seq (cycle [poll-job-running (gen/sleep 5)])))
+
+(defn default-client-gen
+ "Client generator that polls for the job running status."
[]
(->
- (cons {:type :invoke, :f :submit, :value nil}
- (cycle [{:type :invoke, :f :job-running?, :value nil}
- (gen/sleep 5)]))
- (gen/seq)
+ poll-job-running-loop
(gen/singlethreaded)))
+(defn cancelling-client-gen
+ "Client generator that polls for the job running status, and cancels the job
after 15 seconds."
+ []
+ (->
+ (gen/concat (gen/time-limit 15 (default-client-gen))
+ (gen/once cancel-job)
+ (default-client-gen))
+ (gen/singlethreaded)))
+
+(def client-gens
+ {:poll-job-running default-client-gen
+ :cancel-job cancelling-client-gen})
+
(defn flink-test
[opts]
(merge tests/noop-test
(let [{:keys [db deployment-strategy]} (-> opts :deployment-mode
flink-test-config)
- {:keys [job-running-healthy-threshold
job-recovery-grace-period]} opts]
+ {:keys [job-running-healthy-threshold
job-recovery-grace-period]} opts
+ client-gen ((:client-gen opts) client-gens)]
{:name "Apache Flink"
:os debian/os
:db db
@@ -63,7 +81,7 @@
((fn/nemesis-generator-factories (:nemesis-gen opts)) opts)
job-running-healthy-threshold
job-recovery-grace-period))))
- :client (Client. deployment-strategy nil nil nil nil)
+ :client (create-client deployment-strategy)
:checker (flink-checker/job-running-checker)})
(assoc opts :concurrency 1)))
@@ -93,6 +111,12 @@
:default :kill-task-managers
:validate [#(fn/nemesis-generator-factories (keyword %))
(keys-as-allowed-values-help-text
fn/nemesis-generator-factories)]]
+ [nil "--client-gen GEN" (str "Which client should be used?"
+
(keys-as-allowed-values-help-text client-gens))
+ :parse-fn keyword
+ :default :poll-job-running
+ :validate [#(client-gens (keyword %))
+ (keys-as-allowed-values-help-text
client-gens)]]
[nil "--deployment-mode MODE"
(keys-as-allowed-values-help-text flink-test-config)
:parse-fn keyword
:default :yarn-session
diff --git a/flink-jepsen/src/jepsen/flink/mesos.clj
b/flink-jepsen/src/jepsen/flink/mesos.clj
index a73f25fd489..aef73598da9 100644
--- a/flink-jepsen/src/jepsen/flink/mesos.clj
+++ b/flink-jepsen/src/jepsen/flink/mesos.clj
@@ -22,36 +22,9 @@
[util :as util :refer [meh]]]
[jepsen.control.util :as cu]
[jepsen.os.debian :as debian]
+ [jepsen.flink.utils :refer [create-supervised-service!
stop-supervised-service!]]
[jepsen.flink.zookeeper :refer [zookeeper-uri]]))
-;;; runit process supervisor (http://smarden.org/runit/)
-;;;
-;;; We use runit to supervise Mesos processes because Mesos uses a "fail-fast"
approach to
-;;; error handling, e.g., the Mesos master will exit when it discovers it has
been partitioned away
-;;; from the Zookeeper quorum.
-
-(def runit-version "2.1.2-3")
-
-(defn create-supervised-service!
- "Registers a service with the process supervisor and starts it."
- [service-name cmd]
- (let [service-dir (str "/etc/sv/" service-name)
- run-script (str service-dir "/run")]
- (c/su
- (c/exec :mkdir :-p service-dir)
- (c/exec :echo (clojure.string/join "\n" ["#!/bin/sh"
- "exec 2>&1"
- (str "exec " cmd)]) :>
run-script)
- (c/exec :chmod :+x run-script)
- (c/exec :ln :-sf service-dir (str "/etc/service/" service-name)))))
-
-(defn stop-supervised-service!
- "Stops a service and removes it from supervision."
- [service-name]
- (c/su
- (c/exec :sv :down service-name)
- (c/exec :rm :-f (str "/etc/service/" service-name))))
-
;;; Mesos
(def master-count 1)
@@ -154,8 +127,7 @@
"keyserver.ubuntu.com"
"E56151BF")
(debian/install {:mesos mesos-version
- :marathon marathon-version
- :runit runit-version})
+ :marathon marathon-version})
(c/exec :mkdir :-p "/var/run/mesos")
(c/exec :mkdir :-p master-dir)
(c/exec :mkdir :-p slave-dir)))
diff --git a/flink-jepsen/src/jepsen/flink/nemesis.clj
b/flink-jepsen/src/jepsen/flink/nemesis.clj
index 3047eeb9cc1..5335bba874c 100644
--- a/flink-jepsen/src/jepsen/flink/nemesis.clj
+++ b/flink-jepsen/src/jepsen/flink/nemesis.clj
@@ -86,6 +86,11 @@
(take n)
(reverse)))
+(defn- inc-by-factor
+ [n factor]
+ (assert (>= factor 1))
+ (int (* n factor)))
+
(defn stop-generator
[stop source job-running-healthy-threshold job-recovery-grace-period]
(gen/concat source
@@ -105,9 +110,12 @@
(flink-checker/get-job-running-history)
(take-last-with-default
job-running-healthy-threshold false))]
(if (or
- (and
- (every? true? job-running-history))
- (> (ju/relative-time-nanos) (+ @t (ju/secs->nanos
job-recovery-grace-period))))
+ (every? true? job-running-history)
+ (> (ju/relative-time-nanos) (+ @t
+ (ju/secs->nanos
+ (inc-by-factor
+
job-recovery-grace-period
+ 1.1)))))
(do
(reset! stop true)
nil)
@@ -122,14 +130,14 @@
(defn kill-taskmanagers-bursts-gen
[time-limit]
(fgen/time-limit time-limit
- (gen/seq (cycle (concat (repeat 20 {:type :info, :f
:kill-task-managers})
- [(gen/sleep 300)])))))
+ (gen/seq (cycle (concat (repeat 20 {:type :info, :f
:kill-task-managers})
+ [(gen/sleep 300)])))))
(defn kill-jobmanagers-gen
[time-limit]
(fgen/time-limit (+ time-limit job-submit-grace-period)
- (gen/seq (cons (gen/sleep job-submit-grace-period)
- (cycle [{:type :info, :f
:kill-job-manager}])))))
+ (gen/seq (cons (gen/sleep job-submit-grace-period)
+ (cycle [{:type :info, :f
:kill-job-manager}])))))
(defn fail-name-node-during-recovery
[]
diff --git a/flink-jepsen/src/jepsen/flink/utils.clj
b/flink-jepsen/src/jepsen/flink/utils.clj
index 3fd9f961e13..50d0075ca35 100644
--- a/flink-jepsen/src/jepsen/flink/utils.clj
+++ b/flink-jepsen/src/jepsen/flink/utils.clj
@@ -15,7 +15,10 @@
;; limitations under the License.
(ns jepsen.flink.utils
- (:require [clojure.tools.logging :refer :all]))
+ (:require [clojure.tools.logging :refer :all]
+ [jepsen
+ [control :as c]]
+ [jepsen.os.debian :as debian]))
(defn retry
"Runs a function op and retries on exception.
@@ -46,3 +49,41 @@
(Thread/sleep delay)
(recur op (assoc keys :retries (dec retries))))
(success r)))))
+
+;;; runit process supervisor (http://smarden.org/runit/)
+
+(def runit-version "2.1.2-3")
+
+(defn- install-process-supervisor!
+ "Installs the process supervisor."
+ []
+ (debian/install {:runit runit-version}))
+
+(defn create-supervised-service!
+ "Registers a service with the process supervisor and starts it."
+ [service-name cmd]
+ (let [service-dir (str "/etc/sv/" service-name)
+ run-script (str service-dir "/run")]
+ (info "Create supervised service" service-name)
+ (c/su
+ (install-process-supervisor!)
+ (c/exec :mkdir :-p service-dir)
+ (c/exec :echo (clojure.string/join "\n" ["#!/bin/sh"
+ "exec 2>&1"
+ (str "exec " cmd)]) :>
run-script)
+ (c/exec :chmod :+x run-script)
+ (c/exec :ln :-sfT service-dir (str "/etc/service/" service-name)))))
+
+(defn stop-supervised-service!
+ "Stops a service and removes it from supervision."
+ [service-name]
+ (info "Stop supervised service" service-name)
+ (c/su
+ (c/exec :rm :-f (str "/etc/service/" service-name))))
+
+(defn stop-all-supervised-services!
+ "Stops and removes all services from supervision if any."
+ []
+ (info "Stop all supervised services.")
+ (c/su
+ (c/exec :rm :-f (c/lit (str "/etc/service/*")))))
diff --git a/flink-jepsen/test/jepsen/flink/checker_test.clj
b/flink-jepsen/test/jepsen/flink/checker_test.clj
index 7389bbc15e9..c27d751e69e 100644
--- a/flink-jepsen/test/jepsen/flink/checker_test.clj
+++ b/flink-jepsen/test/jepsen/flink/checker_test.clj
@@ -25,46 +25,91 @@
{:type :invoke, :f :job-running?, :value nil, :process 0,
:time 127443701575}
{:type :ok, :f :job-running?, :value false, :process 0, :time
127453553462}
{:type :invoke, :f :job-running?, :value nil, :process 0,
:time 127453553463}
- {:type :ok, :f :job-running?, :value true, :process 0, :time
127453553464}]]
- (is (= (get-job-running-history history) [false true]))))
+ {:type :ok, :f :job-running?, :value true, :process 0, :time
127453553464}
+ {:type :info, :f :job-running?, :value nil, :process 0, :time
127453553465}]]
+ (is (= (get-job-running-history history) [false true false]))))
(deftest job-running-checker-test
(let [checker (job-running-checker)
test {}
- model (job-running-within-grace-period 3 60)
+ model (job-running-within-grace-period 3 60 10)
opts {}
check (fn [history] (checker/check checker test model history opts))]
- (testing "Job is not running after grace period."
- (is (= (:valid? (check
- [{:type :info, :f :kill-task-managers, :process
:nemesis, :time 0, :value ["172.31.32.48"]}
- {:type :ok, :f :job-running?, :value false, :process
0, :time 60000000001}])) false)))
- (testing "Job is running after grace period."
- (is (= (:valid? (check
- [{:type :info, :f :kill-task-managers, :process
:nemesis, :time 0, :value ["172.31.32.48"]}
- {:type :ok, :f :job-running?, :value true, :process
0, :time 60000000001}])) true)))
+ (testing "Model should be inconsistent if job is not running after grace
period."
+ (let [result (check
+ [{:type :info, :f :kill-task-managers, :process :nemesis,
:time 0, :value ["172.31.32.48"]}
+ {:type :ok, :f :job-running?, :value false, :process 0,
:time 60000000001}])]
+ (is (= false (:valid? result)))
+ (is (= "Job is not running." (-> result :final-model :msg)))))
+ (testing "Model should be consistent if job is running after grace period."
+ (is (= true (:valid? (check
+ [{:type :info, :f :kill-task-managers, :process
:nemesis, :time 0, :value ["172.31.32.48"]}
+ {:type :ok, :f :job-running?, :value true,
:process 0, :time 60000000001}
+ {:type :ok, :f :job-running?, :value true,
:process 0, :time 60000000002}
+ {:type :ok, :f :job-running?, :value true,
:process 0, :time 60000000003}])))))
(testing "Should tolerate non-running job during failures."
- (is (= (:valid? (check
- [{:type :info, :f :partition-start, :process :nemesis,
:time -1}
- {:type :info, :f :partition-start, :process :nemesis,
:time 0, :value "Cut off [...]"}
- {:type :ok, :f :job-running?, :value false, :process
0, :time 60000000001}
- {:type :info, :f :partition-stop, :process :nemesis,
:time 60000000002}
- {:type :info, :f :partition-stop, :process :nemesis,
:time 60000000003, :value "fully connected"}
- {:type :ok, :f :job-running?, :value true, :process
0, :time 60000000004}])) true)))
- (testing "Should respect healthy threshold."
- (is (= (:valid? (check
- [{:type :ok, :f :job-running?, :value true, :process
0, :time 0}
- {:type :ok, :f :job-running?, :value true, :process
0, :time 1}
- {:type :ok, :f :job-running?, :value true, :process
0, :time 2}
- {:type :ok, :f :job-running?, :value false, :process
0, :time 60000000003}
- {:type :ok, :f :job-running?, :value true, :process
0, :time 60000000004}])) true))
- (is (= (:valid? (check
- [{:type :ok, :f :job-running?, :value true, :process
0, :time 0}
- {:type :ok, :f :job-running?, :value true, :process
0, :time 1}
- {:type :ok, :f :job-running?, :value false, :process
0, :time 60000000002}
- {:type :ok, :f :job-running?, :value true, :process
0, :time 60000000004}])) false)))
- (testing "Job was not deployed successfully."
- (is (= (:valid? (check [{:type :invoke, :f :job-running?, :value nil,
:process 45, :time 239150413307}
- {:type :info, :f :job-running?, :value nil,
:process 45, :time 239150751938, :error "indeterminate: Assert failed:
job-id"}])) false)))))
+ (is (= true (:valid? (check
+ [{:type :info, :f :partition-start, :process
:nemesis, :time -1}
+ {:type :info, :f :partition-start, :process
:nemesis, :time 0, :value "Cut off [...]"}
+ {:type :ok, :f :job-running?, :value false,
:process 0, :time 60000000001}
+ {:type :info, :f :partition-stop, :process
:nemesis, :time 60000000002}
+ {:type :info, :f :partition-stop, :process
:nemesis, :time 60000000003, :value "fully connected"}
+ {:type :ok, :f :job-running?, :value true,
:process 0, :time 60000000004}
+ {:type :ok, :f :job-running?, :value true,
:process 0, :time 60000000005}
+ {:type :ok, :f :job-running?, :value true,
:process 0, :time 60000000006}])))))
+ (testing "Should not tolerate non-running job without a cause."
+ (let [result (check
+ [{:type :ok, :f :job-running?, :value true, :process 0,
:time 0}
+ {:type :ok, :f :job-running?, :value true, :process 0,
:time 1}
+ {:type :ok, :f :job-running?, :value false, :process 0,
:time 60000000001}
+ {:type :ok, :f :job-running?, :value true, :process 0,
:time 60000000002}])]
+ (is (= false (:valid? result)))
+ (is (= "Job is not running." (-> result :final-model :msg)))))
+ (testing "Model should be inconsistent if job submission was unsuccessful."
+ (let [result (check [{:type :invoke, :f :job-running?, :value nil,
:process 0, :time 239150413307}
+ {:type :info, :f :job-running?, :value nil,
:process 0, :time 239150751938, :error "indeterminate: Assert failed:
job-id"}])]
+ (is (= false (:valid? result)))))
+ (testing "Model should be inconsistent if the job status cannot be polled,
i.e., if the cluster is unavailable."
+ (let [result (check [{:type :fail, :f :job-running?, :value nil,
:process 0, :time 0 :error "Error"}
+ {:type :fail, :f :job-running?, :value nil,
:process 0, :time 60000000001 :error "Error"}
+ {:type :fail, :f :job-running?, :value nil,
:process 0, :time 60000000002 :error "Error"}])]
+ (is (= false (:valid? result)))
+ (is (= "Cluster is not running." (-> result :final-model :msg)))))
+ (testing "Should tolerate non-running job after cancellation."
+ (is (= true (:valid? (check [{:type :invoke, :f :cancel-job, :value nil,
:process 0, :time 0}
+ {:type :ok, :f :cancel-job, :value true,
:process 0, :time 1}
+ {:type :ok, :f :job-running?, :value true,
:process 0, :time 2}
+ {:type :ok, :f :job-running?, :value false,
:process 0, :time 3}])))))
+ (testing "Model should be inconsistent if job is running after
cancellation."
+ (let [result (check [{:type :invoke, :f :cancel-job, :value nil,
:process 0, :time 0}
+ {:type :ok, :f :cancel-job, :value true, :process
0, :time 1}
+ {:type :ok, :f :job-running?, :value true, :process
0, :time 10000000002}])]
+ (is (= false (:valid? result)))
+ (is (= "Job is running after cancellation." (-> result :final-model
:msg)))))
+ (testing "Model should be inconsistent if Flink cluster is not available
at the end."
+ (let [result (check [{:type :ok, :f :job-running?, :value true, :process
0, :time 0}
+ {:type :ok, :f :job-running?, :value true, :process
0, :time 1}
+ {:type :ok, :f :job-running?, :value true, :process
0, :time 2}
+ {:type :fail, :f :job-running?, :value nil,
:process 0, :time 60000000003, :error "Error"}])]
+ (is (= false (:valid? result)))
+ (is (= "Cluster is not running." (-> result :final-model :msg)))))
+ (testing "Model should be inconsistent if Flink cluster is not available
after job cancellation."
+ (let [result (check [{:type :ok, :f :job-running?, :value true, :process
0, :time 0}
+ {:type :invoke, :f :cancel-job, :value nil,
:process 0, :time 1}
+ {:type :ok, :f :cancel-job, :value true, :process
0, :time 2}
+ {:type :fail, :f :job-running?, :value nil,
:process 0, :time 60000000001, :error "Error"}])]
+ (is (= false (:valid? result)))
+ (is (= "Cluster is not running." (-> result :final-model :msg)))))
+ (testing "Should throw AssertionError if job cancelling operation failed."
+ (is (thrown-with-msg? AssertionError
+ #":cancel-job must not fail"
+ (check [{:type :fail, :f :cancel-job, :value nil,
:process 0, :time 0}]))))
+ (testing "Should tolerate non-running job if grace period has not passed."
+ (is (= true (:valid? (check [{:type :invoke, :f :job-running?, :value
nil, :process 0, :time 0}
+ {:type :ok, :f :job-running?, :value false,
:process 0, :time 1}
+ {:type :ok, :f :job-running?, :value true,
:process 0, :time 2}
+ {:type :ok, :f :job-running?, :value true,
:process 0, :time 3}
+ {:type :ok, :f :job-running?, :value true,
:process 0, :time 4}])))))))
(deftest safe-inc-test
(is (= (safe-inc nil) 1))
diff --git a/flink-jepsen/test/jepsen/flink/client_test.clj
b/flink-jepsen/test/jepsen/flink/client_test.clj
index b4373bfb124..a73c936d08d 100644
--- a/flink-jepsen/test/jepsen/flink/client_test.clj
+++ b/flink-jepsen/test/jepsen/flink/client_test.clj
@@ -17,7 +17,8 @@
(ns jepsen.flink.client-test
(:require [clojure.test :refer :all]
[clj-http.fake :as fake]
- [jepsen.flink.client :refer :all]))
+ [jepsen.flink.client :refer :all])
+ (:import (clojure.lang ExceptionInfo)))
(deftest read-url-test
(is (= "https://www.asdf.de" (read-url (byte-array [0xAC 0xED 0x00 0x05 0x77
0x15 0x00 0x13 0x68 0x74 0x74 0x70 0x73 0x3A 0x2F 0x2F 0x77 0x77 0x77 0x2E 0x61
0x73 0x64 0x66 0x2E 0x64 0x65])))))
@@ -25,13 +26,52 @@
(deftest job-running?-test
(fake/with-fake-routes
{"http://localhost:8081/jobs/a718f168ec6be8eff1345a17bf64196c"
- (fn [request] {:status 200
- :headers {}
- :body
"{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window
WordCount\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1522059578198,\"end-time\":-1,\"duration\":19505,\"now\":1522059597703,\"timestamps\":{\"RUNNING\":1522059578244,\"RESTARTING\":0,\"RECONCILING\":0,\"CREATED\":1522059578198,\"FAILING\":0,\"FINISHED\":0,\"CANCELLING\":0,\"SUSPENDING\":0,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0},\"vertices\":[{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"name\":\"Source:
Socket Stream -> Flat
Map\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578369,\"end-time\":-1,\"duration\":19334,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}},{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"name\":\"Window(TumblingProcessingTimeWindows(5000),
ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink:
Print to Std.
Out\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578381,\"end-time\":-1,\"duration\":19322,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}}],\"status-counts\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":2,\"CREATED\":0,\"FINISHED\":0},\"plan\":{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket
Window
WordCount\",\"nodes\":[{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Window(TumblingProcessingTimeWindows(5000),
ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) ->
Sink: Print to Std.
Out\",\"inputs\":[{\"num\":0,\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source:
Socket Stream -> Flat Map\",\"optimizer_properties\":{}}]}}"})
- "http://localhost:8081/jobs/a718f168ec6be8eff1345a17bf64196d"
- (fn [request] {:status 200
- :headers {}
- :body
"{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window
WordCount\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1522059578198,\"end-time\":-1,\"duration\":19505,\"now\":1522059597703,\"timestamps\":{\"RUNNING\":1522059578244,\"RESTARTING\":0,\"RECONCILING\":0,\"CREATED\":1522059578198,\"FAILING\":0,\"FINISHED\":0,\"CANCELLING\":0,\"SUSPENDING\":0,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0},\"vertices\":[{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"name\":\"Source:
Socket Stream -> Flat
Map\",\"parallelism\":1,\"status\":\"CREATED\",\"start-time\":1522059578369,\"end-time\":-1,\"duration\":19334,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}},{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"name\":\"Window(TumblingProcessingTimeWindows(5000),
ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink:
Print to Std.
Out\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578381,\"end-time\":-1,\"duration\":19322,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}}],\"status-counts\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":2,\"CREATED\":0,\"FINISHED\":0},\"plan\":{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket
Window
WordCount\",\"nodes\":[{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Window(TumblingProcessingTimeWindows(5000),
ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) ->
Sink: Print to Std.
Out\",\"inputs\":[{\"num\":0,\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source:
Socket Stream -> Flat Map\",\"optimizer_properties\":{}}]}}"})}
+ (fn [_] {:status 200
+ :body
"{\"jid\":\"a718f168ec6be8eff1345a17bf64196c\",\"name\":\"Socket Window
WordCount\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1522059578198,\"end-time\":-1,\"duration\":19505,\"now\":1522059597703,\"timestamps\":{\"RUNNING\":1522059578244,\"RESTARTING\":0,\"RECONCILING\":0,\"CREATED\":1522059578198,\"FAILING\":0,\"FINISHED\":0,\"CANCELLING\":0,\"SUSPENDING\":0,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0},\"vertices\":[{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"name\":\"Source:
Socket Stream -> Flat
Map\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578369,\"end-time\":-1,\"duration\":19334,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}},{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"name\":\"Window(TumblingProcessingTimeWindows(5000),
ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink:
Print to Std.
Out\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578381,\"end-time\":-1,\"duration\":19322,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}}],\"status-counts\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":2,\"CREATED\":0,\"FINISHED\":0},\"plan\":{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket
Window
WordCount\",\"nodes\":[{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Window(TumblingProcessingTimeWindows(5000),
ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) ->
Sink: Print to Std.
Out\",\"inputs\":[{\"num\":0,\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source:
Socket Stream -> Flat Map\",\"optimizer_properties\":{}}]}}"})
+
+ "http://localhost:8081/jobs/54ae4d8ec01d85053d7eb5d139492df2"
+ (fn [_] {:status 200
+ :body
"{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window
WordCount\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1522059578198,\"end-time\":-1,\"duration\":19505,\"now\":1522059597703,\"timestamps\":{\"RUNNING\":1522059578244,\"RESTARTING\":0,\"RECONCILING\":0,\"CREATED\":1522059578198,\"FAILING\":0,\"FINISHED\":0,\"CANCELLING\":0,\"SUSPENDING\":0,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0},\"vertices\":[{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"name\":\"Source:
Socket Stream -> Flat
Map\",\"parallelism\":1,\"status\":\"CREATED\",\"start-time\":1522059578369,\"end-time\":-1,\"duration\":19334,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}},{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"name\":\"Window(TumblingProcessingTimeWindows(5000),
ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink:
Print to Std.
Out\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578381,\"end-time\":-1,\"duration\":19322,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}}],\"status-counts\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":2,\"CREATED\":0,\"FINISHED\":0},\"plan\":{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket
Window
WordCount\",\"nodes\":[{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Window(TumblingProcessingTimeWindows(5000),
ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) ->
Sink: Print to Std.
Out\",\"inputs\":[{\"num\":0,\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source:
Socket Stream -> Flat Map\",\"optimizer_properties\":{}}]}}"})
+
+ "http://localhost:8081/jobs/ec3a61df646e665d31899bb26aba10b7"
+ (fn [_] {:status 404})}
(is (= (job-running? "http://localhost:8081"
"a718f168ec6be8eff1345a17bf64196c") true))
- (is (= (job-running? "http://localhost:8081"
"a718f168ec6be8eff1345a17bf64196d") false))))
+ (is (= (job-running? "http://localhost:8081"
"54ae4d8ec01d85053d7eb5d139492df2") false))
+ (is (= (job-running? "http://localhost:8081"
"ec3a61df646e665d31899bb26aba10b7") false))))
+
+(deftest cancel-job!-test
+ (fake/with-fake-routes
+ {"http://localhost:8081/jobs/a718f168ec6be8eff1345a17bf64196c"
+ {:patch (fn [_] {:status 202})}
+
+ "http://localhost:8081/jobs/54ae4d8ec01d85053d7eb5d139492df2"
+ {:patch (fn [_] {:status 404})}
+
+ "http://localhost:8081/jobs/ec3a61df646e665d31899bb26aba10b7"
+ {:patch (fn [_] {:status 503})}}
+
+ (testing "Successful job cancellation."
+ (is (= true (@#'jepsen.flink.client/cancel-job!
+ "http://localhost:8081"
+ "a718f168ec6be8eff1345a17bf64196c"))))
+
+ (testing "Job not found."
+ (is (= false (@#'jepsen.flink.client/cancel-job!
+ "http://localhost:8081"
+ "54ae4d8ec01d85053d7eb5d139492df2"))))
+
+ (testing "Throw if HTTP status code is 503."
+ (is (thrown-with-msg? ExceptionInfo
+ #"Job cancellation unsuccessful"
+ (@#'jepsen.flink.client/cancel-job!
+ "http://localhost:8081"
+ "ec3a61df646e665d31899bb26aba10b7"))))))
+
+(deftest dispatch-operation-test
+ (let [op {:type :invoke, :f :job-running?, :value nil}
+ test-fn (constantly 1)]
+ (testing "Dispatching operation completes normally."
+ (is (= {:type :ok :value 1} (select-keys (dispatch-operation op
(test-fn)) [:type :value]))))
+ (testing "Returned operation should be of type :fail and have a nil value
on exception."
+ (is (= {:type :fail :value nil :error "expected"} (select-keys
(dispatch-operation op (throw (Exception. "expected"))) [:type :value
:error]))))))
diff --git a/flink-jepsen/test/jepsen/flink/nemesis_test.clj
b/flink-jepsen/test/jepsen/flink/nemesis_test.clj
new file mode 100644
index 00000000000..488631ffea7
--- /dev/null
+++ b/flink-jepsen/test/jepsen/flink/nemesis_test.clj
@@ -0,0 +1,32 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.nemesis-test
+ (:require [clojure.test :refer :all])
+ (:require [jepsen.flink.nemesis :refer :all]))
+
+(deftest inc-by-factor-test
+ (testing "Should not increase if factor is 1."
+ (is (= 10 (@#'jepsen.flink.nemesis/inc-by-factor 10 1))))
+
+ (testing "Should increase by factor."
+ (is (= 15 (@#'jepsen.flink.nemesis/inc-by-factor 10 1.5))))
+
+ (testing "Should round down."
+ (is (= 15 (@#'jepsen.flink.nemesis/inc-by-factor 10 1.52))))
+
+ (testing "Should throw if factor < 1."
+ (is (thrown? AssertionError (@#'jepsen.flink.nemesis/inc-by-factor 1 0)))))
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> HA end-to-end/Jepsen tests for standby Dispatchers
> --------------------------------------------------
>
> Key: FLINK-10311
> URL: https://issues.apache.org/jira/browse/FLINK-10311
> Project: Flink
> Issue Type: Improvement
> Components: Tests
> Affects Versions: 1.7.0
> Reporter: Till Rohrmann
> Assignee: Gary Yao
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.7.0
>
>
> We should add end-to-end or Jepsen tests to verify the HA behaviour when
> using multiple standby Dispatchers. In particular, we should verify that jobs
> get properly cleaned up after they finished successfully (see FLINK-10255 and
> FLINK-10011):
> 1. Test that a standby Dispatcher does not affect job execution and resource
> cleanup
> 2. Test that a standby Dispatcher recovers all submitted jobs after the
> leader loses leadership
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)