[
https://issues.apache.org/jira/browse/FLINK-27348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ahmet Gürbüz updated FLINK-27348:
---------------------------------
Attachment: image-2022-04-22-05-52-04-760.png
> Flink KafkaSource doesn't set groupId
> -------------------------------------
>
> Key: FLINK-27348
> URL: https://issues.apache.org/jira/browse/FLINK-27348
> Project: Flink
> Issue Type: Bug
> Components: API / Scala
> Affects Versions: 1.14.4
> Environment: OS: windows 8.1.
> Java version:
> java version "11.0.13" 2021-10-19 LTS
> Java(TM) SE Runtime Environment 18.9 (build 11.0.13+10-LTS-370)
> Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.13+10-LTS-370, mixed mode)
>
>
> Reporter: Ahmet Gürbüz
> Priority: Major
> Attachments: image-2022-04-22-05-43-06-475.png,
> image-2022-04-22-05-44-56-494.png, image-2022-04-22-05-46-45-592.png,
> image-2022-04-22-05-52-04-760.png
>
>
> I have one very simple Flink application. I have installed kafka in my local
> and I am reading data from kafka with flink. I am using KafkaSource class in
> Flink. Although I have assigned GroupId with setGroupId, this groupId does
> not appear in Kafka.
>
> {code:java}
> object FlinkKafkaSource extends App {
> val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
> case class Event(partitionNo:Long, eventTime:String, eventTimestamp:Long,
> userId:String, firstName:String)
> implicit val readsEvent: Reads[Event] = Json.reads[Event]
> env
> .fromSource(KafkaSource.builder[Event]
> .setBootstrapServers("localhost:9092")
> .setTopics("flink-connection")
> .setGroupId("test-group") // I can't see this groupId in
> kafka-consumer-groups
> .setStartingOffsets(OffsetsInitializer.latest)
> .setDeserializer(new KafkaRecordDeserializationSchema[Event] {
> override def deserialize(record: ConsumerRecord[Array[Byte],
> Array[Byte]], out: Collector[Event]): Unit = {
> val rec = record.value.map(_.toChar).mkString
> Try(Json.fromJson[Event](Json.parse(rec)).get) match {
> case Success(event) => out.collect(event)
> case Failure(exception) => println(s"Couldn't parse string: $rec,
> error: ${exception.toString}")
> }
> }
> override def getProducedType: TypeInformation[Event] =
> createTypeInformation[Event]
> })
> .build,
> WatermarkStrategy.noWatermarks[Event],
> "kafka-source"
> )
> .keyBy(l => l.userId)
> .print
> env.execute("flink-kafka-source")
> } {code}
> I have created a topic in kafka named "flink-connection".
>
> I am using a simple kafka-python producer to produce data flink-connection
> topic.
> !image-2022-04-22-05-43-06-475.png|width=1259,height=266!
> I am able to consume data from kafka.
> !image-2022-04-22-05-44-56-494.png!
> But can't see the groupId in kafka-consumer-groups
> !image-2022-04-22-05-46-45-592.png!
>
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)