Repository: qpid-cpp Updated Branches: refs/heads/master 7a47046b5 -> d490824a6 (forced update)
QPID-8184: Second part of fix: Change error handling for EFP exceptions such that the broker shuts down more cleanly without a spurious segfault. Also addeed tests to the linearstore test suite which forces such errors and checks that they are handles correctly. A minor update to brokertest.py fixes tests which use the EXPECT_EXIT_FAIL flag. Project: http://git-wip-us.apache.org/repos/asf/qpid-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-cpp/commit/d490824a Tree: http://git-wip-us.apache.org/repos/asf/qpid-cpp/tree/d490824a Diff: http://git-wip-us.apache.org/repos/asf/qpid-cpp/diff/d490824a Branch: refs/heads/master Commit: d490824a6d63841876ecc8b92b26f46622bf1f2f Parents: 3ae9f03 Author: Kim van der Riet <[email protected]> Authored: Tue May 15 09:54:06 2018 -0400 Committer: Kim van der Riet <[email protected]> Committed: Tue May 15 10:02:58 2018 -0400 ---------------------------------------------------------------------- src/qpid/linearstore/journal/EmptyFilePool.cpp | 14 +++--- src/qpid/linearstore/journal/EmptyFilePool.h | 2 +- .../journal/EmptyFilePoolPartition.cpp | 19 ++------ src/qpid/linearstore/journal/jerrno.cpp | 2 +- src/tests/brokertest.py | 5 ++- .../python_tests/client_persistence.py | 46 +++++++++++++++++++- .../linearstore/python_tests/store_test.py | 46 ++++++++++++++++++++ 7 files changed, 106 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/d490824a/src/qpid/linearstore/journal/EmptyFilePool.cpp ---------------------------------------------------------------------- diff --git a/src/qpid/linearstore/journal/EmptyFilePool.cpp b/src/qpid/linearstore/journal/EmptyFilePool.cpp index 0cbcb59..2a79bf8 100644 --- a/src/qpid/linearstore/journal/EmptyFilePool.cpp +++ b/src/qpid/linearstore/journal/EmptyFilePool.cpp @@ -48,7 +48,7 @@ unsigned char EmptyFilePool::s_fhdr_buff_[FHDR_BUFF_SIZE]; smutex EmptyFilePool::s_fhdr_buff_mutex_; size_t EmptyFilePool::s_zero_buff_size_ = ZERO_BUFF_SIZE; unsigned char EmptyFilePool::s_zero_buff_[ZERO_BUFF_SIZE]; -bool EmptyFilePool::s_static_initializer_flag_ = false; +bool EmptyFilePool::s_static_initializer_flag_ = initializeStaticBuffers(); EmptyFilePool::EmptyFilePool(const std::string& efpDirectory, @@ -62,12 +62,7 @@ EmptyFilePool::EmptyFilePool(const std::string& efpDirectory, overwriteBeforeReturnFlag_(overwriteBeforeReturnFlag), truncateFlag_(truncateFlag), journalLogRef_(journalLogRef) -{ - if (!s_static_initializer_flag_) { - initializeStaticBuffers(); - s_static_initializer_flag_ = true; - } -} +{} EmptyFilePool::~EmptyFilePool() {} @@ -201,7 +196,7 @@ efpDataSize_kib_t EmptyFilePool::dataSizeFromDirName_kib(const std::string& dirN efpDataSize_kib_t s = ::atol(n.c_str()); if (!valid || s == 0 || s % QLS_SBLK_SIZE_KIB != 0) { std::ostringstream oss; - oss << "Partition: " << partitionNumber << "; EFP directory: \'" << n << "\'"; + oss << "Partition: " << partitionNumber << "; EFP directory: \'" << dirName << "\'"; throw jexception(jerrno::JERR_EFP_BADEFPDIRNAME, oss.str(), "EmptyFilePool", "fileSizeKbFromDirName"); } return s; @@ -499,10 +494,11 @@ bool EmptyFilePool::moveFile(const std::string& from, } //static -void EmptyFilePool::initializeStaticBuffers() { +bool EmptyFilePool::initializeStaticBuffers() { // Overwrite buffers with zeros ::memset(s_fhdr_buff_, 0, s_fhdr_buff_size_); ::memset(s_zero_buff_, 0, s_zero_buff_size_); + return true; } }}} http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/d490824a/src/qpid/linearstore/journal/EmptyFilePool.h ---------------------------------------------------------------------- diff --git a/src/qpid/linearstore/journal/EmptyFilePool.h b/src/qpid/linearstore/journal/EmptyFilePool.h index aaf09d5..0381f4d 100644 --- a/src/qpid/linearstore/journal/EmptyFilePool.h +++ b/src/qpid/linearstore/journal/EmptyFilePool.h @@ -119,7 +119,7 @@ protected: static bool isSymlink(const std::string& fqName); static bool moveFile(const std::string& fromFqPath, const std::string& toFqPath); - static void initializeStaticBuffers(); + static bool initializeStaticBuffers(); }; }}} http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/d490824a/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp ---------------------------------------------------------------------- diff --git a/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp b/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp index 12d2db7..ece201b 100644 --- a/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp +++ b/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp @@ -164,21 +164,10 @@ EmptyFilePool* EmptyFilePoolPartition::createEmptyFilePool(const efpDataSize_kib EmptyFilePool* EmptyFilePoolPartition::createEmptyFilePool(const std::string fqEfpDirectoryName) { EmptyFilePool* efpp = 0; - try { - efpp = new EmptyFilePool(fqEfpDirectoryName, this, overwriteBeforeReturnFlag_, truncateFlag_, journalLogRef_); - { - slock l(efpMapMutex_); - efpMap_[efpp->dataSize_kib()] = efpp; - } - } - catch (const std::exception& e) { - if (efpp != 0) { - delete efpp; - efpp = 0; - } - std::ostringstream oss; - oss << "EmptyFilePool create failed: " << e.what(); - journalLogRef_.log(JournalLog::LOG_WARN, oss.str()); + efpp = new EmptyFilePool(fqEfpDirectoryName, this, overwriteBeforeReturnFlag_, truncateFlag_, journalLogRef_); + { + slock l(efpMapMutex_); + efpMap_[efpp->dataSize_kib()] = efpp; } if (efpp != 0) { efpp->initialize(); http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/d490824a/src/qpid/linearstore/journal/jerrno.cpp ---------------------------------------------------------------------- diff --git a/src/qpid/linearstore/journal/jerrno.cpp b/src/qpid/linearstore/journal/jerrno.cpp index ce88e78..2cf28cf 100644 --- a/src/qpid/linearstore/journal/jerrno.cpp +++ b/src/qpid/linearstore/journal/jerrno.cpp @@ -209,7 +209,7 @@ jerrno::__init() // EFP errors _err_map[JERR_EFP_BADPARTITIONNAME] = "JERR_EFP_BADPARTITIONNAME: Invalid partition name (must be \'pNNN\' where NNN is a non-zero number)"; - _err_map[JERR_EFP_BADEFPDIRNAME] = "JERR_EFP_BADEFPDIRNAME: Bad Empty File Pool directory name (must be \'NNNk\', where NNN is a number which is a multiple of 4)"; + _err_map[JERR_EFP_BADEFPDIRNAME] = "JERR_EFP_BADEFPDIRNAME: Bad EFP directory name, must be \'NNNNk\', NNNN is number which is multiple of 4"; _err_map[JERR_EFP_BADPARTITIONDIR] = "JERR_EFP_BADPARTITIONDIR: Invalid partition directory"; _err_map[JERR_EFP_NOEFP] = "JERR_EFP_NOEFP: No Empty File Pool found for given partition and empty file size"; _err_map[JERR_EFP_EMPTY] = "JERR_EFP_EMPTY: Empty File Pool is empty"; http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/d490824a/src/tests/brokertest.py ---------------------------------------------------------------------- diff --git a/src/tests/brokertest.py b/src/tests/brokertest.py index b40e953..cb2f8e4 100644 --- a/src/tests/brokertest.py +++ b/src/tests/brokertest.py @@ -326,7 +326,8 @@ class Broker(Popen): self._host = "127.0.0.1" self._agent = None - log.debug("Started broker %s" % self) + if expect != EXPECT_EXIT_FAIL: + log.debug("Started broker %s" % self) def host(self): return self._host @@ -555,6 +556,8 @@ class BrokerTest(TestCase): if (wait): try: b.ready() except Exception, e: + if expect == EXPECT_EXIT_FAIL: + return None raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e)) return b http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/d490824a/src/tests/linearstore/python_tests/client_persistence.py ---------------------------------------------------------------------- diff --git a/src/tests/linearstore/python_tests/client_persistence.py b/src/tests/linearstore/python_tests/client_persistence.py index 9ff9480..99a0c3f 100644 --- a/src/tests/linearstore/python_tests/client_persistence.py +++ b/src/tests/linearstore/python_tests/client_persistence.py @@ -19,7 +19,7 @@ import os -from brokertest import EXPECT_EXIT_OK +from brokertest import EXPECT_EXIT_OK, EXPECT_EXIT_FAIL from store_test import StoreTest, Qmf, store_args from qpid.messaging import * @@ -236,4 +236,48 @@ class RedeliveredTests(StoreTest): rcv_msg = broker.get_message("testQueue") self.assertEqual(msg_content, rcv_msg.content) self.assertTrue(rcv_msg.redelivered) + +class LinearstorePartitionTests(StoreTest): + """ + Test the linearstore partition and EFP size during recovery + """ + + def test_invalid_efp_dirs(self): + """Test invalid EFP sizes are ignored""" + broker = self.broker(store_args(), name="test_invalid_efp_dirs", expect=EXPECT_EXIT_OK) + msg_content = "xyz"*100 + msg = Message(msg_content, durable=True) + broker.send_message("testQueue", msg) + broker.terminate() + + qls_dir = os.path.join(broker.datadir, 'qls') + self.add_invalid_efp_directories(qls_dir) + + self.broker(store_args(), name="test_invalid_efp_dirs", expect=EXPECT_EXIT_FAIL) + + def test_invalid_partition_in_header(self): + """Test invalid partition number in journal header causes broker shutdown""" + broker = self.broker(store_args(), name="test_invalid_partition_in_header", expect=EXPECT_EXIT_OK) + msg_content = "xyz"*100 + msg = Message(msg_content, durable=True) + broker.send_message("testQueue", msg) + broker.terminate() + + qls_dir = os.path.join(broker.datadir, 'qls') + self.alter_journal_header(qls_dir, "testQueue", partition=5) + + self.broker(store_args(), name="test_invalid_partition_in_header", expect=EXPECT_EXIT_FAIL) + + def test_invalid_data_size_in_header(self): + """Test invalid data size in journal header causes broker shutdown""" + broker = self.broker(store_args(), name="test_invalid_data_size_in_header", expect=EXPECT_EXIT_OK) + msg_content = "xyz"*100 + msg = Message(msg_content, durable=True) + broker.send_message("testQueue", msg) + broker.terminate() + + qls_dir = os.path.join(broker.datadir, 'qls') + self.alter_journal_header(qls_dir, "testQueue", data_size=123) + + self.broker(store_args(), name="test_invalid_data_size_in_header", expect=EXPECT_EXIT_FAIL) http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/d490824a/src/tests/linearstore/python_tests/store_test.py ---------------------------------------------------------------------- diff --git a/src/tests/linearstore/python_tests/store_test.py b/src/tests/linearstore/python_tests/store_test.py index cc846ae..8aea59d 100644 --- a/src/tests/linearstore/python_tests/store_test.py +++ b/src/tests/linearstore/python_tests/store_test.py @@ -17,7 +17,9 @@ # under the License. # +import os import re +import struct from brokertest import BrokerTest from qpid.messaging import Empty from qmf.console import Session @@ -345,6 +347,50 @@ class StoreTest(BrokerTest): ssn.commit() return ssn + # Functions for manipulating linearstore journal files and directories + + def add_invalid_efp_directories(self, qls_dir): + """Add some invalid efp directories to the default partition""" + for invalid_dir_name in ['p010', 'p1hello', 'world']: + invalid_dir_path = os.path.join(qls_dir, invalid_dir_name) + if not os.path.exists(invalid_dir_path): + os.makedirs(invalid_dir_path) + + def alter_journal_header(self, qls_dir, queue_name, partition=None, data_size=None): + """Change the first journal in queue queue_name to have the supplied partition and data_size (ds)""" + #print(os.listdir(qls_dir)) + jfh = None + try: + jrnl2_dir = os.path.join(qls_dir, 'jrnl2') + queue_list = os.listdir(jrnl2_dir) + if queue_name in queue_list: + queue_dir = os.path.join(jrnl2_dir, queue_name) + jrnl_link_list = os.listdir(queue_dir) + if len(jrnl_link_list) > 0: + jrnl_link = os.path.join(queue_dir, jrnl_link_list[0]) + jrnl_file = os.readlink(jrnl_link) + if os.path.getsize(jrnl_file) == 0x201000: + jfh = open(jrnl_file, 'r+b') + file_header = jfh.read(72) # Read 72-byte file header + hdr_list = list(struct.unpack('IHHQQHHIQQQQQ', file_header)) + if partition: + hdr_list[6] = partition + if data_size: + hdr_list[8] = data_size + file_hdr = struct.pack('IHHQQHHIQQQQQ', *hdr_list) + jfh.seek(0) + jfh.write(file_hdr) + else: + self.fail('Journal file is invalid size: 0x%x bytes, expected 0x201000 bytes' % os.path.getsize(jrnl_file)) + else: + self.fail('Queue "%s" is empty') + else: + self.fail('Queue "%s" not found in directory %s, dir found: %s' % (queue_name, jrnl2_dir, queue_list)) + except (IOError, OSError) as e: + self.fail(e) + finally: + if jfh and not jfh.closed: + jfh.close() # Functions for finding strings in the broker log file (or other files) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
