[
https://issues.apache.org/jira/browse/BEAM-7498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ismaël Mejía updated BEAM-7498:
-------------------------------
Status: Open (was: Triage Needed)
> Inputs SQS with Session based Windowing doesn't work
> ----------------------------------------------------
>
> Key: BEAM-7498
> URL: https://issues.apache.org/jira/browse/BEAM-7498
> Project: Beam
> Issue Type: Bug
> Components: io-java-aws
> Affects Versions: 2.12.0
> Environment: Docker with DirecctRunner
> Reporter: Esteve
> Priority: Major
> Labels: SESSION, SQS
> Original Estimate: 12h
> Remaining Estimate: 12h
>
> Hi,
>
> Trying to use Beam with AWS SQS service as an input source, using Session
> windows.
> The windows aren't executed. Code works well when the input source is Kafka:
> {code:java}
> // code placeholder
> SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm");
> PipelineOptions options = PipelineOptionsFactory.create();
> AwsOptions awsOptions = options.as(AwsOptions.class);
> BasicAWSCredentials awsCreds = new BasicAWSCredentials("", "");
> awsOptions.setAwsCredentialsProvider(new
> AWSStaticCredentialsProvider(awsCreds));
> awsOptions.setAwsRegion("eu-west-1");
> Pipeline p = Pipeline.create(options);
> // This example reads a public data set consisting of the complete works of
> Shakespeare.
> p.apply(SqsIO.read().withQueueUrl("https://sqs.eu-west-1.amazonaws.com/XXXXXXXXXXXXXXXX"))
> /*Per session windows*/
> .apply(ParDo.of(new DoFn<Message, String>() {
> @ProcessElement
> public void processElement(@Element Message element, OutputReceiver<String>
> out) {
> // Extract the timestamp from log entry we're currently processing.
> LOG.info("Message Body: {}", element.getBody());
> out.output(element.getBody());
> }
> }))
> //Set windowing configuration
> .apply(
> "WindowIntoSessions",
> Window.<String>into(
> Sessions.withGapDuration(Duration.standardSeconds(5)))
> .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW)
> //.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
> // Late data is dropped
> .accumulatingFiredPanes()
> .withAllowedLateness(Duration.ZERO))
> //Extract and count: Extracts a the object to an KV store of <userID, count>
> .apply(
> MapElements.into( TypeDescriptors.kvs(TypeDescriptors.strings(),
> TypeDescriptors.integers()))
> .via(
> (String testO) -> KV.of(testO, new Integer(1))
> )
> )
> .apply("CountElements", Sum.integersPerKey())
> .apply("Log", ParDo.of(new FilterTextFn()))
> .apply(
> MapElements.into(TypeDescriptors.strings())
> .via(
> (KV<String, Integer> wordCount) ->
> wordCount.getKey() + ": " + wordCount.getValue()))
> ;
> p.run().waitUntilFinish();
> }
> {code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)