chamikaramj commented on code in PR #30032:
URL: https://github.com/apache/beam/pull/30032#discussion_r1463784005
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java:
##########
@@ -695,7 +697,25 @@ public Write<?> fromConfigRow(Row configRow) {
if (maxBytesPerPartition != null) {
builder = builder.setMaxBytesPerPartition(maxBytesPerPartition);
}
- Duration triggeringFrequency =
configRow.getValue("triggering_frequency");
+
+ String updateCompatibilityBeamVersion =
+ options.as(StreamingOptions.class).getUpdateCompatibilityVersion();
+
+ // We need to update the 'triggerring_frequency' field name for
pipelines that are upgraded
+ // from Beam 2.53.0 due to https://github.com/apache/beam/pull/29785.
+ // We need to set a default 'updateCompatibilityBeamVersion' here
since this PipelineOption
+ // is not correctly passed in for pipelines that use Beam 2.53.0.
+ // Both above issues are fixed for Beam 2.54.0 and later.
+ updateCompatibilityBeamVersion =
+ (updateCompatibilityBeamVersion != null) ?
updateCompatibilityBeamVersion : "2.53.0";
+
+ String triggeringFrequencyFieldName =
+ (updateCompatibilityBeamVersion != null
Review Comment:
Yeah, probably we can add some utils around updating the schema for
different Beam version ranges etc.
I'll get this in since this fixes some plumbing we needed for schema changes
anyways and also unblocks the release :).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]