[
https://issues.apache.org/jira/browse/BEAM-11635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Limbourg updated BEAM-11635:
----------------------------
Description:
Deploying as a template a streaming job in Google Dataflow, I can't create more
than one PubSub subscriptions, when subscription name is determined at runtime
(with a ValueProvider):
{code:java}
final List<PCollection<String>> pCollections = new ArrayList<>();
for (final String topic : topics) {
final ValueProvider<String> vpSub =
ValueProvider.NestedValueProvider.of(options.getSubscriptionPrefix(),
prefix -> prefix + topic);
PCollection<String> messages =
pipeline.apply("Sub-read-" + topic,
PubsubIO.readStrings().fromSubscription(vpSub))
.apply("Windowing", Window.into(duration));
pCollections.add(messages);
}
{code}
During the launching of the job, I got the error in the logs:
{code:java}
Workflow failed. Causes: The pubsub configuration contains errors: Subscription
'projects/_project_/subscriptions/_prefix_' is consumed by multiple stages,
this will result in undefined behavior.
{code}
It seems that the coded SerializableFunction that I provide to the instances of
NestedValueProvider are not called.
It could be due to
[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java]
StreamingPubsubIOReadTranslator#translate, especially these lines :
{code:java}
if (overriddenTransform.getSubscriptionProvider() != null) {
if (overriddenTransform.getSubscriptionProvider().isAccessible()) {
stepContext.addInput(
PropertyNames.PUBSUB_SUBSCRIPTION,
overriddenTransform.getSubscription().getV1Beta1Path());
} else {
stepContext.addInput(
PropertyNames.PUBSUB_SUBSCRIPTION_OVERRIDE,
((NestedValueProvider)
overriddenTransform.getSubscriptionProvider()).propertyName());
}
}
{code}
Indeed this code :
* Forces to use a NestedValueProvider if value is not accessible before
runtime, due to the vehement cast,
* Results in having the same incorrect subscription value at runtime (so only
the prefix in my case) for all the reads of the loop, at least at validation
time
Seems to be the same issue reported there :
[https://issuetracker.google.com/u/0/issues/157584222]
was:
Deploying as a template a streaming job in Google Dataflow, I can't create more
than one PubSub subscriptions, which subscription name is determined at runtime:
{code:java}
final List<PCollection<String>> pCollections = new ArrayList<>();
for (final String topic : topics) {
final ValueProvider<String> vpSub =
ValueProvider.NestedValueProvider.of(options.getSubscriptionPrefix(),
prefix -> prefix + topic);
PCollection<String> messages =
pipeline.apply("Sub-read-" + topic,
PubsubIO.readStrings().fromSubscription(vpSub))
.apply("Windowing", Window.into(duration));
pCollections.add(messages);
}
{code}
During the launching of the job, I got the error in the logs:
{code:java}
Workflow failed. Causes: The pubsub configuration contains errors: Subscription
'projects/_project_/subscriptions/_prefix_' is consumed by multiple stages,
this will result in undefined behavior.
{code}
It seems that the coded SerializableFunction that I provide to the instances of
NestedValueProvider are not called.
It could be due to
[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java]
StreamingPubsubIOReadTranslator#translate, especially these lines :
{code:java}
if (overriddenTransform.getSubscriptionProvider() != null) {
if (overriddenTransform.getSubscriptionProvider().isAccessible()) {
stepContext.addInput(
PropertyNames.PUBSUB_SUBSCRIPTION,
overriddenTransform.getSubscription().getV1Beta1Path());
} else {
stepContext.addInput(
PropertyNames.PUBSUB_SUBSCRIPTION_OVERRIDE,
((NestedValueProvider)
overriddenTransform.getSubscriptionProvider()).propertyName());
}
}
{code}
Indeed this code :
* Forces to use a NestedValueProvider if value is not accessible before
runtime, due to the vehement cast,
* Results in having the same incorrect subscription value at runtime (so only
the prefix in my case) for all the reads of the loop, at least at validation
time
Seems to be the same issue reported there :
[https://issuetracker.google.com/u/0/issues/157584222]
> Unable to subscribe to several PubSub subscriptions with a valueprovider
> ------------------------------------------------------------------------
>
> Key: BEAM-11635
> URL: https://issues.apache.org/jira/browse/BEAM-11635
> Project: Beam
> Issue Type: Bug
> Components: io-java-gcp, runner-dataflow
> Affects Versions: 2.27.0
> Environment: GCP
> Reporter: Limbourg
> Priority: P2
>
> Deploying as a template a streaming job in Google Dataflow, I can't create
> more than one PubSub subscriptions, when subscription name is determined at
> runtime (with a ValueProvider):
> {code:java}
> final List<PCollection<String>> pCollections = new ArrayList<>();
> for (final String topic : topics) {
> final ValueProvider<String> vpSub =
> ValueProvider.NestedValueProvider.of(options.getSubscriptionPrefix(),
> prefix -> prefix + topic);
> PCollection<String> messages =
> pipeline.apply("Sub-read-" + topic,
> PubsubIO.readStrings().fromSubscription(vpSub))
> .apply("Windowing", Window.into(duration));
> pCollections.add(messages);
> }
> {code}
> During the launching of the job, I got the error in the logs:
> {code:java}
> Workflow failed. Causes: The pubsub configuration contains errors:
> Subscription 'projects/_project_/subscriptions/_prefix_' is consumed by
> multiple stages, this will result in undefined behavior.
> {code}
> It seems that the coded SerializableFunction that I provide to the instances
> of NestedValueProvider are not called.
> It could be due to
> [https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java]
> StreamingPubsubIOReadTranslator#translate, especially these lines :
> {code:java}
> if (overriddenTransform.getSubscriptionProvider() != null) {
> if (overriddenTransform.getSubscriptionProvider().isAccessible()) {
> stepContext.addInput(
> PropertyNames.PUBSUB_SUBSCRIPTION,
> overriddenTransform.getSubscription().getV1Beta1Path());
> } else {
> stepContext.addInput(
> PropertyNames.PUBSUB_SUBSCRIPTION_OVERRIDE,
> ((NestedValueProvider)
> overriddenTransform.getSubscriptionProvider()).propertyName());
> }
> }
> {code}
> Indeed this code :
> * Forces to use a NestedValueProvider if value is not accessible before
> runtime, due to the vehement cast,
> * Results in having the same incorrect subscription value at runtime (so
> only the prefix in my case) for all the reads of the loop, at least at
> validation time
>
> Seems to be the same issue reported there :
> [https://issuetracker.google.com/u/0/issues/157584222]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)