This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ebb79b  [SPARK-26350][FOLLOWUP] Add actual verification on new UT 
introduced on SPARK-26350
2ebb79b is described below

commit 2ebb79b2a607aa25ea22826d9c5d6af18c97a7f2
Author: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com>
AuthorDate: Tue Jan 15 14:21:51 2019 -0800

    [SPARK-26350][FOLLOWUP] Add actual verification on new UT introduced on 
SPARK-26350
    
    ## What changes were proposed in this pull request?
    
    This patch adds the check to verify consumer group id is given correctly 
when custom group id is provided to Kafka parameter.
    
    ## How was this patch tested?
    
    Modified UT.
    
    Closes #23544 from 
HeartSaVioR/SPARK-26350-follow-up-actual-verification-on-UT.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com>
    Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>
---
 .../spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala    | 14 ++++++++++++--
 .../org/apache/spark/sql/kafka010/KafkaRelationSuite.scala | 13 ++++++++++++-
 .../org/apache/spark/sql/kafka010/KafkaTestUtils.scala     |  6 +++++-
 3 files changed, 29 insertions(+), 4 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 6402088..cb45384 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -28,6 +28,7 @@ import scala.collection.JavaConverters._
 import scala.io.Source
 import scala.util.Random
 
+import org.apache.kafka.clients.admin.{AdminClient, ConsumerGroupListing}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, 
RecordMetadata}
 import org.apache.kafka.common.TopicPartition
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -638,10 +639,11 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
     testUtils.sendMessages(topic, (11 to 20).map(_.toString).toArray, Some(1))
     testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2))
 
+    val customGroupId = "id-" + Random.nextInt()
     val dsKafka = spark
       .readStream
       .format("kafka")
-      .option("kafka.group.id", "id-" + Random.nextInt())
+      .option("kafka.group.id", customGroupId)
       .option("kafka.bootstrap.servers", testUtils.brokerAddress)
       .option("subscribe", topic)
       .option("startingOffsets", "earliest")
@@ -652,7 +654,15 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
 
     testStream(dsKafka)(
       makeSureGetOffsetCalled,
-      CheckAnswer(1 to 30: _*)
+      CheckAnswer(1 to 30: _*),
+      Execute { _ =>
+        val consumerGroups = testUtils.listConsumerGroups()
+        val validGroups = consumerGroups.valid().get()
+        val validGroupsId = validGroups.asScala.map(_.groupId())
+        assert(validGroupsId.exists(_ === customGroupId), "Valid consumer 
groups don't " +
+          s"contain the expected group id - Valid consumer groups: 
$validGroupsId / " +
+          s"expected group id: $customGroupId")
+      }
     )
   }
 
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
index efe7385..2cd13a9 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
@@ -20,6 +20,9 @@ package org.apache.spark.sql.kafka010
 import java.util.Locale
 import java.util.concurrent.atomic.AtomicInteger
 
+import scala.collection.JavaConverters._
+import scala.util.Random
+
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
 
@@ -247,8 +250,16 @@ class KafkaRelationSuite extends QueryTest with 
SharedSQLContext with KafkaTest
     testUtils.sendMessages(topic, (11 to 20).map(_.toString).toArray, Some(1))
     testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2))
 
-    val df = createDF(topic, withOptions = Map("kafka.group.id" -> "custom"))
+    val customGroupId = "id-" + Random.nextInt()
+    val df = createDF(topic, withOptions = Map("kafka.group.id" -> 
customGroupId))
     checkAnswer(df, (1 to 30).map(_.toString).toDF())
+
+    val consumerGroups = testUtils.listConsumerGroups()
+    val validGroups = consumerGroups.valid().get()
+    val validGroupsId = validGroups.asScala.map(_.groupId())
+    assert(validGroupsId.exists(_ === customGroupId), "Valid consumer groups 
don't " +
+      s"contain the expected group id - Valid consumer groups: $validGroupsId 
/ " +
+      s"expected group id: $customGroupId")
   }
 
   test("read Kafka transactional messages: read_committed") {
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index bf6934b..dacfffa 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -33,7 +33,7 @@ import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils.ZkUtils
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, 
NewPartitions}
+import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, 
ListConsumerGroupsResult, NewPartitions}
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.TopicPartition
@@ -311,6 +311,10 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] 
= Map.empty) extends L
     offsets
   }
 
+  def listConsumerGroups(): ListConsumerGroupsResult = {
+    adminClient.listConsumerGroups()
+  }
+
   protected def brokerConfiguration: Properties = {
     val props = new Properties()
     props.put("broker.id", "0")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to