Abacn commented on code in PR #29395:
URL: https://github.com/apache/beam/pull/29395#discussion_r1391637530


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java:
##########
@@ -35,6 +35,10 @@ public interface ExperimentalOptions extends PipelineOptions 
{
 
   String STATE_SAMPLING_PERIOD_MILLIS = "state_sampling_period_millis";
 
+  String ENABLE_CUSTOM_PUBSUB_SOURCE = "enable_custom_pubsub_source";

Review Comment:
   sdk core code path does not hold information about gcp io component (pubsub 
experiment). Consider put these literals inside PubsubIO?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java:
##########
@@ -59,6 +60,10 @@ public String getUrn() {
     @Override
     public RunnerApi.FunctionSpec translate(
         AppliedPTransform<?, ?, Unbounded<?>> transform, SdkComponents 
components) {
+      if (ExperimentalOptions.hasExperiment(

Review Comment:
   Would you mind explain a little bit why the current transform override 
(referred above, override invoked when the experiments not provided) is not 
sufficient runner v2? From DataflowRunner.java the stacktrack for execution to 
this point is
   
   DataflowRunner.getOverrides() L552
   DataflowRunner.replaceV1Transforms() L1579
   DataflowRunner.run() L1193
   
   which means both Dataflow v1 and v2 job submission had the override by 
default, while non-Dataflow runner always has custom runner (aka Beam provided 
Pubsub IO)
   
   So there are two questions here
   - why the current override working for Dataflow v1 [override can be enabled 
(default) or disabled (with flag)] but not v2 (persumably always enabled?)?
   - is it possible to keep the change inside Dataflow runner, follow the 
existing pattern there.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java:
##########
@@ -35,6 +35,10 @@ public interface ExperimentalOptions extends PipelineOptions 
{
 
   String STATE_SAMPLING_PERIOD_MILLIS = "state_sampling_period_millis";
 
+  String ENABLE_CUSTOM_PUBSUB_SOURCE = "enable_custom_pubsub_source";

Review Comment:
   Also, these need to be in sync with 
https://github.com/apache/beam/blob/618d7a8f2c6520bfae1ca84dced7fd39d7af45f4/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L552
 and 
https://github.com/apache/beam/blob/618d7a8f2c6520bfae1ca84dced7fd39d7af45f4/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L558.
 If a constant is created, also refer to them there



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to