[ https://issues.apache.org/jira/browse/KAFKA-6611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410573#comment-16410573 ]
ASF GitHub Bot commented on KAFKA-6611: --------------------------------------- guozhangwang closed pull request #4650: KAFKA-6611: PART I, Use JMXTool in SimpleBenchmark URL: https://github.com/apache/kafka/pull/4650 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala index 4a6a348d9a6..27e46319e49 100644 --- a/core/src/main/scala/kafka/tools/JmxTool.scala +++ b/core/src/main/scala/kafka/tools/JmxTool.scala @@ -55,12 +55,17 @@ object JmxTool extends Logging { .withRequiredArg .describedAs("name") .ofType(classOf[String]) - val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats.") + val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats; default value is 2 seconds. " + + "Value of -1 equivalent to setting one-time to true") .withRequiredArg .describedAs("ms") .ofType(classOf[java.lang.Integer]) .defaultsTo(2000) - val helpOpt = parser.accepts("help", "Print usage information.") + val oneTimeOpt = parser.accepts("one-time", "Flag to indicate run once only.") + .withRequiredArg + .describedAs("one-time") + .ofType(classOf[java.lang.Boolean]) + .defaultsTo(false) val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " + "See java.text.SimpleDateFormat for options.") .withRequiredArg @@ -72,8 +77,15 @@ object JmxTool extends Logging { .describedAs("service-url") .ofType(classOf[String]) .defaultsTo("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi") + val reportFormatOpt = parser.accepts("report-format", "output format name: either 'original', 'properties', 'csv', 'tsv' ") + .withRequiredArg + .describedAs("report-format") + .ofType(classOf[java.lang.String]) + .defaultsTo("original") val waitOpt = parser.accepts("wait", "Wait for requested JMX objects to become available before starting output. " + "Only supported when the list of objects is non-empty and contains no object name patterns.") + val helpOpt = parser.accepts("help", "Print usage information.") + if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "Dump JMX values to standard output.") @@ -87,12 +99,16 @@ object JmxTool extends Logging { val url = new JMXServiceURL(options.valueOf(jmxServiceUrlOpt)) val interval = options.valueOf(reportingIntervalOpt).intValue + var oneTime = interval < 0 || options.has(oneTimeOpt) val attributesWhitelistExists = options.has(attributesOpt) - val attributesWhitelist = if(attributesWhitelistExists) Some(options.valueOf(attributesOpt).split(",")) else None + val attributesWhitelist = if(attributesWhitelistExists) Some(options.valueOf(attributesOpt).split(",").filterNot(_.equals(""))) else None val dateFormatExists = options.has(dateFormatOpt) val dateFormat = if(dateFormatExists) Some(new SimpleDateFormat(options.valueOf(dateFormatOpt))) else None val wait = options.has(waitOpt) + val reportFormat = parseFormat(options.valueOf(reportFormatOpt).toLowerCase) + val reportFormatOriginal = reportFormat.equals("original") + var jmxc: JMXConnector = null var mbsc: MBeanServerConnection = null var connected = false @@ -150,33 +166,57 @@ object JmxTool extends Logging { val numExpectedAttributes: Map[ObjectName, Int] = if (attributesWhitelistExists) - queries.map((_, attributesWhitelist.get.size)).toMap + queries.map((_, attributesWhitelist.get.length)).toMap else { names.map{(name: ObjectName) => val mbean = mbsc.getMBeanInfo(name) (name, mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)).size)}.toMap } + if(numExpectedAttributes.isEmpty) { + CommandLineUtils.printUsageAndDie(parser, s"No matched attributes for the queried objects $queries.") + } + // print csv header val keys = List("time") ++ queryAttributes(mbsc, names, attributesWhitelist).keys.toArray.sorted - if(keys.size == numExpectedAttributes.values.sum + 1) + if(reportFormatOriginal && keys.size == numExpectedAttributes.values.sum + 1) { println(keys.map("\"" + _ + "\"").mkString(",")) + } - while(true) { + var keepGoing = true + while (keepGoing) { val start = System.currentTimeMillis val attributes = queryAttributes(mbsc, names, attributesWhitelist) attributes("time") = dateFormat match { case Some(dFormat) => dFormat.format(new Date) case None => System.currentTimeMillis().toString } - if(attributes.keySet.size == numExpectedAttributes.values.sum + 1) - println(keys.map(attributes(_)).mkString(",")) - val sleep = max(0, interval - (System.currentTimeMillis - start)) - Thread.sleep(sleep) + if(attributes.keySet.size == numExpectedAttributes.values.sum + 1) { + if(reportFormatOriginal) { + println(keys.map(attributes(_)).mkString(",")) + } + else if(reportFormat.equals("properties")) { + keys.foreach( k => { println(k + "=" + attributes(k) ) } ) + } + else if(reportFormat.equals("csv")) { + keys.foreach( k => { println(k + ",\"" + attributes(k) + "\"" ) } ) + } + else { // tsv + keys.foreach( k => { println(k + "\t" + attributes(k) ) } ) + } + } + + if (oneTime) { + keepGoing = false + } + else { + val sleep = max(0, interval - (System.currentTimeMillis - start)) + Thread.sleep(sleep) + } } } - def queryAttributes(mbsc: MBeanServerConnection, names: Iterable[ObjectName], attributesWhitelist: Option[Array[String]]) = { + def queryAttributes(mbsc: MBeanServerConnection, names: Iterable[ObjectName], attributesWhitelist: Option[Array[String]]): mutable.Map[String, Any] = { val attributes = new mutable.HashMap[String, Any]() for (name <- names) { val mbean = mbsc.getMBeanInfo(name) @@ -193,4 +233,10 @@ object JmxTool extends Logging { attributes } + def parseFormat(reportFormatOpt : String): String = reportFormatOpt match { + case "properties" => "properties" + case "csv" => "csv" + case "tsv" => "tsv" + case _ => "original" + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java index 5d7041ee1c5..c66d78b7310 100644 --- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java +++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java @@ -231,6 +231,7 @@ public static void main(String[] args) throws IOException { public void setStreamProperties(final String applicationId) { props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + props.put(StreamsConfig.CLIENT_ID_CONFIG, "simple-benchmark"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py index 4cc39763074..06aec1448dd 100644 --- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py +++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py @@ -35,7 +35,7 @@ def __init__(self, test_context): self.num_threads = 1 @cluster(num_nodes=9) - @matrix(test=["produce", "consume", "count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin", "yahoo"], scale=[1, 3]) + @matrix(test=["count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin"], scale=[1, 3]) def test_simple_benchmark(self, test, scale): """ Run simple Kafka Streams benchmark @@ -75,6 +75,8 @@ def test_simple_benchmark(self, test, scale): self.load_driver.wait() self.load_driver.stop() + + ################ # RUN PHASE ################ @@ -93,11 +95,18 @@ def test_simple_benchmark(self, test, scale): node[num] = self.driver[num].node node[num].account.ssh("grep Performance %s" % self.driver[num].STDOUT_FILE, allow_fail=False) data[num] = self.driver[num].collect_data(node[num], "" ) - + self.driver[num].read_jmx_output_all_nodes() + final = {} for num in range(0, scale): for key in data[num]: final[key + str(num)] = data[num][key] - + + for key in sorted(self.driver[num].jmx_stats[0]): + self.logger.info("%s: %s" % (key, self.driver[num].jmx_stats[0][key])) + + final["jmx-avg" + str(num)] = self.driver[num].average_jmx_value + final["jmx-max" + str(num)] = self.driver[num].maximum_jmx_value + return final diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 950ded31cf7..64a99f938e6 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -120,7 +120,7 @@ def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-gro print_timestamp if True, print each message's timestamp as well isolation_level How to handle transactional messages. """ - JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or [], + JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []), root=ConsoleConsumer.PERSISTENT_ROOT) BackgroundThreadService.__init__(self, context, num_nodes) self.kafka = kafka diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index c4d4b247557..ba5abc719f6 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -34,7 +34,6 @@ Port = collections.namedtuple('Port', ['name', 'number', 'open']) - class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): PERSISTENT_ROOT = "/mnt/kafka" STDOUT_STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "server-start-stdout-stderr.log") @@ -72,14 +71,14 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT, client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, authorizer_class_name=None, topics=None, version=DEV_BRANCH, jmx_object_names=None, - jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=[], zk_chroot=None): + jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=None, zk_chroot=None): """ :type context :type zk: ZookeeperService :type topics: dict """ Service.__init__(self, context, num_nodes) - JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or [], + JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []), root=KafkaService.PERSISTENT_ROOT) self.zk = zk @@ -92,7 +91,10 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI self.minikdc = None self.authorizer_class_name = authorizer_class_name self.zk_set_acl = False - self.server_prop_overides = server_prop_overides + if server_prop_overides is None: + self.server_prop_overides = [] + else: + self.server_prop_overides = server_prop_overides self.log_level = "DEBUG" self.zk_chroot = zk_chroot diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py index 6f6e2219989..542d3a55052 100644 --- a/tests/kafkatest/services/monitor/jmx.py +++ b/tests/kafkatest/services/monitor/jmx.py @@ -70,14 +70,14 @@ def check_jmx_port_listening(): use_jmxtool_version = get_version(node) if use_jmxtool_version <= V_0_11_0_0: use_jmxtool_version = DEV_BRANCH - cmd = "%s %s " % (self.path.script("kafka-run-class.sh", use_jmxtool_version), - self.jmx_class_name()) + cmd = "%s %s " % (self.path.script("kafka-run-class.sh", use_jmxtool_version), self.jmx_class_name()) cmd += "--reporting-interval 1000 --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % self.jmx_port cmd += " --wait" for jmx_object_name in self.jmx_object_names: cmd += " --object-name %s" % jmx_object_name + cmd += " --attributes " for jmx_attribute in self.jmx_attributes: - cmd += " --attributes %s" % jmx_attribute + cmd += "%s," % jmx_attribute cmd += " 1>> %s" % self.jmx_tool_log cmd += " 2>> %s &" % self.jmx_tool_err_log diff --git a/tests/kafkatest/services/performance/streams_performance.py b/tests/kafkatest/services/performance/streams_performance.py index 94f72499a7e..9f791811c55 100644 --- a/tests/kafkatest/services/performance/streams_performance.py +++ b/tests/kafkatest/services/performance/streams_performance.py @@ -13,9 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +from kafkatest.services.monitor.jmx import JmxMixin from kafkatest.services.streams import StreamsTestBaseService - # # Class used to start the simple Kafka Streams benchmark # @@ -31,6 +31,55 @@ def __init__(self, test_context, kafka, numrecs, load_phase, test_name, num_thre test_name, num_threads) + self.load_phase = load_phase + + if self.load_phase == "false": + JmxMixin.__init__(self, + num_nodes=1, + jmx_object_names=['kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-%d' %(i+1) for i in range(num_threads)], + jmx_attributes=['process-latency-avg', + 'process-rate', + 'commit-latency-avg', + 'commit-rate', + 'poll-latency-avg', + 'poll-rate'], + root=StreamsTestBaseService.PERSISTENT_ROOT) + + def start_cmd(self, node): + cmd = super(StreamsSimpleBenchmarkService, self).start_cmd(node) + + if self.load_phase == "false": + args = self.args.copy() + args['jmx_port'] = self.jmx_port + args['kafka'] = self.kafka.bootstrap_servers() + args['config_file'] = self.CONFIG_FILE + args['stdout'] = self.STDOUT_FILE + args['stderr'] = self.STDERR_FILE + args['pidfile'] = self.PID_FILE + args['log4j'] = self.LOG4J_CONFIG_FILE + args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node) + + cmd = "( export JMX_PORT=%(jmx_port)s; export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \ + "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \ + " %(kafka)s %(config_file)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s" \ + " %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args + + self.logger.info("Executing streams simple benchmark cmd: " + cmd) + + return cmd + + def start_node(self, node): + super(StreamsSimpleBenchmarkService, self).start_node(node) + + if self.load_phase == "false": + self.start_jmx_tool(1, node) + + + def clean_node(self, node): + if self.load_phase == "false": + JmxMixin.clean_node(self, node) + super(StreamsSimpleBenchmarkService, self).clean_node(node) + def collect_data(self, node, tag = None): # Collect the data and return it to the framework output = node.account.ssh_capture("grep Performance %s" % self.STDOUT_FILE) diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 6da5a25eb7e..d9b475e191b 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -19,13 +19,12 @@ from ducktape.services.service import Service from ducktape.utils.util import wait_until from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin +from kafkatest.services.monitor.jmx import JmxMixin from kafkatest.services.kafka import KafkaConfig - STATE_DIR = "state.dir" -class StreamsTestBaseService(KafkaPathResolverMixin, Service): - +class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service): """Base class for Streams Test services providing some common settings and functionality""" PERSISTENT_ROOT = "/mnt/streams" @@ -35,6 +34,8 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service): LOG_FILE = os.path.join(PERSISTENT_ROOT, "streams.log") STDOUT_FILE = os.path.join(PERSISTENT_ROOT, "streams.stdout") STDERR_FILE = os.path.join(PERSISTENT_ROOT, "streams.stderr") + JMX_LOG_FILE = os.path.join(PERSISTENT_ROOT, "jmx_tool.log") + JMX_ERR_FILE = os.path.join(PERSISTENT_ROOT, "jmx_tool.err.log") LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties") PID_FILE = os.path.join(PERSISTENT_ROOT, "streams.pid") @@ -48,10 +49,16 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service): "streams_stderr": { "path": STDERR_FILE, "collect_default": True}, + "jmx_log": { + "path": JMX_LOG_FILE, + "collect_default": True}, + "jmx_err": { + "path": JMX_ERR_FILE, + "collect_default": True}, } def __init__(self, test_context, kafka, streams_class_name, user_test_args, user_test_args1=None, user_test_args2=None, user_test_args3=None): - super(StreamsTestBaseService, self).__init__(test_context, 1) + Service.__init__(self, test_context, num_nodes=1) self.kafka = kafka self.args = {'streams_class_name': streams_class_name, 'user_test_args': user_test_args, @@ -130,7 +137,7 @@ def start_cmd(self, node): " %(kafka)s %(config_file)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s" \ " %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args - self.logger.info("Executing Streams cmd: " + cmd) + self.logger.info("Executing streams cmd: " + cmd) return cmd ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Re-write simple benchmark in system tests with JMXTool > ------------------------------------------------------ > > Key: KAFKA-6611 > URL: https://issues.apache.org/jira/browse/KAFKA-6611 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Guozhang Wang > Assignee: Guozhang Wang > Priority: Major > > The current SimpleBenchmark is recording the num.records actively in order to > compute throughput and latency, which introduces extra cost plus is less > accurate for benchmarking purposes; instead, it's better to use JmxMixin with > SimpleBenchmark to record metrics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)