damccorm opened a new issue, #20754:
URL: https://github.com/apache/beam/issues/20754

   Deploying as a template a streaming job in Google Dataflow, I can't create 
more than one PubSub subscriptions, when subscription name is determined at 
runtime (with a ValueProvider):
   ```
   
         final List<PCollection<String>> pCollections = new ArrayList<>();
           for (final String
   topic : topics) {
               final ValueProvider<String> vpSub = 
ValueProvider.NestedValueProvider.of(options.getSubscriptionPrefix(),
   
                      prefix -> prefix + topic);
               PCollection<String> messages =
           
              pipeline.apply("Sub-read-" + topic, 
PubsubIO.readStrings().fromSubscription(vpSub))
      
                           .apply("Windowing", Window.into(duration));
               pCollections.add(messages);
   
          }
   
   ```
   
   During the launching of the job, I got the error in the logs:
   ```
   
   Workflow failed. Causes: The pubsub configuration contains errors: 
Subscription 'projects/_project_/subscriptions/_prefix_'
   is consumed by multiple stages, this will result in undefined behavior.
   
   ```
   
   It seems that the coded SerializableFunction that I provide to the instances 
of NestedValueProvider are not called.
   
   It could be due to 
[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java](https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java)
  StreamingPubsubIOReadTranslator#translate, especially these lines :
   ```
   
         if (overriddenTransform.getSubscriptionProvider() != null) {
           if (overriddenTransform.getSubscriptionProvider().isAccessible())
   {
             stepContext.addInput(
                 PropertyNames.PUBSUB_SUBSCRIPTION,
                
   overriddenTransform.getSubscription().getV1Beta1Path());
           } else {
             stepContext.addInput(
   
                PropertyNames.PUBSUB_SUBSCRIPTION_OVERRIDE,
                 ((NestedValueProvider) 
overriddenTransform.getSubscriptionProvider()).propertyName());
   
          }
         }
   
   ```
   
   Indeed this code :
    * Forces to use a NestedValueProvider if value is not accessible before 
runtime, due to the vehement cast,
    * Results in having the same incorrect subscription value at runtime (so 
only the prefix in my case) for all the reads of the loop, at least at 
validation time
   
    
   
   Seems to be the same issue reported there : 
[https://issuetracker.google.com/u/0/issues/157584222](https://issuetracker.google.com/u/0/issues/157584222)
   
   Imported from Jira 
[BEAM-11635](https://issues.apache.org/jira/browse/BEAM-11635). Original Jira 
may contain additional context.
   Reported by: Jean-Gabriel.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to