[ 
https://issues.apache.org/jira/browse/FLINK-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16323546#comment-16323546
 ] 

suganya commented on FLINK-8413:
--------------------------------

The following code is using apache beam libraries to create a pipeline.Please 
find the code.

 public void run(String[] args) {
      BeamCLIOptions beamCliOptions = 
PipelineOptionsFactory.fromArgs(args).withValidation()
          .as(BeamCLIOptions.class);
      Pipeline pipeline = Pipeline.create(beamCliOptions);
      MergeDistribution mergeDistribution = MergeDistribution
          .valueOf(beamCliOptions.getMergeDistribution());
      MergeDistribution fixedWindowDuration = MergeDistribution
          .valueOf(beamCliOptions.getFixedWindowSize());
      KafkaIO.Read<String, String> kafkaEntityStreamReader = KafkaIO.<String, 
String>read()
          .withBootstrapServers(beamCliOptions.getKafkaServers())
          .withTopic(beamCliOptions.getKafkaTopic())
          .withKeyDeserializer(StringDeserializer.class)
          .withValueDeserializer(StringDeserializer.class)
          .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", 
"latest","enable.auto.commit","true"));

      pipeline.apply(kafkaEntityStreamReader.withoutMetadata())
          .apply(Values.create())
          .apply(Window.<String>into(
              
FixedWindows.of(Duration.standardMinutes(fixedWindowDuration.getMins())))
              
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
                  
.plusDelayOf(Duration.standardMinutes(mergeDistribution.getMins()))))
              .discardingFiredPanes()
              .withAllowedLateness(Duration.ZERO))
          .apply(ParDo.of(new ExtractDataFn(
              beamCliOptions.getDatePartitionKey(),
              new 
DateTime().minusDays(beamCliOptions.getDaysAgo()).getMillis())))
          .apply("Applying GroupByKey on YYYY-MM-DD HH ", GroupByKey.create())
          .apply("Applying Merge ", ParDo.of(new MergeDataFn(beamCliOptions)));

      pipeline.run();
    }

> Snapshot state of aggregated data is not maintained in flink's checkpointing
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-8413
>                 URL: https://issues.apache.org/jira/browse/FLINK-8413
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.2
>            Reporter: suganya
>
> We have a project which consumes events from kafka,does a groupby in a time 
> window(5 mins),after window elapses it pushes the events to downstream for 
> merge.This project is deployed using flink ,we have enabled checkpointing to 
> recover from failed state.
> (windowsize: 5mins , checkpointingInterval: 5mins,state.backend: filesystem)
> Offsets from kafka  get checkpointed every 5 
> mins(checkpointingInterval).Before finishing the entire DAG(groupBy and 
> merge) , events offsets are getting checkpointed.So incase of any restart 
> from task-manager ,new task gets started from last successful checkpoint ,but 
> we could'nt able to get the aggregated snapshot data(data from groupBy task) 
> from the persisted checkpoint.
> Able to retrieve the last successful checkpointed offset from kafka ,but 
> couldnt able to get last aggregated data till checkpointing.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to