Updated Branches: refs/heads/0.8 e367f3ffb -> 26c50fac4
kafka-791; Fix validation bugs in System Test; patched by John Fung; reviewed by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/26c50fac Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/26c50fac Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/26c50fac Branch: refs/heads/0.8 Commit: 26c50fac47802e4aa03793c60cb8316995bb37f1 Parents: e367f3f Author: John Fung <fung.j...@gmail.com> Authored: Mon Mar 25 14:08:24 2013 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Mon Mar 25 14:08:24 2013 -0700 ---------------------------------------------------------------------- .../mirror_maker_testsuite/mirror_maker_test.py | 7 +- .../replication_testsuite/replica_basic_test.py | 15 +- system_test/utils/kafka_system_test_utils.py | 304 ++++++-------- system_test/utils/replication_utils.py | 3 + system_test/utils/system_test_utils.py | 78 ++++- 5 files changed, 222 insertions(+), 185 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/26c50fac/system_test/mirror_maker_testsuite/mirror_maker_test.py ---------------------------------------------------------------------- diff --git a/system_test/mirror_maker_testsuite/mirror_maker_test.py b/system_test/mirror_maker_testsuite/mirror_maker_test.py index 48b0d25..098f531 100644 --- a/system_test/mirror_maker_testsuite/mirror_maker_test.py +++ b/system_test/mirror_maker_testsuite/mirror_maker_test.py @@ -76,6 +76,8 @@ class MirrorMakerTest(ReplicationUtils, SetupUtils): self.testSuiteAbsPathName, SystemTestEnv.SYSTEM_TEST_CASE_PREFIX) testCasePathNameList.sort() + replicationUtils = ReplicationUtils(self) + # ============================================================= # launch each testcase one by one: testcase_1, testcase_2, ... # ============================================================= @@ -282,9 +284,8 @@ class MirrorMakerTest(ReplicationUtils, SetupUtils): # validate the data matched and checksum # ============================================= self.log_message("validating data matched") - #kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv) - kafka_system_test_utils.validate_simple_consumer_data_matched(self.systemTestEnv, self.testcaseEnv) - kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv) + kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv, replicationUtils) + kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv, "source") kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv, "target") # ============================================= http://git-wip-us.apache.org/repos/asf/kafka/blob/26c50fac/system_test/replication_testsuite/replica_basic_test.py ---------------------------------------------------------------------- diff --git a/system_test/replication_testsuite/replica_basic_test.py b/system_test/replication_testsuite/replica_basic_test.py index 3fc47d9..ce29240 100644 --- a/system_test/replication_testsuite/replica_basic_test.py +++ b/system_test/replication_testsuite/replica_basic_test.py @@ -77,6 +77,8 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils): self.testSuiteAbsPathName, SystemTestEnv.SYSTEM_TEST_CASE_PREFIX) testCasePathNameList.sort() + replicationUtils = ReplicationUtils(self) + # ============================================================= # launch each testcase one by one: testcase_1, testcase_2, ... # ============================================================= @@ -423,16 +425,15 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils): self.log_message("validating data matched") if logRetentionTest.lower() == "true": - kafka_system_test_utils.validate_simple_consumer_data_matched_across_replicas(self.systemTestEnv, self.testcaseEnv) - kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv) + kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv, replicationUtils) elif consumerMultiTopicsMode.lower() == "true": - #kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv) - kafka_system_test_utils.validate_data_matched_in_multi_topics_from_single_consumer_producer(self.systemTestEnv, self.testcaseEnv) + kafka_system_test_utils.validate_data_matched_in_multi_topics_from_single_consumer_producer( + self.systemTestEnv, self.testcaseEnv, replicationUtils) else: kafka_system_test_utils.validate_simple_consumer_data_matched_across_replicas(self.systemTestEnv, self.testcaseEnv) - #kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv) - kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv) - + kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv, self.testcaseEnv) + kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv, replicationUtils) + # ============================================= # draw graphs # ============================================= http://git-wip-us.apache.org/repos/asf/kafka/blob/26c50fac/system_test/utils/kafka_system_test_utils.py ---------------------------------------------------------------------- diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index 9411405..9e58624 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -1189,17 +1189,21 @@ def get_message_checksum(logPathName): return messageChecksumList -def validate_data_matched(systemTestEnv, testcaseEnv): +def validate_data_matched(systemTestEnv, testcaseEnv, replicationUtils): + logger.debug("#### Inside validate_data_matched", extra=d) + validationStatusDict = testcaseEnv.validationStatusDict clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "producer_performance") consumerCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "console_consumer") + consumerDuplicateCount = 0 + for prodPerfCfg in prodPerfCfgList: producerEntityId = prodPerfCfg["entity_id"] - #topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic") - topic = testcaseEnv.producerTopicsString + topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic") + logger.debug("working on topic : " + topic, extra=d) acks = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "request-num-acks") consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \ @@ -1207,13 +1211,14 @@ def validate_data_matched(systemTestEnv, testcaseEnv): matchingConsumerEntityId = None for consumerEntityId in consumerEntityIdList: - #consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic") - consumerTopic = testcaseEnv.consumerTopicsString + consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic") if consumerTopic in topic: matchingConsumerEntityId = consumerEntityId + logger.debug("matching consumer entity id found", extra=d) break if matchingConsumerEntityId is None: + logger.debug("matching consumer entity id NOT found", extra=d) break msgIdMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname( \ @@ -1229,10 +1234,11 @@ def validate_data_matched(systemTestEnv, testcaseEnv): producerMsgIdSet = set(producerMsgIdList) consumerMsgIdSet = set(consumerMsgIdList) - missingMsgIdInConsumer = producerMsgIdSet - consumerMsgIdSet + consumerDuplicateCount = len(consumerMsgIdList) - len(consumerMsgIdSet) + missingUniqConsumerMsgId = system_test_utils.subtract_list(producerMsgIdSet, consumerMsgIdSet) outfile = open(msgIdMissingInConsumerLogPathName, "w") - for id in missingMsgIdInConsumer: + for id in missingUniqConsumerMsgId: outfile.write(id + "\n") outfile.close() @@ -1241,20 +1247,28 @@ def validate_data_matched(systemTestEnv, testcaseEnv): validationStatusDict["Unique messages from producer on [" + topic + "]"] = str(len(producerMsgIdSet)) validationStatusDict["Unique messages from consumer on [" + topic + "]"] = str(len(consumerMsgIdSet)) - if ( len(missingMsgIdInConsumer) == 0 and len(producerMsgIdSet) > 0 ): + missingPercentage = len(missingUniqConsumerMsgId) * 100.00 / len(producerMsgIdSet) + logger.info("Data loss threshold % : " + str(replicationUtils.ackOneDataLossThresholdPercent), extra=d) + logger.warn("Data loss % on topic : " + topic + " : " + str(missingPercentage), extra=d) + + if ( len(missingUniqConsumerMsgId) == 0 and len(producerMsgIdSet) > 0 ): validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED" elif (acks == "1"): - missingPercentage = len(missingMsgIdInConsumer) * 100 / len(producerMsgIdSet) - print "#### missing Percent : ", missingPercentage - if missingPercentage <= 1: + if missingPercentage <= replicationUtils.ackOneDataLossThresholdPercent: validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED" - logger.warn("Test case passes with less than 1% data loss : [" + str(len(missingMsgIdInConsumer)) + "] missing messages", extra=d) + logger.warn("Test case (Acks = 1) passes with less than " + str(replicationUtils.ackOneDataLossThresholdPercent) \ + + "% data loss : [" + str(len(missingUniqConsumerMsgId)) + "] missing messages", extra=d) + else: + validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED" + logger.error("Test case (Acks = 1) failed with more than " + str(replicationUtils.ackOneDataLossThresholdPercent) \ + + "% data loss : [" + str(len(missingUniqConsumerMsgId)) + "] missing messages", extra=d) else: validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED" logger.info("See " + msgIdMissingInConsumerLogPathName + " for missing MessageID", extra=d) def validate_leader_election_successful(testcaseEnv, leaderDict, validationStatusDict): + logger.debug("#### Inside validate_leader_election_successful", extra=d) if ( len(leaderDict) > 0 ): try: @@ -1545,6 +1559,8 @@ def start_migration_tool(systemTestEnv, testcaseEnv, onlyThisEntityId=None): def validate_07_08_migrated_data_matched(systemTestEnv, testcaseEnv): + logger.debug("#### Inside validate_07_08_migrated_data_matched", extra=d) + validationStatusDict = testcaseEnv.validationStatusDict clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList @@ -1614,6 +1630,7 @@ def validate_07_08_migrated_data_matched(systemTestEnv, testcaseEnv): logger.info("See " + msgChecksumMissingInConsumerLogPathName + " for missing MessageID", extra=d) def validate_broker_log_segment_checksum(systemTestEnv, testcaseEnv, clusterName="source"): + logger.debug("#### Inside validate_broker_log_segment_checksum", extra=d) anonLogger.info("================================================") anonLogger.info("validating merged broker log segment checksums") @@ -1823,77 +1840,6 @@ def start_simple_consumer(systemTestEnv, testcaseEnv, minStartingOffsetDict=None partitionId += 1 replicaIndex += 1 -def validate_simple_consumer_data_matched(systemTestEnv, testcaseEnv): - validationStatusDict = testcaseEnv.validationStatusDict - clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - - prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "producer_performance") - consumerCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "console_consumer") - - mismatchCount = 0 - - for prodPerfCfg in prodPerfCfgList: - producerEntityId = prodPerfCfg["entity_id"] - topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic") - acks = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "request-num-acks") - logger.debug("request-num-acks [" + acks + "]", extra=d) - - consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \ - clusterEntityConfigDictList, "role", "console_consumer", "entity_id") - - matchingConsumerEntityId = None - for consumerEntityId in consumerEntityIdList: - consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic") - if consumerTopic in topic: - matchingConsumerEntityId = consumerEntityId - break - - if matchingConsumerEntityId is None: - break - - producerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", producerEntityId, "default") - producerLogPathName = producerLogPath + "/producer_performance.log" - producerMsgIdList = get_message_id(producerLogPathName) - producerMsgIdSet = set(producerMsgIdList) - logger.info("no. of unique messages on topic [" + topic + "] sent from publisher : " + str(len(producerMsgIdSet)), extra=d) - validationStatusDict["Unique messages from producer on [" + topic + "]"] = str(len(producerMsgIdSet)) - - consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", matchingConsumerEntityId, "default") - for logFile in sorted(os.listdir(consumerLogPath)): - # only process log file: *.log - if logFile.endswith(".log"): - consumerLogPathName = consumerLogPath + "/" + logFile - consumerMsgIdList = get_message_id(consumerLogPathName) - consumerMsgIdSet = set(consumerMsgIdList) - missingMsgIdInConsumer = producerMsgIdSet - consumerMsgIdSet - msgIdMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname( - testcaseEnv, "console_consumer", matchingConsumerEntityId, "default") + \ - "/" + logFile + "_msg_id_missing_in_consumer.log" - - outfile = open(msgIdMissingInConsumerLogPathName, "w") - for id in missingMsgIdInConsumer: - outfile.write(id + "\n") - outfile.close() - - logger.info("no. of unique messages on topic [" + topic + "] at " + logFile + " : " + str(len(consumerMsgIdSet)), extra=d) - validationStatusDict["Unique messages from consumer on [" + topic + "] at " + logFile] = str(len(consumerMsgIdSet)) - - if acks == "-1" and len(missingMsgIdInConsumer) > 0: - mismatchCount += 1 - elif acks == "1" and len(missingMsgIdInConsumer) > 0: - missingPercentage = len(missingMsgIdInConsumer) * 100 / len(producerMsgIdSet) - logger.debug("missing percentage [" + str(missingPercentage) + "]", extra=d) - if missingPercentage <= 1: - logger.warn("Test case (acks == 1) passes with < 1% data loss : [" + \ - str(len(missingMsgIdInConsumer)) + "] missing messages", extra=d) - else: - mismatchCount += 1 - - if mismatchCount == 0: - validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED" - else: - validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED" - def get_controller_attributes(systemTestEnv, testcaseEnv): logger.info("Querying Zookeeper for Controller info ...", extra=d) @@ -1917,7 +1863,7 @@ def get_controller_attributes(systemTestEnv, testcaseEnv): "\"JAVA_HOME=" + javaHome, kafkaRunClassBin + " org.apache.zookeeper.ZooKeeperMain", "-server " + testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"], - "'get /controller' 2> /dev/null | tail -1\""] + "get /controller 2> /dev/null | tail -1\""] cmdStr = " ".join(cmdStrList) logger.debug("executing command [" + cmdStr + "]", extra=d) @@ -2007,6 +1953,8 @@ def getMinCommonStartingOffset(systemTestEnv, testcaseEnv, clusterName="source") return minCommonStartOffsetDict def validate_simple_consumer_data_matched_across_replicas(systemTestEnv, testcaseEnv): + logger.debug("#### Inside validate_simple_consumer_data_matched_across_replicas", extra=d) + validationStatusDict = testcaseEnv.validationStatusDict clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( @@ -2014,101 +1962,100 @@ def validate_simple_consumer_data_matched_across_replicas(systemTestEnv, testcas replicaFactor = testcaseEnv.testcaseArgumentsDict["replica_factor"] numPartition = testcaseEnv.testcaseArgumentsDict["num_partition"] - # Unique messages from producer on [test_1] : 1500 - # Unique messages from consumer on [test_1] : 1500 + for consumerEntityId in consumerEntityIdList: - # Unique messages from consumer on [test_1] at simple_consumer_test_1-0_r1.log : 750 - # Unique messages from consumer on [test_1] at simple_consumer_test_1-0_r2.log : 750 - # Unique messages from consumer on [test_1] at simple_consumer_test_1-0_r3.log : 0 + # get topic string from multi consumer "entity" + topicStr = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic") - # Unique messages from consumer on [test_1] at simple_consumer_test_1-1_r1.log : 0 - # Unique messages from consumer on [test_1] at simple_consumer_test_1-1_r2.log : 750 - # Unique messages from consumer on [test_1] at simple_consumer_test_1-1_r3.log : 750 + # the topic string could be multi topics separated by ',' + topicList = topicStr.split(',') - # ================================================== + for topic in topicList: + logger.debug("working on topic : " + topic, extra=d) + consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", consumerEntityId, "default") + + # keep track of total msg count across replicas for each topic-partition + # (should be greater than 0 for passing) + totalMsgCounter = 0 + + # keep track of the mismatch msg count for each topic-partition + # (should be equal to 0 for passing) + mismatchCounter = 0 + + replicaIdxMsgIdList = [] + # replicaIdxMsgIdList : + # - This is a list of dictionaries of topic-partition (key) + # mapping to list of MessageID in that topic-partition (val) + # - The list index is mapped to (replicaId - 1) + # [ + # // list index = 0 => replicaId = idx(0) + 1 = 1 + # { + # "topic1-0" : [ "0000000001", "0000000002", "0000000003"], + # "topic1-1" : [ "0000000004", "0000000005", "0000000006"] + # }, + # // list index = 1 => replicaId = idx(1) + 1 = 2 + # { + # "topic1-0" : [ "0000000001", "0000000002", "0000000003"], + # "topic1-1" : [ "0000000004", "0000000005", "0000000006"] + # } + # ] + + # initialize replicaIdxMsgIdList + j = 0 + while j < int(replicaFactor): + newDict = {} + replicaIdxMsgIdList.append(newDict) + j += 1 + + # retrieve MessageID from all simple consumer log4j files + for logFile in sorted(os.listdir(consumerLogPath)): + + if logFile.startswith("simple_consumer_"+topic) and logFile.endswith(".log"): + logger.debug("working on file : " + logFile, extra=d) + matchObj = re.match("simple_consumer_"+topic+"-(\d*)_r(\d*)\.log" , logFile) + partitionId = int(matchObj.group(1)) + replicaIdx = int(matchObj.group(2)) + + consumerLogPathName = consumerLogPath + "/" + logFile + consumerMsgIdList = get_message_id(consumerLogPathName) - # Unique messages from producer on [test_2] : 1000 - # Unique messages from consumer on [test_2] : 1000 + topicPartition = topic + "-" + str(partitionId) + replicaIdxMsgIdList[replicaIdx - 1][topicPartition] = consumerMsgIdList - # Unique messages from consumer on [test_2] at simple_consumer_test_2-0_r1.log : 500 - # Unique messages from consumer on [test_2] at simple_consumer_test_2-0_r2.log : 0 - # Unique messages from consumer on [test_2] at simple_consumer_test_2-0_r3.log : 500 + logger.info("no. of messages on topic [" + topic + "] at " + logFile + " : " + str(len(consumerMsgIdList)), extra=d) + validationStatusDict["No. of messages from consumer on [" + topic + "] at " + logFile] = str(len(consumerMsgIdList)) - # Unique messages from consumer on [test_2] at simple_consumer_test_2-1_r1.log : 500 - # Unique messages from consumer on [test_2] at simple_consumer_test_2-1_r2.log : 500 - # Unique messages from consumer on [test_2] at simple_consumer_test_2-1_r3.log : 0 + # print replicaIdxMsgIdList - mismatchCounter = 0 - for consumerEntityId in consumerEntityIdList: + # take the first dictionary of replicaIdxMsgIdList and compare with the rest + firstMsgIdDict = replicaIdxMsgIdList[0] - topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic") - consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", consumerEntityId, "default") - - replicaIdxMsgCountDictList = [] - # replicaIdxMsgCountDictList is being used as follows: - # - # the above replica message count will be organized as follows: - # index of the list would map to the partitionId - # each element in the list maps to the replicaIdx-MessageCount - # to validate that : - # 1. there should be "no. of broker" of non-zero message count and they are equal - # 2. there should be "no. of broker - replication factor" of zero count - # [{"1": "750", "2": "750", "3": "0" }, - # {"1": "0" , "2": "750", "3": "750"}] - - j = 0 - while j < int(numPartition): - newDict = {} - replicaIdxMsgCountDictList.append(newDict) - j += 1 - - for logFile in sorted(os.listdir(consumerLogPath)): - - if logFile.startswith("simple_consumer_") and logFile.endswith(".log"): - matchObj = re.match("simple_consumer_"+topic+"-(\d*)_r(\d*)\.log" , logFile) - partitionId = int(matchObj.group(1)) - replicaIdx = int(matchObj.group(2)) - - consumerLogPathName = consumerLogPath + "/" + logFile - consumerMsgIdList = get_message_id(consumerLogPathName) - consumerMsgIdSet = set(consumerMsgIdList) - - replicaIdxMsgCountDictList[partitionId][replicaIdx] = len(consumerMsgIdSet) - - logger.info("no. of unique messages on topic [" + topic + "] at " + logFile + " : " + str(len(consumerMsgIdSet)), extra=d) - validationStatusDict["Unique messages from consumer on [" + topic + "] at " + logFile] = str(len(consumerMsgIdSet)) - - pprint.pprint(replicaIdxMsgCountDictList) - - partitionId = 0 - while partitionId < int(numPartition): - zeroMsgCounter = 0 - nonZeroMsgCounter = 0 - nonZeroMsgValue = -1 - - for replicaIdx in sorted(replicaIdxMsgCountDictList[partitionId].iterkeys()): - if replicaIdxMsgCountDictList[partitionId][int(replicaIdx)] == 0: - zeroMsgCounter += 1 - else: - if nonZeroMsgValue == -1: - nonZeroMsgValue = replicaIdxMsgCountDictList[partitionId][int(replicaIdx)] - else: - if nonZeroMsgValue != replicaIdxMsgCountDictList[partitionId][int(replicaIdx)]: - mismatchCounter += 1 - nonZeroMsgCounter += 1 - partitionId += 1 + # loop through all 'topic-partition' such as topic1-0, topic1-1, ... + for topicPartition in sorted(firstMsgIdDict.iterkeys()): - logger.info("topic " + topic + " : no. of brokers with zero msg count : " + str(zeroMsgCounter), extra=d) - logger.info("topic " + topic + " : no. of brokers with non-zero msg count : " + str(nonZeroMsgCounter), extra=d) - logger.info("topic " + topic + " : non-zero brokers msg count : " + str(nonZeroMsgValue), extra=d) + # compare all replicas' MessageID in corresponding topic-partition + for i in range(len(replicaIdxMsgIdList)): + # skip the first dictionary + if i == 0: + totalMsgCounter += len(firstMsgIdDict[topicPartition]) + continue - if mismatchCounter == 0 and nonZeroMsgCounter > 0: - validationStatusDict["Validate for data matched on topic [" + topic + "] across replicas"] = "PASSED" - else: - validationStatusDict["Validate for data matched on topic [" + topic + "] across replicas"] = "FAILED" + totalMsgCounter += len(replicaIdxMsgIdList[i][topicPartition]) + + # get the count of mismatch MessageID between first MessageID list and the other lists + diffCount = system_test_utils.diff_lists(firstMsgIdDict[topicPartition], replicaIdxMsgIdList[i][topicPartition]) + mismatchCounter += diffCount + logger.info("Mismatch count of topic-partition [" + topicPartition + "] in replica id [" + str(i+1) + "] : " + str(diffCount), extra=d) + + if mismatchCounter == 0 and totalMsgCounter > 0: + validationStatusDict["Validate for data matched on topic [" + topic + "] across replicas"] = "PASSED" + else: + validationStatusDict["Validate for data matched on topic [" + topic + "] across replicas"] = "FAILED" -def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTestEnv, testcaseEnv): +def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTestEnv, testcaseEnv, replicationUtils): + logger.debug("#### Inside validate_data_matched_in_multi_topics_from_single_consumer_producer", extra=d) + validationStatusDict = testcaseEnv.validationStatusDict clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList @@ -2140,6 +2087,7 @@ def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTe topicList = topicStr.split(',') for topic in topicList: + consumerDuplicateCount = 0 msgIdMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname( testcaseEnv, "console_consumer", matchingConsumerEntityId, "default") \ + "/msg_id_missing_in_consumer_" + topic + ".log" @@ -2148,10 +2096,11 @@ def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTe producerMsgIdSet = set(producerMsgIdList) consumerMsgIdSet = set(consumerMsgIdList) - missingMsgIdInConsumer = producerMsgIdSet - consumerMsgIdSet + consumerDuplicateCount = len(consumerMsgIdList) -len(consumerMsgIdSet) + missingUniqConsumerMsgId = system_test_utils.subtract_list(producerMsgIdSet, consumerMsgIdSet) outfile = open(msgIdMissingInConsumerLogPathName, "w") - for id in missingMsgIdInConsumer: + for id in missingUniqConsumerMsgId: outfile.write(id + "\n") outfile.close() @@ -2160,14 +2109,21 @@ def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTe validationStatusDict["Unique messages from producer on [" + topic + "]"] = str(len(producerMsgIdSet)) validationStatusDict["Unique messages from consumer on [" + topic + "]"] = str(len(consumerMsgIdSet)) - if ( len(missingMsgIdInConsumer) == 0 and len(producerMsgIdSet) > 0 ): + missingPercentage = len(missingUniqConsumerMsgId) * 100.00 / len(producerMsgIdSet) + logger.info("Data loss threshold % : " + str(replicationUtils.ackOneDataLossThresholdPercent), extra=d) + logger.warn("Data loss % on topic : " + topic + " : " + str(missingPercentage), extra=d) + + if ( len(missingUniqConsumerMsgId) == 0 and len(producerMsgIdSet) > 0 ): validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED" elif (acks == "1"): - missingPercentage = len(missingMsgIdInConsumer) * 100 / len(producerMsgIdSet) - print "#### missing Percent : ", missingPercentage - if missingPercentage <= 1: + if missingPercentage <= replicationUtils.ackOneDataLossThresholdPercent: validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED" - logger.warn("Test case passes with less than 1% data loss : [" + str(len(missingMsgIdInConsumer)) + "] missing messages", extra=d) + logger.warn("Test case (Acks = 1) passes with less than " + str(replicationUtils.ackOneDataLossThresholdPercent) \ + + "% data loss : [" + str(len(missingUniqConsumerMsgId)) + "] missing messages", extra=d) + else: + validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED" + logger.error("Test case (Acks = 1) failed with more than " + str(replicationUtils.ackOneDataLossThresholdPercent) \ + + "% data loss : [" + str(len(missingUniqConsumerMsgId)) + "] missing messages", extra=d) else: validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED" logger.info("See " + msgIdMissingInConsumerLogPathName + " for missing MessageID", extra=d) http://git-wip-us.apache.org/repos/asf/kafka/blob/26c50fac/system_test/utils/replication_utils.py ---------------------------------------------------------------------- diff --git a/system_test/utils/replication_utils.py b/system_test/utils/replication_utils.py index 3e8efad..cfd80b2 100644 --- a/system_test/utils/replication_utils.py +++ b/system_test/utils/replication_utils.py @@ -65,3 +65,6 @@ class ReplicationUtils(object): self.controllerAttributesDict["REGX_CONTROLLER_STARTUP_PATTERN"] = "\[(.*?)\] .* \[Controller (.*?)\]: " + \ self.controllerAttributesDict["CONTROLLER_STARTUP_COMPLETE_MSG"] + # Data Loss Percentage Threshold in Ack = 1 cases + self.ackOneDataLossThresholdPercent = 5.0 + http://git-wip-us.apache.org/repos/asf/kafka/blob/26c50fac/system_test/utils/system_test_utils.py ---------------------------------------------------------------------- diff --git a/system_test/utils/system_test_utils.py b/system_test/utils/system_test_utils.py index 65db5c5..50340f0 100644 --- a/system_test/utils/system_test_utils.py +++ b/system_test/utils/system_test_utils.py @@ -21,6 +21,7 @@ # =================================== import copy +import difflib import inspect import json import logging @@ -554,5 +555,80 @@ def setup_remote_hosts_with_testsuite_level_cluster_config(systemTestEnv, testMo sys.exit(1) print - +# ================================================= +# lists_diff_count +# - find the no. of different items in both lists +# - both lists need not be sorted +# - input lists won't be changed +# ================================================= +def lists_diff_count(a, b): + c = list(b) + d = [] + for item in a: + try: + c.remove(item) + except: + d.append(item) + + if len(d) > 0: + print "#### Mismatch MessageID" + print d + + return len(c) + len(d) + +# ================================================= +# subtract_list +# - subtract items in listToSubtract from mainList +# and return the resulting list +# - both lists need not be sorted +# - input lists won't be changed +# ================================================= +def subtract_list(mainList, listToSubtract): + remainingList = list(mainList) + for item in listToSubtract: + try: + remainingList.remove(item) + except: + pass + return remainingList + +# ================================================= +# diff_lists +# - find the diff of 2 lists and return the +# total no. of mismatch from both lists +# - diff of both lists includes: +# - no. of items mismatch +# - ordering of the items +# +# sample lists: +# a = ['8','4','3','2','1'] +# b = ['8','3','4','2','1'] +# +# difflib will return the following: +# 8 +# + 3 +# 4 +# - 3 +# 2 +# 1 +# +# diff_lists(a,b) returns 2 and prints the following: +# #### only in seq 2 : + 3 +# #### only in seq 1 : - 3 +# ================================================= +def diff_lists(a, b): + mismatchCount = 0 + d = difflib.Differ() + diff = d.compare(a,b) + + for item in diff: + result = item[0:1].strip() + if len(result) > 0: + mismatchCount += 1 + if '-' in result: + logger.debug("#### only in seq 1 : " + item, extra=d) + elif '+' in result: + logger.debug("#### only in seq 2 : " + item, extra=d) + + return mismatchCount