[ 
https://issues.apache.org/jira/browse/KAFKA-1898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-1898:
-----------------------------
    Description: 
There are a few different scenarios where you want/need to know the 
status/state of a client library that works with Kafka. Client library 
development is not just about supporting the wire protocol but also the 
implementations around specific interactions of the API.  The API has blossomed 
into a robust set of producer, consumer, broker and administrative calls all of 
which have layers of logic above them.  A Client Library may choose to deviate 
from the path the project sets out and that is ok. The goal of this ticket is 
to have a system for Kafka that can help to explain what the library is or 
isn't doing (regardless of what it claims).

The idea behind this stems in being able to quickly/easily/succinctly analyze 
the topic message data. Once you can analyze the topic(s) message you can 
gather lots of information about what the client library is doing, is not doing 
and such.  There are a few components to this.

1) dataset-generator 

Test Kafka dataset generation tool. Generates a random text file with given 
params:

--filename, -f - output file name.
--filesize, -s - desired size of output file. The actual size will always be a 
bit larger (with a maximum size of $filesize + $max.length - 1)
--min.length, -l - minimum generated entry length.
--max.length, -h - maximum generated entry length.

Usage:

./gradlew build
java -jar dataset-generator/build/libs/dataset-generator-*.jar -s 100000 -l 2 
-h 20

2) dataset-producer

Test Kafka dataset producer tool. Able to produce the given dataset to Kafka or 
Syslog server.  The idea here is you already have lots of data sets that you 
want to test different things for. You might have different sized messages, 
formats, etc and want a repeatable benchmark to run and re-run the testing on. 
You could just have a days worth of data and just choose to replay it.  The 
CCTK idea is that you are always starting from CONSUME in your state of 
library. If your library is only producing then you will fail a bunch of tests 
and that might be ok for people.

Accepts following params:

{code}

--filename, -f - input file name.

--kafka, -k - Kafka broker address in host:port format. If this parameter is 
set, --producer.config and --topic must be set too (otherwise they're ignored).

--producer.config, -p - Kafka producer properties file location.

--topic, -t - Kafka topic to produce to.

--syslog, -s - Syslog server address. Format: protocol://host:port 
(tcp://0.0.0.0:5140 or udp://0.0.0.0:5141 for example)

--loop, -l - flag to loop through file until shut off manually. False by 
default.

Usage:

./gradlew build
java -jar dataset-producer/build/libs/dataset-producer-*.jar --filename dataset 
--syslog tcp://0.0.0.0:5140 --loop true

{code}

3) extract

This step is good so you can save data and compare tests. It could also be 
removed if folks are just looking for a real live test (and we could support 
that too).  Here we are taking data out of Kafka and putting it into Cassandra 
(but other data stores can be used too and we should come up with a way to 
abstract this out completely so folks could implement whatever they wanted.

{code}

package ly.stealth.shaihulud.reader

import java.util.UUID

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import consumer.kafka.MessageAndMetadata
import consumer.kafka.client.KafkaReceiver
import org.apache.spark._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.DStream

object Main extends App with Logging {
  val parser = new scopt.OptionParser[ReaderConfiguration]("spark-reader") {
    head("Spark Reader for Kafka client applications", "1.0")
    opt[String]("testId") unbounded() optional() action { (x, c) =>
      c.copy(testId = x)
    } text ("Source topic with initial set of data")
    opt[String]("source") unbounded() required() action { (x, c) =>
      c.copy(sourceTopic = x)
    } text ("Source topic with initial set of data")
    opt[String]("destination") unbounded() required() action { (x, c) =>
      c.copy(destinationTopic = x)
    } text ("Destination topic with processed set of data")
    opt[Int]("partitions") unbounded() optional() action { (x, c) =>
      c.copy(partitions = x)
    } text ("Partitions in topic")
    opt[String]("zookeeper") unbounded() required() action { (x, c) =>
      c.copy(zookeeper = x)
    } text ("Zookeeper connection host:port")
    opt[Int]("kafka.fetch.size") unbounded() optional() action { (x, c) =>
      c.copy(kafkaFetchSize = x)
    } text ("Maximum KBs to fetch from Kafka")
    checkConfig { c =>
      if (c.testId.isEmpty || c.sourceTopic.isEmpty || 
c.destinationTopic.isEmpty || c.zookeeper.isEmpty) {
        failure("You haven't provided all required parameters")
      } else {
        success
      }
                }
  }
  val config = parser.parse(args, ReaderConfiguration()) match {
    case Some(c) => c
    case None => sys.exit(1)
  }

  val sparkConfig = new SparkConf().setAppName("kafka_client_validator")
                                   .set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
  val ssc = new StreamingContext(sparkConfig, Seconds(10))
  ssc.checkpoint("reader")

  CassandraConnector(sparkConfig).withSessionDo( session => {
    session.execute("CREATE KEYSPACE IF NOT EXISTS kafka_client_validation WITH 
REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}")
    session.execute("CREATE TABLE IF NOT EXISTS 
kafka_client_validation.tests(test_id text PRIMARY KEY, source_topic text, 
destination_topic text)")
    session.execute("CREATE TABLE IF NOT EXISTS 
kafka_client_validation.counters(test_id text, topic text, total counter, 
PRIMARY KEY(test_id, topic))")
    session.execute("CREATE TABLE IF NOT EXISTS 
kafka_client_validation.messages(test_id text, topic text, partition int, 
offset int, payload text, PRIMARY KEY(test_id, topic, partition, offset))")
  })
  val test = Test(config.testId, config.sourceTopic, config.destinationTopic)
  
ssc.sparkContext.parallelize(Seq(test)).saveToCassandra("kafka_client_validation",
 "tests")

  startStreamForTopic(test.test_id, config.sourceTopic, config)
  startStreamForTopic(test.test_id, config.destinationTopic, config)

  ssc.start()
  ssc.awaitTermination()

  def startStreamForTopic(testId: String, topic: String, config: 
ReaderConfiguration) {
    val stream = createKafkaStream(config.zookeeper, topic, 
config.partitions).repartition(config.partitions).persist(StorageLevel.MEMORY_AND_DISK_SER)
    stream.map(message => {
      Counter(testId, message.getTopic, 1L)
    }).reduce((prev, curr) => {
      Counter(testId, prev.topic, prev.total + curr.total)
    }).foreachRDD(rdd => {
      rdd.saveToCassandra("kafka_client_validation", "counters")
    })

    stream.map(message => {
      Message(testId, message.getTopic,message.getPartition.partition, 
message.getOffset, new String(message.getPayload))
    }).foreachRDD(rdd => {
      rdd.saveToCassandra("kafka_client_validation", "messages")
    })
  }

  private def createKafkaStream(zkConnect: String, topic: String, partitions: 
Int): DStream[MessageAndMetadata] = {
    val zkhosts = zkConnect.split(":")(0)
    val zkports = zkConnect.split(":")(1)
    val kafkaParams = Map("zookeeper.hosts" -> zkhosts,
                          "zookeeper.port" -> zkports,
                          "zookeeper.consumer.connection" -> zkConnect,
                          "zookeeper.broker.path" -> "/brokers",
                          "zookeeper.consumer.path" -> "/consumers",
                          "fetch.size.bytes" -> (config.kafkaFetchSize * 
1024).toString,
                          "kafka.topic" -> topic,
                          "kafka.consumer.id" -> "%s-%s".format(topic, 
UUID.randomUUID().toString))
    val props = new java.util.Properties()
    kafkaParams foreach { case (key, value) => props.put(key, value)}
    val streams = (0 to partitions - 1).map { partitionId => 
ssc.receiverStream(new KafkaReceiver(StorageLevel.MEMORY_AND_DISK_SER, props, 
partitionId))}
    ssc.union(streams)
  }
}

case class Test(test_id: String = "", source_topic: String = "", 
destination_topic: String = "")
case class Counter(test_id: String = "", topic: String = "", total: Long = 0L)
case class Message(test_id: String = "", topic: String = "", partition: Int = 
0, offset: Long = 0, payload: String = "")

case class ReaderConfiguration(testId: String = UUID.randomUUID().toString, 
sourceTopic: String = "", destinationTopic: String = "",
                                  partitions: Int = 1, zookeeper: String = "", 
kafkaFetchSize: Int = 8)

{code}

4) validator

This is plug-able both for how to read the topics and process the results to 
once done

Right now we have been checking out using Spark and Cassandra for this, we also 
are looking at Spark and HBase and Samza with the Mesos support.  The nice 
thing about using Samza is we really don't have to use another data store it is 
just so easy to put the results back into a topic.

Here is kind of what the Spark/Cassandra version looks like for whether or not 
a consumer/producer is a) at least once processing guarantee 2) order order 
preserving 3) etc, etc, etc. While this test is running many (as much as you 
want) negative testing can be done to the cluster. It is made to run in an 
environment where you want to pump through as much data as you can as fast as 
you can and then once done, analyze it.

{code}

package ly.stealth.shaihulud.validator

import java.security.MessageDigest
import java.util.Iterator

import com.datastax.driver.core.{Cluster, Row, SocketOptions}

object Main extends App {
  val parser = new 
scopt.OptionParser[ValidatorConfiguration]("spark-validator") {
    head("Spark Validator for Kafka client applications", "1.0")
    opt[String]("test.id") unbounded() required() action { (x, c) =>
      c.copy(testId = x)
    } text ("Test ID")
    opt[String]("cassandra.connect") unbounded() required() action { (x, c) =>
      c.copy(cassandraConnect = x)
    } text ("Cassandra host")
    opt[String]("cassandra.user") unbounded() required() action { (x, c) =>
      c.copy(cassandraUser = x)
    } text ("Cassandra user")
    opt[String]("cassandra.password") unbounded() required() action { (x, c) =>
      c.copy(cassandraPassword = x)
    } text ("Cassandra password")
    checkConfig { c =>
      if (c.testId.isEmpty || c.cassandraConnect.isEmpty || 
c.cassandraUser.isEmpty || c.cassandraPassword.isEmpty) {
        failure("You haven't provided all required parameters")
      } else {
        success
      }
                }
  }
  val config = parser.parse(args, ValidatorConfiguration()) match {
    case Some(c) => c
    case None => sys.exit(1)
  }

  val cluster = new Cluster.Builder()
    .addContactPoints(config.cassandraConnect)
    .withSocketOptions(new SocketOptions().setTcpNoDelay(true))
    .build()

  val session = cluster.connect("kafka_client_validation")

  val tests = session.execute("SELECT * FROM kafka_client_validation.tests 
WHERE test_id='%s'".format(config.testId))
  val test = tests.one()
  if (test != null) {
    val testId = test.getString("test_id")
    val sourceTopic = test.getString("source_topic")
    val destinationTopic = test.getString("destination_topic")

    val countersQuery = "SELECT * FROM kafka_client_validation.counters WHERE 
test_id='%s' AND topic='%s'"
    val sourceCounter = session.execute(countersQuery.format(testId, 
sourceTopic))
    val destinationCounter = session.execute(countersQuery.format(testId, 
destinationTopic))

    println("***** TEST RESULTS *****")
    var sameAmount = false
    val totalInSource = sourceCounter.one().getLong("total")
    val totalInDestination = destinationCounter.one().getLong("total")
    if (totalInSource == totalInDestination) {
      sameAmount = true
    }
    println(" - Destination topic contains the same amount of messages as 
Source topic(%d out of %d): %B".format(totalInSource,
                                                                                
                                 totalInDestination,
                                                                                
                                 sameAmount))
    val messagesQuery = "SELECT * FROM kafka_client_validation.messages WHERE 
test_id='%s' AND topic='%s'"
    val sourceMessages = session.execute(messagesQuery.format(testId, 
sourceTopic))
    val destinationMessages = session.execute(messagesQuery.format(testId, 
destinationTopic))
    val si = sourceMessages.iterator()
    val di = destinationMessages.iterator()

    val portionSize = 1000
    var isOrderPreserved = true
    while ((si.hasNext || di.hasNext) && isOrderPreserved) {
      val sourceHash = this.calculateMD5ForSlice(si, portionSize)
      val destinationHash = this.calculateMD5ForSlice(di, portionSize)
      if (sourceHash != destinationHash) {
        isOrderPreserved = false
      }
    }
    println(" - Destination topic preserves ordering of Source topic: 
%B".format(isOrderPreserved))
  } else {
    System.err.println("There is no such test '%s'".format(config.testId))
  }

  cluster.close()

  def calculateMD5ForSlice(it: Iterator[Row], portionSize: Int): String = {
    val sb = new StringBuilder
    var left = portionSize
    while (it.hasNext && left > 0) {
      sb.append(it.next.getString("payload"))
      left = left - 1
    }

    new 
String(MessageDigest.getInstance("MD5").digest(sb.toString().getBytes("UTF-8")))
  }
}

case class ValidatorConfiguration(testId: String = "", cassandraConnect: String 
= "", cassandraUser: String = "", cassandraPassword: String = "")

{code}

> compatibility testing framework 
> --------------------------------
>
>                 Key: KAFKA-1898
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1898
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Joe Stein
>             Fix For: 0.8.3
>
>         Attachments: cctk.png
>
>
> There are a few different scenarios where you want/need to know the 
> status/state of a client library that works with Kafka. Client library 
> development is not just about supporting the wire protocol but also the 
> implementations around specific interactions of the API.  The API has 
> blossomed into a robust set of producer, consumer, broker and administrative 
> calls all of which have layers of logic above them.  A Client Library may 
> choose to deviate from the path the project sets out and that is ok. The goal 
> of this ticket is to have a system for Kafka that can help to explain what 
> the library is or isn't doing (regardless of what it claims).
> The idea behind this stems in being able to quickly/easily/succinctly analyze 
> the topic message data. Once you can analyze the topic(s) message you can 
> gather lots of information about what the client library is doing, is not 
> doing and such.  There are a few components to this.
> 1) dataset-generator 
> Test Kafka dataset generation tool. Generates a random text file with given 
> params:
> --filename, -f - output file name.
> --filesize, -s - desired size of output file. The actual size will always be 
> a bit larger (with a maximum size of $filesize + $max.length - 1)
> --min.length, -l - minimum generated entry length.
> --max.length, -h - maximum generated entry length.
> Usage:
> ./gradlew build
> java -jar dataset-generator/build/libs/dataset-generator-*.jar -s 100000 -l 2 
> -h 20
> 2) dataset-producer
> Test Kafka dataset producer tool. Able to produce the given dataset to Kafka 
> or Syslog server.  The idea here is you already have lots of data sets that 
> you want to test different things for. You might have different sized 
> messages, formats, etc and want a repeatable benchmark to run and re-run the 
> testing on. You could just have a days worth of data and just choose to 
> replay it.  The CCTK idea is that you are always starting from CONSUME in 
> your state of library. If your library is only producing then you will fail a 
> bunch of tests and that might be ok for people.
> Accepts following params:
> {code}
> --filename, -f - input file name.
> --kafka, -k - Kafka broker address in host:port format. If this parameter is 
> set, --producer.config and --topic must be set too (otherwise they're 
> ignored).
> --producer.config, -p - Kafka producer properties file location.
> --topic, -t - Kafka topic to produce to.
> --syslog, -s - Syslog server address. Format: protocol://host:port 
> (tcp://0.0.0.0:5140 or udp://0.0.0.0:5141 for example)
> --loop, -l - flag to loop through file until shut off manually. False by 
> default.
> Usage:
> ./gradlew build
> java -jar dataset-producer/build/libs/dataset-producer-*.jar --filename 
> dataset --syslog tcp://0.0.0.0:5140 --loop true
> {code}
> 3) extract
> This step is good so you can save data and compare tests. It could also be 
> removed if folks are just looking for a real live test (and we could support 
> that too).  Here we are taking data out of Kafka and putting it into 
> Cassandra (but other data stores can be used too and we should come up with a 
> way to abstract this out completely so folks could implement whatever they 
> wanted.
> {code}
> package ly.stealth.shaihulud.reader
> import java.util.UUID
> import com.datastax.spark.connector._
> import com.datastax.spark.connector.cql.CassandraConnector
> import consumer.kafka.MessageAndMetadata
> import consumer.kafka.client.KafkaReceiver
> import org.apache.spark._
> import org.apache.spark.storage.StorageLevel
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.dstream.DStream
> object Main extends App with Logging {
>   val parser = new scopt.OptionParser[ReaderConfiguration]("spark-reader") {
>     head("Spark Reader for Kafka client applications", "1.0")
>     opt[String]("testId") unbounded() optional() action { (x, c) =>
>       c.copy(testId = x)
>     } text ("Source topic with initial set of data")
>     opt[String]("source") unbounded() required() action { (x, c) =>
>       c.copy(sourceTopic = x)
>     } text ("Source topic with initial set of data")
>     opt[String]("destination") unbounded() required() action { (x, c) =>
>       c.copy(destinationTopic = x)
>     } text ("Destination topic with processed set of data")
>     opt[Int]("partitions") unbounded() optional() action { (x, c) =>
>       c.copy(partitions = x)
>     } text ("Partitions in topic")
>     opt[String]("zookeeper") unbounded() required() action { (x, c) =>
>       c.copy(zookeeper = x)
>     } text ("Zookeeper connection host:port")
>     opt[Int]("kafka.fetch.size") unbounded() optional() action { (x, c) =>
>       c.copy(kafkaFetchSize = x)
>     } text ("Maximum KBs to fetch from Kafka")
>     checkConfig { c =>
>       if (c.testId.isEmpty || c.sourceTopic.isEmpty || 
> c.destinationTopic.isEmpty || c.zookeeper.isEmpty) {
>         failure("You haven't provided all required parameters")
>       } else {
>         success
>       }
>                 }
>   }
>   val config = parser.parse(args, ReaderConfiguration()) match {
>     case Some(c) => c
>     case None => sys.exit(1)
>   }
>   val sparkConfig = new SparkConf().setAppName("kafka_client_validator")
>                                    .set("spark.serializer", 
> "org.apache.spark.serializer.KryoSerializer")
>   val ssc = new StreamingContext(sparkConfig, Seconds(10))
>   ssc.checkpoint("reader")
>   CassandraConnector(sparkConfig).withSessionDo( session => {
>     session.execute("CREATE KEYSPACE IF NOT EXISTS kafka_client_validation 
> WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}")
>     session.execute("CREATE TABLE IF NOT EXISTS 
> kafka_client_validation.tests(test_id text PRIMARY KEY, source_topic text, 
> destination_topic text)")
>     session.execute("CREATE TABLE IF NOT EXISTS 
> kafka_client_validation.counters(test_id text, topic text, total counter, 
> PRIMARY KEY(test_id, topic))")
>     session.execute("CREATE TABLE IF NOT EXISTS 
> kafka_client_validation.messages(test_id text, topic text, partition int, 
> offset int, payload text, PRIMARY KEY(test_id, topic, partition, offset))")
>   })
>   val test = Test(config.testId, config.sourceTopic, config.destinationTopic)
>   
> ssc.sparkContext.parallelize(Seq(test)).saveToCassandra("kafka_client_validation",
>  "tests")
>   startStreamForTopic(test.test_id, config.sourceTopic, config)
>   startStreamForTopic(test.test_id, config.destinationTopic, config)
>   ssc.start()
>   ssc.awaitTermination()
>   def startStreamForTopic(testId: String, topic: String, config: 
> ReaderConfiguration) {
>     val stream = createKafkaStream(config.zookeeper, topic, 
> config.partitions).repartition(config.partitions).persist(StorageLevel.MEMORY_AND_DISK_SER)
>     stream.map(message => {
>       Counter(testId, message.getTopic, 1L)
>     }).reduce((prev, curr) => {
>       Counter(testId, prev.topic, prev.total + curr.total)
>     }).foreachRDD(rdd => {
>       rdd.saveToCassandra("kafka_client_validation", "counters")
>     })
>     stream.map(message => {
>       Message(testId, message.getTopic,message.getPartition.partition, 
> message.getOffset, new String(message.getPayload))
>     }).foreachRDD(rdd => {
>       rdd.saveToCassandra("kafka_client_validation", "messages")
>     })
>   }
>   private def createKafkaStream(zkConnect: String, topic: String, partitions: 
> Int): DStream[MessageAndMetadata] = {
>     val zkhosts = zkConnect.split(":")(0)
>     val zkports = zkConnect.split(":")(1)
>     val kafkaParams = Map("zookeeper.hosts" -> zkhosts,
>                           "zookeeper.port" -> zkports,
>                           "zookeeper.consumer.connection" -> zkConnect,
>                           "zookeeper.broker.path" -> "/brokers",
>                           "zookeeper.consumer.path" -> "/consumers",
>                           "fetch.size.bytes" -> (config.kafkaFetchSize * 
> 1024).toString,
>                           "kafka.topic" -> topic,
>                           "kafka.consumer.id" -> "%s-%s".format(topic, 
> UUID.randomUUID().toString))
>     val props = new java.util.Properties()
>     kafkaParams foreach { case (key, value) => props.put(key, value)}
>     val streams = (0 to partitions - 1).map { partitionId => 
> ssc.receiverStream(new KafkaReceiver(StorageLevel.MEMORY_AND_DISK_SER, props, 
> partitionId))}
>     ssc.union(streams)
>   }
> }
> case class Test(test_id: String = "", source_topic: String = "", 
> destination_topic: String = "")
> case class Counter(test_id: String = "", topic: String = "", total: Long = 0L)
> case class Message(test_id: String = "", topic: String = "", partition: Int = 
> 0, offset: Long = 0, payload: String = "")
> case class ReaderConfiguration(testId: String = UUID.randomUUID().toString, 
> sourceTopic: String = "", destinationTopic: String = "",
>                                   partitions: Int = 1, zookeeper: String = 
> "", kafkaFetchSize: Int = 8)
> {code}
> 4) validator
> This is plug-able both for how to read the topics and process the results to 
> once done
> Right now we have been checking out using Spark and Cassandra for this, we 
> also are looking at Spark and HBase and Samza with the Mesos support.  The 
> nice thing about using Samza is we really don't have to use another data 
> store it is just so easy to put the results back into a topic.
> Here is kind of what the Spark/Cassandra version looks like for whether or 
> not a consumer/producer is a) at least once processing guarantee 2) order 
> order preserving 3) etc, etc, etc. While this test is running many (as much 
> as you want) negative testing can be done to the cluster. It is made to run 
> in an environment where you want to pump through as much data as you can as 
> fast as you can and then once done, analyze it.
> {code}
> package ly.stealth.shaihulud.validator
> import java.security.MessageDigest
> import java.util.Iterator
> import com.datastax.driver.core.{Cluster, Row, SocketOptions}
> object Main extends App {
>   val parser = new 
> scopt.OptionParser[ValidatorConfiguration]("spark-validator") {
>     head("Spark Validator for Kafka client applications", "1.0")
>     opt[String]("test.id") unbounded() required() action { (x, c) =>
>       c.copy(testId = x)
>     } text ("Test ID")
>     opt[String]("cassandra.connect") unbounded() required() action { (x, c) =>
>       c.copy(cassandraConnect = x)
>     } text ("Cassandra host")
>     opt[String]("cassandra.user") unbounded() required() action { (x, c) =>
>       c.copy(cassandraUser = x)
>     } text ("Cassandra user")
>     opt[String]("cassandra.password") unbounded() required() action { (x, c) 
> =>
>       c.copy(cassandraPassword = x)
>     } text ("Cassandra password")
>     checkConfig { c =>
>       if (c.testId.isEmpty || c.cassandraConnect.isEmpty || 
> c.cassandraUser.isEmpty || c.cassandraPassword.isEmpty) {
>         failure("You haven't provided all required parameters")
>       } else {
>         success
>       }
>                 }
>   }
>   val config = parser.parse(args, ValidatorConfiguration()) match {
>     case Some(c) => c
>     case None => sys.exit(1)
>   }
>   val cluster = new Cluster.Builder()
>     .addContactPoints(config.cassandraConnect)
>     .withSocketOptions(new SocketOptions().setTcpNoDelay(true))
>     .build()
>   val session = cluster.connect("kafka_client_validation")
>   val tests = session.execute("SELECT * FROM kafka_client_validation.tests 
> WHERE test_id='%s'".format(config.testId))
>   val test = tests.one()
>   if (test != null) {
>     val testId = test.getString("test_id")
>     val sourceTopic = test.getString("source_topic")
>     val destinationTopic = test.getString("destination_topic")
>     val countersQuery = "SELECT * FROM kafka_client_validation.counters WHERE 
> test_id='%s' AND topic='%s'"
>     val sourceCounter = session.execute(countersQuery.format(testId, 
> sourceTopic))
>     val destinationCounter = session.execute(countersQuery.format(testId, 
> destinationTopic))
>     println("***** TEST RESULTS *****")
>     var sameAmount = false
>     val totalInSource = sourceCounter.one().getLong("total")
>     val totalInDestination = destinationCounter.one().getLong("total")
>     if (totalInSource == totalInDestination) {
>       sameAmount = true
>     }
>     println(" - Destination topic contains the same amount of messages as 
> Source topic(%d out of %d): %B".format(totalInSource,
>                                                                               
>                                    totalInDestination,
>                                                                               
>                                    sameAmount))
>     val messagesQuery = "SELECT * FROM kafka_client_validation.messages WHERE 
> test_id='%s' AND topic='%s'"
>     val sourceMessages = session.execute(messagesQuery.format(testId, 
> sourceTopic))
>     val destinationMessages = session.execute(messagesQuery.format(testId, 
> destinationTopic))
>     val si = sourceMessages.iterator()
>     val di = destinationMessages.iterator()
>     val portionSize = 1000
>     var isOrderPreserved = true
>     while ((si.hasNext || di.hasNext) && isOrderPreserved) {
>       val sourceHash = this.calculateMD5ForSlice(si, portionSize)
>       val destinationHash = this.calculateMD5ForSlice(di, portionSize)
>       if (sourceHash != destinationHash) {
>         isOrderPreserved = false
>       }
>     }
>     println(" - Destination topic preserves ordering of Source topic: 
> %B".format(isOrderPreserved))
>   } else {
>     System.err.println("There is no such test '%s'".format(config.testId))
>   }
>   cluster.close()
>   def calculateMD5ForSlice(it: Iterator[Row], portionSize: Int): String = {
>     val sb = new StringBuilder
>     var left = portionSize
>     while (it.hasNext && left > 0) {
>       sb.append(it.next.getString("payload"))
>       left = left - 1
>     }
>     new 
> String(MessageDigest.getInstance("MD5").digest(sb.toString().getBytes("UTF-8")))
>   }
> }
> case class ValidatorConfiguration(testId: String = "", cassandraConnect: 
> String = "", cassandraUser: String = "", cassandraPassword: String = "")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to