[ 
https://issues.apache.org/jira/browse/FLINK-25344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzi updated FLINK-25344:
---------------------------
    Description: 
Create a Kafka table:
{code:java}
//代码占位符
CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
) {code}
 

Remark:
{code:java}
//代码占位符
The  "properties.group.id" is unset , {code}
{color:#172b4d}Execute a query:{color}
{code:java}
//代码占位符
 select * from KafkaTable; {code}
{color:#172b4d} {color}

The log is :
{code:java}
//代码占位符
java.lang.IllegalStateException: Property group.id is required when using 
committed offset for offsets initializer    at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
    at 
org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer.validate(ReaderHandledOffsetsInitializer.java:76)
    at 
org.apache.flink.connector.kafka.source.KafkaSourceBuilder.sanityCheck(KafkaSourceBuilder.java:501)
    at 
org.apache.flink.connector.kafka.source.KafkaSourceBuilder.build(KafkaSourceBuilder.java:403)
    at 
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createKafkaSource(KafkaDynamicSource.java:416)
    at 
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:216)
    at 
org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:452)
    at 
org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:160)
    at 
org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:124)
    at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:85)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
    at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177)
    at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169)
    at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057)
    at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026)
    at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301)
    at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:639)
    at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:290)
    at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.explainSql(TableEnvironmentImpl.java:684)
    at com.chehejia.dmp.JobApplication.kafkaGroupOption(JobApplication.java:686)
    at 
com.chehejia.dmp.JobApplication.explainDmlForOption(JobApplication.java:458)
    at com.chehejia.dmp.JobApplication.main(JobApplication.java:120)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
    at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
    at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
    at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)    
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
    at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) 
{code}
 

 

 

 
 

  was:
Create a Kafka table:
{quote}{{CREATE TABLE KafkaTable (}}

{{  `user_id` BIGINT,}}

{{  `item_id` BIGINT,}}

{{  `behavior` STRING,}}

{{  `ts` TIMESTAMP(3) METADATA FROM 'timestamp') }}

{{WITH ('connector' = 'kafka',}}

{{'topic' = 'user_behavior',}}

{{  'properties.bootstrap.servers' = 'localhost:9092',}}

{{  'format' = 'csv')}}

????{{{}{}}}

Remark:The  "{{{}properties.group.id"{}}} is unset ,

{color:#172b4d}Execute a query: select * from {color}{{KafkaTable;}}

The log is :
{quote}
 
{quote}java.lang.IllegalStateException: Property group.id is required when 
using committed offset for offsets initializer

    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
    at 
org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer.validate(ReaderHandledOffsetsInitializer.java:76)
    at 
org.apache.flink.connector.kafka.source.KafkaSourceBuilder.sanityCheck(KafkaSourceBuilder.java:501)
    at 
org.apache.flink.connector.kafka.source.KafkaSourceBuilder.build(KafkaSourceBuilder.java:403)
    at 
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createKafkaSource(KafkaDynamicSource.java:416)
    at 
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:216)
    at 
org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:452)

    at 
org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:160)
    at 
org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:124)
    at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:85)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)

    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)

    at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177)
    at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169)
    at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057)
    at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026)
    at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301)
    at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:639)
    at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:290)

    at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.explainSql(TableEnvironmentImpl.java:684)
    at com.chehejia.dmp.JobApplication.kafkaGroupOption(JobApplication.java:686)
    at 
com.chehejia.dmp.JobApplication.explainDmlForOption(JobApplication.java:458)
    at com.chehejia.dmp.JobApplication.main(JobApplication.java:120)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)

    at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
    at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
    at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

    at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
    at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
{quote}
 

 

 

 
 


> flink kafka connector Property group.id is required when using committed 
> offset for offsets initializer
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-25344
>                 URL: https://issues.apache.org/jira/browse/FLINK-25344
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.14.0
>            Reporter: jingzi
>            Priority: Major
>
> Create a Kafka table:
> {code:java}
> //代码占位符
> CREATE TABLE KafkaTable (
>   `user_id` BIGINT,
>   `item_id` BIGINT,
>   `behavior` STRING,
>   `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'user_behavior',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'csv'
> ) {code}
>  
> Remark:
> {code:java}
> //代码占位符
> The  "properties.group.id" is unset , {code}
> {color:#172b4d}Execute a query:{color}
> {code:java}
> //代码占位符
>  select * from KafkaTable; {code}
> {color:#172b4d} {color}
> The log is :
> {code:java}
> //代码占位符
> java.lang.IllegalStateException: Property group.id is required when using 
> committed offset for offsets initializer    at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
>     at 
> org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer.validate(ReaderHandledOffsetsInitializer.java:76)
>     at 
> org.apache.flink.connector.kafka.source.KafkaSourceBuilder.sanityCheck(KafkaSourceBuilder.java:501)
>     at 
> org.apache.flink.connector.kafka.source.KafkaSourceBuilder.build(KafkaSourceBuilder.java:403)
>     at 
> org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createKafkaSource(KafkaDynamicSource.java:416)
>     at 
> org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:216)
>     at 
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:452)
>     at 
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:160)
>     at 
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:124)
>     at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:85)
>     at 
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
>     at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
>     at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
>     at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
>     at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
>     at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
>     at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
>     at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
>     at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:639)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:290)
>     at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainSql(TableEnvironmentImpl.java:684)
>     at 
> com.chehejia.dmp.JobApplication.kafkaGroupOption(JobApplication.java:686)
>     at 
> com.chehejia.dmp.JobApplication.explainDmlForOption(JobApplication.java:458)
>     at com.chehejia.dmp.JobApplication.main(JobApplication.java:120)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>     at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>     at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>     at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>     at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>     at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)    
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at javax.security.auth.Subject.doAs(Subject.java:422)
>     at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
>     at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>     at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) 
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to