robertwb commented on code in PR #28210:
URL: https://github.com/apache/beam/pull/28210#discussion_r1311827147


##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java:
##########
@@ -84,6 +91,16 @@ public FunctionSpec translate(
       }
     }
 
+    @Override
+    public @Nullable Row toConfigRow(PTransform<?, ?> pTransform) {

Review Comment:
   I wonder if these should be in an optional interface, or if default 
null-returning implementations should be provided. (We could argue that it 
should be strongly encouraged.)



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java:
##########
@@ -62,6 +64,11 @@ private CombinePerKeyPayloadTranslator() {}
 
     @Override
     public String getUrn(Combine.PerKey<?, ?, ?> transform) {
+      return getUrn();

Review Comment:
   Should this be in the base class?



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java:
##########
@@ -426,6 +437,85 @@ public String getUrn(PTransform transform) {
       return 
KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass()).getUrn(transform);
     }
 
+    private int findAvailablePort() throws IOException {
+      ServerSocket s = new ServerSocket(0);
+      try {
+        return s.getLocalPort();
+      } finally {
+        s.close();
+        try {
+          // Some systems don't free the port for future use immediately.
+          Thread.sleep(100);
+        } catch (InterruptedException exn) {
+          // ignore
+        }
+      }
+    }
+
+    public FunctionSpec getSpecViaTransformService(
+        AppliedPTransform<?, ?, ?> originalAppliedPTransform,
+        TransformPayloadTranslator originalPayloadTranslator)
+        throws IOException {
+
+      ExternalTranslationOptions externalTranslationOptions =
+          
originalAppliedPTransform.getPipeline().getOptions().as(ExternalTranslationOptions.class);
+
+      Row configRow =
+          
originalPayloadTranslator.toConfigRow(originalAppliedPTransform.getTransform());
+
+      ByteStringOutputStream outputStream = new ByteStringOutputStream();
+      try {
+        RowCoder.of(configRow.getSchema()).encode(configRow, outputStream);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      String urn = 
originalPayloadTranslator.getUrn(originalAppliedPTransform.getTransform());
+      ExternalTransforms.ExternalConfigurationPayload payload =
+          ExternalTransforms.ExternalConfigurationPayload.newBuilder()
+              
.setSchema(SchemaTranslation.schemaToProto(configRow.getSchema(), true))
+              .setPayload(outputStream.toByteString())
+              .build();
+
+      String serviceAddress = null;
+      TransformServiceLauncher service = null;
+
+      try {
+        if (externalTranslationOptions.getTransformServiceAddress() != null) {
+          serviceAddress = 
externalTranslationOptions.getTransformServiceAddress();
+        } else if (externalTranslationOptions.getTransformServiceBeamVersion() 
!= null) {
+          String projectName = UUID.randomUUID().toString();
+          service = TransformServiceLauncher.forProject(projectName, 
this.findAvailablePort());
+          
service.setBeamVersion(externalTranslationOptions.getTransformServiceBeamVersion());
+
+          // Starting the transform service.
+          service.start();
+          // Waiting the service to be ready.
+          service.waitTillUp(15000);
+        } else {
+          throw new IllegalArgumentException(
+              "Either option TransformServiceAddress or option 
TransformServiceBeamVersion should be provided to override a transform using 
the transform service");
+        }
+
+        ExpandableTransform externalTransform =
+            External.of(urn, payload.toByteArray(), serviceAddress);
+
+        PCollectionTuple input = 
PCollectionTuple.empty(originalAppliedPTransform.getPipeline());
+        for (TupleTag<?> tag : originalAppliedPTransform.getInputs().keySet()) 
{
+          input = input.and(tag.getId(), 
originalAppliedPTransform.getInputs().get(tag));
+        }
+        externalTransform.expand(input);
+
+        return externalTransform.getExpandedTransform().getSpec();

Review Comment:
   How do the outputs get wired up correctly?



##########
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java:
##########
@@ -178,6 +184,65 @@ public List<String> getDependencies(
         }
       }
 
+      List<String> deprecatedTransformURNs = 
ImmutableList.of(READ_TRANSFORM_URN);
+      for (TransformPayloadTranslatorRegistrar registrar :
+          ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+        for (Map.Entry<? extends Class<? extends PTransform>, ? extends 
TransformPayloadTranslator>
+            entry : registrar.getTransformPayloadTranslators().entrySet()) {
+          @Initialized TransformPayloadTranslator translator = 
entry.getValue();
+          if (translator == null) {
+            continue;
+          }
+
+          String urn = null;
+          try {
+            urn = translator.getUrn();
+            if (urn == null) {
+              LOG.info(
+                  "Could not load the TransformPayloadTranslator "
+                      + translator
+                      + " to the Expansion Service.");
+              continue;
+            }
+          } catch (Exception e) {

Review Comment:
   Why would this happen? Feels that this is something exceptional enough to 
propagate up. 



##########
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java:
##########
@@ -178,6 +184,65 @@ public List<String> getDependencies(
         }
       }
 
+      List<String> deprecatedTransformURNs = 
ImmutableList.of(READ_TRANSFORM_URN);
+      for (TransformPayloadTranslatorRegistrar registrar :
+          ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+        for (Map.Entry<? extends Class<? extends PTransform>, ? extends 
TransformPayloadTranslator>
+            entry : registrar.getTransformPayloadTranslators().entrySet()) {
+          @Initialized TransformPayloadTranslator translator = 
entry.getValue();
+          if (translator == null) {

Review Comment:
   When would this happen?



##########
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java:
##########
@@ -178,6 +184,65 @@ public List<String> getDependencies(
         }
       }
 
+      List<String> deprecatedTransformURNs = 
ImmutableList.of(READ_TRANSFORM_URN);
+      for (TransformPayloadTranslatorRegistrar registrar :
+          ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+        for (Map.Entry<? extends Class<? extends PTransform>, ? extends 
TransformPayloadTranslator>
+            entry : registrar.getTransformPayloadTranslators().entrySet()) {
+          @Initialized TransformPayloadTranslator translator = 
entry.getValue();
+          if (translator == null) {
+            continue;
+          }
+
+          String urn = null;
+          try {
+            urn = translator.getUrn();
+            if (urn == null) {
+              LOG.info(
+                  "Could not load the TransformPayloadTranslator "
+                      + translator
+                      + " to the Expansion Service.");
+              continue;
+            }
+          } catch (Exception e) {
+            LOG.info(
+                "Could not load the TransformPayloadTranslator "
+                    + translator
+                    + " to the Expansion Service.");
+            continue;
+          }
+
+          if (deprecatedTransformURNs.contains(urn)) {
+            continue;
+          }
+          final String finalUrn = urn;
+          TransformProvider transformProvider =
+              spec -> {
+                try {
+                  ExternalConfigurationPayload payload =
+                      
ExternalConfigurationPayload.parseFrom(spec.getPayload());
+                  Row configRow =
+                      
RowCoder.of(SchemaTranslation.schemaFromProto(payload.getSchema()))
+                          .decode(new 
ByteArrayInputStream(payload.getPayload().toByteArray()));
+                  @Nullable PTransform transformFromRow = 
translator.fromConfigRow(configRow);
+                  if (transformFromRow != null) {

Review Comment:
   Prefer making this not nullable and throwing a NotImplementedError rather 
than having to both check for null and catch the exception? 



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java:
##########
@@ -426,6 +437,85 @@ public String getUrn(PTransform transform) {
       return 
KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass()).getUrn(transform);
     }
 
+    private int findAvailablePort() throws IOException {
+      ServerSocket s = new ServerSocket(0);
+      try {
+        return s.getLocalPort();
+      } finally {
+        s.close();
+        try {
+          // Some systems don't free the port for future use immediately.
+          Thread.sleep(100);
+        } catch (InterruptedException exn) {
+          // ignore
+        }
+      }
+    }
+
+    public FunctionSpec getSpecViaTransformService(
+        AppliedPTransform<?, ?, ?> originalAppliedPTransform,
+        TransformPayloadTranslator originalPayloadTranslator)
+        throws IOException {
+
+      ExternalTranslationOptions externalTranslationOptions =
+          
originalAppliedPTransform.getPipeline().getOptions().as(ExternalTranslationOptions.class);
+
+      Row configRow =
+          
originalPayloadTranslator.toConfigRow(originalAppliedPTransform.getTransform());
+
+      ByteStringOutputStream outputStream = new ByteStringOutputStream();
+      try {
+        RowCoder.of(configRow.getSchema()).encode(configRow, outputStream);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      String urn = 
originalPayloadTranslator.getUrn(originalAppliedPTransform.getTransform());
+      ExternalTransforms.ExternalConfigurationPayload payload =
+          ExternalTransforms.ExternalConfigurationPayload.newBuilder()
+              
.setSchema(SchemaTranslation.schemaToProto(configRow.getSchema(), true))
+              .setPayload(outputStream.toByteString())
+              .build();
+
+      String serviceAddress = null;
+      TransformServiceLauncher service = null;
+
+      try {
+        if (externalTranslationOptions.getTransformServiceAddress() != null) {
+          serviceAddress = 
externalTranslationOptions.getTransformServiceAddress();
+        } else if (externalTranslationOptions.getTransformServiceBeamVersion() 
!= null) {
+          String projectName = UUID.randomUUID().toString();
+          service = TransformServiceLauncher.forProject(projectName, 
this.findAvailablePort());
+          
service.setBeamVersion(externalTranslationOptions.getTransformServiceBeamVersion());
+
+          // Starting the transform service.
+          service.start();
+          // Waiting the service to be ready.
+          service.waitTillUp(15000);
+        } else {
+          throw new IllegalArgumentException(
+              "Either option TransformServiceAddress or option 
TransformServiceBeamVersion should be provided to override a transform using 
the transform service");
+        }
+
+        ExpandableTransform externalTransform =
+            External.of(urn, payload.toByteArray(), serviceAddress);

Review Comment:
   serviceAddress might be unset. (Perhaps don't set it to null above to let 
the compiler check this for you.)



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java:
##########
@@ -435,10 +525,24 @@ public RunnerApi.PTransform translate(
       RunnerApi.PTransform.Builder transformBuilder =
           translateAppliedPTransform(appliedPTransform, subtransforms, 
components);
 
-      FunctionSpec spec =
-          KNOWN_PAYLOAD_TRANSLATORS
-              .get(appliedPTransform.getTransform().getClass())
-              .translate(appliedPTransform, components);
+      TransformPayloadTranslator payloadTranslator =
+          
KNOWN_PAYLOAD_TRANSLATORS.get(appliedPTransform.getTransform().getClass());
+
+      ExternalTranslationOptions externalTranslationOptions =
+          
appliedPTransform.getPipeline().getOptions().as(ExternalTranslationOptions.class);
+      List<String> urnsToOverride = 
externalTranslationOptions.getTransformsToOverride();
+
+      PTransform<?, ?> transform = appliedPTransform.getTransform();
+
+      FunctionSpec spec = null;
+      if (getUrn(transform) != null && 
urnsToOverride.contains(getUrn(transform))) {
+        // Expand using the transform service.
+        spec = getSpecViaTransformService(appliedPTransform, 
payloadTranslator);
+      } else {
+        // Expand locally.

Review Comment:
   This isn't an expand. Maybe "translate directly."



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java:
##########
@@ -435,10 +525,24 @@ public RunnerApi.PTransform translate(
       RunnerApi.PTransform.Builder transformBuilder =
           translateAppliedPTransform(appliedPTransform, subtransforms, 
components);
 
-      FunctionSpec spec =
-          KNOWN_PAYLOAD_TRANSLATORS
-              .get(appliedPTransform.getTransform().getClass())
-              .translate(appliedPTransform, components);
+      TransformPayloadTranslator payloadTranslator =
+          
KNOWN_PAYLOAD_TRANSLATORS.get(appliedPTransform.getTransform().getClass());
+
+      ExternalTranslationOptions externalTranslationOptions =
+          
appliedPTransform.getPipeline().getOptions().as(ExternalTranslationOptions.class);
+      List<String> urnsToOverride = 
externalTranslationOptions.getTransformsToOverride();
+
+      PTransform<?, ?> transform = appliedPTransform.getTransform();
+
+      FunctionSpec spec = null;
+      if (getUrn(transform) != null && 
urnsToOverride.contains(getUrn(transform))) {

Review Comment:
   I have to say it does feel a bit odd to do this substitution at translation 
time. Did you consider doing this as a separate pass post-translation rather 
than intertwining the two?



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ImpulseTranslation.java:
##########
@@ -46,6 +54,22 @@ public FunctionSpec translate(
         AppliedPTransform<?, ?, Impulse> application, SdkComponents 
components) throws IOException {
       return 
FunctionSpec.newBuilder().setUrn(getUrn(application.getTransform())).build();
     }
+
+    @Override
+    public @Nullable Row toConfigRow(PTransform<?, ?> pTransform) {
+      Impulse impulse = (Impulse) pTransform;
+      System.out.println("Found impulse transform: " + impulse);

Review Comment:
   Remove debugging.



##########
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java:
##########
@@ -178,6 +184,65 @@ public List<String> getDependencies(
         }
       }
 
+      List<String> deprecatedTransformURNs = 
ImmutableList.of(READ_TRANSFORM_URN);
+      for (TransformPayloadTranslatorRegistrar registrar :
+          ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+        for (Map.Entry<? extends Class<? extends PTransform>, ? extends 
TransformPayloadTranslator>
+            entry : registrar.getTransformPayloadTranslators().entrySet()) {
+          @Initialized TransformPayloadTranslator translator = 
entry.getValue();
+          if (translator == null) {
+            continue;
+          }
+
+          String urn = null;
+          try {
+            urn = translator.getUrn();
+            if (urn == null) {
+              LOG.info(

Review Comment:
   Maybe the warning should be softer, that this translator does not have a 
tranform-independent URN? 



##########
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java:
##########
@@ -178,6 +184,65 @@ public List<String> getDependencies(
         }
       }
 
+      List<String> deprecatedTransformURNs = 
ImmutableList.of(READ_TRANSFORM_URN);
+      for (TransformPayloadTranslatorRegistrar registrar :
+          ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+        for (Map.Entry<? extends Class<? extends PTransform>, ? extends 
TransformPayloadTranslator>
+            entry : registrar.getTransformPayloadTranslators().entrySet()) {
+          @Initialized TransformPayloadTranslator translator = 
entry.getValue();
+          if (translator == null) {
+            continue;
+          }
+
+          String urn = null;

Review Comment:
   I think we can leave this unset and the compiler is smart enough to see the 
assignment or continue. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to