This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 93339bcf061 Fixes an upgrade compatibility breakage for the BQ write 
transform (#30032)
93339bcf061 is described below

commit 93339bcf0612681cb5d421e90e0e97f6d5a42ce4
Author: Chamikara Jayalath <[email protected]>
AuthorDate: Tue Jan 23 10:28:46 2024 -0800

    Fixes an upgrade compatibility breakage for the BQ write transform (#30032)
    
    * Fixes an upgrade compatibility breakage for the BQ write transform
    
    * Addressing reviewer comments
    
    * Resolve a conflcit
---
 .../core/construction/PTransformTranslation.java   |  3 +-
 .../core/construction/PipelineTranslation.java     |  7 ++---
 .../core/construction/TransformUpgrader.java       | 35 +++++++++++++++++-----
 .../core/construction/TransformUpgraderTest.java   |  3 +-
 .../sdk/expansion/service/ExpansionService.java    | 12 ++++----
 .../ExpansionServiceSchemaTransformProvider.java   |  3 +-
 .../service/JavaClassLookupTransformProvider.java  |  3 +-
 ...xpansionServiceSchemaTransformProviderTest.java |  8 +++--
 .../expansion/service/ExpansionServiceTest.java    |  2 +-
 .../beam/sdk/expansion/service/ExternalTest.java   | 30 +++++++++----------
 .../sdk/io/gcp/bigquery/BigQueryIOTranslation.java | 26 ++++++++++++++--
 .../io/gcp/bigquery/BigQueryIOTranslationTest.java | 14 +++++++--
 .../sdk/io/kafka/upgrade/KafkaIOTranslation.java   |  5 ++--
 .../io/kafka/upgrade/KafkaIOTranslationTest.java   |  5 ++--
 .../testing/expansion/TestExpansionService.java    |  5 ++--
 15 files changed, 111 insertions(+), 50 deletions(-)

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index 6829e0d6b23..6bdd0fc3739 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -40,6 +40,7 @@ import 
org.apache.beam.runners.core.construction.ParDoTranslation.ParDoTranslato
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.RowCoder;
 import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.schemas.SchemaTranslation;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -609,7 +610,7 @@ public class PTransformTranslation {
      *     {@link #toConfigRow(PTransform)} method.
      * @return a transform represented by the current {@code 
TransformPayloadTranslator}.
      */
-    default T fromConfigRow(Row configRow) {
+    default T fromConfigRow(Row configRow, PipelineOptions options) {
       throw new UnsupportedOperationException("Not implemented");
     }
 
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
index 4433b4b0475..688d7a80864 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
@@ -111,14 +111,13 @@ public class PipelineTranslation {
       res = elideDeprecatedViews(res);
     }
 
-    ExternalTranslationOptions externalTranslationOptions =
-        pipeline.getOptions().as(ExternalTranslationOptions.class);
-    List<String> urnsToOverride = 
externalTranslationOptions.getTransformsToOverride();
+    List<String> urnsToOverride =
+        
pipeline.getOptions().as(ExternalTranslationOptions.class).getTransformsToOverride();
     if (urnsToOverride.size() > 0 && upgradeTransforms) {
       try (TransformUpgrader upgrader = TransformUpgrader.of()) {
         res =
             upgrader.upgradeTransformsViaTransformService(
-                res, urnsToOverride, externalTranslationOptions);
+                res, urnsToOverride, pipeline.getOptions());
       } catch (Exception e) {
         throw new RuntimeException(
             "Could not override the transforms with URNs " + urnsToOverride, 
e);
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java
index 4f1a02165d2..f07df605215 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java
@@ -41,8 +41,11 @@ import org.apache.beam.model.pipeline.v1.ExternalTransforms;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.SchemaApi;
 import 
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transformservice.launcher.TransformServiceLauncher;
+import org.apache.beam.sdk.util.ReleaseInfo;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
@@ -97,7 +100,7 @@ public class TransformUpgrader implements AutoCloseable {
    * @throws Exception
    */
   public RunnerApi.Pipeline upgradeTransformsViaTransformService(
-      RunnerApi.Pipeline pipeline, List<String> urnsToOverride, 
ExternalTranslationOptions options)
+      RunnerApi.Pipeline pipeline, List<String> urnsToOverride, 
PipelineOptions options)
       throws IOException, TimeoutException {
     List<String> transformsToOverride =
         pipeline.getComponents().getTransformsMap().entrySet().stream()
@@ -127,13 +130,15 @@ public class TransformUpgrader implements AutoCloseable {
     String serviceAddress;
     TransformServiceLauncher service = null;
 
-    if (options.getTransformServiceAddress() != null) {
-      serviceAddress = options.getTransformServiceAddress();
-    } else if (options.getTransformServiceBeamVersion() != null) {
+    ExternalTranslationOptions externalTranslationOptions =
+        options.as(ExternalTranslationOptions.class);
+    if (externalTranslationOptions.getTransformServiceAddress() != null) {
+      serviceAddress = externalTranslationOptions.getTransformServiceAddress();
+    } else if (externalTranslationOptions.getTransformServiceBeamVersion() != 
null) {
       String projectName = UUID.randomUUID().toString();
       int port = findAvailablePort();
       service = TransformServiceLauncher.forProject(projectName, port, null);
-      service.setBeamVersion(options.getTransformServiceBeamVersion());
+      
service.setBeamVersion(externalTranslationOptions.getTransformServiceBeamVersion());
 
       // Starting the transform service.
       service.start();
@@ -169,7 +174,7 @@ public class TransformUpgrader implements AutoCloseable {
           RunnerApi.Pipeline runnerAPIpipeline,
           String transformId,
           Endpoints.ApiServiceDescriptor transformServiceEndpoint,
-          ExternalTranslationOptions options)
+          PipelineOptions options)
           throws IOException {
     RunnerApi.PTransform transformToUpgrade =
         runnerAPIpipeline.getComponents().getTransformsMap().get(transformId);
@@ -207,11 +212,26 @@ public class TransformUpgrader implements AutoCloseable {
 
     ExpansionApi.ExpansionRequest.Builder requestBuilder =
         ExpansionApi.ExpansionRequest.newBuilder();
+
+    // Creating a clone here so that we can set properties without modifying 
the original
+    // PipelineOptions object.
+    PipelineOptions optionsClone =
+        
PipelineOptionsTranslation.fromProto(PipelineOptionsTranslation.toProto(options));
+    String updateCompatibilityVersion =
+        
optionsClone.as(StreamingOptions.class).getUpdateCompatibilityVersion();
+    if (updateCompatibilityVersion == null || 
updateCompatibilityVersion.isEmpty()) {
+      // Setting the option 'updateCompatibilityVersion' to the current SDK 
version so that the
+      // TransformService uses a compatible schema.
+      optionsClone
+          .as(StreamingOptions.class)
+          
.setUpdateCompatibilityVersion(ReleaseInfo.getReleaseInfo().getSdkVersion());
+    }
     ExpansionApi.ExpansionRequest request =
         requestBuilder
             .setComponents(runnerAPIpipeline.getComponents())
             .setTransform(ptransformBuilder.build())
             .setNamespace(UPGRADE_NAMESPACE)
+            
.setPipelineOptions(PipelineOptionsTranslation.toProto(optionsClone))
             .addAllRequirements(runnerAPIpipeline.getRequirementsList())
             .build();
 
@@ -242,7 +262,8 @@ public class TransformUpgrader implements AutoCloseable {
 
     // Adds an annotation that denotes the Beam version the transform was 
upgraded to.
     RunnerApi.PTransform.Builder expandedTransformBuilder = 
expandedTransform.toBuilder();
-    String transformServiceVersion = options.getTransformServiceBeamVersion();
+    String transformServiceVersion =
+        
options.as(ExternalTranslationOptions.class).getTransformServiceBeamVersion();
     if (transformServiceVersion == null || transformServiceVersion.isEmpty()) {
       transformServiceVersion = "unknown";
     }
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformUpgraderTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformUpgraderTest.java
index e14fa556dd9..2b01bf70246 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformUpgraderTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformUpgraderTest.java
@@ -35,6 +35,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.schemas.Schema;
@@ -93,7 +94,7 @@ public class TransformUpgraderTest {
     }
 
     @Override
-    public TestTransform fromConfigRow(Row configRow) {
+    public TestTransform fromConfigRow(Row configRow, PipelineOptions options) 
{
       return new TestTransform(configRow.getInt32("multiplier"));
     }
 
diff --git 
a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
 
b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
index 7760cab64ac..5d46100fe65 100644
--- 
a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
+++ 
b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
@@ -152,7 +152,8 @@ public class ExpansionService extends 
ExpansionServiceGrpc.ExpansionServiceImplB
           TransformProvider transformProvider =
               new TransformProvider() {
                 @Override
-                public PTransform getTransform(RunnerApi.FunctionSpec spec) {
+                public PTransform getTransform(
+                    RunnerApi.FunctionSpec spec, PipelineOptions options) {
                   try {
                     Class configClass = getConfigClass(builderInstance);
                     return builderInstance.buildExternal(
@@ -222,14 +223,14 @@ public class ExpansionService extends 
ExpansionServiceGrpc.ExpansionServiceImplB
           }
           final String finalUrn = urn;
           TransformProvider transformProvider =
-              spec -> {
+              (spec, options) -> {
                 try {
                   ExternalConfigurationPayload payload =
                       
ExternalConfigurationPayload.parseFrom(spec.getPayload());
                   Row configRow =
                       
RowCoder.of(SchemaTranslation.schemaFromProto(payload.getSchema()))
                           .decode(new 
ByteArrayInputStream(payload.getPayload().toByteArray()));
-                  PTransform transformFromRow = 
translator.fromConfigRow(configRow);
+                  PTransform transformFromRow = 
translator.fromConfigRow(configRow, options);
                   if (transformFromRow != null) {
                     return transformFromRow;
                   } else {
@@ -441,7 +442,7 @@ public class ExpansionService extends 
ExpansionServiceGrpc.ExpansionServiceImplB
       }
     }
 
-    PTransform<InputT, OutputT> getTransform(RunnerApi.FunctionSpec spec);
+    PTransform<InputT, OutputT> getTransform(RunnerApi.FunctionSpec spec, 
PipelineOptions options);
 
     default Map<String, PCollection<?>> extractOutputs(OutputT output) {
       if (output instanceof PDone) {
@@ -485,7 +486,8 @@ public class ExpansionService extends 
ExpansionServiceGrpc.ExpansionServiceImplB
     default Map<String, PCollection<?>> apply(
         Pipeline p, String name, RunnerApi.FunctionSpec spec, Map<String, 
PCollection<?>> inputs) {
       return extractOutputs(
-          Pipeline.applyTransform(name, createInput(p, inputs), 
getTransform(spec)));
+          Pipeline.applyTransform(
+              name, createInput(p, inputs), getTransform(spec, 
p.getOptions())));
     }
 
     default String getTransformUniqueID(RunnerApi.FunctionSpec spec) {
diff --git 
a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProvider.java
 
b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProvider.java
index ead1fa67dc9..0bd85fb79e6 100644
--- 
a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProvider.java
+++ 
b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProvider.java
@@ -29,6 +29,7 @@ import 
org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.RowCoder;
 import 
org.apache.beam.sdk.expansion.service.ExpansionService.TransformProvider;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.SchemaTranslation;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -91,7 +92,7 @@ public class ExpansionServiceSchemaTransformProvider
   }
 
   @Override
-  public PTransform getTransform(FunctionSpec spec) {
+  public PTransform getTransform(FunctionSpec spec, PipelineOptions options) {
     SchemaTransformPayload payload;
     try {
       payload = SchemaTransformPayload.parseFrom(spec.getPayload());
diff --git 
a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/JavaClassLookupTransformProvider.java
 
b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/JavaClassLookupTransformProvider.java
index 9f982f0cd01..96697c070be 100644
--- 
a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/JavaClassLookupTransformProvider.java
+++ 
b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/JavaClassLookupTransformProvider.java
@@ -46,6 +46,7 @@ import org.apache.beam.model.pipeline.v1.SchemaApi;
 import org.apache.beam.repackaged.core.org.apache.commons.lang3.ClassUtils;
 import org.apache.beam.sdk.coders.RowCoder;
 import 
org.apache.beam.sdk.expansion.service.ExpansionService.TransformProvider;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.JavaFieldSchema;
 import org.apache.beam.sdk.schemas.NoSuchSchemaException;
 import org.apache.beam.sdk.schemas.Schema;
@@ -90,7 +91,7 @@ class JavaClassLookupTransformProvider<InputT extends PInput, 
OutputT extends PO
 
   @SuppressWarnings("argument")
   @Override
-  public PTransform<PInput, POutput> getTransform(FunctionSpec spec) {
+  public PTransform<PInput, POutput> getTransform(FunctionSpec spec, 
PipelineOptions options) {
     JavaClassLookupPayload payload;
     try {
       payload = JavaClassLookupPayload.parseFrom(spec.getPayload());
diff --git 
a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProviderTest.java
 
b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProviderTest.java
index 696fed0f8ff..3e6451b131d 100644
--- 
a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProviderTest.java
+++ 
b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProviderTest.java
@@ -34,6 +34,7 @@ import 
org.apache.beam.runners.core.construction.ParDoTranslation;
 import org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.schemas.JavaFieldSchema;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.Field;
@@ -441,10 +442,13 @@ public class ExpansionServiceSchemaTransformProviderTest {
     assertNotEquals(spec.getPayload(), equivalentSpec.getPayload());
 
     TestSchemaTransform transform =
-        (TestSchemaTransform) 
ExpansionServiceSchemaTransformProvider.of().getTransform(spec);
+        (TestSchemaTransform)
+            ExpansionServiceSchemaTransformProvider.of()
+                .getTransform(spec, PipelineOptionsFactory.create());
     TestSchemaTransform equivalentTransform =
         (TestSchemaTransform)
-            
ExpansionServiceSchemaTransformProvider.of().getTransform(equivalentSpec);
+            ExpansionServiceSchemaTransformProvider.of()
+                .getTransform(equivalentSpec, PipelineOptionsFactory.create());
 
     assertEquals(transform.int1, equivalentTransform.int1);
     assertEquals(transform.int2, equivalentTransform.int2);
diff --git 
a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java
 
b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java
index 618fa333309..b79d91bf628 100644
--- 
a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java
+++ 
b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java
@@ -94,7 +94,7 @@ public class ExpansionServiceTest {
 
     @Override
     public Map<String, ExpansionService.TransformProvider> knownTransforms() {
-      return ImmutableMap.of(TEST_URN, spec -> Count.perElement());
+      return ImmutableMap.of(TEST_URN, (spec, options) -> Count.perElement());
     }
   }
 
diff --git 
a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExternalTest.java
 
b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExternalTest.java
index 4b949a597f0..d2363559473 100644
--- 
a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExternalTest.java
+++ 
b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExternalTest.java
@@ -167,23 +167,23 @@ public class ExternalTest implements Serializable {
     public Map<String, ExpansionService.TransformProvider> knownTransforms() {
       return ImmutableMap.of(
           TEST_URN_SIMPLE,
-              spec -> MapElements.into(TypeDescriptors.strings()).via((String 
x) -> x + x),
+          (spec, options) -> 
MapElements.into(TypeDescriptors.strings()).via((String x) -> x + x),
           TEST_URN_LE,
-              spec -> 
Filter.lessThanEq(Integer.parseInt(spec.getPayload().toStringUtf8())),
+          (spec, options) -> 
Filter.lessThanEq(Integer.parseInt(spec.getPayload().toStringUtf8())),
           TEST_URN_MULTI,
-              spec ->
-                  ParDo.of(
-                          new DoFn<Integer, Integer>() {
-                            @ProcessElement
-                            public void processElement(ProcessContext c) {
-                              if (c.element() % 2 == 0) {
-                                c.output(c.element());
-                              } else {
-                                c.output(odd, c.element());
-                              }
-                            }
-                          })
-                      .withOutputTags(even, TupleTagList.of(odd)));
+          (spec, options) ->
+              ParDo.of(
+                      new DoFn<Integer, Integer>() {
+                        @ProcessElement
+                        public void processElement(ProcessContext c) {
+                          if (c.element() % 2 == 0) {
+                            c.output(c.element());
+                          } else {
+                            c.output(odd, c.element());
+                          }
+                        }
+                      })
+                  .withOutputTags(even, TupleTagList.of(odd)));
     }
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
index a3a270a315b..b2d533f69fb 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
@@ -50,6 +50,8 @@ import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.SchemaUpdateOption;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import 
org.apache.beam.sdk.io.gcp.bigquery.RowWriterFactory.AvroRowWriterFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.schemas.Schema;
@@ -188,7 +190,7 @@ public class BigQueryIOTranslation {
     }
 
     @Override
-    public TypedRead<?> fromConfigRow(Row configRow) {
+    public TypedRead<?> fromConfigRow(Row configRow, PipelineOptions options) {
       try {
         BigQueryIO.TypedRead.Builder builder = new 
AutoValue_BigQueryIO_TypedRead.Builder<>();
 
@@ -552,7 +554,7 @@ public class BigQueryIOTranslation {
     }
 
     @Override
-    public Write<?> fromConfigRow(Row configRow) {
+    public Write<?> fromConfigRow(Row configRow, PipelineOptions options) {
       try {
         BigQueryIO.Write.Builder builder = new 
AutoValue_BigQueryIO_Write.Builder<>();
 
@@ -695,7 +697,25 @@ public class BigQueryIOTranslation {
         if (maxBytesPerPartition != null) {
           builder = builder.setMaxBytesPerPartition(maxBytesPerPartition);
         }
-        Duration triggeringFrequency = 
configRow.getValue("triggering_frequency");
+
+        String updateCompatibilityBeamVersion =
+            options.as(StreamingOptions.class).getUpdateCompatibilityVersion();
+
+        // We need to update the 'triggerring_frequency' field name for 
pipelines that are upgraded
+        // from Beam 2.53.0 due to https://github.com/apache/beam/pull/29785.
+        // We need to set a default 'updateCompatibilityBeamVersion' here 
since this PipelineOption
+        // is not correctly passed in for pipelines that use Beam 2.53.0.
+        // Both above issues are fixed for Beam 2.54.0 and later.
+        updateCompatibilityBeamVersion =
+            (updateCompatibilityBeamVersion != null) ? 
updateCompatibilityBeamVersion : "2.53.0";
+
+        String triggeringFrequencyFieldName =
+            (updateCompatibilityBeamVersion != null
+                    && updateCompatibilityBeamVersion.equals("2.53.0"))
+                ? "triggerring_frequency"
+                : "triggering_frequency";
+
+        Duration triggeringFrequency = 
configRow.getValue(triggeringFrequencyFieldName);
         if (triggeringFrequency != null) {
           builder =
               builder.setTriggeringFrequency(
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
index c46d382bb29..668f4eef4d8 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
@@ -31,6 +31,9 @@ import java.util.stream.Collectors;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.Row;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
@@ -142,7 +145,8 @@ public class BigQueryIOTranslationTest {
     Row row = translator.toConfigRow(readTransform);
 
     BigQueryIO.TypedRead<TableRow> readTransformFromRow =
-        (BigQueryIO.TypedRead<TableRow>) translator.fromConfigRow(row);
+        (BigQueryIO.TypedRead<TableRow>)
+            translator.fromConfigRow(row, PipelineOptionsFactory.create());
     assertNotNull(readTransformFromRow.getTable());
     assertEquals("dummyproject", 
readTransformFromRow.getTable().getProjectId());
     assertEquals("dummydataset", 
readTransformFromRow.getTable().getDatasetId());
@@ -172,7 +176,8 @@ public class BigQueryIOTranslationTest {
         new BigQueryIOTranslation.BigQueryIOReadTranslator();
     Row row = translator.toConfigRow(readTransform);
 
-    BigQueryIO.TypedRead<?> readTransformFromRow = 
translator.fromConfigRow(row);
+    BigQueryIO.TypedRead<?> readTransformFromRow =
+        translator.fromConfigRow(row, PipelineOptionsFactory.create());
     assertEquals("dummyquery", readTransformFromRow.getQuery().get());
     assertNotNull(readTransformFromRow.getParseFn());
     assertTrue(readTransformFromRow.getParseFn() instanceof DummyParseFn);
@@ -241,7 +246,10 @@ public class BigQueryIOTranslationTest {
         new BigQueryIOTranslation.BigQueryIOWriteTranslator();
     Row row = translator.toConfigRow(writeTransform);
 
-    BigQueryIO.Write<?> writeTransformFromRow = (BigQueryIO.Write<?>) 
translator.fromConfigRow(row);
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.54.0");
+    BigQueryIO.Write<?> writeTransformFromRow =
+        (BigQueryIO.Write<?>) translator.fromConfigRow(row, options);
     assertNotNull(writeTransformFromRow.getTable());
     assertEquals("dummyproject", 
writeTransformFromRow.getTable().get().getProjectId());
     assertEquals("dummydataset", 
writeTransformFromRow.getTable().get().getDatasetId());
diff --git 
a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
 
b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
index eedd2282b1f..a76507a285b 100644
--- 
a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
+++ 
b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.io.kafka.KafkaIO.Write;
 import org.apache.beam.sdk.io.kafka.KafkaIO.WriteRecords;
 import org.apache.beam.sdk.io.kafka.KafkaIOUtils;
 import org.apache.beam.sdk.io.kafka.TimestampPolicyFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
@@ -215,7 +216,7 @@ public class KafkaIOTranslation {
     }
 
     @Override
-    public Read<?, ?> fromConfigRow(Row configRow) {
+    public Read<?, ?> fromConfigRow(Row configRow, PipelineOptions options) {
       try {
         Read<?, ?> transform = KafkaIO.read();
 
@@ -511,7 +512,7 @@ public class KafkaIOTranslation {
     }
 
     @Override
-    public Write<?, ?> fromConfigRow(Row configRow) {
+    public Write<?, ?> fromConfigRow(Row configRow, PipelineOptions options) {
       try {
         Write<?, ?> transform = KafkaIO.write();
 
diff --git 
a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
 
b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
index be54d7830d5..a94491d8513 100644
--- 
a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
+++ 
b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.io.kafka.KafkaIO.Write;
 import org.apache.beam.sdk.io.kafka.KafkaIO.WriteRecords;
 import 
org.apache.beam.sdk.io.kafka.upgrade.KafkaIOTranslation.KafkaIOReadWithMetadataTranslator;
 import 
org.apache.beam.sdk.io.kafka.upgrade.KafkaIOTranslation.KafkaIOWriteTranslator;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.values.Row;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -111,7 +112,7 @@ public class KafkaIOTranslationTest {
     Row row = translator.toConfigRow(readTransform);
 
     Read<String, Integer> readTransformFromRow =
-        (Read<String, Integer>) translator.fromConfigRow(row);
+        (Read<String, Integer>) translator.fromConfigRow(row, 
PipelineOptionsFactory.create());
     assertNotNull(
         
readTransformFromRow.getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
     assertEquals(
@@ -178,7 +179,7 @@ public class KafkaIOTranslationTest {
     Row row = translator.toConfigRow(writeTransform);
 
     Write<String, Integer> writeTransformFromRow =
-        (Write<String, Integer>) translator.fromConfigRow(row);
+        (Write<String, Integer>) translator.fromConfigRow(row, 
PipelineOptionsFactory.create());
     WriteRecords<String, Integer> writeRecordsTransform =
         writeTransformFromRow.getWriteRecordsTransform();
     assertNotNull(
diff --git 
a/sdks/java/testing/expansion-service/src/test/java/org/apache/beam/sdk/testing/expansion/TestExpansionService.java
 
b/sdks/java/testing/expansion-service/src/test/java/org/apache/beam/sdk/testing/expansion/TestExpansionService.java
index b45c922eae7..9becaf980b8 100644
--- 
a/sdks/java/testing/expansion-service/src/test/java/org/apache/beam/sdk/testing/expansion/TestExpansionService.java
+++ 
b/sdks/java/testing/expansion-service/src/test/java/org/apache/beam/sdk/testing/expansion/TestExpansionService.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
 import org.apache.beam.sdk.io.FileIO;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
@@ -137,7 +138,7 @@ public class TestExpansionService {
 
       @Override
       public PTransform<KeyedPCollectionTuple<Long>, PCollection<KV<Long, 
Iterable<String>>>>
-          getTransform(RunnerApi.FunctionSpec spec) {
+          getTransform(RunnerApi.FunctionSpec spec, PipelineOptions options) {
         return new TestCoGroupByKeyTransform();
       }
     }
@@ -155,7 +156,7 @@ public class TestExpansionService {
 
       @Override
       public PTransform<PCollectionList<Long>, PCollection<Long>> getTransform(
-          RunnerApi.FunctionSpec spec) {
+          RunnerApi.FunctionSpec spec, PipelineOptions options) {
         return Flatten.pCollections();
       }
     }

Reply via email to