[ 
https://issues.apache.org/jira/browse/FLINK-15262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-15262:
-----------------------------
    Description: 
I created a kafka table in Flink to read from my kakfa topic (already has 
messages in it) in earliest offset, but `select * from test` query in Flink 
doesn't start to read until a new message comes. If no new message arrives, the 
query just sit there and never produce result.

What I expect is that the query should immediate produce result on all existing 
message without having to wait for a new message to "trigger" data processing.

DDL that I used according to DDL document at 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector

{code:java}
create table test(name String) with (
   'connector.type' = 'kafka',
   'connector.version' = 'universal',
   'connector.topic' = 'test',
   'connector.properties.zookeeper.connect' = 'localhost:2181',
   'connector.properties.bootstrap.servers' = 'localhost:9092',
   'connector.startup-mode' = 'earliest-offset',
   'format.type' = 'csv',
   'update-mode' = 'append'
);
{code}


repro steps:
1) start a local kafka cluster following https://kafka.apache.org/quickstart 
with a topic named "test"
2) produce some records in kafka with simple strings as "john", "marry", etc, 
into the topic
3) start flink sql cli, add kafka cli dependency, create a Flink table as

{code:java}
create table test(name String) with (
   'connector.type' = 'kafka',
   'connector.version' = 'universal',
   'connector.topic' = 'test',
   'connector.properties.zookeeper.connect' = 'localhost:2181',
   'connector.properties.bootstrap.servers' = 'localhost:9092',
   'connector.startup-mode' = 'earliest-offset',
   'format.type' = 'csv'
);
{code}

4) run "select * from test" in SQL CLI

Expected: upon running the query, we should immediately see records already in 
kafka, like "john" and "marry"

Reality: upon running the query, no record shows up. we have to produce some 
new records like "kitty" into the kafka topic to be able to see old records 
"john" and "marry"



  was:
I created a kafka table in Flink to read from my kakfa topic (already has 
messages in it) in earliest offset, but `select * from test` query in Flink 
doesn't start to read until a new message comes. If no new message arrives, the 
query just sit there and never produce result.

What I expect is that the query should immediate produce result on all existing 
message without having to wait for a new message to "trigger" data processing.

DDL that I used according to DDL document at 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector

{code:java}
create table test(name String, age Int) with (
   'connector.type' = 'kafka',
   'connector.version' = 'universal',
   'connector.topic' = 'test',
   'connector.properties.zookeeper.connect' = 'localhost:2181',
   'connector.properties.bootstrap.servers' = 'localhost:9092',
   'connector.startup-mode' = 'earliest-offset',
   'format.type' = 'csv',
   'update-mode' = 'append'
);
{code}


repro steps:
1) start a local kafka cluster following https://kafka.apache.org/quickstart 
with a topic named "test"
2) produce some records in kafka with simple strings as "john", "marry", etc, 
into the topic
3) start flink sql cli, add kafka cli dependency, create a Flink table as

{code:java}
create table test(name String) with (
   'connector.type' = 'kafka',
   'connector.version' = 'universal',
   'connector.topic' = 'test',
   'connector.properties.zookeeper.connect' = 'localhost:2181',
   'connector.properties.bootstrap.servers' = 'localhost:9092',
   'connector.startup-mode' = 'earliest-offset',
   'format.type' = 'csv'
);
{code}

4) run "select * from test" in SQL CLI

Expected: upon running the query, we should immediately see records already in 
kafka, like "john" and "marry"

Reality: upon running the query, no record shows up. we have to produce some 
new records like "kitty" into the kafka topic to be able to see old records 
"john" and "marry"




> kafka connector doesn't read from beginning immediately when 
> 'connector.startup-mode' = 'earliest-offset' 
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-15262
>                 URL: https://issues.apache.org/jira/browse/FLINK-15262
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.10.0
>            Reporter: Bowen Li
>            Assignee: Jiangjie Qin
>            Priority: Major
>             Fix For: 1.10.0, 1.11.0
>
>
> I created a kafka table in Flink to read from my kakfa topic (already has 
> messages in it) in earliest offset, but `select * from test` query in Flink 
> doesn't start to read until a new message comes. If no new message arrives, 
> the query just sit there and never produce result.
> What I expect is that the query should immediate produce result on all 
> existing message without having to wait for a new message to "trigger" data 
> processing.
> DDL that I used according to DDL document at 
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector
> {code:java}
> create table test(name String) with (
>    'connector.type' = 'kafka',
>    'connector.version' = 'universal',
>    'connector.topic' = 'test',
>    'connector.properties.zookeeper.connect' = 'localhost:2181',
>    'connector.properties.bootstrap.servers' = 'localhost:9092',
>    'connector.startup-mode' = 'earliest-offset',
>    'format.type' = 'csv',
>    'update-mode' = 'append'
> );
> {code}
> repro steps:
> 1) start a local kafka cluster following https://kafka.apache.org/quickstart 
> with a topic named "test"
> 2) produce some records in kafka with simple strings as "john", "marry", etc, 
> into the topic
> 3) start flink sql cli, add kafka cli dependency, create a Flink table as
> {code:java}
> create table test(name String) with (
>    'connector.type' = 'kafka',
>    'connector.version' = 'universal',
>    'connector.topic' = 'test',
>    'connector.properties.zookeeper.connect' = 'localhost:2181',
>    'connector.properties.bootstrap.servers' = 'localhost:9092',
>    'connector.startup-mode' = 'earliest-offset',
>    'format.type' = 'csv'
> );
> {code}
> 4) run "select * from test" in SQL CLI
> Expected: upon running the query, we should immediately see records already 
> in kafka, like "john" and "marry"
> Reality: upon running the query, no record shows up. we have to produce some 
> new records like "kitty" into the kafka topic to be able to see old records 
> "john" and "marry"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to