This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 93339bcf061 Fixes an upgrade compatibility breakage for the BQ write
transform (#30032)
93339bcf061 is described below
commit 93339bcf0612681cb5d421e90e0e97f6d5a42ce4
Author: Chamikara Jayalath <[email protected]>
AuthorDate: Tue Jan 23 10:28:46 2024 -0800
Fixes an upgrade compatibility breakage for the BQ write transform (#30032)
* Fixes an upgrade compatibility breakage for the BQ write transform
* Addressing reviewer comments
* Resolve a conflcit
---
.../core/construction/PTransformTranslation.java | 3 +-
.../core/construction/PipelineTranslation.java | 7 ++---
.../core/construction/TransformUpgrader.java | 35 +++++++++++++++++-----
.../core/construction/TransformUpgraderTest.java | 3 +-
.../sdk/expansion/service/ExpansionService.java | 12 ++++----
.../ExpansionServiceSchemaTransformProvider.java | 3 +-
.../service/JavaClassLookupTransformProvider.java | 3 +-
...xpansionServiceSchemaTransformProviderTest.java | 8 +++--
.../expansion/service/ExpansionServiceTest.java | 2 +-
.../beam/sdk/expansion/service/ExternalTest.java | 30 +++++++++----------
.../sdk/io/gcp/bigquery/BigQueryIOTranslation.java | 26 ++++++++++++++--
.../io/gcp/bigquery/BigQueryIOTranslationTest.java | 14 +++++++--
.../sdk/io/kafka/upgrade/KafkaIOTranslation.java | 5 ++--
.../io/kafka/upgrade/KafkaIOTranslationTest.java | 5 ++--
.../testing/expansion/TestExpansionService.java | 5 ++--
15 files changed, 111 insertions(+), 50 deletions(-)
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index 6829e0d6b23..6bdd0fc3739 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -40,6 +40,7 @@ import
org.apache.beam.runners.core.construction.ParDoTranslation.ParDoTranslato
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.transforms.PTransform;
@@ -609,7 +610,7 @@ public class PTransformTranslation {
* {@link #toConfigRow(PTransform)} method.
* @return a transform represented by the current {@code
TransformPayloadTranslator}.
*/
- default T fromConfigRow(Row configRow) {
+ default T fromConfigRow(Row configRow, PipelineOptions options) {
throw new UnsupportedOperationException("Not implemented");
}
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
index 4433b4b0475..688d7a80864 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
@@ -111,14 +111,13 @@ public class PipelineTranslation {
res = elideDeprecatedViews(res);
}
- ExternalTranslationOptions externalTranslationOptions =
- pipeline.getOptions().as(ExternalTranslationOptions.class);
- List<String> urnsToOverride =
externalTranslationOptions.getTransformsToOverride();
+ List<String> urnsToOverride =
+
pipeline.getOptions().as(ExternalTranslationOptions.class).getTransformsToOverride();
if (urnsToOverride.size() > 0 && upgradeTransforms) {
try (TransformUpgrader upgrader = TransformUpgrader.of()) {
res =
upgrader.upgradeTransformsViaTransformService(
- res, urnsToOverride, externalTranslationOptions);
+ res, urnsToOverride, pipeline.getOptions());
} catch (Exception e) {
throw new RuntimeException(
"Could not override the transforms with URNs " + urnsToOverride,
e);
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java
index 4f1a02165d2..f07df605215 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java
@@ -41,8 +41,11 @@ import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.SchemaApi;
import
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transformservice.launcher.TransformServiceLauncher;
+import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
@@ -97,7 +100,7 @@ public class TransformUpgrader implements AutoCloseable {
* @throws Exception
*/
public RunnerApi.Pipeline upgradeTransformsViaTransformService(
- RunnerApi.Pipeline pipeline, List<String> urnsToOverride,
ExternalTranslationOptions options)
+ RunnerApi.Pipeline pipeline, List<String> urnsToOverride,
PipelineOptions options)
throws IOException, TimeoutException {
List<String> transformsToOverride =
pipeline.getComponents().getTransformsMap().entrySet().stream()
@@ -127,13 +130,15 @@ public class TransformUpgrader implements AutoCloseable {
String serviceAddress;
TransformServiceLauncher service = null;
- if (options.getTransformServiceAddress() != null) {
- serviceAddress = options.getTransformServiceAddress();
- } else if (options.getTransformServiceBeamVersion() != null) {
+ ExternalTranslationOptions externalTranslationOptions =
+ options.as(ExternalTranslationOptions.class);
+ if (externalTranslationOptions.getTransformServiceAddress() != null) {
+ serviceAddress = externalTranslationOptions.getTransformServiceAddress();
+ } else if (externalTranslationOptions.getTransformServiceBeamVersion() !=
null) {
String projectName = UUID.randomUUID().toString();
int port = findAvailablePort();
service = TransformServiceLauncher.forProject(projectName, port, null);
- service.setBeamVersion(options.getTransformServiceBeamVersion());
+
service.setBeamVersion(externalTranslationOptions.getTransformServiceBeamVersion());
// Starting the transform service.
service.start();
@@ -169,7 +174,7 @@ public class TransformUpgrader implements AutoCloseable {
RunnerApi.Pipeline runnerAPIpipeline,
String transformId,
Endpoints.ApiServiceDescriptor transformServiceEndpoint,
- ExternalTranslationOptions options)
+ PipelineOptions options)
throws IOException {
RunnerApi.PTransform transformToUpgrade =
runnerAPIpipeline.getComponents().getTransformsMap().get(transformId);
@@ -207,11 +212,26 @@ public class TransformUpgrader implements AutoCloseable {
ExpansionApi.ExpansionRequest.Builder requestBuilder =
ExpansionApi.ExpansionRequest.newBuilder();
+
+ // Creating a clone here so that we can set properties without modifying
the original
+ // PipelineOptions object.
+ PipelineOptions optionsClone =
+
PipelineOptionsTranslation.fromProto(PipelineOptionsTranslation.toProto(options));
+ String updateCompatibilityVersion =
+
optionsClone.as(StreamingOptions.class).getUpdateCompatibilityVersion();
+ if (updateCompatibilityVersion == null ||
updateCompatibilityVersion.isEmpty()) {
+ // Setting the option 'updateCompatibilityVersion' to the current SDK
version so that the
+ // TransformService uses a compatible schema.
+ optionsClone
+ .as(StreamingOptions.class)
+
.setUpdateCompatibilityVersion(ReleaseInfo.getReleaseInfo().getSdkVersion());
+ }
ExpansionApi.ExpansionRequest request =
requestBuilder
.setComponents(runnerAPIpipeline.getComponents())
.setTransform(ptransformBuilder.build())
.setNamespace(UPGRADE_NAMESPACE)
+
.setPipelineOptions(PipelineOptionsTranslation.toProto(optionsClone))
.addAllRequirements(runnerAPIpipeline.getRequirementsList())
.build();
@@ -242,7 +262,8 @@ public class TransformUpgrader implements AutoCloseable {
// Adds an annotation that denotes the Beam version the transform was
upgraded to.
RunnerApi.PTransform.Builder expandedTransformBuilder =
expandedTransform.toBuilder();
- String transformServiceVersion = options.getTransformServiceBeamVersion();
+ String transformServiceVersion =
+
options.as(ExternalTranslationOptions.class).getTransformServiceBeamVersion();
if (transformServiceVersion == null || transformServiceVersion.isEmpty()) {
transformServiceVersion = "unknown";
}
diff --git
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformUpgraderTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformUpgraderTest.java
index e14fa556dd9..2b01bf70246 100644
---
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformUpgraderTest.java
+++
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformUpgraderTest.java
@@ -35,6 +35,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.schemas.Schema;
@@ -93,7 +94,7 @@ public class TransformUpgraderTest {
}
@Override
- public TestTransform fromConfigRow(Row configRow) {
+ public TestTransform fromConfigRow(Row configRow, PipelineOptions options)
{
return new TestTransform(configRow.getInt32("multiplier"));
}
diff --git
a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
index 7760cab64ac..5d46100fe65 100644
---
a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
+++
b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
@@ -152,7 +152,8 @@ public class ExpansionService extends
ExpansionServiceGrpc.ExpansionServiceImplB
TransformProvider transformProvider =
new TransformProvider() {
@Override
- public PTransform getTransform(RunnerApi.FunctionSpec spec) {
+ public PTransform getTransform(
+ RunnerApi.FunctionSpec spec, PipelineOptions options) {
try {
Class configClass = getConfigClass(builderInstance);
return builderInstance.buildExternal(
@@ -222,14 +223,14 @@ public class ExpansionService extends
ExpansionServiceGrpc.ExpansionServiceImplB
}
final String finalUrn = urn;
TransformProvider transformProvider =
- spec -> {
+ (spec, options) -> {
try {
ExternalConfigurationPayload payload =
ExternalConfigurationPayload.parseFrom(spec.getPayload());
Row configRow =
RowCoder.of(SchemaTranslation.schemaFromProto(payload.getSchema()))
.decode(new
ByteArrayInputStream(payload.getPayload().toByteArray()));
- PTransform transformFromRow =
translator.fromConfigRow(configRow);
+ PTransform transformFromRow =
translator.fromConfigRow(configRow, options);
if (transformFromRow != null) {
return transformFromRow;
} else {
@@ -441,7 +442,7 @@ public class ExpansionService extends
ExpansionServiceGrpc.ExpansionServiceImplB
}
}
- PTransform<InputT, OutputT> getTransform(RunnerApi.FunctionSpec spec);
+ PTransform<InputT, OutputT> getTransform(RunnerApi.FunctionSpec spec,
PipelineOptions options);
default Map<String, PCollection<?>> extractOutputs(OutputT output) {
if (output instanceof PDone) {
@@ -485,7 +486,8 @@ public class ExpansionService extends
ExpansionServiceGrpc.ExpansionServiceImplB
default Map<String, PCollection<?>> apply(
Pipeline p, String name, RunnerApi.FunctionSpec spec, Map<String,
PCollection<?>> inputs) {
return extractOutputs(
- Pipeline.applyTransform(name, createInput(p, inputs),
getTransform(spec)));
+ Pipeline.applyTransform(
+ name, createInput(p, inputs), getTransform(spec,
p.getOptions())));
}
default String getTransformUniqueID(RunnerApi.FunctionSpec spec) {
diff --git
a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProvider.java
b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProvider.java
index ead1fa67dc9..0bd85fb79e6 100644
---
a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProvider.java
+++
b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProvider.java
@@ -29,6 +29,7 @@ import
org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.RowCoder;
import
org.apache.beam.sdk.expansion.service.ExpansionService.TransformProvider;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.transforms.PTransform;
@@ -91,7 +92,7 @@ public class ExpansionServiceSchemaTransformProvider
}
@Override
- public PTransform getTransform(FunctionSpec spec) {
+ public PTransform getTransform(FunctionSpec spec, PipelineOptions options) {
SchemaTransformPayload payload;
try {
payload = SchemaTransformPayload.parseFrom(spec.getPayload());
diff --git
a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/JavaClassLookupTransformProvider.java
b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/JavaClassLookupTransformProvider.java
index 9f982f0cd01..96697c070be 100644
---
a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/JavaClassLookupTransformProvider.java
+++
b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/JavaClassLookupTransformProvider.java
@@ -46,6 +46,7 @@ import org.apache.beam.model.pipeline.v1.SchemaApi;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.ClassUtils;
import org.apache.beam.sdk.coders.RowCoder;
import
org.apache.beam.sdk.expansion.service.ExpansionService.TransformProvider;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
@@ -90,7 +91,7 @@ class JavaClassLookupTransformProvider<InputT extends PInput,
OutputT extends PO
@SuppressWarnings("argument")
@Override
- public PTransform<PInput, POutput> getTransform(FunctionSpec spec) {
+ public PTransform<PInput, POutput> getTransform(FunctionSpec spec,
PipelineOptions options) {
JavaClassLookupPayload payload;
try {
payload = JavaClassLookupPayload.parseFrom(spec.getPayload());
diff --git
a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProviderTest.java
b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProviderTest.java
index 696fed0f8ff..3e6451b131d 100644
---
a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProviderTest.java
+++
b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProviderTest.java
@@ -34,6 +34,7 @@ import
org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
@@ -441,10 +442,13 @@ public class ExpansionServiceSchemaTransformProviderTest {
assertNotEquals(spec.getPayload(), equivalentSpec.getPayload());
TestSchemaTransform transform =
- (TestSchemaTransform)
ExpansionServiceSchemaTransformProvider.of().getTransform(spec);
+ (TestSchemaTransform)
+ ExpansionServiceSchemaTransformProvider.of()
+ .getTransform(spec, PipelineOptionsFactory.create());
TestSchemaTransform equivalentTransform =
(TestSchemaTransform)
-
ExpansionServiceSchemaTransformProvider.of().getTransform(equivalentSpec);
+ ExpansionServiceSchemaTransformProvider.of()
+ .getTransform(equivalentSpec, PipelineOptionsFactory.create());
assertEquals(transform.int1, equivalentTransform.int1);
assertEquals(transform.int2, equivalentTransform.int2);
diff --git
a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java
b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java
index 618fa333309..b79d91bf628 100644
---
a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java
+++
b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java
@@ -94,7 +94,7 @@ public class ExpansionServiceTest {
@Override
public Map<String, ExpansionService.TransformProvider> knownTransforms() {
- return ImmutableMap.of(TEST_URN, spec -> Count.perElement());
+ return ImmutableMap.of(TEST_URN, (spec, options) -> Count.perElement());
}
}
diff --git
a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExternalTest.java
b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExternalTest.java
index 4b949a597f0..d2363559473 100644
---
a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExternalTest.java
+++
b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExternalTest.java
@@ -167,23 +167,23 @@ public class ExternalTest implements Serializable {
public Map<String, ExpansionService.TransformProvider> knownTransforms() {
return ImmutableMap.of(
TEST_URN_SIMPLE,
- spec -> MapElements.into(TypeDescriptors.strings()).via((String
x) -> x + x),
+ (spec, options) ->
MapElements.into(TypeDescriptors.strings()).via((String x) -> x + x),
TEST_URN_LE,
- spec ->
Filter.lessThanEq(Integer.parseInt(spec.getPayload().toStringUtf8())),
+ (spec, options) ->
Filter.lessThanEq(Integer.parseInt(spec.getPayload().toStringUtf8())),
TEST_URN_MULTI,
- spec ->
- ParDo.of(
- new DoFn<Integer, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- if (c.element() % 2 == 0) {
- c.output(c.element());
- } else {
- c.output(odd, c.element());
- }
- }
- })
- .withOutputTags(even, TupleTagList.of(odd)));
+ (spec, options) ->
+ ParDo.of(
+ new DoFn<Integer, Integer>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ if (c.element() % 2 == 0) {
+ c.output(c.element());
+ } else {
+ c.output(odd, c.element());
+ }
+ }
+ })
+ .withOutputTags(even, TupleTagList.of(odd)));
}
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
index a3a270a315b..b2d533f69fb 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
@@ -50,6 +50,8 @@ import
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.SchemaUpdateOption;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import
org.apache.beam.sdk.io.gcp.bigquery.RowWriterFactory.AvroRowWriterFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.schemas.Schema;
@@ -188,7 +190,7 @@ public class BigQueryIOTranslation {
}
@Override
- public TypedRead<?> fromConfigRow(Row configRow) {
+ public TypedRead<?> fromConfigRow(Row configRow, PipelineOptions options) {
try {
BigQueryIO.TypedRead.Builder builder = new
AutoValue_BigQueryIO_TypedRead.Builder<>();
@@ -552,7 +554,7 @@ public class BigQueryIOTranslation {
}
@Override
- public Write<?> fromConfigRow(Row configRow) {
+ public Write<?> fromConfigRow(Row configRow, PipelineOptions options) {
try {
BigQueryIO.Write.Builder builder = new
AutoValue_BigQueryIO_Write.Builder<>();
@@ -695,7 +697,25 @@ public class BigQueryIOTranslation {
if (maxBytesPerPartition != null) {
builder = builder.setMaxBytesPerPartition(maxBytesPerPartition);
}
- Duration triggeringFrequency =
configRow.getValue("triggering_frequency");
+
+ String updateCompatibilityBeamVersion =
+ options.as(StreamingOptions.class).getUpdateCompatibilityVersion();
+
+ // We need to update the 'triggerring_frequency' field name for
pipelines that are upgraded
+ // from Beam 2.53.0 due to https://github.com/apache/beam/pull/29785.
+ // We need to set a default 'updateCompatibilityBeamVersion' here
since this PipelineOption
+ // is not correctly passed in for pipelines that use Beam 2.53.0.
+ // Both above issues are fixed for Beam 2.54.0 and later.
+ updateCompatibilityBeamVersion =
+ (updateCompatibilityBeamVersion != null) ?
updateCompatibilityBeamVersion : "2.53.0";
+
+ String triggeringFrequencyFieldName =
+ (updateCompatibilityBeamVersion != null
+ && updateCompatibilityBeamVersion.equals("2.53.0"))
+ ? "triggerring_frequency"
+ : "triggering_frequency";
+
+ Duration triggeringFrequency =
configRow.getValue(triggeringFrequencyFieldName);
if (triggeringFrequency != null) {
builder =
builder.setTriggeringFrequency(
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
index c46d382bb29..668f4eef4d8 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
@@ -31,6 +31,9 @@ import java.util.stream.Collectors;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
@@ -142,7 +145,8 @@ public class BigQueryIOTranslationTest {
Row row = translator.toConfigRow(readTransform);
BigQueryIO.TypedRead<TableRow> readTransformFromRow =
- (BigQueryIO.TypedRead<TableRow>) translator.fromConfigRow(row);
+ (BigQueryIO.TypedRead<TableRow>)
+ translator.fromConfigRow(row, PipelineOptionsFactory.create());
assertNotNull(readTransformFromRow.getTable());
assertEquals("dummyproject",
readTransformFromRow.getTable().getProjectId());
assertEquals("dummydataset",
readTransformFromRow.getTable().getDatasetId());
@@ -172,7 +176,8 @@ public class BigQueryIOTranslationTest {
new BigQueryIOTranslation.BigQueryIOReadTranslator();
Row row = translator.toConfigRow(readTransform);
- BigQueryIO.TypedRead<?> readTransformFromRow =
translator.fromConfigRow(row);
+ BigQueryIO.TypedRead<?> readTransformFromRow =
+ translator.fromConfigRow(row, PipelineOptionsFactory.create());
assertEquals("dummyquery", readTransformFromRow.getQuery().get());
assertNotNull(readTransformFromRow.getParseFn());
assertTrue(readTransformFromRow.getParseFn() instanceof DummyParseFn);
@@ -241,7 +246,10 @@ public class BigQueryIOTranslationTest {
new BigQueryIOTranslation.BigQueryIOWriteTranslator();
Row row = translator.toConfigRow(writeTransform);
- BigQueryIO.Write<?> writeTransformFromRow = (BigQueryIO.Write<?>)
translator.fromConfigRow(row);
+ PipelineOptions options = PipelineOptionsFactory.create();
+ options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.54.0");
+ BigQueryIO.Write<?> writeTransformFromRow =
+ (BigQueryIO.Write<?>) translator.fromConfigRow(row, options);
assertNotNull(writeTransformFromRow.getTable());
assertEquals("dummyproject",
writeTransformFromRow.getTable().get().getProjectId());
assertEquals("dummydataset",
writeTransformFromRow.getTable().get().getDatasetId());
diff --git
a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
index eedd2282b1f..a76507a285b 100644
---
a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
+++
b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.io.kafka.KafkaIO.Write;
import org.apache.beam.sdk.io.kafka.KafkaIO.WriteRecords;
import org.apache.beam.sdk.io.kafka.KafkaIOUtils;
import org.apache.beam.sdk.io.kafka.TimestampPolicyFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
@@ -215,7 +216,7 @@ public class KafkaIOTranslation {
}
@Override
- public Read<?, ?> fromConfigRow(Row configRow) {
+ public Read<?, ?> fromConfigRow(Row configRow, PipelineOptions options) {
try {
Read<?, ?> transform = KafkaIO.read();
@@ -511,7 +512,7 @@ public class KafkaIOTranslation {
}
@Override
- public Write<?, ?> fromConfigRow(Row configRow) {
+ public Write<?, ?> fromConfigRow(Row configRow, PipelineOptions options) {
try {
Write<?, ?> transform = KafkaIO.write();
diff --git
a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
index be54d7830d5..a94491d8513 100644
---
a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
+++
b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.io.kafka.KafkaIO.Write;
import org.apache.beam.sdk.io.kafka.KafkaIO.WriteRecords;
import
org.apache.beam.sdk.io.kafka.upgrade.KafkaIOTranslation.KafkaIOReadWithMetadataTranslator;
import
org.apache.beam.sdk.io.kafka.upgrade.KafkaIOTranslation.KafkaIOWriteTranslator;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.Row;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -111,7 +112,7 @@ public class KafkaIOTranslationTest {
Row row = translator.toConfigRow(readTransform);
Read<String, Integer> readTransformFromRow =
- (Read<String, Integer>) translator.fromConfigRow(row);
+ (Read<String, Integer>) translator.fromConfigRow(row,
PipelineOptionsFactory.create());
assertNotNull(
readTransformFromRow.getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
assertEquals(
@@ -178,7 +179,7 @@ public class KafkaIOTranslationTest {
Row row = translator.toConfigRow(writeTransform);
Write<String, Integer> writeTransformFromRow =
- (Write<String, Integer>) translator.fromConfigRow(row);
+ (Write<String, Integer>) translator.fromConfigRow(row,
PipelineOptionsFactory.create());
WriteRecords<String, Integer> writeRecordsTransform =
writeTransformFromRow.getWriteRecordsTransform();
assertNotNull(
diff --git
a/sdks/java/testing/expansion-service/src/test/java/org/apache/beam/sdk/testing/expansion/TestExpansionService.java
b/sdks/java/testing/expansion-service/src/test/java/org/apache/beam/sdk/testing/expansion/TestExpansionService.java
index b45c922eae7..9becaf980b8 100644
---
a/sdks/java/testing/expansion-service/src/test/java/org/apache/beam/sdk/testing/expansion/TestExpansionService.java
+++
b/sdks/java/testing/expansion-service/src/test/java/org/apache/beam/sdk/testing/expansion/TestExpansionService.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
@@ -137,7 +138,7 @@ public class TestExpansionService {
@Override
public PTransform<KeyedPCollectionTuple<Long>, PCollection<KV<Long,
Iterable<String>>>>
- getTransform(RunnerApi.FunctionSpec spec) {
+ getTransform(RunnerApi.FunctionSpec spec, PipelineOptions options) {
return new TestCoGroupByKeyTransform();
}
}
@@ -155,7 +156,7 @@ public class TestExpansionService {
@Override
public PTransform<PCollectionList<Long>, PCollection<Long>> getTransform(
- RunnerApi.FunctionSpec spec) {
+ RunnerApi.FunctionSpec spec, PipelineOptions options) {
return Flatten.pCollections();
}
}