Updated Branches:
  refs/heads/0.8 e367f3ffb -> 26c50fac4

kafka-791; Fix validation bugs in System Test; patched by John Fung; reviewed 
by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/26c50fac
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/26c50fac
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/26c50fac

Branch: refs/heads/0.8
Commit: 26c50fac47802e4aa03793c60cb8316995bb37f1
Parents: e367f3f
Author: John Fung <fung.j...@gmail.com>
Authored: Mon Mar 25 14:08:24 2013 -0700
Committer: Jun Rao <jun...@gmail.com>
Committed: Mon Mar 25 14:08:24 2013 -0700

----------------------------------------------------------------------
 .../mirror_maker_testsuite/mirror_maker_test.py    |    7 +-
 .../replication_testsuite/replica_basic_test.py    |   15 +-
 system_test/utils/kafka_system_test_utils.py       |  304 ++++++--------
 system_test/utils/replication_utils.py             |    3 +
 system_test/utils/system_test_utils.py             |   78 ++++-
 5 files changed, 222 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/26c50fac/system_test/mirror_maker_testsuite/mirror_maker_test.py
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker_testsuite/mirror_maker_test.py 
b/system_test/mirror_maker_testsuite/mirror_maker_test.py
index 48b0d25..098f531 100644
--- a/system_test/mirror_maker_testsuite/mirror_maker_test.py
+++ b/system_test/mirror_maker_testsuite/mirror_maker_test.py
@@ -76,6 +76,8 @@ class MirrorMakerTest(ReplicationUtils, SetupUtils):
             self.testSuiteAbsPathName, SystemTestEnv.SYSTEM_TEST_CASE_PREFIX)
         testCasePathNameList.sort()
 
+        replicationUtils = ReplicationUtils(self)
+
         # =============================================================
         # launch each testcase one by one: testcase_1, testcase_2, ...
         # =============================================================
@@ -282,9 +284,8 @@ class MirrorMakerTest(ReplicationUtils, SetupUtils):
                 # validate the data matched and checksum
                 # =============================================
                 self.log_message("validating data matched")
-                
#kafka_system_test_utils.validate_data_matched(self.systemTestEnv, 
self.testcaseEnv)
-                
kafka_system_test_utils.validate_simple_consumer_data_matched(self.systemTestEnv,
 self.testcaseEnv)
-                
kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv,
 self.testcaseEnv)
+                
kafka_system_test_utils.validate_data_matched(self.systemTestEnv, 
self.testcaseEnv, replicationUtils)
+                
kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv,
 self.testcaseEnv, "source")
                 
kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv,
 self.testcaseEnv, "target")
 
                 # =============================================

http://git-wip-us.apache.org/repos/asf/kafka/blob/26c50fac/system_test/replication_testsuite/replica_basic_test.py
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/replica_basic_test.py 
b/system_test/replication_testsuite/replica_basic_test.py
index 3fc47d9..ce29240 100644
--- a/system_test/replication_testsuite/replica_basic_test.py
+++ b/system_test/replication_testsuite/replica_basic_test.py
@@ -77,6 +77,8 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils):
             self.testSuiteAbsPathName, SystemTestEnv.SYSTEM_TEST_CASE_PREFIX)
         testCasePathNameList.sort()
 
+        replicationUtils = ReplicationUtils(self)
+
         # =============================================================
         # launch each testcase one by one: testcase_1, testcase_2, ...
         # =============================================================
@@ -423,16 +425,15 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils):
                 self.log_message("validating data matched")
 
                 if logRetentionTest.lower() == "true":
-                    
kafka_system_test_utils.validate_simple_consumer_data_matched_across_replicas(self.systemTestEnv,
 self.testcaseEnv)
-                    
kafka_system_test_utils.validate_data_matched(self.systemTestEnv, 
self.testcaseEnv)
+                    
kafka_system_test_utils.validate_data_matched(self.systemTestEnv, 
self.testcaseEnv, replicationUtils)
                 elif consumerMultiTopicsMode.lower() == "true":
-                    
#kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv,
 self.testcaseEnv)
-                    
kafka_system_test_utils.validate_data_matched_in_multi_topics_from_single_consumer_producer(self.systemTestEnv,
 self.testcaseEnv)
+                    
kafka_system_test_utils.validate_data_matched_in_multi_topics_from_single_consumer_producer(
+                        self.systemTestEnv, self.testcaseEnv, replicationUtils)
                 else:
                     
kafka_system_test_utils.validate_simple_consumer_data_matched_across_replicas(self.systemTestEnv,
 self.testcaseEnv)
-                    
#kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv,
 self.testcaseEnv)
-                    
kafka_system_test_utils.validate_data_matched(self.systemTestEnv, 
self.testcaseEnv)
-
+                    
kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv,
 self.testcaseEnv)
+                    
kafka_system_test_utils.validate_data_matched(self.systemTestEnv, 
self.testcaseEnv, replicationUtils)
+ 
                 # =============================================
                 # draw graphs
                 # =============================================

http://git-wip-us.apache.org/repos/asf/kafka/blob/26c50fac/system_test/utils/kafka_system_test_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/kafka_system_test_utils.py 
b/system_test/utils/kafka_system_test_utils.py
index 9411405..9e58624 100644
--- a/system_test/utils/kafka_system_test_utils.py
+++ b/system_test/utils/kafka_system_test_utils.py
@@ -1189,17 +1189,21 @@ def get_message_checksum(logPathName):
     return messageChecksumList
 
 
-def validate_data_matched(systemTestEnv, testcaseEnv):
+def validate_data_matched(systemTestEnv, testcaseEnv, replicationUtils):
+    logger.debug("#### Inside validate_data_matched", extra=d)
+
     validationStatusDict        = testcaseEnv.validationStatusDict
     clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
 
     prodPerfCfgList = 
system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, 
"role", "producer_performance")
     consumerCfgList = 
system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, 
"role", "console_consumer")
 
+    consumerDuplicateCount = 0
+
     for prodPerfCfg in prodPerfCfgList:
         producerEntityId = prodPerfCfg["entity_id"]
-        #topic = 
system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, 
"entity_id", producerEntityId, "topic")
-        topic = testcaseEnv.producerTopicsString
+        topic = 
system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, 
"entity_id", producerEntityId, "topic")
+        logger.debug("working on topic : " + topic, extra=d)
         acks  = 
system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, 
"entity_id", producerEntityId, "request-num-acks")
 
         consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \
@@ -1207,13 +1211,14 @@ def validate_data_matched(systemTestEnv, testcaseEnv):
 
         matchingConsumerEntityId = None
         for consumerEntityId in consumerEntityIdList:
-            #consumerTopic = 
system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, 
"entity_id", consumerEntityId, "topic")
-            consumerTopic = testcaseEnv.consumerTopicsString
+            consumerTopic = 
system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, 
"entity_id", consumerEntityId, "topic")
             if consumerTopic in topic:
                 matchingConsumerEntityId = consumerEntityId
+                logger.debug("matching consumer entity id found", extra=d)
                 break
 
         if matchingConsumerEntityId is None:
+            logger.debug("matching consumer entity id NOT found", extra=d)
             break
 
         msgIdMissingInConsumerLogPathName = 
get_testcase_config_log_dir_pathname( \
@@ -1229,10 +1234,11 @@ def validate_data_matched(systemTestEnv, testcaseEnv):
         producerMsgIdSet   = set(producerMsgIdList)
         consumerMsgIdSet   = set(consumerMsgIdList)
 
-        missingMsgIdInConsumer = producerMsgIdSet - consumerMsgIdSet
+        consumerDuplicateCount = len(consumerMsgIdList) - len(consumerMsgIdSet)
+        missingUniqConsumerMsgId = 
system_test_utils.subtract_list(producerMsgIdSet, consumerMsgIdSet)
 
         outfile = open(msgIdMissingInConsumerLogPathName, "w")
-        for id in missingMsgIdInConsumer:
+        for id in missingUniqConsumerMsgId:
             outfile.write(id + "\n")
         outfile.close()
 
@@ -1241,20 +1247,28 @@ def validate_data_matched(systemTestEnv, testcaseEnv):
         validationStatusDict["Unique messages from producer on [" + topic + 
"]"] = str(len(producerMsgIdSet))
         validationStatusDict["Unique messages from consumer on [" + topic + 
"]"] = str(len(consumerMsgIdSet))
 
-        if ( len(missingMsgIdInConsumer) == 0 and len(producerMsgIdSet) > 0 ):
+        missingPercentage = len(missingUniqConsumerMsgId) * 100.00 / 
len(producerMsgIdSet)
+        logger.info("Data loss threshold % : " + 
str(replicationUtils.ackOneDataLossThresholdPercent), extra=d)
+        logger.warn("Data loss % on topic : " + topic + " : " + 
str(missingPercentage), extra=d)
+
+        if ( len(missingUniqConsumerMsgId) == 0 and len(producerMsgIdSet) > 0 
):
             validationStatusDict["Validate for data matched on topic [" + 
topic + "]"] = "PASSED"
         elif (acks == "1"):
-            missingPercentage = len(missingMsgIdInConsumer) * 100 / 
len(producerMsgIdSet)
-            print "#### missing Percent : ", missingPercentage
-            if missingPercentage <= 1:
+            if missingPercentage <= 
replicationUtils.ackOneDataLossThresholdPercent:
                 validationStatusDict["Validate for data matched on topic [" + 
topic + "]"] = "PASSED"
-                logger.warn("Test case passes with less than 1% data loss : [" 
+ str(len(missingMsgIdInConsumer)) + "] missing messages", extra=d)
+                logger.warn("Test case (Acks = 1) passes with less than " + 
str(replicationUtils.ackOneDataLossThresholdPercent) \
+                    + "% data loss : [" + str(len(missingUniqConsumerMsgId)) + 
"] missing messages", extra=d)
+            else:
+                validationStatusDict["Validate for data matched on topic [" + 
topic + "]"] = "FAILED"
+                logger.error("Test case (Acks = 1) failed with more than " + 
str(replicationUtils.ackOneDataLossThresholdPercent) \
+                    + "% data loss : [" + str(len(missingUniqConsumerMsgId)) + 
"] missing messages", extra=d)
         else:
             validationStatusDict["Validate for data matched on topic [" + 
topic + "]"] = "FAILED"
             logger.info("See " + msgIdMissingInConsumerLogPathName + " for 
missing MessageID", extra=d)
 
 
 def validate_leader_election_successful(testcaseEnv, leaderDict, 
validationStatusDict):
+    logger.debug("#### Inside validate_leader_election_successful", extra=d)
 
     if ( len(leaderDict) > 0 ):
         try:
@@ -1545,6 +1559,8 @@ def start_migration_tool(systemTestEnv, testcaseEnv, 
onlyThisEntityId=None):
 
 
 def validate_07_08_migrated_data_matched(systemTestEnv, testcaseEnv):
+    logger.debug("#### Inside validate_07_08_migrated_data_matched", extra=d)
+
     validationStatusDict        = testcaseEnv.validationStatusDict
     clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
 
@@ -1614,6 +1630,7 @@ def validate_07_08_migrated_data_matched(systemTestEnv, 
testcaseEnv):
             logger.info("See " + msgChecksumMissingInConsumerLogPathName + " 
for missing MessageID", extra=d)
 
 def validate_broker_log_segment_checksum(systemTestEnv, testcaseEnv, 
clusterName="source"):
+    logger.debug("#### Inside validate_broker_log_segment_checksum", extra=d)
 
     anonLogger.info("================================================")
     anonLogger.info("validating merged broker log segment checksums")
@@ -1823,77 +1840,6 @@ def start_simple_consumer(systemTestEnv, testcaseEnv, 
minStartingOffsetDict=None
                 partitionId += 1
             replicaIndex += 1
 
-def validate_simple_consumer_data_matched(systemTestEnv, testcaseEnv):
-    validationStatusDict        = testcaseEnv.validationStatusDict
-    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
-
-    prodPerfCfgList = 
system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, 
"role", "producer_performance")
-    consumerCfgList = 
system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, 
"role", "console_consumer")
-
-    mismatchCount = 0
-
-    for prodPerfCfg in prodPerfCfgList:
-        producerEntityId = prodPerfCfg["entity_id"]
-        topic = 
system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, 
"entity_id", producerEntityId, "topic")
-        acks  = 
system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, 
"entity_id", producerEntityId, "request-num-acks")
-        logger.debug("request-num-acks [" + acks + "]", extra=d)
-
-        consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \
-                           clusterEntityConfigDictList, "role", 
"console_consumer", "entity_id")
-
-        matchingConsumerEntityId = None
-        for consumerEntityId in consumerEntityIdList:
-            consumerTopic = 
system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, 
"entity_id", consumerEntityId, "topic")
-            if consumerTopic in topic:
-                matchingConsumerEntityId = consumerEntityId
-                break
-
-        if matchingConsumerEntityId is None:
-            break
-
-        producerLogPath     = 
get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", 
producerEntityId, "default")
-        producerLogPathName = producerLogPath + "/producer_performance.log"
-        producerMsgIdList   = get_message_id(producerLogPathName)
-        producerMsgIdSet    = set(producerMsgIdList)
-        logger.info("no. of unique messages on topic [" + topic + "] sent from 
publisher  : " + str(len(producerMsgIdSet)), extra=d)
-        validationStatusDict["Unique messages from producer on [" + topic + 
"]"] = str(len(producerMsgIdSet))
-
-        consumerLogPath     = 
get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", 
matchingConsumerEntityId, "default")
-        for logFile in sorted(os.listdir(consumerLogPath)):
-            # only process log file: *.log
-            if logFile.endswith(".log"):
-                consumerLogPathName = consumerLogPath + "/" + logFile
-                consumerMsgIdList   = get_message_id(consumerLogPathName)
-                consumerMsgIdSet   = set(consumerMsgIdList)
-                missingMsgIdInConsumer = producerMsgIdSet - consumerMsgIdSet
-                msgIdMissingInConsumerLogPathName = 
get_testcase_config_log_dir_pathname( 
-                    testcaseEnv, "console_consumer", matchingConsumerEntityId, 
"default") + \
-                    "/" + logFile + "_msg_id_missing_in_consumer.log"
-
-                outfile = open(msgIdMissingInConsumerLogPathName, "w")
-                for id in missingMsgIdInConsumer:
-                    outfile.write(id + "\n")
-                outfile.close()
-
-                logger.info("no. of unique messages on topic [" + topic + "] 
at " + logFile + " : " + str(len(consumerMsgIdSet)), extra=d)
-                validationStatusDict["Unique messages from consumer on [" + 
topic + "] at " + logFile] = str(len(consumerMsgIdSet))
-
-                if acks == "-1" and len(missingMsgIdInConsumer) > 0:
-                    mismatchCount += 1
-                elif acks == "1" and len(missingMsgIdInConsumer) > 0:
-                    missingPercentage = len(missingMsgIdInConsumer) * 100 / 
len(producerMsgIdSet)
-                    logger.debug("missing percentage [" + 
str(missingPercentage) + "]", extra=d)
-                    if missingPercentage <= 1:
-                        logger.warn("Test case (acks == 1) passes with < 1% 
data loss : [" + \
-                            str(len(missingMsgIdInConsumer)) + "] missing 
messages", extra=d)
-                    else:
-                        mismatchCount += 1
-
-        if mismatchCount == 0:
-            validationStatusDict["Validate for data matched on topic [" + 
topic + "]"] = "PASSED"
-        else:
-            validationStatusDict["Validate for data matched on topic [" + 
topic + "]"] = "FAILED"
-
 def get_controller_attributes(systemTestEnv, testcaseEnv):
 
     logger.info("Querying Zookeeper for Controller info ...", extra=d)
@@ -1917,7 +1863,7 @@ def get_controller_attributes(systemTestEnv, testcaseEnv):
                   "\"JAVA_HOME=" + javaHome,
                   kafkaRunClassBin + " org.apache.zookeeper.ZooKeeperMain",
                   "-server " + 
testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"],
-                  "'get /controller' 2> /dev/null | tail -1\""]
+                  "get /controller 2> /dev/null | tail -1\""]
 
     cmdStr = " ".join(cmdStrList)
     logger.debug("executing command [" + cmdStr + "]", extra=d)
@@ -2007,6 +1953,8 @@ def getMinCommonStartingOffset(systemTestEnv, 
testcaseEnv, clusterName="source")
     return minCommonStartOffsetDict
 
 def validate_simple_consumer_data_matched_across_replicas(systemTestEnv, 
testcaseEnv):
+    logger.debug("#### Inside 
validate_simple_consumer_data_matched_across_replicas", extra=d)
+
     validationStatusDict        = testcaseEnv.validationStatusDict
     clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
     consumerEntityIdList        = 
system_test_utils.get_data_from_list_of_dicts(
@@ -2014,101 +1962,100 @@ def 
validate_simple_consumer_data_matched_across_replicas(systemTestEnv, testcas
     replicaFactor               = 
testcaseEnv.testcaseArgumentsDict["replica_factor"]
     numPartition                = 
testcaseEnv.testcaseArgumentsDict["num_partition"]
 
-    # Unique messages from producer on [test_1]  :  1500
-    # Unique messages from consumer on [test_1]  :  1500
+    for consumerEntityId in consumerEntityIdList:
 
-    # Unique messages from consumer on [test_1] at 
simple_consumer_test_1-0_r1.log  :  750
-    # Unique messages from consumer on [test_1] at 
simple_consumer_test_1-0_r2.log  :  750
-    # Unique messages from consumer on [test_1] at 
simple_consumer_test_1-0_r3.log  :  0
+        # get topic string from multi consumer "entity"
+        topicStr  = 
system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, 
"entity_id", consumerEntityId, "topic")
 
-    # Unique messages from consumer on [test_1] at 
simple_consumer_test_1-1_r1.log  :  0
-    # Unique messages from consumer on [test_1] at 
simple_consumer_test_1-1_r2.log  :  750
-    # Unique messages from consumer on [test_1] at 
simple_consumer_test_1-1_r3.log  :  750
+        # the topic string could be multi topics separated by ','
+        topicList = topicStr.split(',')
 
-    # ==================================================
+        for topic in topicList:
+            logger.debug("working on topic : " + topic, extra=d)
+            consumerLogPath = 
get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", 
consumerEntityId, "default")
+
+            # keep track of total msg count across replicas for each 
topic-partition
+            # (should be greater than 0 for passing)
+            totalMsgCounter = 0
+
+            # keep track of the mismatch msg count for each topic-partition
+            # (should be equal to 0 for passing)
+            mismatchCounter = 0
+
+            replicaIdxMsgIdList = []
+            # replicaIdxMsgIdList :
+            # - This is a list of dictionaries of topic-partition (key)
+            #   mapping to list of MessageID in that topic-partition (val)
+            # - The list index is mapped to (replicaId - 1)
+            # [
+            #  // list index = 0 => replicaId = idx(0) + 1 = 1
+            #  {
+            #      "topic1-0" : [ "0000000001", "0000000002", "0000000003"],
+            #      "topic1-1" : [ "0000000004", "0000000005", "0000000006"]
+            #  },
+            #  // list index = 1 => replicaId = idx(1) + 1 = 2
+            #  {
+            #      "topic1-0" : [ "0000000001", "0000000002", "0000000003"],
+            #      "topic1-1" : [ "0000000004", "0000000005", "0000000006"]
+            #  }
+            # ]
+
+            # initialize replicaIdxMsgIdList
+            j = 0
+            while j < int(replicaFactor):
+                newDict = {}
+                replicaIdxMsgIdList.append(newDict)
+                j += 1
+
+            # retrieve MessageID from all simple consumer log4j files
+            for logFile in sorted(os.listdir(consumerLogPath)):
+
+                if logFile.startswith("simple_consumer_"+topic) and 
logFile.endswith(".log"):
+                    logger.debug("working on file : " + logFile, extra=d)
+                    matchObj    = 
re.match("simple_consumer_"+topic+"-(\d*)_r(\d*)\.log" , logFile)
+                    partitionId = int(matchObj.group(1))
+                    replicaIdx  = int(matchObj.group(2))
+
+                    consumerLogPathName   = consumerLogPath + "/" + logFile
+                    consumerMsgIdList     = get_message_id(consumerLogPathName)
 
-    # Unique messages from producer on [test_2]  :  1000
-    # Unique messages from consumer on [test_2]  :  1000
+                    topicPartition = topic + "-" + str(partitionId)
+                    replicaIdxMsgIdList[replicaIdx - 1][topicPartition] = 
consumerMsgIdList
 
-    # Unique messages from consumer on [test_2] at 
simple_consumer_test_2-0_r1.log  :  500
-    # Unique messages from consumer on [test_2] at 
simple_consumer_test_2-0_r2.log  :  0
-    # Unique messages from consumer on [test_2] at 
simple_consumer_test_2-0_r3.log  :  500
+                    logger.info("no. of messages on topic [" + topic + "] at " 
+ logFile + " : " + str(len(consumerMsgIdList)), extra=d)
+                    validationStatusDict["No. of messages from consumer on [" 
+ topic + "] at " + logFile] = str(len(consumerMsgIdList))
 
-    # Unique messages from consumer on [test_2] at 
simple_consumer_test_2-1_r1.log  :  500
-    # Unique messages from consumer on [test_2] at 
simple_consumer_test_2-1_r2.log  :  500
-    # Unique messages from consumer on [test_2] at 
simple_consumer_test_2-1_r3.log  :  0
+            # print replicaIdxMsgIdList
 
-    mismatchCounter = 0
-    for consumerEntityId in consumerEntityIdList:
+            # take the first dictionary of replicaIdxMsgIdList and compare 
with the rest
+            firstMsgIdDict = replicaIdxMsgIdList[0]
 
-        topic           = 
system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, 
"entity_id", consumerEntityId, "topic")
-        consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, 
"console_consumer", consumerEntityId, "default")
-
-        replicaIdxMsgCountDictList = []
-        # replicaIdxMsgCountDictList is being used as follows:
-        #
-        # the above replica message count will be organized as follows:
-        # index of the list would map to the partitionId
-        # each element in the list maps to the replicaIdx-MessageCount
-        # to validate that :
-        # 1. there should be "no. of broker" of non-zero message count and 
they are equal
-        # 2. there should be "no. of broker - replication factor" of zero count
-        # [{"1": "750", "2": "750", "3": "0"  },
-        #  {"1": "0"  , "2": "750", "3": "750"}]
-
-        j = 0
-        while j < int(numPartition):
-            newDict = {}
-            replicaIdxMsgCountDictList.append(newDict)
-            j += 1
-
-        for logFile in sorted(os.listdir(consumerLogPath)):
-
-            if logFile.startswith("simple_consumer_") and 
logFile.endswith(".log"):
-                matchObj    = 
re.match("simple_consumer_"+topic+"-(\d*)_r(\d*)\.log" , logFile)
-                partitionId = int(matchObj.group(1))
-                replicaIdx  = int(matchObj.group(2))
-
-                consumerLogPathName   = consumerLogPath + "/" + logFile
-                consumerMsgIdList     = get_message_id(consumerLogPathName)
-                consumerMsgIdSet      = set(consumerMsgIdList)
-
-                replicaIdxMsgCountDictList[partitionId][replicaIdx] = 
len(consumerMsgIdSet)
-
-                logger.info("no. of unique messages on topic [" + topic + "] 
at " + logFile + " : " + str(len(consumerMsgIdSet)), extra=d)
-                validationStatusDict["Unique messages from consumer on [" + 
topic + "] at " + logFile] = str(len(consumerMsgIdSet))
-
-        pprint.pprint(replicaIdxMsgCountDictList)
-
-        partitionId = 0
-        while partitionId < int(numPartition):
-            zeroMsgCounter    = 0
-            nonZeroMsgCounter = 0
-            nonZeroMsgValue   = -1
-
-            for replicaIdx in 
sorted(replicaIdxMsgCountDictList[partitionId].iterkeys()):
-                if replicaIdxMsgCountDictList[partitionId][int(replicaIdx)] == 
0:
-                    zeroMsgCounter += 1
-                else:
-                    if nonZeroMsgValue == -1:
-                        nonZeroMsgValue = 
replicaIdxMsgCountDictList[partitionId][int(replicaIdx)]
-                    else:
-                        if nonZeroMsgValue != 
replicaIdxMsgCountDictList[partitionId][int(replicaIdx)]:
-                            mismatchCounter += 1
-                    nonZeroMsgCounter += 1
-            partitionId += 1
+            # loop through all 'topic-partition' such as topic1-0, topic1-1, 
...
+            for topicPartition in sorted(firstMsgIdDict.iterkeys()):
 
-            logger.info("topic " + topic + " : no. of brokers with zero msg 
count     : " + str(zeroMsgCounter), extra=d)
-            logger.info("topic " + topic + " : no. of brokers with non-zero 
msg count : " + str(nonZeroMsgCounter), extra=d)
-            logger.info("topic " + topic + " : non-zero brokers msg count      
       : " + str(nonZeroMsgValue), extra=d)
+                # compare all replicas' MessageID in corresponding 
topic-partition
+                for i in range(len(replicaIdxMsgIdList)):
+                    # skip the first dictionary
+                    if i == 0:
+                        totalMsgCounter += len(firstMsgIdDict[topicPartition])
+                        continue
 
-        if mismatchCounter == 0 and nonZeroMsgCounter > 0:
-            validationStatusDict["Validate for data matched on topic [" + 
topic + "] across replicas"] = "PASSED"
-        else:
-            validationStatusDict["Validate for data matched on topic [" + 
topic + "] across replicas"] = "FAILED"
+                    totalMsgCounter += 
len(replicaIdxMsgIdList[i][topicPartition])
+
+                    # get the count of mismatch MessageID between first 
MessageID list and the other lists
+                    diffCount = 
system_test_utils.diff_lists(firstMsgIdDict[topicPartition], 
replicaIdxMsgIdList[i][topicPartition])
+                    mismatchCounter += diffCount
+                    logger.info("Mismatch count of topic-partition [" + 
topicPartition + "] in replica id [" + str(i+1) + "] : " + str(diffCount), 
extra=d)
+
+            if mismatchCounter == 0 and totalMsgCounter > 0:
+                validationStatusDict["Validate for data matched on topic [" + 
topic + "] across replicas"] = "PASSED"
+            else:
+                validationStatusDict["Validate for data matched on topic [" + 
topic + "] across replicas"] = "FAILED"
 
 
-def 
validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTestEnv,
 testcaseEnv):
+def 
validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTestEnv,
 testcaseEnv, replicationUtils):
+    logger.debug("#### Inside 
validate_data_matched_in_multi_topics_from_single_consumer_producer", extra=d)
+
     validationStatusDict        = testcaseEnv.validationStatusDict
     clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
 
@@ -2140,6 +2087,7 @@ def 
validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTe
 
         topicList = topicStr.split(',')
         for topic in topicList:
+            consumerDuplicateCount = 0
             msgIdMissingInConsumerLogPathName = 
get_testcase_config_log_dir_pathname( 
                                                 testcaseEnv, 
"console_consumer", matchingConsumerEntityId, "default") \
                                                 + 
"/msg_id_missing_in_consumer_" + topic + ".log"
@@ -2148,10 +2096,11 @@ def 
validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTe
             producerMsgIdSet   = set(producerMsgIdList)
             consumerMsgIdSet   = set(consumerMsgIdList)
 
-            missingMsgIdInConsumer = producerMsgIdSet - consumerMsgIdSet
+            consumerDuplicateCount = len(consumerMsgIdList) 
-len(consumerMsgIdSet)
+            missingUniqConsumerMsgId = 
system_test_utils.subtract_list(producerMsgIdSet, consumerMsgIdSet)
 
             outfile = open(msgIdMissingInConsumerLogPathName, "w")
-            for id in missingMsgIdInConsumer:
+            for id in missingUniqConsumerMsgId:
                 outfile.write(id + "\n")
             outfile.close()
 
@@ -2160,14 +2109,21 @@ def 
validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTe
             validationStatusDict["Unique messages from producer on [" + topic 
+ "]"] = str(len(producerMsgIdSet))
             validationStatusDict["Unique messages from consumer on [" + topic 
+ "]"] = str(len(consumerMsgIdSet))
 
-            if ( len(missingMsgIdInConsumer) == 0 and len(producerMsgIdSet) > 
0 ):
+            missingPercentage = len(missingUniqConsumerMsgId) * 100.00 / 
len(producerMsgIdSet)
+            logger.info("Data loss threshold % : " + 
str(replicationUtils.ackOneDataLossThresholdPercent), extra=d)
+            logger.warn("Data loss % on topic : " + topic + " : " + 
str(missingPercentage), extra=d)
+
+            if ( len(missingUniqConsumerMsgId) == 0 and len(producerMsgIdSet) 
> 0 ):
                 validationStatusDict["Validate for data matched on topic [" + 
topic + "]"] = "PASSED"
             elif (acks == "1"):
-                missingPercentage = len(missingMsgIdInConsumer) * 100 / 
len(producerMsgIdSet)
-                print "#### missing Percent : ", missingPercentage
-                if missingPercentage <= 1:
+                if missingPercentage <= 
replicationUtils.ackOneDataLossThresholdPercent:
                     validationStatusDict["Validate for data matched on topic 
[" + topic + "]"] = "PASSED"
-                    logger.warn("Test case passes with less than 1% data loss 
: [" + str(len(missingMsgIdInConsumer)) + "] missing messages", extra=d)
+                    logger.warn("Test case (Acks = 1) passes with less than " 
+ str(replicationUtils.ackOneDataLossThresholdPercent) \
+                        + "% data loss : [" + 
str(len(missingUniqConsumerMsgId)) + "] missing messages", extra=d)
+                else:
+                    validationStatusDict["Validate for data matched on topic 
[" + topic + "]"] = "FAILED"
+                    logger.error("Test case (Acks = 1) failed with more than " 
+ str(replicationUtils.ackOneDataLossThresholdPercent) \
+                        + "% data loss : [" + 
str(len(missingUniqConsumerMsgId)) + "] missing messages", extra=d)
             else:
                 validationStatusDict["Validate for data matched on topic [" + 
topic + "]"] = "FAILED"
                 logger.info("See " + msgIdMissingInConsumerLogPathName + " for 
missing MessageID", extra=d)

http://git-wip-us.apache.org/repos/asf/kafka/blob/26c50fac/system_test/utils/replication_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/replication_utils.py 
b/system_test/utils/replication_utils.py
index 3e8efad..cfd80b2 100644
--- a/system_test/utils/replication_utils.py
+++ b/system_test/utils/replication_utils.py
@@ -65,3 +65,6 @@ class ReplicationUtils(object):
         self.controllerAttributesDict["REGX_CONTROLLER_STARTUP_PATTERN"] = 
"\[(.*?)\] .* \[Controller (.*?)\]: " + \
             self.controllerAttributesDict["CONTROLLER_STARTUP_COMPLETE_MSG"]
 
+        # Data Loss Percentage Threshold in Ack = 1 cases
+        self.ackOneDataLossThresholdPercent = 5.0
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/26c50fac/system_test/utils/system_test_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/system_test_utils.py 
b/system_test/utils/system_test_utils.py
index 65db5c5..50340f0 100644
--- a/system_test/utils/system_test_utils.py
+++ b/system_test/utils/system_test_utils.py
@@ -21,6 +21,7 @@
 # ===================================
 
 import copy
+import difflib
 import inspect
 import json
 import logging
@@ -554,5 +555,80 @@ def 
setup_remote_hosts_with_testsuite_level_cluster_config(systemTestEnv, testMo
         sys.exit(1)
     print
 
-
+# =================================================
+# lists_diff_count
+# - find the no. of different items in both lists
+# - both lists need not be sorted
+# - input lists won't be changed
+# =================================================
+def lists_diff_count(a, b):
+    c = list(b)
+    d = []
+    for item in a:
+        try:
+            c.remove(item)
+        except:
+            d.append(item)
+
+    if len(d) > 0:
+        print "#### Mismatch MessageID"
+        print d
+
+    return len(c) + len(d)
+
+# =================================================
+# subtract_list
+# - subtract items in listToSubtract from mainList
+#   and return the resulting list
+# - both lists need not be sorted
+# - input lists won't be changed
+# =================================================
+def subtract_list(mainList, listToSubtract):
+    remainingList = list(mainList)
+    for item in listToSubtract:
+        try:
+            remainingList.remove(item)
+        except:
+            pass
+    return remainingList
+
+# =================================================
+# diff_lists
+# - find the diff of 2 lists and return the 
+#   total no. of mismatch from both lists
+# - diff of both lists includes:
+#   - no. of items mismatch
+#   - ordering of the items
+#
+# sample lists:
+# a = ['8','4','3','2','1']
+# b = ['8','3','4','2','1']
+#
+# difflib will return the following:
+#   8
+# + 3
+#   4
+# - 3
+#   2
+#   1
+#
+# diff_lists(a,b) returns 2 and prints the following:
+# #### only in seq 2 :  + 3
+# #### only in seq 1 :  - 3
+# =================================================
+def diff_lists(a, b):
+    mismatchCount = 0
+    d = difflib.Differ()
+    diff = d.compare(a,b)
+
+    for item in diff:
+        result = item[0:1].strip()
+        if len(result) > 0:
+            mismatchCount += 1
+            if '-' in result:
+                logger.debug("#### only in seq 1 : " + item, extra=d)
+            elif '+' in result:
+                logger.debug("#### only in seq 2 : " + item, extra=d)
+
+    return mismatchCount
 

Reply via email to