[ 
https://issues.apache.org/jira/browse/BEAM-361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573234#comment-15573234
 ] 

Ven Bijjam edited comment on BEAM-361 at 10/13/16 9:29 PM:
-----------------------------------------------------------

I am thinking along these lines:: in
{code} @VisibleForTesting
  PubsubUnboundedSource(
      Clock clock,
      PubsubClientFactory pubsubFactory,
      @Nullable ProjectPath project,
      @Nullable TopicPath topic,
      @Nullable SubscriptionPath subscription,
      Coder<T> elementCoder,
      @Nullable String timestampLabel,
      @Nullable String idLabel) {
    checkArgument((topic == null) != (subscription == null),
                  "Exactly one of topic and subscription must be given");
    checkArgument((topic == null) == (project == null),
                  "Project must be given if topic is given");
    this.clock = clock;
    this.pubsubFactory = checkNotNull(pubsubFactory);
    this.project = project;
    this.topic = topic;
    this.subscription = subscription;
    this.elementCoder = checkNotNull(elementCoder);
    this.timestampLabel = timestampLabel;
    this.idLabel = idLabel == null ? 
UUID.nameUUIDFromBytes(timestampLabel.getBytes()).toString() : idLabel; 
  }{code} 
{code}..\sdks\java\core\src\main\java\org\apache\beam\sdk\io\PubsubUnboundedSource.java{code}

change the following line {code}this.idLabel = idLabel; {code} 
to the following:
{code}this.idLabel = idLabel == null ? 
UUID.nameUUIDFromBytes(timestampLabel.getBytes()).toString() : idLabel;{code}


was (Author: venbijjam):
I am thinking along these lines:: in
{code} @VisibleForTesting
  PubsubUnboundedSource(
      Clock clock,
      PubsubClientFactory pubsubFactory,
      @Nullable ProjectPath project,
      @Nullable TopicPath topic,
      @Nullable SubscriptionPath subscription,
      Coder<T> elementCoder,
      @Nullable String timestampLabel,
      @Nullable String idLabel) {
    checkArgument((topic == null) != (subscription == null),
                  "Exactly one of topic and subscription must be given");
    checkArgument((topic == null) == (project == null),
                  "Project must be given if topic is given");
    this.clock = clock;
    this.pubsubFactory = checkNotNull(pubsubFactory);
    this.project = project;
    this.topic = topic;
    this.subscription = subscription;
    this.elementCoder = checkNotNull(elementCoder);
    this.timestampLabel = timestampLabel;
    this.idLabel = idLabel == null ? 
UUID.nameUUIDFromBytes(timestampLabel.getBytes()).toString() : idLabel; 
//Geetha instead of idLabel;
  }{code} 
{code}..\sdks\java\core\src\main\java\org\apache\beam\sdk\io\PubsubUnboundedSource.java{code}

change the following line {code}this.idLabel = idLabel; {code} 
to the following:
{code}this.idLabel = idLabel == null ? 
UUID.nameUUIDFromBytes(timestampLabel.getBytes()).toString() : idLabel;{code}

> Batch Pub/Sub sink does not set idLabel
> ---------------------------------------
>
>                 Key: BEAM-361
>                 URL: https://issues.apache.org/jira/browse/BEAM-361
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-gcp
>            Reporter: Daniel Mills
>              Labels: newbie, starter
>
> The transform should generate a unique token (must be stable across retries), 
> and set it on outgoing pubsub messages if an idLabel has been set



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to