[
https://issues.apache.org/jira/browse/FLINK-10985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16714567#comment-16714567
]
ASF GitHub Bot commented on FLINK-10985:
----------------------------------------
GJL closed pull request #7166: [FLINK-10985][tests] Enable submission of
multiple jobs.
URL: https://github.com/apache/flink/pull/7166
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/flink-jepsen/README.md b/flink-jepsen/README.md
index a3e2668c26b..d2678f89302 100644
--- a/flink-jepsen/README.md
+++ b/flink-jepsen/README.md
@@ -20,7 +20,7 @@ semantics.
## Usage
See the [Jepsen
documentation](https://github.com/jepsen-io/jepsen#setting-up-a-jepsen-environment)
-for how to set up the environment to run tests. The script under
`scripts/run-tests.sh` documents how to invoke
+for how to set up the environment to run tests. The script under
`docker/run-tests.sh` documents how to invoke
tests. The Flink job used for testing is located under
`flink-end-to-end-tests/flink-datastream-allround-test`. You have to build the
job first and copy
the resulting jar (`DataStreamAllroundTestProgram.jar`) to the `./bin`
directory of this project's
diff --git a/flink-jepsen/docker/data-stream-test-program-parallelism-1.edn
b/flink-jepsen/docker/data-stream-test-program-parallelism-1.edn
new file mode 100644
index 00000000000..a20903e8897
--- /dev/null
+++ b/flink-jepsen/docker/data-stream-test-program-parallelism-1.edn
@@ -0,0 +1,18 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+{:jobs [{:job-jar "/jepsen/bin/DataStreamAllroundTestProgram.jar"
+ :job-args "--environment.parallelism 1
--state_backend.checkpoint_directory hdfs:///checkpoints --state_backend rocks
--state_backend.rocks.incremental true"}]}
diff --git a/flink-jepsen/docker/run-tests.sh b/flink-jepsen/docker/run-tests.sh
index 8b2b1e6d18f..a81b873d692 100755
--- a/flink-jepsen/docker/run-tests.sh
+++ b/flink-jepsen/docker/run-tests.sh
@@ -17,6 +17,8 @@
# limitations under the License.
################################################################################
+set -euo pipefail
+
dockerdir=$(dirname $0)
dockerdir=$(cd ${dockerdir}; pwd)
@@ -26,6 +28,27 @@ n2
n3
EOF
-common_jepsen_args+=(--nodes-file ${dockerdir}/nodes)
+common_jepsen_args+=(--ha-storage-dir hdfs:///flink
+--test-spec "${dockerdir}/data-stream-test-program-parallelism-1.edn"
+--tarball ${2}
+--ssh-private-key ~/.ssh/id_rsa
+--nodes-file ${dockerdir}/nodes)
+
+for i in $(seq 1 ${1})
+do
+ echo "Executing run #${i} of ${1}"
+ lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers
--deployment-mode yarn-session
+ lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers
--deployment-mode yarn-session
+ lein run test "${common_jepsen_args[@]}" --nemesis-gen
fail-name-node-during-recovery --deployment-mode yarn-session
+
+ lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers
--deployment-mode yarn-job
+ lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers
--deployment-mode yarn-job
+ lein run test "${common_jepsen_args[@]}" --nemesis-gen
fail-name-node-during-recovery --deployment-mode yarn-job
+
+ lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers
--deployment-mode mesos-session
+ lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers
--deployment-mode mesos-session
-. ${dockerdir}/../scripts/run-tests.sh ${1} ${2} 1
+ lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers
--deployment-mode standalone-session
+ lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers
--client-gen cancel-jobs --deployment-mode standalone-session
+ echo
+done
diff --git a/flink-jepsen/scripts/run-tests.sh
b/flink-jepsen/scripts/run-tests.sh
deleted file mode 100755
index a2b256b6f6a..00000000000
--- a/flink-jepsen/scripts/run-tests.sh
+++ /dev/null
@@ -1,50 +0,0 @@
-#!/usr/bin/env bash
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-set -euo pipefail
-
-scripts=$(dirname $0)
-scripts=$(cd ${scripts}; pwd)
-
-parallelism=${3}
-
-common_jepsen_args+=(--ha-storage-dir hdfs:///flink
---job-jar ${scripts}/../bin/DataStreamAllroundTestProgram.jar
---tarball ${2}
---job-args "--environment.parallelism ${parallelism}
--state_backend.checkpoint_directory hdfs:///checkpoints --state_backend rocks
--state_backend.rocks.incremental true"
---ssh-private-key ~/.ssh/id_rsa)
-
-for i in $(seq 1 ${1})
-do
- echo "Executing run #${i} of ${1}"
- lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers
--deployment-mode yarn-session
- lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers
--deployment-mode yarn-session
- lein run test "${common_jepsen_args[@]}" --nemesis-gen
fail-name-node-during-recovery --deployment-mode yarn-session
-
- lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers
--deployment-mode yarn-job
- lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers
--deployment-mode yarn-job
- lein run test "${common_jepsen_args[@]}" --nemesis-gen
fail-name-node-during-recovery --deployment-mode yarn-job
-
- lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers
--deployment-mode mesos-session
- lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers
--deployment-mode mesos-session
-
- lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers
--deployment-mode standalone-session
- lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers
--client-gen cancel-job --deployment-mode standalone-session
- echo
-done
diff --git a/flink-jepsen/src/jepsen/flink/checker.clj
b/flink-jepsen/src/jepsen/flink/checker.clj
index 7e437e9d628..b036b0a27d6 100644
--- a/flink-jepsen/src/jepsen/flink/checker.clj
+++ b/flink-jepsen/src/jepsen/flink/checker.clj
@@ -155,34 +155,80 @@
([job-running-healthy-threshold job-recovery-grace-period]
(job-running-within-grace-period job-running-healthy-threshold
job-recovery-grace-period 10)))
-(defn get-job-running-history
+(defn- history->jobs-running?-value
[history]
(->>
history
- (remove #(= (:process %) :nemesis))
+ (filter #(= (:f %) :jobs-running?))
(remove #(= (:type %) :invoke))
- (map :value)
- (map boolean)
- (remove nil?)))
+ (map :value)))
+
+(defn- history->job-ids
+ "Extracts all job ids from a history."
+ [history]
+ (set (->> history
+ (history->jobs-running?-value)
+ (map keys)
+ (flatten)
+ (remove nil?))))
+
+(defn all-jobs-running?-history
+ [history]
+ (->>
+ history
+ (history->jobs-running?-value)
+ (map vals)
+ (map #(and
+ (not (empty? %))
+ (every? true? %)))))
(defn- healthy?
[model]
- (>= (:healthy-count model) (:healthy-threshold model)))
+ (or (>= (:healthy-count model) (:healthy-threshold model))
+ (:job-canceled? model)))
+
+(defn- jobs-running?->job-running?
+ "Rewrites history entries of the form {:f :jobs-running? :value {...}}
+
+ Example: {:type ok :f :jobs-running? :value {job-id-1 true}} -> {:type ok :f
:job-running? :value true}"
+ [history-entry job-id]
+ (let [job-running?-entry (assoc history-entry :f :job-running?)
+ job-running?-entry-ok (update job-running?-entry :value #(get %
job-id))]
+ (if (= (:type history-entry) :ok)
+ job-running?-entry-ok
+ job-running?-entry)))
+
+(defn- history->single-job-history
+ "Rewrites a history to one that appears to run a single Flink job."
+ [history job-id]
+ (let [transform-history-entry (fn [history-entry]
+ (case (:f history-entry)
+ :jobs-running?
(jobs-running?->job-running? history-entry job-id)
+ :cancel-jobs (assoc history-entry :f
:cancel-job)
+ history-entry))]
+ (map transform-history-entry history)))
+
+(defn- compute-final-model
+ [model history]
+ (let [start-time (-> history first :time)]
+ (reduce model/step
+ (assoc model :last-failure start-time)
+ history)))
(defn job-running-checker
[]
(reify
checker/Checker
(check [_ test model history _]
- (let [final (reduce model/step (assoc model :last-failure (:time (first
history))) history)
- result-map (conj {}
- (find test :nemesis-gen)
- (find test :deployment-mode))]
- (if (or (model/inconsistent? final)
- (and
- (not (healthy? final))
- (not (:job-canceled? final))))
- (into result-map {:valid? false
- :final-model final})
- (into result-map {:valid? true
- :final-model final}))))))
+ (let [job-ids (history->job-ids history)
+ individual-job-histories (map (partial history->single-job-history
history) job-ids)
+ final-models (map (partial compute-final-model model)
individual-job-histories)
+ inconsistent-or-unhealthy (or (empty? job-ids)
+ (some model/inconsistent?
final-models)
+ (some (complement healthy?)
final-models))
+ result-map (select-keys test [:nemesis-gen :deployment-mode])]
+ (if inconsistent-or-unhealthy
+ (into result-map {:valid? false
+ :final-models final-models})
+ (into result-map {:valid? true
+ :final-models final-models}))))))
diff --git a/flink-jepsen/src/jepsen/flink/client.clj
b/flink-jepsen/src/jepsen/flink/client.clj
index 1ab987bd704..afbfe56e59b 100644
--- a/flink-jepsen/src/jepsen/flink/client.clj
+++ b/flink-jepsen/src/jepsen/flink/client.clj
@@ -119,6 +119,14 @@
(map :status)
(every? #(= "RUNNING" %)))))))
+(defn jobs-running?
+ "Checks if multiple jobs are running. Returns a map where the keys are job
ids and the values are
+ booleans indicating whether the job is running or not."
+ [base-url job-ids]
+ (let [job-running-on-current-master? (partial job-running? base-url)
+ make-job-id-running?-pair (juxt identity
job-running-on-current-master?)]
+ (into {} (map make-job-id-running?-pair job-ids))))
+
(defn- cancel-job!
"Cancels the specified job. Returns true if the job could be canceled.
Returns false if the job does not exist. Throws an exception if the HTTP
status
@@ -131,6 +139,10 @@
(not (http/success? response)) (throw (ex-info "Job cancellation
unsuccessful" {:job-id job-id :error error}))
:else true)))
+(defn- cancel-jobs!
+ [base-url job-ids]
+ (doseq [job-id job-ids] (cancel-job! base-url job-id)))
+
(defmacro dispatch-operation
[op & body]
`(try
@@ -148,24 +160,24 @@
(System/exit 1)))))
(defn- dispatch-rest-operation!
- [rest-url job-id op]
- (assert job-id)
+ [rest-url job-ids op]
+ (assert job-ids)
(if-not rest-url
(assoc op :type :fail :error "Have not determined REST URL yet.")
(case (:f op)
- :job-running? (dispatch-operation op (fu/retry
- (partial job-running? rest-url
job-id)
- :retries 3
- :fallback #(throw %)))
- :cancel-job (dispatch-operation-or-fatal op (cancel-job! rest-url
job-id)))))
+ :jobs-running? (dispatch-operation op (fu/retry
+ (partial jobs-running? rest-url
job-ids)
+ :retries 3
+ :fallback #(throw %)))
+ :cancel-jobs (dispatch-operation-or-fatal op (cancel-jobs! rest-url
job-ids)))))
(defrecord Client
- [deploy-cluster! ; function that
starts a non-standalone cluster and submits the job
+ [deploy-cluster! ; function that
starts a non-standalone cluster and submits the jobs
closer ; function that
closes the ZK client
rest-url ; atom storing the
current rest-url
init-future ; future that
completes if rest-url is set to an initial value
- job-id ; atom storing the
job-id
- job-submitted?] ; Has the job
already been submitted? Used to avoid re-submission if the client is re-opened.
+ job-ids ; atom storing the
job-ids
+ job-submitted?] ; Have the jobs
already been submitted? Used to avoid re-submission if the client is re-opened.
client/Client
(open! [this test _]
(info "Open client.")
@@ -183,14 +195,13 @@
:fallback (fn [e]
(fatal e "Could not get running jobs.")
(System/exit 1)))
- num-jobs (count jobs)
- job (first jobs)]
- (assert (= 1 num-jobs) (str "Expected 1 job, was " num-jobs))
- (info "Submitted job" job)
- (reset! job-id job))))
+ num-jobs (count jobs)]
+ (assert (pos? num-jobs) (str "Expected at least 1 job, was " num-jobs))
+ (info "Submitted jobs" jobs)
+ (reset! job-ids jobs))))
(invoke! [_ _ op]
- (dispatch-rest-operation! @rest-url @job-id op))
+ (dispatch-rest-operation! @rest-url @job-ids op))
(teardown! [_ _])
(close! [_ _]
diff --git a/flink-jepsen/src/jepsen/flink/db.clj
b/flink-jepsen/src/jepsen/flink/db.clj
index 971c69e5000..30467767481 100644
--- a/flink-jepsen/src/jepsen/flink/db.clj
+++ b/flink-jepsen/src/jepsen/flink/db.clj
@@ -69,6 +69,20 @@
;; TODO: write log4j.properties properly
(c/exec (c/lit (str "sed -i'.bak' -e '/log4j.rootLogger=/ s/=.*/=DEBUG,
file/' " install-dir "/conf/log4j.properties")))))
+(defn- file-name
+ [path]
+ (.getName (clojure.java.io/file path)))
+
+(defn upload-job-jar!
+ [job-jar]
+ (c/upload job-jar upload-dir)
+ (c/exec :mv (str upload-dir "/" (file-name job-jar)) install-dir))
+
+(defn upload-job-jars!
+ [job-jars]
+ (doseq [job-jar job-jars]
+ (upload-job-jar! job-jar)))
+
(defn install-flink!
[test node]
(let [url (:tarball test)]
@@ -76,8 +90,7 @@
(cu/install-archive! url install-dir)
(info "Enable S3 FS")
(c/exec (c/lit (str "ls " install-dir "/opt/flink-s3-fs-hadoop* | xargs -I
{} mv {} " install-dir "/lib")))
- (c/upload (:job-jar test) upload-dir)
- (c/exec :mv (str upload-dir "/" (.getName (clojure.java.io/file (:job-jar
test)))) install-dir)
+ (upload-job-jars! (->> test :test-spec :jobs (map :job-jar)))
(write-configuration! test node)))
(defn teardown-flink!
@@ -145,7 +158,7 @@
[m]
(->>
(map #(str (name (first %)) "=" (second %)) m)
- (clojure.string/join " ")
+ (apply fu/join-space)
(#(str % " "))))
(defn- hadoop-env-vars
@@ -158,26 +171,25 @@
(c/su
(c/exec (c/lit (str
(hadoop-env-vars)
- install-dir "/bin/flink " cmd " " args)))))
+ install-dir "/bin/flink " cmd " " (apply fu/join-space
args))))))
(defn flink-run-cli-args
"Returns the CLI args that should be passed to 'flink run'"
- [test]
+ [job-spec]
(concat
["-d"]
- (if (:main-class test)
- [(str "-c " (:main-class test))]
+ (if (:main-class job-spec)
+ [(str "-c " (:main-class job-spec))]
[])))
(defn submit-job!
([test] (submit-job! test []))
([test cli-args]
- (exec-flink! "run" (clojure.string/join
- " "
- (concat cli-args
- (flink-run-cli-args test)
- [(str install-dir "/" (last (str/split
(:job-jar test) #"/")))
- (:job-args test)])))))
+ (doseq [{:keys [job-jar job-args] :as job-spec} (-> test :test-spec :jobs)]
+ (exec-flink! "run" (concat cli-args
+ (flink-run-cli-args job-spec)
+ [(str install-dir "/" (file-name job-jar))
+ job-args])))))
;;; Standalone
diff --git a/flink-jepsen/src/jepsen/flink/flink.clj
b/flink-jepsen/src/jepsen/flink/flink.clj
index c5d0d225932..b2b7644cd02 100644
--- a/flink-jepsen/src/jepsen/flink/flink.clj
+++ b/flink-jepsen/src/jepsen/flink/flink.clj
@@ -37,15 +37,15 @@
:standalone-session {:db (fdb/flink-standalone-db)
:deployment-strategy fdb/submit-job-from-first-node!}})
-(def poll-job-running {:type :invoke, :f :job-running?, :value nil})
-(def cancel-job {:type :invoke, :f :cancel-job, :value nil})
-(def poll-job-running-loop (gen/seq (cycle [poll-job-running (gen/sleep 5)])))
+(def poll-jobs-running {:type :invoke, :f :jobs-running?, :value nil})
+(def cancel-jobs {:type :invoke, :f :cancel-jobs, :value nil})
+(def poll-jobs-running-loop (gen/seq (cycle [poll-jobs-running (gen/sleep
5)])))
(defn default-client-gen
"Client generator that polls for the job running status."
[]
(->
- poll-job-running-loop
+ poll-jobs-running-loop
(gen/singlethreaded)))
(defn cancelling-client-gen
@@ -53,13 +53,13 @@
[]
(->
(gen/concat (gen/time-limit 15 (default-client-gen))
- (gen/once cancel-job)
+ (gen/once cancel-jobs)
(default-client-gen))
(gen/singlethreaded)))
(def client-gens
{:poll-job-running default-client-gen
- :cancel-job cancelling-client-gen})
+ :cancel-jobs cancelling-client-gen})
(defn flink-test
[opts]
@@ -94,6 +94,10 @@
(clojure.string/join ", ")
(str "Must be one of: ")))
+(defn read-test-spec
+ [path]
+ (clojure.edn/read-string (slurp path)))
+
(defn -main
[& args]
(cli/run!
@@ -101,10 +105,8 @@
(cli/single-test-cmd
{:test-fn flink-test
:tarball fdb/default-flink-dist-url
- :opt-spec [[nil "--ha-storage-dir DIR" "high-availability.storageDir"]
- [nil "--job-jar JAR" "Path to the job jar"]
- [nil "--job-args ARGS" "CLI arguments for the flink job"]
- [nil "--main-class CLASS" "Job main class"]
+ :opt-spec [[nil "--test-spec FILE" "" :parse-fn read-test-spec]
+ [nil "--ha-storage-dir DIR" "high-availability.storageDir"]
[nil "--nemesis-gen GEN" (str "Which nemesis should be
used?"
(keys-as-allowed-values-help-text fn/nemesis-generator-factories))
:parse-fn keyword
diff --git a/flink-jepsen/src/jepsen/flink/nemesis.clj
b/flink-jepsen/src/jepsen/flink/nemesis.clj
index 5335bba874c..07d69ead9cf 100644
--- a/flink-jepsen/src/jepsen/flink/nemesis.clj
+++ b/flink-jepsen/src/jepsen/flink/nemesis.clj
@@ -107,7 +107,7 @@
job-running-history (->>
history
(filter (fn [op] (>= (- (:time
op) @t) 0)))
-
(flink-checker/get-job-running-history)
+
(flink-checker/all-jobs-running?-history)
(take-last-with-default
job-running-healthy-threshold false))]
(if (or
(every? true? job-running-history)
diff --git a/flink-jepsen/src/jepsen/flink/utils.clj
b/flink-jepsen/src/jepsen/flink/utils.clj
index 1aa53efe7ae..6634a7ec83a 100644
--- a/flink-jepsen/src/jepsen/flink/utils.clj
+++ b/flink-jepsen/src/jepsen/flink/utils.clj
@@ -51,6 +51,10 @@
(recur op (assoc keys :retries (dec retries))))
(success r)))))
+(defn join-space
+ [& tokens]
+ (clojure.string/join " " tokens))
+
(defn find-files!
"Lists files recursively given a directory. If the directory does not exist,
an empty collection
is returned."
diff --git a/flink-jepsen/test/jepsen/flink/checker_test.clj
b/flink-jepsen/test/jepsen/flink/checker_test.clj
index c27d751e69e..d1ade1645b0 100644
--- a/flink-jepsen/test/jepsen/flink/checker_test.clj
+++ b/flink-jepsen/test/jepsen/flink/checker_test.clj
@@ -20,96 +20,99 @@
[checker :as checker]]
[jepsen.flink.checker :refer :all]))
-(deftest get-job-running-history-test
+(deftest all-jobs-running?-history-test
(let [history [{:type :info, :f :kill-random-subset-task-managers, :process
:nemesis, :time 121898381144, :value '("172.31.33.170")}
- {:type :invoke, :f :job-running?, :value nil, :process 0,
:time 127443701575}
- {:type :ok, :f :job-running?, :value false, :process 0, :time
127453553462}
- {:type :invoke, :f :job-running?, :value nil, :process 0,
:time 127453553463}
- {:type :ok, :f :job-running?, :value true, :process 0, :time
127453553464}
- {:type :info, :f :job-running?, :value nil, :process 0, :time
127453553465}]]
- (is (= (get-job-running-history history) [false true false]))))
+ {:type :invoke, :f :jobs-running?, :value nil, :process 0,
:time 127443701575}
+ {:type :ok, :f :jobs-running?, :value
{"3886d6b547969c3f15c53896bb496b8f" false}, :process 0, :time 127453553462}
+ {:type :invoke, :f :jobs-running?, :value nil, :process 0,
:time 127453553463}
+ {:type :ok, :f :jobs-running?, :value
{"3886d6b547969c3f15c53896bb496b8f" true}, :process 0, :time 127453553464}
+ {:type :info, :f :jobs-running?, :value nil, :process 0,
:time 127453553465}]]
+ (is (= [false true false] (all-jobs-running?-history history)))))
(deftest job-running-checker-test
(let [checker (job-running-checker)
test {}
model (job-running-within-grace-period 3 60 10)
opts {}
- check (fn [history] (checker/check checker test model history opts))]
+ check (fn [history] (checker/check checker test model history opts))
+ job-running-value {"3886d6b547969c3f15c53896bb496b8f" true}
+ job-not-running-value {"3886d6b547969c3f15c53896bb496b8f" false}]
(testing "Model should be inconsistent if job is not running after grace
period."
(let [result (check
[{:type :info, :f :kill-task-managers, :process :nemesis,
:time 0, :value ["172.31.32.48"]}
- {:type :ok, :f :job-running?, :value false, :process 0,
:time 60000000001}])]
+ {:type :ok, :f :jobs-running?, :value
job-not-running-value, :process 0, :time 60000000001}])]
(is (= false (:valid? result)))
- (is (= "Job is not running." (-> result :final-model :msg)))))
+ (is (= "Job is not running." (-> result :final-models first :msg)))))
(testing "Model should be consistent if job is running after grace period."
(is (= true (:valid? (check
[{:type :info, :f :kill-task-managers, :process
:nemesis, :time 0, :value ["172.31.32.48"]}
- {:type :ok, :f :job-running?, :value true,
:process 0, :time 60000000001}
- {:type :ok, :f :job-running?, :value true,
:process 0, :time 60000000002}
- {:type :ok, :f :job-running?, :value true,
:process 0, :time 60000000003}])))))
+ {:type :ok, :f :jobs-running?, :value
job-running-value, :process 0, :time 60000000001}
+ {:type :ok, :f :jobs-running?, :value
job-running-value, :process 0, :time 60000000002}
+ {:type :ok, :f :jobs-running?, :value
job-running-value, :process 0, :time 60000000003}])))))
(testing "Should tolerate non-running job during failures."
(is (= true (:valid? (check
[{:type :info, :f :partition-start, :process
:nemesis, :time -1}
{:type :info, :f :partition-start, :process
:nemesis, :time 0, :value "Cut off [...]"}
- {:type :ok, :f :job-running?, :value false,
:process 0, :time 60000000001}
+ {:type :ok, :f :jobs-running?, :value
job-not-running-value, :process 0, :time 60000000001}
{:type :info, :f :partition-stop, :process
:nemesis, :time 60000000002}
{:type :info, :f :partition-stop, :process
:nemesis, :time 60000000003, :value "fully connected"}
- {:type :ok, :f :job-running?, :value true,
:process 0, :time 60000000004}
- {:type :ok, :f :job-running?, :value true,
:process 0, :time 60000000005}
- {:type :ok, :f :job-running?, :value true,
:process 0, :time 60000000006}])))))
+ {:type :ok, :f :jobs-running?, :value
job-running-value, :process 0, :time 60000000004}
+ {:type :ok, :f :jobs-running?, :value
job-running-value, :process 0, :time 60000000005}
+ {:type :ok, :f :jobs-running?, :value
job-running-value, :process 0, :time 60000000006}])))))
(testing "Should not tolerate non-running job without a cause."
(let [result (check
- [{:type :ok, :f :job-running?, :value true, :process 0,
:time 0}
- {:type :ok, :f :job-running?, :value true, :process 0,
:time 1}
- {:type :ok, :f :job-running?, :value false, :process 0,
:time 60000000001}
- {:type :ok, :f :job-running?, :value true, :process 0,
:time 60000000002}])]
+ [{:type :ok, :f :jobs-running?, :value job-running-value,
:process 0, :time 0}
+ {:type :ok, :f :jobs-running?, :value job-running-value,
:process 0, :time 1}
+ {:type :ok, :f :jobs-running?, :value
job-not-running-value, :process 0, :time 60000000001}
+ {:type :ok, :f :jobs-running?, :value job-running-value,
:process 0, :time 60000000002}])]
(is (= false (:valid? result)))
- (is (= "Job is not running." (-> result :final-model :msg)))))
+ (is (= "Job is not running." (-> result :final-models first :msg)))))
(testing "Model should be inconsistent if job submission was unsuccessful."
- (let [result (check [{:type :invoke, :f :job-running?, :value nil,
:process 0, :time 239150413307}
- {:type :info, :f :job-running?, :value nil,
:process 0, :time 239150751938, :error "indeterminate: Assert failed:
job-id"}])]
+ (let [result (check [{:type :invoke, :f :jobs-running?, :value nil,
:process 0, :time 239150413307}
+ {:type :info, :f :jobs-running?, :value nil,
:process 0, :time 239150751938, :error "indeterminate: Assert failed:
job-id"}])]
(is (= false (:valid? result)))))
(testing "Model should be inconsistent if the job status cannot be polled,
i.e., if the cluster is unavailable."
- (let [result (check [{:type :fail, :f :job-running?, :value nil,
:process 0, :time 0 :error "Error"}
- {:type :fail, :f :job-running?, :value nil,
:process 0, :time 60000000001 :error "Error"}
- {:type :fail, :f :job-running?, :value nil,
:process 0, :time 60000000002 :error "Error"}])]
+ (let [result (check [{:type :fail, :f :jobs-running?, :value
job-running-value, :process 0, :time 0 :error "Error"}
+ {:type :fail, :f :jobs-running?, :value nil,
:process 0, :time 60000000001 :error "Error"}
+ {:type :fail, :f :jobs-running?, :value nil,
:process 0, :time 60000000002 :error "Error"}])]
(is (= false (:valid? result)))
- (is (= "Cluster is not running." (-> result :final-model :msg)))))
+ (is (= "Cluster is not running." (-> result :final-models first
:msg)))))
(testing "Should tolerate non-running job after cancellation."
- (is (= true (:valid? (check [{:type :invoke, :f :cancel-job, :value nil,
:process 0, :time 0}
- {:type :ok, :f :cancel-job, :value true,
:process 0, :time 1}
- {:type :ok, :f :job-running?, :value true,
:process 0, :time 2}
- {:type :ok, :f :job-running?, :value false,
:process 0, :time 3}])))))
+ (is (= true (:valid? (check [{:type :invoke, :f :cancel-jobs, :value
nil, :process 0, :time 0}
+ {:type :ok, :f :cancel-jobs, :value nil,
:process 0, :time 1}
+ {:type :ok, :f :jobs-running?, :value
job-running-value, :process 0, :time 2}
+ {:type :ok, :f :jobs-running?, :value
job-not-running-value, :process 0, :time 3}])))))
(testing "Model should be inconsistent if job is running after
cancellation."
- (let [result (check [{:type :invoke, :f :cancel-job, :value nil,
:process 0, :time 0}
- {:type :ok, :f :cancel-job, :value true, :process
0, :time 1}
- {:type :ok, :f :job-running?, :value true, :process
0, :time 10000000002}])]
+ (let [result (check [{:type :invoke, :f :cancel-jobs, :value nil,
:process 0, :time 0}
+ {:type :ok, :f :cancel-jobs, :value true, :process
0, :time 1}
+ {:type :ok, :f :jobs-running?, :value
job-running-value, :process 0, :time 10000000002}])]
(is (= false (:valid? result)))
- (is (= "Job is running after cancellation." (-> result :final-model
:msg)))))
+ (is (= "Job is running after cancellation." (-> result :final-models
first :msg)))))
(testing "Model should be inconsistent if Flink cluster is not available
at the end."
- (let [result (check [{:type :ok, :f :job-running?, :value true, :process
0, :time 0}
- {:type :ok, :f :job-running?, :value true, :process
0, :time 1}
- {:type :ok, :f :job-running?, :value true, :process
0, :time 2}
- {:type :fail, :f :job-running?, :value nil,
:process 0, :time 60000000003, :error "Error"}])]
+ (let [result (check [{:type :ok, :f :jobs-running?, :value
job-running-value, :process 0, :time 0}
+ {:type :ok, :f :jobs-running?, :value
job-running-value, :process 0, :time 1}
+ {:type :ok, :f :jobs-running?, :value
job-running-value, :process 0, :time 2}
+ {:type :fail, :f :jobs-running?, :value nil,
:process 0, :time 60000000003, :error "Error"}])]
(is (= false (:valid? result)))
- (is (= "Cluster is not running." (-> result :final-model :msg)))))
+ (is (= "Cluster is not running." (-> result :final-models first
:msg)))))
(testing "Model should be inconsistent if Flink cluster is not available
after job cancellation."
- (let [result (check [{:type :ok, :f :job-running?, :value true, :process
0, :time 0}
- {:type :invoke, :f :cancel-job, :value nil,
:process 0, :time 1}
- {:type :ok, :f :cancel-job, :value true, :process
0, :time 2}
- {:type :fail, :f :job-running?, :value nil,
:process 0, :time 60000000001, :error "Error"}])]
+ (let [result (check [{:type :ok, :f :jobs-running?, :value
job-running-value, :process 0, :time 0}
+ {:type :invoke, :f :cancel-jobs, :value nil,
:process 0, :time 1}
+ {:type :ok, :f :cancel-jobs, :value
job-running-value, :process 0, :time 2}
+ {:type :fail, :f :jobs-running?, :value nil,
:process 0, :time 60000000001, :error "Error"}])]
(is (= false (:valid? result)))
- (is (= "Cluster is not running." (-> result :final-model :msg)))))
+ (is (= "Cluster is not running." (-> result :final-models first
:msg)))))
(testing "Should throw AssertionError if job cancelling operation failed."
(is (thrown-with-msg? AssertionError
#":cancel-job must not fail"
- (check [{:type :fail, :f :cancel-job, :value nil,
:process 0, :time 0}]))))
+ (check [{:type :ok, :f :jobs-running?, :value
job-running-value, :process 0, :time 0}
+ {:type :fail, :f :cancel-jobs, :value nil,
:process 0, :time 1}]))))
(testing "Should tolerate non-running job if grace period has not passed."
- (is (= true (:valid? (check [{:type :invoke, :f :job-running?, :value
nil, :process 0, :time 0}
- {:type :ok, :f :job-running?, :value false,
:process 0, :time 1}
- {:type :ok, :f :job-running?, :value true,
:process 0, :time 2}
- {:type :ok, :f :job-running?, :value true,
:process 0, :time 3}
- {:type :ok, :f :job-running?, :value true,
:process 0, :time 4}])))))))
+ (is (= true (:valid? (check [{:type :invoke, :f :jobs-running?, :value
nil, :process 0, :time 0}
+ {:type :ok, :f :jobs-running?, :value
job-not-running-value, :process 0, :time 1}
+ {:type :ok, :f :jobs-running?, :value
job-running-value, :process 0, :time 2}
+ {:type :ok, :f :jobs-running?, :value
job-running-value, :process 0, :time 3}
+ {:type :ok, :f :jobs-running?, :value
job-running-value, :process 0, :time 4}])))))))
(deftest safe-inc-test
(is (= (safe-inc nil) 1))
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Enable multiple Job Submission in distributed Tests
> ---------------------------------------------------
>
> Key: FLINK-10985
> URL: https://issues.apache.org/jira/browse/FLINK-10985
> Project: Flink
> Issue Type: New Feature
> Components: Tests
> Affects Versions: 1.8.0
> Reporter: Gary Yao
> Assignee: Gary Yao
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.8.0
>
>
> *Description*
> Jepsen tests only allow submission of a single job. It should be possible to
> submit multiple jobs to a session cluster and assert that all jobs are
> running at the end of the test.
> *Acceptance Criteria*
> * Multiple jobs can be submitted to a session cluster
> * Jobs to be submitted (jars files, job args, main class) should be specified
> in an .edn file
> * Checks at the end of the tests are performed for all jobs
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)