This is an automated email from the ASF dual-hosted git repository.
elek pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1b041d4 HDDS-1497. Refactor blockade Tests. Contributed by Nilotpal
Nandi.
1b041d4 is described below
commit 1b041d4fd4ec0c8c4cfdcd6fa28711cf7fcd56fe
Author: Márton Elek <[email protected]>
AuthorDate: Thu May 30 16:46:06 2019 +0200
HDDS-1497. Refactor blockade Tests. Contributed by Nilotpal Nandi.
---
.../src/main/blockade/blockadeUtils/blockade.py | 30 +--
.../src/main/blockade/clusterUtils/__init__.py | 2 +-
.../main/blockade/clusterUtils/cluster_utils.py | 11 +-
.../blockade/{clusterUtils => ozone}/__init__.py | 2 +-
.../dist/src/main/blockade/ozone/cluster.py | 295 +++++++++++++++++++++
.../blockade/test_blockade_datanode_isolation.py | 219 ++++++++-------
.../dist/src/main/blockade/test_blockade_flaky.py | 48 ++--
hadoop-ozone/dist/src/main/blockade/util.py | 52 ++++
.../src/main/compose/ozoneblockade/docker-config | 6 +
9 files changed, 508 insertions(+), 157 deletions(-)
diff --git a/hadoop-ozone/dist/src/main/blockade/blockadeUtils/blockade.py
b/hadoop-ozone/dist/src/main/blockade/blockadeUtils/blockade.py
index f371865..7809c70 100644
--- a/hadoop-ozone/dist/src/main/blockade/blockadeUtils/blockade.py
+++ b/hadoop-ozone/dist/src/main/blockade/blockadeUtils/blockade.py
@@ -18,9 +18,8 @@
"""This module has apis to create and remove a blockade cluster"""
from subprocess import call
-import subprocess
import logging
-import random
+import util
from clusterUtils.cluster_utils import ClusterUtils
logger = logging.getLogger(__name__)
@@ -39,23 +38,13 @@ class Blockade(object):
@classmethod
def blockade_status(cls):
- exit_code, output = ClusterUtils.run_cmd("blockade status")
+ exit_code, output = util.run_cmd("blockade status")
return exit_code, output
@classmethod
- def make_flaky(cls, flaky_node, container_list):
- # make the network flaky
- om, scm, _, datanodes = \
- ClusterUtils.find_om_scm_client_datanodes(container_list)
- node_dict = {
- "all": "--all",
- "scm" : scm[0],
- "om" : om[0],
- "datanode": random.choice(datanodes)
- }[flaky_node]
- logger.info("flaky node: %s", node_dict)
-
- output = call(["blockade", "flaky", node_dict])
+ def make_flaky(cls, flaky_node):
+ logger.info("flaky node: %s", flaky_node)
+ output = call(["blockade", "flaky", flaky_node])
assert output == 0, "flaky command failed with exit code=[%s]" % output
@classmethod
@@ -69,7 +58,7 @@ class Blockade(object):
for node_list in args:
nodes = nodes + ','.join(node_list) + " "
exit_code, output = \
- ClusterUtils.run_cmd("blockade partition %s" % nodes)
+ util.run_cmd("blockade partition %s" % nodes)
assert exit_code == 0, \
"blockade partition command failed with exit code=[%s]" % output
@@ -95,4 +84,9 @@ class Blockade(object):
else:
output = call(["blockade", "start", node])
assert output == 0, "blockade start command failed with " \
- "exit code=[%s]" % output
\ No newline at end of file
+ "exit code=[%s]" % output
+
+ @classmethod
+ def blockade_add(cls, node):
+ output = call(["blockade", "add", node])
+ assert output == 0, "blockade add command failed"
\ No newline at end of file
diff --git a/hadoop-ozone/dist/src/main/blockade/clusterUtils/__init__.py
b/hadoop-ozone/dist/src/main/blockade/clusterUtils/__init__.py
index ae1e83e..13878a1 100644
--- a/hadoop-ozone/dist/src/main/blockade/clusterUtils/__init__.py
+++ b/hadoop-ozone/dist/src/main/blockade/clusterUtils/__init__.py
@@ -11,4 +11,4 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
-# limitations under the License.
+# limitations under the License.
\ No newline at end of file
diff --git a/hadoop-ozone/dist/src/main/blockade/clusterUtils/cluster_utils.py
b/hadoop-ozone/dist/src/main/blockade/clusterUtils/cluster_utils.py
index 3a04103..cf67380 100644
--- a/hadoop-ozone/dist/src/main/blockade/clusterUtils/cluster_utils.py
+++ b/hadoop-ozone/dist/src/main/blockade/clusterUtils/cluster_utils.py
@@ -17,6 +17,7 @@
from subprocess import call
+
import subprocess
import logging
import time
@@ -292,9 +293,15 @@ class ClusterUtils(object):
assert exit_code == 0, "Ozone get Key failed with output=[%s]" % output
@classmethod
- def find_checksum(cls, docker_compose_file, filepath):
+ def find_checksum(cls, docker_compose_file, filepath, client="ozone_client"):
+ """
+ This function finds the checksum of a file present in a docker container.
+ Before running any 'putKey' operation, this function is called to store
+ the original checksum of the file. The file is then uploaded as a key.
+ """
command = "docker-compose -f %s " \
- "exec ozone_client md5sum %s" % (docker_compose_file, filepath)
+ "exec %s md5sum %s" % \
+ (docker_compose_file, client, filepath)
exit_code, output = cls.run_cmd(command)
assert exit_code == 0, "Cant find checksum"
myoutput = output.split("\n")
diff --git a/hadoop-ozone/dist/src/main/blockade/clusterUtils/__init__.py
b/hadoop-ozone/dist/src/main/blockade/ozone/__init__.py
similarity index 95%
copy from hadoop-ozone/dist/src/main/blockade/clusterUtils/__init__.py
copy to hadoop-ozone/dist/src/main/blockade/ozone/__init__.py
index ae1e83e..13878a1 100644
--- a/hadoop-ozone/dist/src/main/blockade/clusterUtils/__init__.py
+++ b/hadoop-ozone/dist/src/main/blockade/ozone/__init__.py
@@ -11,4 +11,4 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
-# limitations under the License.
+# limitations under the License.
\ No newline at end of file
diff --git a/hadoop-ozone/dist/src/main/blockade/ozone/cluster.py
b/hadoop-ozone/dist/src/main/blockade/ozone/cluster.py
new file mode 100644
index 0000000..4347f86
--- /dev/null
+++ b/hadoop-ozone/dist/src/main/blockade/ozone/cluster.py
@@ -0,0 +1,295 @@
+#!/usr/bin/python
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+import re
+import subprocess
+import yaml
+import util
+from subprocess import call
+from blockadeUtils.blockade import Blockade
+
+
+class Command(object):
+ docker = "docker"
+ blockade = "blockade"
+ docker_compose = "docker-compose"
+ ozone = "/opt/hadoop/bin/ozone"
+ freon = "/opt/hadoop/bin/ozone freon"
+
+
+class Configuration:
+ """
+ Configurations to be used while starting Ozone Cluster.
+ Here @property decorators is used to achieve getters, setters and delete
+ behaviour for 'datanode_count' attribute.
+ @datanode_count.setter will set the value for 'datanode_count' attribute.
+ @datanode_count.deleter will delete the current value of 'datanode_count'
+ attribute.
+ """
+
+ def __init__(self):
+ __parent_dir__ = os.path.dirname(os.path.dirname(
+ os.path.dirname(os.path.realpath(__file__))))
+ self.docker_compose_file = os.path.join(__parent_dir__,
+ "compose", "ozoneblockade",
+ "docker-compose.yaml")
+ self._datanode_count = 3
+ os.environ["DOCKER_COMPOSE_FILE"] = self.docker_compose_file
+
+ @property
+ def datanode_count(self):
+ return self._datanode_count
+
+ @datanode_count.setter
+ def datanode_count(self, datanode_count):
+ self._datanode_count = datanode_count
+
+ @datanode_count.deleter
+ def datanode_count(self):
+ del self._datanode_count
+
+
+class Cluster(object):
+ """
+ This represents Ozone Cluster.
+ Here @property decorators is used to achieve getters, setters and delete
+ behaviour for 'om', 'scm', 'datanodes' and 'clients' attributes.
+ """
+
+ __logger__ = logging.getLogger(__name__)
+
+ def __init__(self, conf):
+ self.conf = conf
+ self.docker_compose_file = conf.docker_compose_file
+ self._om = None
+ self._scm = None
+ self._datanodes = None
+ self._clients = None
+ self.scm_uuid = None
+ self.datanode_dir = None
+
+ @property
+ def om(self):
+ return self._om
+
+ @om.setter
+ def om(self, om):
+ self._om = om
+
+ @om.deleter
+ def om(self):
+ del self._om
+
+ @property
+ def scm(self):
+ return self._scm
+
+ @scm.setter
+ def scm(self, scm):
+ self._scm = scm
+
+ @scm.deleter
+ def scm(self):
+ del self._scm
+
+ @property
+ def datanodes(self):
+ return self._datanodes
+
+ @datanodes.setter
+ def datanodes(self, datanodes):
+ self._datanodes = datanodes
+
+ @datanodes.deleter
+ def datanodes(self):
+ del self._datanodes
+
+ @property
+ def clients(self):
+ return self._clients
+
+ @clients.setter
+ def clients(self, clients):
+ self._clients = clients
+
+ @clients.deleter
+ def clients(self):
+ del self._clients
+
+ @classmethod
+ def create(cls, config=Configuration()):
+ return Cluster(config)
+
+ def start(self):
+ """
+ Start Ozone Cluster in docker containers.
+ """
+ Cluster.__logger__.info("Starting Ozone Cluster")
+ Blockade.blockade_destroy()
+ call([Command.docker_compose, "-f", self.docker_compose_file,
+ "up", "-d", "--scale",
+ "datanode=" + str(self.conf.datanode_count)])
+ Cluster.__logger__.info("Waiting 10s for cluster start up...")
+ # Remove the sleep and wait only till the cluster is out of safemode
+ # time.sleep(10)
+ output = subprocess.check_output([Command.docker_compose, "-f",
+ self.docker_compose_file, "ps"])
+ node_list = []
+ for out in output.split("\n")[2:-1]:
+ node = out.split(" ")[0]
+ node_list.append(node)
+ Blockade.blockade_add(node)
+
+ Blockade.blockade_status()
+ self.om = filter(lambda x: 'om' in x, node_list)[0]
+ self.scm = filter(lambda x: 'scm' in x, node_list)[0]
+ self.datanodes = sorted(list(filter(lambda x: 'datanode' in x, node_list)))
+ self.clients = filter(lambda x: 'ozone_client' in x, node_list)
+ self.scm_uuid = self.__get_scm_uuid__()
+ self.datanode_dir = self.get_conf_value("hdds.datanode.dir")
+
+ assert node_list, "no node found in the cluster!"
+ Cluster.__logger__.info("blockade created with nodes %s",
+ ' '.join(node_list))
+
+ def get_conf_value(self, key):
+ """
+ Returns the value of given configuration key.
+ """
+ command = [Command.ozone, "getconf -confKey " + key]
+ exit_code, output = self.__run_docker_command__(command, self.om)
+ return str(output).strip()
+
+ def scale_datanode(self, datanode_count):
+ """
+ Commission new datanodes to the running cluster.
+ """
+ call([Command.docker_compose, "-f", self.docker_compose_file,
+ "up", "-d", "--scale", "datanode=" + datanode_count])
+
+ def partition_network(self, *args):
+ """
+ Partition the network which is used by the cluster.
+ """
+ Blockade.blockade_create_partition(*args)
+
+
+ def restore_network(self):
+ """
+ Restores the network partition.
+ """
+ Blockade.blockade_join()
+
+
+ def __get_scm_uuid__(self):
+ """
+ Returns SCM's UUID.
+ """
+ ozone_metadata_dir = self.get_conf_value("ozone.metadata.dirs")
+ command = "cat %s/scm/current/VERSION" % ozone_metadata_dir
+ exit_code, output = self.__run_docker_command__(command, self.scm)
+ output_list = output.split("\n")
+ key_value = [x for x in output_list if re.search(r"\w+=\w+", x)]
+ uuid = [token for token in key_value if 'scmUuid' in token]
+ return uuid.pop().split("=")[1].strip()
+
+ def get_container_states(self, datanode):
+ """
+ Returns the state of all the containers in the given datanode.
+ """
+ container_parent_path = "%s/hdds/%s/current/containerDir0" % \
+ (self.datanode_dir, self.scm_uuid)
+ command = "find %s -type f -name '*.container'" % container_parent_path
+ exit_code, output = self.__run_docker_command__(command, datanode)
+ container_state = {}
+
+ container_list = map(str.strip, output.split("\n"))
+ for container_path in container_list:
+ # Reading the container file.
+ exit_code, output = self.__run_docker_command__(
+ "cat " + container_path, datanode)
+ if exit_code is not 0:
+ continue
+ data = output.split("\n")
+ # Reading key value pairs from container file.
+ key_value = [x for x in data if re.search(r"\w+:\s\w+", x)]
+ content = "\n".join(key_value)
+ content_yaml = yaml.load(content)
+ if content_yaml is None:
+ continue
+ for key, value in content_yaml.items():
+ content_yaml[key] = str(value).lstrip()
+ # Stores the container state in a dictionary.
+ container_state[content_yaml['containerID']] = content_yaml['state']
+ return container_state
+
+ def run_freon(self, num_volumes, num_buckets, num_keys, key_size,
+ replication_type="RATIS", replication_factor="THREE",
+ run_on=None):
+ """
+ Runs freon on the cluster.
+ """
+ if run_on is None:
+ run_on = self.om
+ command = [Command.freon,
+ " rk",
+ " --numOfVolumes " + str(num_volumes),
+ " --numOfBuckets " + str(num_buckets),
+ " --numOfKeys " + str(num_keys),
+ " --keySize " + str(key_size),
+ " --replicationType " + replication_type,
+ " --factor " + replication_factor]
+ return self.__run_docker_command__(command, run_on)
+
+ def __run_docker_command__(self, command, run_on):
+ if isinstance(command, list):
+ command = ' '.join(command)
+ command = [Command.docker,
+ "exec " + run_on,
+ command]
+ return util.run_cmd(command)
+
+ def stop(self):
+ """
+ Stops the Ozone Cluster.
+ """
+ Cluster.__logger__.info("Stopping Ozone Cluster")
+ call([Command.docker_compose, "-f", self.docker_compose_file, "down"])
+ Blockade.blockade_destroy()
+
+ def container_state_predicate_all_closed(self, datanodes):
+ for datanode in datanodes:
+ container_states_dn = self.get_container_states(datanode)
+ if not container_states_dn \
+ or container_states_dn.popitem()[1] != 'CLOSED':
+ return False
+ return True
+
+ def container_state_predicate_one_closed(self, datanodes):
+ for datanode in datanodes:
+ container_states_dn = self.get_container_states(datanode)
+ if container_states_dn and container_states_dn.popitem()[1] == 'CLOSED':
+ return True
+ return False
+
+ def container_state_predicate(self, datanode, state):
+ container_states_dn = self.get_container_states(datanode)
+ if container_states_dn and container_states_dn.popitem()[1] == state:
+ return True
+ return False
\ No newline at end of file
diff --git
a/hadoop-ozone/dist/src/main/blockade/test_blockade_datanode_isolation.py
b/hadoop-ozone/dist/src/main/blockade/test_blockade_datanode_isolation.py
index 1e53a32..dfa1b70 100644
--- a/hadoop-ozone/dist/src/main/blockade/test_blockade_datanode_isolation.py
+++ b/hadoop-ozone/dist/src/main/blockade/test_blockade_datanode_isolation.py
@@ -16,132 +16,123 @@
# limitations under the License.
import os
-import time
-import re
import logging
-from blockadeUtils.blockade import Blockade
-from clusterUtils.cluster_utils import ClusterUtils
+import util
+from ozone.cluster import Cluster
logger = logging.getLogger(__name__)
-parent_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
-FILE = os.path.join(parent_dir, "compose", "ozoneblockade",
- "docker-compose.yaml")
-os.environ["DOCKER_COMPOSE_FILE"] = FILE
-SCALE = 3
-INCREASED_SCALE = 5
-CONTAINER_LIST = []
-OM = []
-SCM = []
-DATANODES = []
-def setup():
- global CONTAINER_LIST, OM, SCM, DATANODES
- Blockade.blockade_destroy()
- CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE)
- exit_code, output = Blockade.blockade_status()
- assert exit_code == 0, "blockade status command failed with output=[%s]" % \
- output
- OM, SCM, _, DATANODES = \
- ClusterUtils.find_om_scm_client_datanodes(CONTAINER_LIST)
+def setup_function(function):
+ global cluster
+ cluster = Cluster.create()
+ cluster.start()
- exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS",
- "THREE")
- assert exit_code == 0, "freon run failed with output=[%s]" % output
+def teardown_function(function):
+ cluster.stop()
-def teardown():
- logger.info("Inside teardown")
- Blockade.blockade_destroy()
+def test_isolate_single_datanode():
+ """
+ In this test case we will create a network partition in such a way that
+ one of the datanode will not be able to communicate with other datanodes
+ but it will be able to communicate with SCM.
-def teardown_module():
- ClusterUtils.cluster_destroy(FILE)
+ Once the network partition happens, SCM detects it and closes the pipeline,
+ which in-turn closes the containers.
+ The container on the first two datanode will get CLOSED as they have quorum.
+ The container replica on the third node will be QUASI_CLOSED as it is not
+ able to connect with the other datanodes and it doesn't have latest BCSID.
+
+ Once we restore the network, the stale replica on the third datanode will be
+ deleted and a latest replica will be copied from any one of the other
+ datanodes.
-def test_isolatedatanode_singlenode(run_second_phase):
- """
- In this test, one of the datanodes (first datanode) cannot communicate
- with other two datanodes.
- All datanodes can communicate with SCM.
- Expectation :
- The container replica state in first datanode should be quasi-closed.
- The container replica state in other datanodes should be closed.
"""
- first_set = [OM[0], SCM[0], DATANODES[0]]
- second_set = [OM[0], SCM[0], DATANODES[1], DATANODES[2]]
- Blockade.blockade_create_partition(first_set, second_set)
- Blockade.blockade_status()
- ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
- logger.info("Waiting for %s seconds before checking container status",
- os.environ["CONTAINER_STATUS_SLEEP"])
- time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
- all_datanodes_container_status = \
- ClusterUtils.findall_container_status(FILE, SCALE)
- first_datanode_status = all_datanodes_container_status[0]
- closed_container_datanodes = [x for x in all_datanodes_container_status
- if x == 'CLOSED']
- assert first_datanode_status == 'QUASI_CLOSED'
- assert len(closed_container_datanodes) == 2, \
- "The container should have two closed replicas."
-
- if str(run_second_phase).lower() == "true":
- ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False)
- Blockade.blockade_status()
- logger.info("Waiting for %s seconds before checking container status",
- os.environ["CONTAINER_STATUS_SLEEP"])
- time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
- all_datanodes_container_status = \
- ClusterUtils.findall_container_status(FILE, INCREASED_SCALE)
- closed_container_datanodes = [x for x in all_datanodes_container_status
- if x == 'CLOSED']
- assert len(closed_container_datanodes) >= 3, \
- "The container should have at least three closed replicas."
- Blockade.blockade_join()
- Blockade.blockade_status()
- _, output = \
- ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
- assert re.search("Status: Success", output) is not None
-
-
-def test_datanode_isolation_all(run_second_phase):
+ cluster.run_freon(1, 1, 1, 10240)
+ first_set = [cluster.om, cluster.scm,
+ cluster.datanodes[0], cluster.datanodes[1]]
+ second_set = [cluster.om, cluster.scm, cluster.datanodes[2]]
+ logger.info("Partitioning the network")
+ cluster.partition_network(first_set, second_set)
+ cluster.run_freon(1, 1, 1, 10240)
+ logger.info("Waiting for container to be QUASI_CLOSED")
+
+ util.wait_until(lambda: cluster.get_container_states(cluster.datanodes[2])
+ .popitem()[1] == 'QUASI_CLOSED',
+ int(os.environ["CONTAINER_STATUS_SLEEP"]), 10)
+ container_states_dn_0 = cluster.get_container_states(cluster.datanodes[0])
+ container_states_dn_1 = cluster.get_container_states(cluster.datanodes[1])
+ container_states_dn_2 = cluster.get_container_states(cluster.datanodes[2])
+ assert len(container_states_dn_0) != 0
+ assert len(container_states_dn_1) != 0
+ assert len(container_states_dn_2) != 0
+ for key in container_states_dn_0:
+ assert container_states_dn_0.get(key) == 'CLOSED'
+ for key in container_states_dn_1:
+ assert container_states_dn_1.get(key) == 'CLOSED'
+ for key in container_states_dn_2:
+ assert container_states_dn_2.get(key) == 'QUASI_CLOSED'
+
+ # Since the replica in datanode[2] doesn't have the latest BCSID,
+ # ReplicationManager will delete it and copy a closed replica.
+ # We will now restore the network and datanode[2] should get a
+ # closed replica of the container
+ logger.info("Restoring the network")
+ cluster.restore_network()
+
+ logger.info("Waiting for the replica to be CLOSED")
+ util.wait_until(
+ lambda: cluster.container_state_predicate(cluster.datanodes[2], 'CLOSED'),
+ int(os.environ["CONTAINER_STATUS_SLEEP"]), 10)
+ container_states_dn_2 = cluster.get_container_states(cluster.datanodes[2])
+ assert len(container_states_dn_2) != 0
+ for key in container_states_dn_2:
+ assert container_states_dn_2.get(key) == 'CLOSED'
+
+
+def test_datanode_isolation_all():
"""
- In this test, none of the datanodes can communicate with other two
- datanodes.
- All datanodes can communicate with SCM.
- Expectation : The container should eventually have at least two closed
- replicas.
+ In this test case we will create a network partition in such a way that
+ all datanodes cannot communicate with each other.
+ All datanodes will be able to communicate with SCM.
+
+ Once the network partition happens, SCM detects it and closes the pipeline,
+ which in-turn tries to close the containers.
+ At least one of the replica should be in closed state
+
+ Once we restore the network, there will be three closed replicas.
+
"""
- first_set = [OM[0], SCM[0], DATANODES[0]]
- second_set = [OM[0], SCM[0], DATANODES[1]]
- third_set = [OM[0], SCM[0], DATANODES[2]]
- Blockade.blockade_create_partition(first_set, second_set, third_set)
- Blockade.blockade_status()
- ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
- logger.info("Waiting for %s seconds before checking container status",
- os.environ["CONTAINER_STATUS_SLEEP"])
- time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
- all_datanodes_container_status = \
- ClusterUtils.findall_container_status(FILE, SCALE)
- closed_container_datanodes = [x for x in all_datanodes_container_status
- if x == 'CLOSED']
- assert len(closed_container_datanodes) >= 2, \
- "The container should have at least two closed replicas."
-
- if str(run_second_phase).lower() == "true":
- ClusterUtils.cluster_setup(FILE, INCREASED_SCALE, False)
- Blockade.blockade_status()
- logger.info("Waiting for %s seconds before checking container status",
- os.environ["CONTAINER_STATUS_SLEEP"])
- time.sleep(int(os.environ["CONTAINER_STATUS_SLEEP"]))
- all_datanodes_container_status = \
- ClusterUtils.findall_container_status(FILE, INCREASED_SCALE)
- closed_container_datanodes = [x for x in all_datanodes_container_status
- if x == 'CLOSED']
- assert len(closed_container_datanodes) >= 3, \
- "The container should have at least three closed replicas."
- Blockade.blockade_join()
- Blockade.blockade_status()
- _, output = \
- ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS", "THREE")
- assert re.search("Status: Success", output) is not None
\ No newline at end of file
+ cluster.run_freon(1, 1, 1, 10240)
+
+ assert len(cluster.get_container_states(cluster.datanodes[0])) != 0
+ assert len(cluster.get_container_states(cluster.datanodes[1])) != 0
+ assert len(cluster.get_container_states(cluster.datanodes[2])) != 0
+
+ logger.info("Partitioning the network")
+ first_set = [cluster.om, cluster.scm, cluster.datanodes[0]]
+ second_set = [cluster.om, cluster.scm, cluster.datanodes[1]]
+ third_set = [cluster.om, cluster.scm, cluster.datanodes[2]]
+ cluster.partition_network(first_set, second_set, third_set)
+
+ logger.info("Waiting for the replica to be CLOSED")
+ util.wait_until(
+ lambda: cluster.container_state_predicate_one_closed(cluster.datanodes),
+ int(os.environ["CONTAINER_STATUS_SLEEP"]), 10)
+
+ # At least one of the replica should be in closed state
+ assert cluster.container_state_predicate_one_closed(cluster.datanodes)
+
+ # After restoring the network all the replicas should be in
+ # CLOSED state
+ logger.info("Restoring the network")
+ cluster.restore_network()
+
+ logger.info("Waiting for the container to be replicated")
+ util.wait_until(
+ lambda: cluster.container_state_predicate_all_closed(cluster.datanodes),
+ int(os.environ["CONTAINER_STATUS_SLEEP"]), 10)
+ assert cluster.container_state_predicate_all_closed(cluster.datanodes)
\ No newline at end of file
diff --git a/hadoop-ozone/dist/src/main/blockade/test_blockade_flaky.py
b/hadoop-ozone/dist/src/main/blockade/test_blockade_flaky.py
index 3129600..a79bd4f 100644
--- a/hadoop-ozone/dist/src/main/blockade/test_blockade_flaky.py
+++ b/hadoop-ozone/dist/src/main/blockade/test_blockade_flaky.py
@@ -16,11 +16,11 @@
# limitations under the License.
import os
-import time
import logging
+import random
import pytest
from blockadeUtils.blockade import Blockade
-from clusterUtils.cluster_utils import ClusterUtils
+from ozone.cluster import Cluster
logger = logging.getLogger(__name__)
@@ -32,30 +32,36 @@ SCALE = 6
CONTAINER_LIST = []
-def setup_module():
- global CONTAINER_LIST
- Blockade.blockade_destroy()
- CONTAINER_LIST = ClusterUtils.cluster_setup(FILE, SCALE)
- exit_code, output = Blockade.blockade_status()
- assert exit_code == 0, "blockade status command failed with output=[%s]" %
\
- output
+def setup_function(function):
+ global cluster
+ cluster = Cluster.create()
+ cluster.start()
-def teardown_module():
- Blockade.blockade_destroy()
- ClusterUtils.cluster_destroy(FILE)
+def teardown_function(function):
+ cluster.stop()
-def teardown():
- logger.info("Inside teardown")
- Blockade.blockade_fast_all()
- time.sleep(5)
[email protected]("flaky_node", ["datanode", "scm", "om", "all"])
+def test_flaky(flaky_node):
+ """
+ In these tests, we make the network of the nodes as flaky using blockade.
+ There are 4 tests :
+ 1) one of the datanodes selected randomly and network of the datanode is
+ made flaky.
+ 2) scm network is made flaky.
+ 3) om network is made flaky.
+ 4) Network of all the nodes are made flaky.
+ """
+ flaky_container_name = {
+ "scm": cluster.scm,
+ "om": cluster.om,
+ "datanode": random.choice(cluster.datanodes),
+ "all": "--all"
+ }[flaky_node]
[email protected]("flaky_nodes", ["datanode", "scm", "om", "all"])
-def test_flaky(flaky_nodes):
- Blockade.make_flaky(flaky_nodes, CONTAINER_LIST)
+ Blockade.make_flaky(flaky_container_name)
Blockade.blockade_status()
- exit_code, output = ClusterUtils.run_freon(FILE, 1, 1, 1, 10240, "RATIS",
- "THREE")
+ exit_code, output = cluster.run_freon(1, 1, 1, 10240)
assert exit_code == 0, "freon run failed with output=[%s]" % output
\ No newline at end of file
diff --git a/hadoop-ozone/dist/src/main/blockade/util.py
b/hadoop-ozone/dist/src/main/blockade/util.py
new file mode 100644
index 0000000..84f7fda
--- /dev/null
+++ b/hadoop-ozone/dist/src/main/blockade/util.py
@@ -0,0 +1,52 @@
+#!/usr/bin/python
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+import re
+import logging
+import subprocess
+
+logger = logging.getLogger(__name__)
+
+def wait_until(predicate, timeout, check_frequency=1):
+ deadline = time.time() + timeout
+ while time.time() < deadline:
+ if predicate():
+ return
+ time.sleep(check_frequency)
+
+
+def run_cmd(cmd):
+ command = cmd
+ if isinstance(cmd, list):
+ command = ' '.join(cmd)
+ logger.info(" RUNNING: %s", command)
+ all_output = ""
+ my_process = subprocess.Popen(command, stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT, shell=True)
+ while my_process.poll() is None:
+ op = my_process.stdout.readline()
+ if op:
+ all_output += op
+ logger.info(op)
+ other_output = my_process.communicate()
+ other_output = other_output[0].strip()
+ if other_output != "":
+ all_output += other_output
+ reg = re.compile(r"(\r\n|\n)$")
+ all_output = reg.sub("", all_output, 1)
+ return my_process.returncode, all_output
diff --git a/hadoop-ozone/dist/src/main/compose/ozoneblockade/docker-config
b/hadoop-ozone/dist/src/main/compose/ozoneblockade/docker-config
index dae9ddb..f5e6a92 100644
--- a/hadoop-ozone/dist/src/main/compose/ozoneblockade/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozoneblockade/docker-config
@@ -26,6 +26,12 @@ OZONE-SITE.XML_ozone.scm.client.address=scm
OZONE-SITE.XML_ozone.scm.dead.node.interval=5m
OZONE-SITE.XML_ozone.replication=1
OZONE-SITE.XML_hdds.datanode.dir=/data/hdds
+OZONE-SITE.XML_ozone.scm.pipeline.owner.container.count=1
+OZONE-SITE.XML_ozone.scm.pipeline.destroy.timeout=15s
+OZONE-SITE.XML_hdds.heartbeat.interval=2s
+OZONE-SITE.XML_hdds.scm.replication.thread.interval=5s
+OZONE-SITE.XML_hdds.scm.replication.event.timeout=7s
+OZONE-SITE.XML_dfs.ratis.server.failure.duration=25s
HDFS-SITE.XML_rpc.metrics.quantile.enable=true
HDFS-SITE.XML_rpc.metrics.percentiles.intervals=60,300
LOG4J.PROPERTIES_log4j.rootLogger=INFO, stdout
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]