ahmedabu98 commented on code in PR #31451:
URL: https://github.com/apache/beam/pull/31451#discussion_r1621280060


##########
sdks/java/io/iceberg/build.gradle:
##########
@@ -57,8 +57,8 @@ dependencies {
 
     testImplementation library.java.hadoop_client
     testImplementation library.java.bigdataoss_gcsio
-    testImplementation 
"com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.16"
-    testImplementation "com.google.cloud.bigdataoss:util-hadoop:hadoop2-2.2.16"
+    testImplementation library.java.bigdataoss_gcs_connector
+    testImplementation library.java.bigdataoss_util_hadoop

Review Comment:
   Can we trigger Iceberg intergation tests? (create a 
`IO_Iceberg_Integration_Tests.json` file in `.github/trigger_files/`)
   
   When I was writing those tests, I remember the GCS write didn't work with 
the standard bigdataoss version we have in BeamModulePlugin (`2.2.16`). It did 
work with `hadoop2-2.2.16` though



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java:
##########
@@ -184,26 +203,33 @@ RunnerApi.Pipeline updateTransformViaTransformService(
     if (transformToUpgrade == null) {
       throw new IllegalArgumentException("Could not find a transform with the 
ID " + transformId);
     }
-    ByteString configRowBytes =
-        
transformToUpgrade.getAnnotationsOrThrow(PTransformTranslation.CONFIG_ROW_KEY);
-    ByteString configRowSchemaBytes =
-        
transformToUpgrade.getAnnotationsOrThrow(PTransformTranslation.CONFIG_ROW_SCHEMA_KEY);
-    SchemaApi.Schema configRowSchemaProto =
-        SchemaApi.Schema.parseFrom(configRowSchemaBytes.toByteArray());
-
-    ExternalTransforms.ExternalConfigurationPayload payload =
-        ExternalTransforms.ExternalConfigurationPayload.newBuilder()
-            .setSchema(configRowSchemaProto)
-            .setPayload(configRowBytes)
-            .build();
+
+    byte[] payloadBytes = null;
+
+    if 
(!transformToUpgrade.getSpec().getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM)))
 {
+      ByteString configRowBytes =
+          
transformToUpgrade.getAnnotationsOrThrow(PTransformTranslation.CONFIG_ROW_KEY);
+      ByteString configRowSchemaBytes =
+          
transformToUpgrade.getAnnotationsOrThrow(PTransformTranslation.CONFIG_ROW_SCHEMA_KEY);
+      SchemaApi.Schema configRowSchemaProto =
+          SchemaApi.Schema.parseFrom(configRowSchemaBytes.toByteArray());
+      payloadBytes =
+          ExternalTransforms.ExternalConfigurationPayload.newBuilder()
+              .setSchema(configRowSchemaProto)
+              .setPayload(configRowBytes)
+              .build()
+              .toByteArray();
+    } else {
+      payloadBytes = transformToUpgrade.getSpec().getPayload().toByteArray();
+    }
 
     RunnerApi.PTransform.Builder ptransformBuilder =
         RunnerApi.PTransform.newBuilder()
             .setUniqueName(transformToUpgrade.getUniqueName() + "_external")
             .setSpec(
                 RunnerApi.FunctionSpec.newBuilder()
                     .setUrn(transformToUpgrade.getSpec().getUrn())
-                    .setPayload(ByteString.copyFrom(payload.toByteArray()))
+                    .setPayload(ByteString.copyFrom(payloadBytes))

Review Comment:
   This payload could be either `SchemaTransformPayload` or 
`ExternalConfigurationPayload`. Does the consumer of this transform need to 
know which one it is?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to