[ 
https://issues.apache.org/jira/browse/BEAM-7498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismaël Mejía updated BEAM-7498:
-------------------------------
    Status: Open  (was: Triage Needed)

> Inputs SQS with Session based Windowing doesn't work
> ----------------------------------------------------
>
>                 Key: BEAM-7498
>                 URL: https://issues.apache.org/jira/browse/BEAM-7498
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-aws
>    Affects Versions: 2.12.0
>         Environment: Docker with DirecctRunner
>            Reporter: Esteve
>            Priority: Major
>              Labels: SESSION, SQS
>   Original Estimate: 12h
>  Remaining Estimate: 12h
>
> Hi,
>  
> Trying to use Beam with AWS SQS service as an input source, using Session 
> windows.
> The windows aren't executed. Code works well when the input source is Kafka:
> {code:java}
> // code placeholder
> SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm");
> PipelineOptions options = PipelineOptionsFactory.create();
> AwsOptions awsOptions = options.as(AwsOptions.class);
> BasicAWSCredentials awsCreds = new BasicAWSCredentials("", "");
> awsOptions.setAwsCredentialsProvider(new 
> AWSStaticCredentialsProvider(awsCreds));
> awsOptions.setAwsRegion("eu-west-1");
> Pipeline p = Pipeline.create(options);
> // This example reads a public data set consisting of the complete works of 
> Shakespeare.
> p.apply(SqsIO.read().withQueueUrl("https://sqs.eu-west-1.amazonaws.com/XXXXXXXXXXXXXXXX";))
> /*Per session windows*/
> .apply(ParDo.of(new DoFn<Message, String>() {
> @ProcessElement
> public void processElement(@Element Message element, OutputReceiver<String> 
> out) {
> // Extract the timestamp from log entry we're currently processing.
> LOG.info("Message Body: {}", element.getBody());
> out.output(element.getBody());
> }
> }))
> //Set windowing configuration
> .apply(
> "WindowIntoSessions",
> Window.<String>into(
> Sessions.withGapDuration(Duration.standardSeconds(5)))
> .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW)
> //.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
> // Late data is dropped
> .accumulatingFiredPanes()
> .withAllowedLateness(Duration.ZERO))
> //Extract and count: Extracts a the object to an KV store of <userID, count>
> .apply(
> MapElements.into( TypeDescriptors.kvs(TypeDescriptors.strings(), 
> TypeDescriptors.integers()))
> .via(
> (String testO) -> KV.of(testO, new Integer(1))
> )
> )
> .apply("CountElements", Sum.integersPerKey())
> .apply("Log", ParDo.of(new FilterTextFn()))
> .apply(
> MapElements.into(TypeDescriptors.strings())
> .via(
> (KV<String, Integer> wordCount) ->
> wordCount.getKey() + ": " + wordCount.getValue()))
> ;
> p.run().waitUntilFinish();
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to