Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/2236#discussion_r70828069
--- Diff:
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
---
@@ -1235,15 +1236,129 @@ public void flatMap(Tuple2<Integer, Integer>
value, Collector<Void> out) throws
JobExecutionResult result = tryExecute(env1, "Consume " +
ELEMENT_COUNT + " elements from Kafka");
- Map<String, Object> accuResults =
result.getAllAccumulatorResults();
- // kafka 0.9 consumer: 39 results
- if (kafkaServer.getVersion().equals("0.9")) {
- assertTrue("Not enough accumulators from Kafka
Consumer: " + accuResults.size(), accuResults.size() > 38);
+ deleteTestTopic(topic);
+ }
+
+ /**
+ * Test metrics reporting for consumer
+ *
+ * @throws Exception
+ */
+ public void runMetricsTest() throws Throwable {
+
+ // create a stream with 5 topics
+ final String topic = "metricsStream";
+ createTestTopic(topic, 5, 1);
+
+ final Tuple1<Throwable> error = new Tuple1<>(null);
+ Runnable job = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // start job writing & reading data.
+ final StreamExecutionEnvironment env1 =
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env1.setParallelism(1);
+
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ env1.getConfig().disableSysoutLogging();
+ env1.disableOperatorChaining(); // let
the source read everything into the network buffers
+
+
TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new
TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer,
Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig());
+ DataStream<Tuple2<Integer, Integer>>
fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema,
standardProps));
+ fromKafka.flatMap(new
FlatMapFunction<Tuple2<Integer, Integer>, Void>() {
+ @Override
+ public void
flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {
+ // read slowly
+ Thread.sleep(100);
+ }
+ });
+
+ DataStream<Tuple2<Integer, Integer>>
fromGen = env1.addSource(new RichSourceFunction<Tuple2<Integer, Integer>>() {
+ boolean running = true;
+
+ @Override
+ public void
run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
+ int i = 0;
+ while (running) {
+
ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask()));
+ Thread.sleep(1);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ });
+
+
fromGen.addSink(kafkaServer.getProducer(topic, new
KeyedSerializationSchemaWrapper<>(schema), standardProps, null));
+
+ env1.execute("Metrics test job");
+ } catch(Throwable t) {
+ LOG.warn("Got exception during
execution", t);
+ if(!(t.getCause() instanceof
JobCancellationException)) { // we'll cancel the job
+ error.f0 = t;
+ }
+ }
+ }
+ };
+ Thread jobThread = new Thread(job);
+ jobThread.start();
+
+ try {
+ // connect to JMX
+ MBeanServer mBeanServer =
ManagementFactory.getPlatformMBeanServer();
+ // wait until we've found all 5 offset metrics
+ Set<ObjectName> offsetMetrics =
mBeanServer.queryNames(new ObjectName("*:key7=offsets,*"), null);
+ while (offsetMetrics.size() < 5) { // test will time
out if metrics are not properly working
+ if (error.f0 != null) {
+ // fail test early
+ throw error.f0;
+ }
+ offsetMetrics = mBeanServer.queryNames(new
ObjectName("*:key7=offsets,*"), null);
+ Thread.sleep(50);
+ }
+ Assert.assertEquals(5, offsetMetrics.size());
+ // we can't rely on the consumer to have touched all
the partitions already
+ // that's why we'll wait until all five partitions have
a positive offset.
+ // The test will fail if we never meet the condition
+ while(true) {
+ int numPosOffsets = 0;
+ // check that offsets are correctly reported
+ for (ObjectName object : offsetMetrics) {
+ Object offset =
mBeanServer.getAttribute(object, "Value");
+ if((long) offset >= 0) {
+ numPosOffsets++;
+ }
+ }
+ if(numPosOffsets == 5) {
+ break;
+ }
+ // wait for the consumer to consume on all
partitions
+ Thread.sleep(50);
+ }
+
+ // check if producer metrics are also available.
+ Set<ObjectName> producerMetrics =
mBeanServer.queryNames(new ObjectName("*:key6=KafkaProducer,*"), null);
+ Assert.assertTrue("No producer metrics found",
producerMetrics.size() > 30);
+
+
+ LOG.info("Found all JMX metrics. Cancelling job.");
+ } finally {
+ // cancel
+
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+ }
+
+ while(jobThread.isAlive()) {
+ Thread.sleep(50);
+ }
+ if(error.f0 != null) {
--- End diff --
space after if
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---