Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2236#discussion_r70828069
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
    @@ -1235,15 +1236,129 @@ public void flatMap(Tuple2<Integer, Integer> 
value, Collector<Void> out) throws
     
                JobExecutionResult result = tryExecute(env1, "Consume " + 
ELEMENT_COUNT + " elements from Kafka");
     
    -           Map<String, Object> accuResults = 
result.getAllAccumulatorResults();
    -           // kafka 0.9 consumer: 39 results
    -           if (kafkaServer.getVersion().equals("0.9")) {
    -                   assertTrue("Not enough accumulators from Kafka 
Consumer: " + accuResults.size(), accuResults.size() > 38);
    +           deleteTestTopic(topic);
    +   }
    +
    +   /**
    +    * Test metrics reporting for consumer
    +    *
    +    * @throws Exception
    +    */
    +   public void runMetricsTest() throws Throwable {
    +
    +           // create a stream with 5 topics
    +           final String topic = "metricsStream";
    +           createTestTopic(topic, 5, 1);
    +
    +           final Tuple1<Throwable> error = new Tuple1<>(null);
    +           Runnable job = new Runnable() {
    +                   @Override
    +                   public void run() {
    +                           try {
    +                                   // start job writing & reading data.
    +                                   final StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    +                                   env1.setParallelism(1);
    +                                   
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
    +                                   env1.getConfig().disableSysoutLogging();
    +                                   env1.disableOperatorChaining(); // let 
the source read everything into the network buffers
    +
    +                                   
TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new 
TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer, 
Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig());
    +                                   DataStream<Tuple2<Integer, Integer>> 
fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, 
standardProps));
    +                                   fromKafka.flatMap(new 
FlatMapFunction<Tuple2<Integer, Integer>, Void>() {
    +                                           @Override
    +                                           public void 
flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {
    +                                                   // read slowly
    +                                                   Thread.sleep(100);
    +                                           }
    +                                   });
    +
    +                                   DataStream<Tuple2<Integer, Integer>> 
fromGen = env1.addSource(new RichSourceFunction<Tuple2<Integer, Integer>>() {
    +                                           boolean running = true;
    +
    +                                           @Override
    +                                           public void 
run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
    +                                                   int i = 0;
    +                                                   while (running) {
    +                                                           
ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask()));
    +                                                           Thread.sleep(1);
    +                                                   }
    +                                           }
    +
    +                                           @Override
    +                                           public void cancel() {
    +                                                   running = false;
    +                                           }
    +                                   });
    +
    +                                   
fromGen.addSink(kafkaServer.getProducer(topic, new 
KeyedSerializationSchemaWrapper<>(schema), standardProps, null));
    +
    +                                   env1.execute("Metrics test job");
    +                           } catch(Throwable t) {
    +                                   LOG.warn("Got exception during 
execution", t);
    +                                   if(!(t.getCause() instanceof 
JobCancellationException)) { // we'll cancel the job
    +                                           error.f0 = t;
    +                                   }
    +                           }
    +                   }
    +           };
    +           Thread jobThread = new Thread(job);
    +           jobThread.start();
    +
    +           try {
    +                   // connect to JMX
    +                   MBeanServer mBeanServer = 
ManagementFactory.getPlatformMBeanServer();
    +                   // wait until we've found all 5 offset metrics
    +                   Set<ObjectName> offsetMetrics = 
mBeanServer.queryNames(new ObjectName("*:key7=offsets,*"), null);
    +                   while (offsetMetrics.size() < 5) { // test will time 
out if metrics are not properly working
    +                           if (error.f0 != null) {
    +                                   // fail test early
    +                                   throw error.f0;
    +                           }
    +                           offsetMetrics = mBeanServer.queryNames(new 
ObjectName("*:key7=offsets,*"), null);
    +                           Thread.sleep(50);
    +                   }
    +                   Assert.assertEquals(5, offsetMetrics.size());
    +                   // we can't rely on the consumer to have touched all 
the partitions already
    +                   // that's why we'll wait until all five partitions have 
a positive offset.
    +                   // The test will fail if we never meet the condition
    +                   while(true) {
    +                           int numPosOffsets = 0;
    +                           // check that offsets are correctly reported
    +                           for (ObjectName object : offsetMetrics) {
    +                                   Object offset = 
mBeanServer.getAttribute(object, "Value");
    +                                   if((long) offset >= 0) {
    +                                           numPosOffsets++;
    +                                   }
    +                           }
    +                           if(numPosOffsets == 5) {
    +                                   break;
    +                           }
    +                           // wait for the consumer to consume on all 
partitions
    +                           Thread.sleep(50);
    +                   }
    +
    +                   // check if producer metrics are also available.
    +                   Set<ObjectName> producerMetrics = 
mBeanServer.queryNames(new ObjectName("*:key6=KafkaProducer,*"), null);
    +                   Assert.assertTrue("No producer metrics found", 
producerMetrics.size() > 30);
    +
    +
    +                   LOG.info("Found all JMX metrics. Cancelling job.");
    +           } finally {
    +                   // cancel
    +                   
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
    +           }
    +
    +           while(jobThread.isAlive()) {
    +                   Thread.sleep(50);
    +           }
    +           if(error.f0 != null) {
    --- End diff --
    
    space after if


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to