[
https://issues.apache.org/jira/browse/FLINK-29674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621432#comment-17621432
]
hongcha commented on FLINK-29674:
---------------------------------
[~coderap] this is my test source code:
{code:java}
package test;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;import
org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;import
com.alibaba.fastjson.JSONObject;/**
*
* User: jiangwei
* Date: Oct 12, 2022
* Time: 10:22:20 AM
*/
public class KafkaWindowTest {
@SuppressWarnings("serial")
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
String jaasTemplate =
"org.apache.kafka.common.security.plain.PlainLoginModule required
username=\"%s\" password=\"%s\";";
String jaasCfg = String.format(jaasTemplate, "xxx", "xxx");
properties.put("sasl.jaas.config", jaasCfg);
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
String topic = "jw-test-kafka-w-offset-002";
offsets.put(new TopicPartition(topic,0), 6L);
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("192.168.8.79:9092")
.setProperties(properties)
// .setProperty("commit.offsets.on.checkpoint", "false")
.setTopics(topic)
// .setTopicPattern(java.util.regex.Pattern.compile(topic+".*"))
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.setBounded(OffsetsInitializer.offsets(offsets))
.build();
try {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
//env.setParallelism(1);
env.enableCheckpointing(5000);
env.getCheckpointConfig()
.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
SingleOutputStreamOperator<String> fromSource = env
.fromSource(source, WatermarkStrategy.noWatermarks(),
"Kafka Source");
fromSource.print("first");
fromSource.keyBy(data ->
JSONObject.parseObject(data).getString("id"))
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.process(new ProcessWindowFunction<String,
Iterable<String>, String, TimeWindow>() {
@Override
public void process(String arg0,
ProcessWindowFunction<String, Iterable<String>,
String, TimeWindow>.Context arg1,
Iterable<String> datas,
Collector<Iterable<String>> out) throws Exception {
out.collect(datas);
}
}).print("window-data");
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
{code}
this is my test data:
{code:java}
"{'id':'1','kpi':11,'kpi1':90,'time':1}",
"{'id':'1','kpi':11,'kpi1':90,'time':2}",
"{'id':'1','kpi':11,'kpi1':90,'time':3}",
"{'id':'1','kpi':11,'kpi1':90,'time':4}",
"{'id':'1','kpi':11,'kpi1':90,'time':5}",
"{'id':'1','kpi':11,'kpi1':90,'time':6}",
"end"{code}
this is success result
{code:java}
first> {"kpi":11,"id":"1","time":1,"kpi1":90}
first> {"kpi":11,"id":"1","time":2,"kpi1":90}
first> {"kpi":11,"id":"1","time":3,"kpi1":90}
first> {"kpi":11,"id":"1","time":4,"kpi1":90}
window-data> [{"kpi":11,"id":"1","time":1,"kpi1":90},
{"kpi":11,"id":"1","time":2,"kpi1":90}]
first> {"kpi":11,"id":"1","time":5,"kpi1":90}
first> {"kpi":11,"id":"1","time":6,"kpi1":90}
window-data> [{"kpi":11,"id":"1","time":3,"kpi1":90},
{"kpi":11,"id":"1","time":4,"kpi1":90}, {"kpi":11,"id":"1","time":5,"kpi1":90}]
window-data> [{"kpi":11,"id":"1","time":6,"kpi1":90}] {code}
this is use checkpoint result,the last "window-data" not print ,and the job
always running:
{code:java}
first> {"kpi":11,"id":"1","time":1,"kpi1":90}
first> {"kpi":11,"id":"1","time":2,"kpi1":90}
first> {"kpi":11,"id":"1","time":3,"kpi1":90}
first> {"kpi":11,"id":"1","time":4,"kpi1":90}
window-data> [{"kpi":11,"id":"1","time":1,"kpi1":90},
{"kpi":11,"id":"1","time":2,"kpi1":90}]
first> {"kpi":11,"id":"1","time":5,"kpi1":90}
first> {"kpi":11,"id":"1","time":6,"kpi1":90}
window-data> [{"kpi":11,"id":"1","time":3,"kpi1":90},
{"kpi":11,"id":"1","time":4,"kpi1":90},
{"kpi":11,"id":"1","time":5,"kpi1":90}]{code}
> Apache Kafka Connector‘s “ setBounded” not valid
> ------------------------------------------------
>
> Key: FLINK-29674
> URL: https://issues.apache.org/jira/browse/FLINK-29674
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.15.2
> Reporter: hongcha
> Priority: Major
> Attachments: image-2022-10-18-20-38-34-515.png
>
>
> When I'm using the Kafka connector, and to set kafka's consumption boundary
> (" setBounded ") 。when my job runs normally (with no fail), the bounds are
> valid, and my job will finish. However, when my job fails and I restore it to
> the checkpoint used during the failure, I find that my job cannot be
> completed normally and is always running. However, I can see in the log that
> data has been consumed to the boundary set by me. I don't know if there is a
> problem with my usage, here is part of my code:
>
> {code:java}
> //代码占位符
> String topicName = "jw-test-kafka-w-offset-002";
> Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
> offsets.put(new TopicPartition(topicName,0), 6L);
> KafkaSource<String> source = KafkaSource.<String>builder()
> .setBootstrapServers("xxx:9092")
> .setProperties(properties)
> .setTopics(topicName)
> .setGroupId("my-group")
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .setBounded(OffsetsInitializer.offsets(offsets))
> .build(); {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)