[ 
https://issues.apache.org/jira/browse/FLINK-29674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621432#comment-17621432
 ] 

hongcha edited comment on FLINK-29674 at 10/21/22 1:37 AM:
-----------------------------------------------------------

[~coderap] this is my test source code:

 
{code:java}
package test;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import com.alibaba.fastjson.JSONObject;/** 
 * 
 * User: jiangwei
 * Date: Oct 12, 2022
 * Time: 10:22:20 AM
 */
public class KafkaWindowTest {

    @SuppressWarnings("serial")
    public static void main(String[] args) {
        
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        
        String jaasTemplate = 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\"%s\" password=\"%s\";";
        String jaasCfg = String.format(jaasTemplate, "xxx", "xxx");
        properties.put("sasl.jaas.config", jaasCfg);
        properties.put("security.protocol", "SASL_PLAINTEXT");
        properties.put("sasl.mechanism", "PLAIN");
        
        Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
        String topic = "jw-test-kafka-w-offset-002";
        offsets.put(new TopicPartition(topic,0), 6L);
        
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("xxx:9092")
                .setProperties(properties)
//                .setProperty("commit.offsets.on.checkpoint", "false")
                .setTopics(topic)
//                .setTopicPattern(java.util.regex.Pattern.compile(topic+".*"))
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setBounded(OffsetsInitializer.offsets(offsets))
                .build();
        try {
            final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
            //env.setParallelism(1);
            env.enableCheckpointing(5000);
            env.getCheckpointConfig()
                    
.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

            WatermarkStrategy<String> wk = 
org.apache.flink.api.common.eventtime.WatermarkStrategy
                    .<String>forBoundedOutOfOrderness(Duration.ZERO)
                    .withTimestampAssigner(new 
SerializableTimestampAssigner<String>() {
                        @Override
                        public long extractTimestamp(String element, long 
recordTimestamp) {
                            return 
JSONObject.parseObject(element).getInteger("time") * 1000;
                        }
                    });             SingleOutputStreamOperator<String> 
fromSource = env
                    .fromSource(source, wk, "Kafka Source");
            fromSource.print("first");
            fromSource.keyBy(data -> 
JSONObject.parseObject(data).getString("id"))
                    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
                    .process(new ProcessWindowFunction<String, 
Iterable<String>, String, TimeWindow>() {
                        @Override
                        public void process(String arg0,
                               ProcessWindowFunction<String, Iterable<String>, 
String, TimeWindow>.Context arg1,
                                Iterable<String> datas, 
Collector<Iterable<String>> out) throws Exception {
                            out.collect(datas);
                        }
                    }).print("window-data");
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

{code}
 

this is my test data:

 
{code:java}
"{'id':'1','kpi':11,'kpi1':90,'time':1}", 
"{'id':'1','kpi':11,'kpi1':90,'time':2}",
"{'id':'1','kpi':11,'kpi1':90,'time':3}",
"{'id':'1','kpi':11,'kpi1':90,'time':4}",
"{'id':'1','kpi':11,'kpi1':90,'time':5}",
"{'id':'1','kpi':11,'kpi1':90,'time':6}", 
"end"{code}
 

 

this is success result

 
{code:java}
first> {"kpi":11,"id":"1","time":1,"kpi1":90}
first> {"kpi":11,"id":"1","time":2,"kpi1":90}
first> {"kpi":11,"id":"1","time":3,"kpi1":90}
first> {"kpi":11,"id":"1","time":4,"kpi1":90}
window-data> [{"kpi":11,"id":"1","time":1,"kpi1":90}, 
{"kpi":11,"id":"1","time":2,"kpi1":90}]
first> {"kpi":11,"id":"1","time":5,"kpi1":90}
first> {"kpi":11,"id":"1","time":6,"kpi1":90}
window-data> [{"kpi":11,"id":"1","time":3,"kpi1":90}, 
{"kpi":11,"id":"1","time":4,"kpi1":90}, {"kpi":11,"id":"1","time":5,"kpi1":90}]
window-data> [{"kpi":11,"id":"1","time":6,"kpi1":90}] {code}
 

 

this is use checkpoint result,the last "window-data" not print ,and the job 
always running:
{code:java}
first> {"kpi":11,"id":"1","time":1,"kpi1":90}
first> {"kpi":11,"id":"1","time":2,"kpi1":90}
first> {"kpi":11,"id":"1","time":3,"kpi1":90}
first> {"kpi":11,"id":"1","time":4,"kpi1":90}
window-data> [{"kpi":11,"id":"1","time":1,"kpi1":90}, 
{"kpi":11,"id":"1","time":2,"kpi1":90}]
first> {"kpi":11,"id":"1","time":5,"kpi1":90}
first> {"kpi":11,"id":"1","time":6,"kpi1":90}
window-data> [{"kpi":11,"id":"1","time":3,"kpi1":90}, 
{"kpi":11,"id":"1","time":4,"kpi1":90}, 
{"kpi":11,"id":"1","time":5,"kpi1":90}]{code}
 

 


was (Author: JIRAUSER297050):
[~coderap] this is my test source code:

 
{code:java}
package test;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;import 
org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;import 
com.alibaba.fastjson.JSONObject;/** 
 * 
 * User: jiangwei
 * Date: Oct 12, 2022
 * Time: 10:22:20 AM
 */
public class KafkaWindowTest {

    @SuppressWarnings("serial")
    public static void main(String[] args) {
        
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        
        String jaasTemplate = 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\"%s\" password=\"%s\";";
        String jaasCfg = String.format(jaasTemplate, "xxx", "xxx");
        properties.put("sasl.jaas.config", jaasCfg);
        properties.put("security.protocol", "SASL_PLAINTEXT");
        properties.put("sasl.mechanism", "PLAIN");
        
        Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
        String topic = "jw-test-kafka-w-offset-002";
        offsets.put(new TopicPartition(topic,0), 6L);
        
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("xxx:9092")
                .setProperties(properties)
//                .setProperty("commit.offsets.on.checkpoint", "false")
                .setTopics(topic)
//                .setTopicPattern(java.util.regex.Pattern.compile(topic+".*"))
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setBounded(OffsetsInitializer.offsets(offsets))
                .build();
        try {
            final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
            //env.setParallelism(1);
            env.enableCheckpointing(5000);
            env.getCheckpointConfig()
                    
.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            SingleOutputStreamOperator<String> fromSource = env
                    .fromSource(source, WatermarkStrategy.noWatermarks(), 
"Kafka Source");
            fromSource.print("first");
            fromSource.keyBy(data -> 
JSONObject.parseObject(data).getString("id"))
                    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
                    .process(new ProcessWindowFunction<String, 
Iterable<String>, String, TimeWindow>() {
                        @Override
                        public void process(String arg0,
                               ProcessWindowFunction<String, Iterable<String>, 
String, TimeWindow>.Context arg1,
                                Iterable<String> datas, 
Collector<Iterable<String>> out) throws Exception {
                            out.collect(datas);
                        }
                    }).print("window-data");
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

{code}
 

this is my test data:

 
{code:java}
"{'id':'1','kpi':11,'kpi1':90,'time':1}", 
"{'id':'1','kpi':11,'kpi1':90,'time':2}",
"{'id':'1','kpi':11,'kpi1':90,'time':3}",
"{'id':'1','kpi':11,'kpi1':90,'time':4}",
"{'id':'1','kpi':11,'kpi1':90,'time':5}",
"{'id':'1','kpi':11,'kpi1':90,'time':6}", 
"end"{code}
 

 

this is success result

 
{code:java}
first> {"kpi":11,"id":"1","time":1,"kpi1":90}
first> {"kpi":11,"id":"1","time":2,"kpi1":90}
first> {"kpi":11,"id":"1","time":3,"kpi1":90}
first> {"kpi":11,"id":"1","time":4,"kpi1":90}
window-data> [{"kpi":11,"id":"1","time":1,"kpi1":90}, 
{"kpi":11,"id":"1","time":2,"kpi1":90}]
first> {"kpi":11,"id":"1","time":5,"kpi1":90}
first> {"kpi":11,"id":"1","time":6,"kpi1":90}
window-data> [{"kpi":11,"id":"1","time":3,"kpi1":90}, 
{"kpi":11,"id":"1","time":4,"kpi1":90}, {"kpi":11,"id":"1","time":5,"kpi1":90}]
window-data> [{"kpi":11,"id":"1","time":6,"kpi1":90}] {code}
 

 

this is use checkpoint result,the last "window-data" not print ,and the job 
always running:
{code:java}
first> {"kpi":11,"id":"1","time":1,"kpi1":90}
first> {"kpi":11,"id":"1","time":2,"kpi1":90}
first> {"kpi":11,"id":"1","time":3,"kpi1":90}
first> {"kpi":11,"id":"1","time":4,"kpi1":90}
window-data> [{"kpi":11,"id":"1","time":1,"kpi1":90}, 
{"kpi":11,"id":"1","time":2,"kpi1":90}]
first> {"kpi":11,"id":"1","time":5,"kpi1":90}
first> {"kpi":11,"id":"1","time":6,"kpi1":90}
window-data> [{"kpi":11,"id":"1","time":3,"kpi1":90}, 
{"kpi":11,"id":"1","time":4,"kpi1":90}, 
{"kpi":11,"id":"1","time":5,"kpi1":90}]{code}
 

 

> Apache Kafka Connector‘s “ setBounded” not valid
> ------------------------------------------------
>
>                 Key: FLINK-29674
>                 URL: https://issues.apache.org/jira/browse/FLINK-29674
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.15.2
>            Reporter: hongcha
>            Priority: Major
>         Attachments: image-2022-10-18-20-38-34-515.png
>
>
> When I'm using the Kafka connector, and to set kafka's consumption boundary 
> (" setBounded ") 。when my job runs normally (with no fail), the bounds are 
> valid, and my job will finish. However, when my job fails and I restore it to 
> the checkpoint used during the failure, I find that my job cannot be 
> completed normally and is always running. However, I can see in the log that 
> data has been consumed to the boundary set by me. I don't know if there is a 
> problem with my usage, here is part of my code:
>  
> {code:java}
> //代码占位符
> String topicName = "jw-test-kafka-w-offset-002";
> Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
> offsets.put(new TopicPartition(topicName,0), 6L);
> KafkaSource<String> source = KafkaSource.<String>builder()
>         .setBootstrapServers("xxx:9092")
>         .setProperties(properties)
>         .setTopics(topicName)
>         .setGroupId("my-group")
>         .setStartingOffsets(OffsetsInitializer.earliest())
>         .setValueOnlyDeserializer(new SimpleStringSchema())
>         .setBounded(OffsetsInitializer.offsets(offsets))
>         .build(); {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to