robertwb commented on code in PR #30032:
URL: https://github.com/apache/beam/pull/30032#discussion_r1458184814


##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java:
##########
@@ -204,11 +211,22 @@ RunnerApi.Pipeline updateTransformViaTransformService(
 
     ExpansionApi.ExpansionRequest.Builder requestBuilder =
         ExpansionApi.ExpansionRequest.newBuilder();
+
+    // Creating a clone here so that we can set properties without modifying 
the original
+    // PipelineOptions object.
+    PipelineOptions optionsClone =
+        
PipelineOptionsTranslation.fromProto(PipelineOptionsTranslation.toProto(options));
+    // Setting the option 'updateCompatibilityVersion' to the current SDK 
version so that the

Review Comment:
   Maybe only set it if it was not explicitly set by the user (e.g. to an even 
older version)? 



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java:
##########
@@ -695,7 +697,25 @@ public Write<?> fromConfigRow(Row configRow) {
         if (maxBytesPerPartition != null) {
           builder = builder.setMaxBytesPerPartition(maxBytesPerPartition);
         }
-        Duration triggeringFrequency = 
configRow.getValue("triggering_frequency");
+
+        String updateCompatibilityBeamVersion =
+            options.as(StreamingOptions.class).getUpdateCompatibilityVersion();
+
+        // We need to update the 'triggerring_frequency' field name for 
pipelines that are upgraded
+        // from Beam 2.53.0 due to https://github.com/apache/beam/pull/29785.
+        // We need to set a default 'updateCompatibilityBeamVersion' here 
since this PipelineOption
+        // is not correctly passed in for pipelines that use Beam 2.53.0.
+        // Both above issues are fixed for Beam 2.54.0 and later.
+        updateCompatibilityBeamVersion =
+            (updateCompatibilityBeamVersion != null) ? 
updateCompatibilityBeamVersion : "2.53.0";
+
+        String triggeringFrequencyFieldName =
+            (updateCompatibilityBeamVersion != null
+                    && updateCompatibilityBeamVersion.equals("2.53.0"))

Review Comment:
   Should this be any version less than 2.53, rather than exactly 2.53? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to