[ 
https://issues.apache.org/jira/browse/BEAM-13129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Beam JIRA Bot updated BEAM-13129:
---------------------------------
    Labels:   (was: stale-P2)

> [Streaming][PubSub Lite][DataflowRunner] PubSub Lite IO doesn't sink message 
> to PubSub Lite topic
> -------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-13129
>                 URL: https://issues.apache.org/jira/browse/BEAM-13129
>             Project: Beam
>          Issue Type: Bug
>          Components: extensions-java-gcp
>    Affects Versions: 2.30.0, 2.31.0, 2.32.0, 2.33.0
>         Environment: GCP Dataflow
>            Reporter: David Duarte
>            Priority: P3
>
> We are currently using PubSub Lite IO with Dataflow Runner.
> Our Beam job is on streaming mode.
> The read from a PubSub Lite subscription works correctly.
> The sink to a PubSub topic doesn't work with the runner. 
> When we take a look  on the job graph for the PubSubLite Write step 
> (transform : org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteSink ) in the 
> Google Cloud Console we don't see any writes.
> When we check the topic we don't see any outputs.
> Code works well on 2.27, 2.28, 2.29 Beam version.
> Here is the code we used to do the check on version:
>  * 2.27 
>  * 2.28
>  * 2.29
>  * 2.30
>  * 2.31
>  * 2.33
>  * 2.33
>  
> {code:java}
> // PubSubStreamingWriteJobOptions options =
>         
> PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubStreamingWriteJobOptions.class);
> options.setStreaming(true);
> //set up file system
> FileSystems.setDefaultPipelineOptions(options);
> TopicPath topicPath = TopicPath.newBuilder()
>         .setProject(ProjectId.of("[PROJECT ID]"))
>         .setLocation(CloudZone.of(CloudRegion.of("[REGION]"), "[ZONE CHAR]"))
>         .setName(TopicName.of("[TOPIC ID]"))
>         .build();
> PublisherOptions publisherOptions =
>         PublisherOptions.newBuilder()
>                 .setTopicPath(topicPath)
>                 .build();
> Pipeline pipeline = Pipeline.create(options);
> pipeline.apply(TextIO.read()
>                 .from("gs://[BUCKET]/[OBJECT_PREFIX]*")
>                 .watchForNewFiles(
>                         Duration.standardMinutes(1),
>                         
> Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))))
>         .apply(CREATE_PUB_SUB_LITE_MESSAGE_STEP, 
> MapElements.into(TypeDescriptor.of(PubSubMessage.class)).via(file -> {
>             Instant instant = Instant.now();
>             Message message =
>                     Message.builder()
>                             .setData(ByteString.copyFromUtf8("message " + 
> file))
>                             .setEventTime(Timestamp.newBuilder()
>                                     .setNanos(instant.getNano())
>                                     .setSeconds(instant.getEpochSecond())
>                                     .build())
>                             .build();
>             return message.toProto();
>         }))
>         .apply(SINK_PUB_SUB_LITE_MESSAGES_STEP, 
> PubsubLiteIO.write(publisherOptions));
> pipeline.run();
> {code}
> Can you help us to found the issue and fix the Beam version please?
>  
> Best regards,
> David Duarte
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to