[ 
https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846786#comment-17846786
 ] 

Kanthi Vaidya edited comment on FLINK-35289 at 5/16/24 2:27 AM:
----------------------------------------------------------------

{code:java}
package sample;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.IngestionTimeAssigner;
import org.apache.flink.api.common.eventtime.NoWatermarksGenerator;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.jmx.JMXReporterFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Slf4j
public class BatchJobTest2 {    private static ParameterTool setupParams() {
        Map<String, String> properties = new HashMap<>();
        properties.put("security.delegation.token.provider.hadoopfs.enabled", 
"false");
        properties.put("security.delegation.token.provider.hbase.enabled", 
"false");
        return ParameterTool.fromMap(properties);
    }    public static void main(String[] args) throws Exception {
        ParameterTool paramUtils = setupParams();
        Configuration config = new Configuration(paramUtils.getConfiguration());
        config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx." + 
ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, 
JMXReporterFactory.class.getName());
        config.setLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL, 
paramUtils.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.key(), 
MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue()));
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);        
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        DataStream<Domain> positionData = domainStream(env);
        positionData.keyBy(Domain::getA1)
                .process(new KeyedProcessFunction<String, Domain, Domain>() {
                    private transient MapState<String, Domain> processedInputs; 
                   @Override
                    public void open(Configuration configuration) {
                        MapStateDescriptor<String, Domain> mapStateDescriptor = 
new MapStateDescriptor<>("domain2-input", TypeInformation.of(String.class),
                                TypeInformation.of(Domain.class));
                        processedInputs = 
getRuntimeContext().getMapState(mapStateDescriptor);
                    }                    @Override
                    public void processElement(Domain value, 
KeyedProcessFunction<String, Domain, Domain>.Context context, Collector<Domain> 
out) throws Exception {
                        processedInputs.put(value.getUniqueId(), value);
                        
context.timerService().registerEventTimeTimer(Long.MAX_VALUE);                  
  }                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<Domain> collector) throws Exception {
                        processedInputs.iterator().forEachRemaining(entry -> 
collector.collect(entry.getValue()));
                        processedInputs.clear();
                    }
                }).process(new ProcessFunction<Domain, Void>() {
                    @Override
                    public void processElement(Domain value, 
ProcessFunction<Domain, Void>.Context ctx, Collector<Void> out) throws 
Exception {
                        log.info("Timestamp : {}, element : {}", 
ctx.timestamp(), value.getUniqueId());
                    }
                });        env.execute("FileReadJob");    }    

public static DataStream<Domain> domainStream(StreamExecutionEnvironment env) { 
       /* Not assigning watermarks as program is being run in batch mode and 
watermarks are irrelevant to batch mode */
        return env.fromCollection(getDataCollection())
                .assignTimestampsAndWatermarks(getNoWatermarkStrategy())
                .returns(TypeInformation.of(Domain.class))
                .name("test-domain-source")
                .uid("test-domain-source");    
}    

private static List<Domain> getDataCollection() {
        List<Domain> data = new ArrayList<>();
        data.add(new Domain("A11", "123-Z-1"));
        data.add(new Domain("A11", "456-A-2"));
        data.add(new Domain("A11", "456-B-2"));
        data.add(new Domain("A21", "673-9Z-09"));
        data.add(new Domain("A21", "843-09-21"));        
        return data;
    }    

   private static WatermarkStrategy<Domain> getNoWatermarkStrategy() {
        return WatermarkStrategy.<Domain>forGenerator((ctx) -> new 
NoWatermarksGenerator<>())
                .withTimestampAssigner((ctx) -> new IngestionTimeAssigner<>());
    }    private static WatermarkStrategy<Domain> getMonotonous() {
        return WatermarkStrategy.<Domain>forMonotonousTimestamps()
                .withTimestampAssigner((ctx) -> new IngestionTimeAssigner<>());
    }    

private static class Domain {
        private String a1;
        private String uniqueId;        
public Domain() {        }        
public Domain(String a1, String uniqueId) {
            this.a1 = a1;
            this.uniqueId = uniqueId;
        }       
 public String getA1() {
            return a1;
        }        
public void setA1(String a1) {
            this.a1 = a1;
        }      
public String getUniqueId() {
            return uniqueId;
        }
    }
}
 {code}
Look at attached program which reproduces issue


was (Author: JIRAUSER302517):
{code:java}
package sample;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.IngestionTimeAssigner;
import org.apache.flink.api.common.eventtime.NoWatermarksGenerator;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.jmx.JMXReporterFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Slf4j
public class BatchJobTest2 {    private static ParameterTool setupParams() {
        Map<String, String> properties = new HashMap<>();
        properties.put("security.delegation.token.provider.hadoopfs.enabled", 
"false");
        properties.put("security.delegation.token.provider.hbase.enabled", 
"false");
        return ParameterTool.fromMap(properties);
    }    public static void main(String[] args) throws Exception {
        ParameterTool paramUtils = setupParams();
        Configuration config = new Configuration(paramUtils.getConfiguration());
        config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx." + 
ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, 
JMXReporterFactory.class.getName());
        config.setLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL, 
paramUtils.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.key(), 
MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue()));
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);        
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        DataStream<Domain> positionData = domainStream(env);
        positionData.keyBy(Domain::getA1)
                .process(new KeyedProcessFunction<String, Domain, Domain>() {
                    private transient MapState<String, Domain> processedInputs; 
                   @Override
                    public void open(Configuration configuration) {
                        MapStateDescriptor<String, Domain> mapStateDescriptor = 
new MapStateDescriptor<>("domain2-input", TypeInformation.of(String.class),
                                TypeInformation.of(Domain.class));
                        processedInputs = 
getRuntimeContext().getMapState(mapStateDescriptor);
                    }                    @Override
                    public void processElement(Domain value, 
KeyedProcessFunction<String, Domain, Domain>.Context context, Collector<Domain> 
out) throws Exception {
                        processedInputs.put(value.getUniqueId(), value);
                        
context.timerService().registerEventTimeTimer(Long.MAX_VALUE);                  
  }                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<Domain> collector) throws Exception {
                        processedInputs.iterator().forEachRemaining(entry -> 
collector.collect(entry.getValue()));
                        processedInputs.clear();
                    }
                }).process(new ProcessFunction<Domain, Void>() {
                    @Override
                    public void processElement(Domain value, 
ProcessFunction<Domain, Void>.Context ctx, Collector<Void> out) throws 
Exception {
                        log.info("Timestamp : {}, element : {}", 
ctx.timestamp(), value.getUniqueId());
                    }
                });        env.execute("FileReadJob");    }    public static 
DataStream<Domain> domainStream(StreamExecutionEnvironment env) {        /* Not 
assigning watermarks as program is being run in batch mode and watermarks are 
irrelevant to batch mode */
        return env.fromCollection(getDataCollection())
                .assignTimestampsAndWatermarks(getNoWatermarkStrategy())
                .returns(TypeInformation.of(Domain.class))
                .name("test-domain-source")
                .uid("test-domain-source");    }    private static List<Domain> 
getDataCollection() {
        List<Domain> data = new ArrayList<>();
        data.add(new Domain("A11", "123-Z-1"));
        data.add(new Domain("A11", "456-A-2"));
        data.add(new Domain("A11", "456-B-2"));
        data.add(new Domain("A21", "673-9Z-09"));
        data.add(new Domain("A21", "843-09-21"));        return data;
    }    private static WatermarkStrategy<Domain> getNoWatermarkStrategy() {
        return WatermarkStrategy.<Domain>forGenerator((ctx) -> new 
NoWatermarksGenerator<>())
                .withTimestampAssigner((ctx) -> new IngestionTimeAssigner<>());
    }    private static WatermarkStrategy<Domain> getMonotonous() {
        return WatermarkStrategy.<Domain>forMonotonousTimestamps()
                .withTimestampAssigner((ctx) -> new IngestionTimeAssigner<>());
    }    private static class Domain {
        private String a1;
        private String uniqueId;        public Domain() {        }        
public Domain(String a1, String uniqueId) {
            this.a1 = a1;
            this.uniqueId = uniqueId;
        }        public String getA1() {
            return a1;
        }        public void setA1(String a1) {
            this.a1 = a1;
        }        public String getUniqueId() {
            return uniqueId;
        }
    }}
 {code}
Look at attached program which reproduces issue

> Incorrect timestamp of stream elements collected from onTimer in batch mode
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-35289
>                 URL: https://issues.apache.org/jira/browse/FLINK-35289
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core
>    Affects Versions: 1.18.1
>            Reporter: Kanthi Vaidya
>            Priority: Major
>
> In batch mode  all registered timers will fire at the _end of time. Given 
> this, if a user registers a timer for Long.MAX_VALUE, the timestamp assigned 
> to the elements that are collected from the onTimer context ends up being 
> Long.MAX_VALUE. Ideally this should be the time when the batch actually 
> executed  the onTimer function._



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to