[
https://issues.apache.org/jira/browse/BEAM-13129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17514234#comment-17514234
]
Beam JIRA Bot commented on BEAM-13129:
--------------------------------------
This issue was marked "stale-P2" and has not received a public comment in 14
days. It is now automatically moved to P3. If you are still affected by it, you
can comment and move it back to P2.
> [Streaming][PubSub Lite][DataflowRunner] PubSub Lite IO doesn't sink message
> to PubSub Lite topic
> -------------------------------------------------------------------------------------------------
>
> Key: BEAM-13129
> URL: https://issues.apache.org/jira/browse/BEAM-13129
> Project: Beam
> Issue Type: Bug
> Components: extensions-java-gcp
> Affects Versions: 2.30.0, 2.31.0, 2.32.0, 2.33.0
> Environment: GCP Dataflow
> Reporter: David Duarte
> Priority: P3
>
> We are currently using PubSub Lite IO with Dataflow Runner.
> Our Beam job is on streaming mode.
> The read from a PubSub Lite subscription works correctly.
> The sink to a PubSub topic doesn't work with the runner.
> When we take a look on the job graph for the PubSubLite Write step
> (transform : org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteSink ) in the
> Google Cloud Console we don't see any writes.
> When we check the topic we don't see any outputs.
> Code works well on 2.27, 2.28, 2.29 Beam version.
> Here is the code we used to do the check on version:
> * 2.27
> * 2.28
> * 2.29
> * 2.30
> * 2.31
> * 2.33
> * 2.33
>
> {code:java}
> // PubSubStreamingWriteJobOptions options =
>
> PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubStreamingWriteJobOptions.class);
> options.setStreaming(true);
> //set up file system
> FileSystems.setDefaultPipelineOptions(options);
> TopicPath topicPath = TopicPath.newBuilder()
> .setProject(ProjectId.of("[PROJECT ID]"))
> .setLocation(CloudZone.of(CloudRegion.of("[REGION]"), "[ZONE CHAR]"))
> .setName(TopicName.of("[TOPIC ID]"))
> .build();
> PublisherOptions publisherOptions =
> PublisherOptions.newBuilder()
> .setTopicPath(topicPath)
> .build();
> Pipeline pipeline = Pipeline.create(options);
> pipeline.apply(TextIO.read()
> .from("gs://[BUCKET]/[OBJECT_PREFIX]*")
> .watchForNewFiles(
> Duration.standardMinutes(1),
>
> Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))))
> .apply(CREATE_PUB_SUB_LITE_MESSAGE_STEP,
> MapElements.into(TypeDescriptor.of(PubSubMessage.class)).via(file -> {
> Instant instant = Instant.now();
> Message message =
> Message.builder()
> .setData(ByteString.copyFromUtf8("message " +
> file))
> .setEventTime(Timestamp.newBuilder()
> .setNanos(instant.getNano())
> .setSeconds(instant.getEpochSecond())
> .build())
> .build();
> return message.toProto();
> }))
> .apply(SINK_PUB_SUB_LITE_MESSAGES_STEP,
> PubsubLiteIO.write(publisherOptions));
> pipeline.run();
> {code}
> Can you help us to found the issue and fix the Beam version please?
>
> Best regards,
> David Duarte
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)