Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2236#discussion_r70828069 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -1235,15 +1236,129 @@ public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws JobExecutionResult result = tryExecute(env1, "Consume " + ELEMENT_COUNT + " elements from Kafka"); - Map<String, Object> accuResults = result.getAllAccumulatorResults(); - // kafka 0.9 consumer: 39 results - if (kafkaServer.getVersion().equals("0.9")) { - assertTrue("Not enough accumulators from Kafka Consumer: " + accuResults.size(), accuResults.size() > 38); + deleteTestTopic(topic); + } + + /** + * Test metrics reporting for consumer + * + * @throws Exception + */ + public void runMetricsTest() throws Throwable { + + // create a stream with 5 topics + final String topic = "metricsStream"; + createTestTopic(topic, 5, 1); + + final Tuple1<Throwable> error = new Tuple1<>(null); + Runnable job = new Runnable() { + @Override + public void run() { + try { + // start job writing & reading data. + final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env1.setParallelism(1); + env1.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env1.getConfig().disableSysoutLogging(); + env1.disableOperatorChaining(); // let the source read everything into the network buffers + + TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig()); + DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps)); + fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() { + @Override + public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception { + // read slowly + Thread.sleep(100); + } + }); + + DataStream<Tuple2<Integer, Integer>> fromGen = env1.addSource(new RichSourceFunction<Tuple2<Integer, Integer>>() { + boolean running = true; + + @Override + public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { + int i = 0; + while (running) { + ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask())); + Thread.sleep(1); + } + } + + @Override + public void cancel() { + running = false; + } + }); + + fromGen.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null)); + + env1.execute("Metrics test job"); + } catch(Throwable t) { + LOG.warn("Got exception during execution", t); + if(!(t.getCause() instanceof JobCancellationException)) { // we'll cancel the job + error.f0 = t; + } + } + } + }; + Thread jobThread = new Thread(job); + jobThread.start(); + + try { + // connect to JMX + MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + // wait until we've found all 5 offset metrics + Set<ObjectName> offsetMetrics = mBeanServer.queryNames(new ObjectName("*:key7=offsets,*"), null); + while (offsetMetrics.size() < 5) { // test will time out if metrics are not properly working + if (error.f0 != null) { + // fail test early + throw error.f0; + } + offsetMetrics = mBeanServer.queryNames(new ObjectName("*:key7=offsets,*"), null); + Thread.sleep(50); + } + Assert.assertEquals(5, offsetMetrics.size()); + // we can't rely on the consumer to have touched all the partitions already + // that's why we'll wait until all five partitions have a positive offset. + // The test will fail if we never meet the condition + while(true) { + int numPosOffsets = 0; + // check that offsets are correctly reported + for (ObjectName object : offsetMetrics) { + Object offset = mBeanServer.getAttribute(object, "Value"); + if((long) offset >= 0) { + numPosOffsets++; + } + } + if(numPosOffsets == 5) { + break; + } + // wait for the consumer to consume on all partitions + Thread.sleep(50); + } + + // check if producer metrics are also available. + Set<ObjectName> producerMetrics = mBeanServer.queryNames(new ObjectName("*:key6=KafkaProducer,*"), null); + Assert.assertTrue("No producer metrics found", producerMetrics.size() > 30); + + + LOG.info("Found all JMX metrics. Cancelling job."); + } finally { + // cancel + JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); + } + + while(jobThread.isAlive()) { + Thread.sleep(50); + } + if(error.f0 != null) { --- End diff -- space after if
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---