[
https://issues.apache.org/jira/browse/FLINK-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16323546#comment-16323546
]
Aljoscha Krettek edited comment on FLINK-8413 at 1/24/18 1:02 PM:
------------------------------------------------------------------
The following code is using apache beam libraries to create a pipeline.Please
find the code.
{code}
public void run(String[] args) {
BeamCLIOptions beamCliOptions =
PipelineOptionsFactory.fromArgs(args).withValidation()
.as(BeamCLIOptions.class);
Pipeline pipeline = Pipeline.create(beamCliOptions);
MergeDistribution mergeDistribution = MergeDistribution
.valueOf(beamCliOptions.getMergeDistribution());
MergeDistribution fixedWindowDuration = MergeDistribution
.valueOf(beamCliOptions.getFixedWindowSize());
KafkaIO.Read<String, String> kafkaEntityStreamReader = KafkaIO.<String,
String>read()
.withBootstrapServers(beamCliOptions.getKafkaServers())
.withTopic(beamCliOptions.getKafkaTopic())
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
"latest","enable.auto.commit","true"));
pipeline.apply(kafkaEntityStreamReader.withoutMetadata())
.apply(Values.create())
.apply(Window.<String>into(
FixedWindows.of(Duration.standardMinutes(fixedWindowDuration.getMins())))
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(mergeDistribution.getMins()))))
.discardingFiredPanes()
.withAllowedLateness(Duration.ZERO))
.apply(ParDo.of(new ExtractDataFn(
beamCliOptions.getDatePartitionKey(),
new
DateTime().minusDays(beamCliOptions.getDaysAgo()).getMillis())))
.apply("Applying GroupByKey on YYYY-MM-DD HH ", GroupByKey.create())
.apply("Applying Merge ", ParDo.of(new MergeDataFn(beamCliOptions)));
pipeline.run();
}
{code}
was (Author: suganyap):
The following code is using apache beam libraries to create a pipeline.Please
find the code.
public void run(String[] args) {
BeamCLIOptions beamCliOptions =
PipelineOptionsFactory.fromArgs(args).withValidation()
.as(BeamCLIOptions.class);
Pipeline pipeline = Pipeline.create(beamCliOptions);
MergeDistribution mergeDistribution = MergeDistribution
.valueOf(beamCliOptions.getMergeDistribution());
MergeDistribution fixedWindowDuration = MergeDistribution
.valueOf(beamCliOptions.getFixedWindowSize());
KafkaIO.Read<String, String> kafkaEntityStreamReader = KafkaIO.<String,
String>read()
.withBootstrapServers(beamCliOptions.getKafkaServers())
.withTopic(beamCliOptions.getKafkaTopic())
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
"latest","enable.auto.commit","true"));
pipeline.apply(kafkaEntityStreamReader.withoutMetadata())
.apply(Values.create())
.apply(Window.<String>into(
FixedWindows.of(Duration.standardMinutes(fixedWindowDuration.getMins())))
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(mergeDistribution.getMins()))))
.discardingFiredPanes()
.withAllowedLateness(Duration.ZERO))
.apply(ParDo.of(new ExtractDataFn(
beamCliOptions.getDatePartitionKey(),
new
DateTime().minusDays(beamCliOptions.getDaysAgo()).getMillis())))
.apply("Applying GroupByKey on YYYY-MM-DD HH ", GroupByKey.create())
.apply("Applying Merge ", ParDo.of(new MergeDataFn(beamCliOptions)));
pipeline.run();
}
> Snapshot state of aggregated data is not maintained in flink's checkpointing
> ----------------------------------------------------------------------------
>
> Key: FLINK-8413
> URL: https://issues.apache.org/jira/browse/FLINK-8413
> Project: Flink
> Issue Type: Bug
> Components: State Backends, Checkpointing
> Affects Versions: 1.3.2
> Reporter: suganya
> Priority: Major
>
> We have a project which consumes events from kafka,does a groupby in a time
> window(5 mins),after window elapses it pushes the events to downstream for
> merge.This project is deployed using flink ,we have enabled checkpointing to
> recover from failed state.
> (windowsize: 5mins , checkpointingInterval: 5mins,state.backend: filesystem)
> Offsets from kafka get checkpointed every 5
> mins(checkpointingInterval).Before finishing the entire DAG(groupBy and
> merge) , events offsets are getting checkpointed.So incase of any restart
> from task-manager ,new task gets started from last successful checkpoint ,but
> we could'nt able to get the aggregated snapshot data(data from groupBy task)
> from the persisted checkpoint.
> Able to retrieve the last successful checkpointed offset from kafka ,but
> couldnt able to get last aggregated data till checkpointing.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)