ahmedabu98 commented on code in PR #31451: URL: https://github.com/apache/beam/pull/31451#discussion_r1621280060
########## sdks/java/io/iceberg/build.gradle: ########## @@ -57,8 +57,8 @@ dependencies { testImplementation library.java.hadoop_client testImplementation library.java.bigdataoss_gcsio - testImplementation "com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.16" - testImplementation "com.google.cloud.bigdataoss:util-hadoop:hadoop2-2.2.16" + testImplementation library.java.bigdataoss_gcs_connector + testImplementation library.java.bigdataoss_util_hadoop Review Comment: Can we trigger Iceberg intergation tests? (create a `IO_Iceberg_Integration_Tests.json` file in `.github/trigger_files/`) When I was writing those tests, I remember the GCS write didn't work with the standard bigdataoss version we have in BeamModulePlugin (`2.2.16`). It did work with `hadoop2-2.2.16` though ########## sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java: ########## @@ -184,26 +203,33 @@ RunnerApi.Pipeline updateTransformViaTransformService( if (transformToUpgrade == null) { throw new IllegalArgumentException("Could not find a transform with the ID " + transformId); } - ByteString configRowBytes = - transformToUpgrade.getAnnotationsOrThrow(PTransformTranslation.CONFIG_ROW_KEY); - ByteString configRowSchemaBytes = - transformToUpgrade.getAnnotationsOrThrow(PTransformTranslation.CONFIG_ROW_SCHEMA_KEY); - SchemaApi.Schema configRowSchemaProto = - SchemaApi.Schema.parseFrom(configRowSchemaBytes.toByteArray()); - - ExternalTransforms.ExternalConfigurationPayload payload = - ExternalTransforms.ExternalConfigurationPayload.newBuilder() - .setSchema(configRowSchemaProto) - .setPayload(configRowBytes) - .build(); + + byte[] payloadBytes = null; + + if (!transformToUpgrade.getSpec().getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))) { + ByteString configRowBytes = + transformToUpgrade.getAnnotationsOrThrow(PTransformTranslation.CONFIG_ROW_KEY); + ByteString configRowSchemaBytes = + transformToUpgrade.getAnnotationsOrThrow(PTransformTranslation.CONFIG_ROW_SCHEMA_KEY); + SchemaApi.Schema configRowSchemaProto = + SchemaApi.Schema.parseFrom(configRowSchemaBytes.toByteArray()); + payloadBytes = + ExternalTransforms.ExternalConfigurationPayload.newBuilder() + .setSchema(configRowSchemaProto) + .setPayload(configRowBytes) + .build() + .toByteArray(); + } else { + payloadBytes = transformToUpgrade.getSpec().getPayload().toByteArray(); + } RunnerApi.PTransform.Builder ptransformBuilder = RunnerApi.PTransform.newBuilder() .setUniqueName(transformToUpgrade.getUniqueName() + "_external") .setSpec( RunnerApi.FunctionSpec.newBuilder() .setUrn(transformToUpgrade.getSpec().getUrn()) - .setPayload(ByteString.copyFrom(payload.toByteArray())) + .setPayload(ByteString.copyFrom(payloadBytes)) Review Comment: This payload could be either `SchemaTransformPayload` or `ExternalConfigurationPayload`. Does the consumer of this transform need to know which one it is? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org