kafka-819; System Test : Add validation of log segment index to offset; patched 
by John Fung; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f570cce1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f570cce1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f570cce1

Branch: refs/heads/trunk
Commit: f570cce1f4bec4ce50b4b1878a804b725f316b91
Parents: c5e354d
Author: John Fung <[email protected]>
Authored: Wed Mar 27 16:58:54 2013 -0700
Committer: Jun Rao <[email protected]>
Committed: Wed Mar 27 16:58:54 2013 -0700

----------------------------------------------------------------------
 .../replication_testsuite/replica_basic_test.py |  2 +
 system_test/utils/kafka_system_test_utils.py    | 74 ++++++++++++++++++++
 2 files changed, 76 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f570cce1/system_test/replication_testsuite/replica_basic_test.py
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/replica_basic_test.py 
b/system_test/replication_testsuite/replica_basic_test.py
index ce29240..40c1157 100644
--- a/system_test/replication_testsuite/replica_basic_test.py
+++ b/system_test/replication_testsuite/replica_basic_test.py
@@ -433,6 +433,8 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils):
                     
kafka_system_test_utils.validate_simple_consumer_data_matched_across_replicas(self.systemTestEnv,
 self.testcaseEnv)
                     
kafka_system_test_utils.validate_broker_log_segment_checksum(self.systemTestEnv,
 self.testcaseEnv)
                     
kafka_system_test_utils.validate_data_matched(self.systemTestEnv, 
self.testcaseEnv, replicationUtils)
+
+                kafka_system_test_utils.validate_index_log(self.systemTestEnv, 
self.testcaseEnv)
  
                 # =============================================
                 # draw graphs

http://git-wip-us.apache.org/repos/asf/kafka/blob/f570cce1/system_test/utils/kafka_system_test_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/kafka_system_test_utils.py 
b/system_test/utils/kafka_system_test_utils.py
index 9e58624..dd082f5 100644
--- a/system_test/utils/kafka_system_test_utils.py
+++ b/system_test/utils/kafka_system_test_utils.py
@@ -2129,4 +2129,78 @@ def 
validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTe
                 logger.info("See " + msgIdMissingInConsumerLogPathName + " for 
missing MessageID", extra=d)
 
 
+def validate_index_log(systemTestEnv, testcaseEnv, clusterName="source"):
+    logger.debug("#### Inside validate_index_log", extra=d)
+
+    failureCount         = 0
+    brokerLogCksumDict   = {}
+    testCaseBaseDir      = testcaseEnv.testCaseBaseDir
+    tcConfigsList        = testcaseEnv.testcaseConfigsList
+    validationStatusDict = testcaseEnv.validationStatusDict
+    clusterConfigList    = systemTestEnv.clusterEntityConfigDictList
+    allBrokerConfigList  = 
system_test_utils.get_dict_from_list_of_dicts(clusterConfigList, "role", 
"broker")
+    brokerEntityIdList   = 
system_test_utils.get_data_from_list_of_dicts(allBrokerConfigList, 
"cluster_name", clusterName, "entity_id")
+
+    # loop through all brokers
+    for brokerEntityId in brokerEntityIdList:
+        logCksumDict   = {}
+        # remoteLogSegmentPathName : /tmp/kafka_server_4_logs
+        # => remoteLogSegmentDir   : kafka_server_4_logs
+        remoteLogSegmentPathName = 
system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", 
brokerEntityId, "log.dir")
+        remoteLogSegmentDir      = os.path.basename(remoteLogSegmentPathName)
+        logPathName              = 
get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, 
"default")
+        localLogSegmentPath      = logPathName + "/" + remoteLogSegmentDir
+        kafkaHome                = 
system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", 
brokerEntityId, "kafka_home")
+        hostname                 = 
system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", 
brokerEntityId, "hostname")
+        kafkaRunClassBin         = kafkaHome + "/bin/kafka-run-class.sh"
+
+        # localLogSegmentPath :
+        # 
.../system_test/mirror_maker_testsuite/testcase_5002/logs/broker-4/kafka_server_4_logs
+        #   |- test_1-0
+        #        |- 00000000000000000000.index
+        #        |- 00000000000000000000.log
+        #        |- 00000000000000000020.index
+        #        |- 00000000000000000020.log
+        #        |- . . .
+        #   |- test_1-1
+        #        |- 00000000000000000000.index
+        #        |- 00000000000000000000.log
+        #        |- 00000000000000000020.index
+        #        |- 00000000000000000020.log
+        #        |- . . .
+
+        # loop through all topicPartition directories such as : test_1-0, 
test_1-1, ...
+        for topicPartition in os.listdir(localLogSegmentPath):
+            # found a topic-partition directory
+            if os.path.isdir(localLogSegmentPath + "/" + topicPartition):
+
+                # log segment files are located in : localLogSegmentPath + "/" 
+ topicPartition
+                # sort the log segment files under each topic-partition and 
verify index
+                for logFile in sorted(os.listdir(localLogSegmentPath + "/" + 
topicPartition)):
+                    # only process index file: *.index
+                    if logFile.endswith(".index"):
+                        offsetLogSegmentPathName = localLogSegmentPath + "/" + 
topicPartition + "/" + logFile
+                        cmdStrList = ["ssh " + hostname,
+                                      kafkaRunClassBin + " 
kafka.tools.DumpLogSegments",
+                                      " --file " + offsetLogSegmentPathName,
+                                      "--verify-index-only 2>&1"]
+                        cmdStr     = " ".join(cmdStrList)
+
+                        showMismatchedIndexOffset = False
+
+                        logger.debug("executing command [" + cmdStr + "]", 
extra=d)
+                        subproc = 
system_test_utils.sys_call_return_subproc(cmdStr)
+                        for line in subproc.stdout.readlines():
+                            line = line.rstrip('\n')
+                            if showMismatchedIndexOffset:
+                                logger.debug("#### [" + line + "]", extra=d)
+                            elif "Mismatches in :" in line:
+                                logger.debug("#### error found [" + line + 
"]", extra=d)
+                                failureCount += 1
+                                showMismatchedIndexOffset = True
+
+    if failureCount == 0:
+        validationStatusDict["Validate index log in cluster [" + clusterName + 
"]"] = "PASSED"
+    else:
+        validationStatusDict["Validate index log in cluster [" + clusterName + 
"]"] = "FAILED"
 

Reply via email to