[ 
https://issues.apache.org/jira/browse/BEAM-7498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17122786#comment-17122786
 ] 

Beam JIRA Bot commented on BEAM-7498:
-------------------------------------

This issue is P2 but has been unassigned without any comment for 60 days so it 
has been labeled "stale-P2". If this issue is still affecting you, we care! 
Please comment and remove the label. Otherwise, in 14 days the issue will be 
moved to P3.

Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed 
explanation of what these priorities mean.


> Inputs SQS with Session based Windowing doesn't work
> ----------------------------------------------------
>
>                 Key: BEAM-7498
>                 URL: https://issues.apache.org/jira/browse/BEAM-7498
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-aws
>    Affects Versions: 2.12.0
>         Environment: Docker with DirecctRunner
>            Reporter: Esteve
>            Priority: P2
>              Labels: SESSION, SQS, stale-P2
>   Original Estimate: 12h
>  Remaining Estimate: 12h
>
> Hi,
>  
> Trying to use Beam with AWS SQS service as an input source, using Session 
> windows.
> The windows aren't executed. Code works well when the input source is Kafka:
> {code:java}
> // code placeholder
> SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm");
> PipelineOptions options = PipelineOptionsFactory.create();
> AwsOptions awsOptions = options.as(AwsOptions.class);
> BasicAWSCredentials awsCreds = new BasicAWSCredentials("", "");
> awsOptions.setAwsCredentialsProvider(new 
> AWSStaticCredentialsProvider(awsCreds));
> awsOptions.setAwsRegion("eu-west-1");
> Pipeline p = Pipeline.create(options);
> // This example reads a public data set consisting of the complete works of 
> Shakespeare.
> p.apply(SqsIO.read().withQueueUrl("https://sqs.eu-west-1.amazonaws.com/XXXXXXXXXXXXXXXX";))
> /*Per session windows*/
> .apply(ParDo.of(new DoFn<Message, String>() {
> @ProcessElement
> public void processElement(@Element Message element, OutputReceiver<String> 
> out) {
> // Extract the timestamp from log entry we're currently processing.
> LOG.info("Message Body: {}", element.getBody());
> out.output(element.getBody());
> }
> }))
> //Set windowing configuration
> .apply(
> "WindowIntoSessions",
> Window.<String>into(
> Sessions.withGapDuration(Duration.standardSeconds(5)))
> .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW)
> //.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
> // Late data is dropped
> .accumulatingFiredPanes()
> .withAllowedLateness(Duration.ZERO))
> //Extract and count: Extracts a the object to an KV store of <userID, count>
> .apply(
> MapElements.into( TypeDescriptors.kvs(TypeDescriptors.strings(), 
> TypeDescriptors.integers()))
> .via(
> (String testO) -> KV.of(testO, new Integer(1))
> )
> )
> .apply("CountElements", Sum.integersPerKey())
> .apply("Log", ParDo.of(new FilterTextFn()))
> .apply(
> MapElements.into(TypeDescriptors.strings())
> .via(
> (KV<String, Integer> wordCount) ->
> wordCount.getKey() + ": " + wordCount.getValue()))
> ;
> p.run().waitUntilFinish();
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to