robertwb commented on code in PR #28210:
URL: https://github.com/apache/beam/pull/28210#discussion_r1311827147
##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java:
##########
@@ -84,6 +91,16 @@ public FunctionSpec translate(
}
}
+ @Override
+ public @Nullable Row toConfigRow(PTransform<?, ?> pTransform) {
Review Comment:
I wonder if these should be in an optional interface, or if default
null-returning implementations should be provided. (We could argue that it
should be strongly encouraged.)
##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java:
##########
@@ -62,6 +64,11 @@ private CombinePerKeyPayloadTranslator() {}
@Override
public String getUrn(Combine.PerKey<?, ?, ?> transform) {
+ return getUrn();
Review Comment:
Should this be in the base class?
##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java:
##########
@@ -426,6 +437,85 @@ public String getUrn(PTransform transform) {
return
KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass()).getUrn(transform);
}
+ private int findAvailablePort() throws IOException {
+ ServerSocket s = new ServerSocket(0);
+ try {
+ return s.getLocalPort();
+ } finally {
+ s.close();
+ try {
+ // Some systems don't free the port for future use immediately.
+ Thread.sleep(100);
+ } catch (InterruptedException exn) {
+ // ignore
+ }
+ }
+ }
+
+ public FunctionSpec getSpecViaTransformService(
+ AppliedPTransform<?, ?, ?> originalAppliedPTransform,
+ TransformPayloadTranslator originalPayloadTranslator)
+ throws IOException {
+
+ ExternalTranslationOptions externalTranslationOptions =
+
originalAppliedPTransform.getPipeline().getOptions().as(ExternalTranslationOptions.class);
+
+ Row configRow =
+
originalPayloadTranslator.toConfigRow(originalAppliedPTransform.getTransform());
+
+ ByteStringOutputStream outputStream = new ByteStringOutputStream();
+ try {
+ RowCoder.of(configRow.getSchema()).encode(configRow, outputStream);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ String urn =
originalPayloadTranslator.getUrn(originalAppliedPTransform.getTransform());
+ ExternalTransforms.ExternalConfigurationPayload payload =
+ ExternalTransforms.ExternalConfigurationPayload.newBuilder()
+
.setSchema(SchemaTranslation.schemaToProto(configRow.getSchema(), true))
+ .setPayload(outputStream.toByteString())
+ .build();
+
+ String serviceAddress = null;
+ TransformServiceLauncher service = null;
+
+ try {
+ if (externalTranslationOptions.getTransformServiceAddress() != null) {
+ serviceAddress =
externalTranslationOptions.getTransformServiceAddress();
+ } else if (externalTranslationOptions.getTransformServiceBeamVersion()
!= null) {
+ String projectName = UUID.randomUUID().toString();
+ service = TransformServiceLauncher.forProject(projectName,
this.findAvailablePort());
+
service.setBeamVersion(externalTranslationOptions.getTransformServiceBeamVersion());
+
+ // Starting the transform service.
+ service.start();
+ // Waiting the service to be ready.
+ service.waitTillUp(15000);
+ } else {
+ throw new IllegalArgumentException(
+ "Either option TransformServiceAddress or option
TransformServiceBeamVersion should be provided to override a transform using
the transform service");
+ }
+
+ ExpandableTransform externalTransform =
+ External.of(urn, payload.toByteArray(), serviceAddress);
+
+ PCollectionTuple input =
PCollectionTuple.empty(originalAppliedPTransform.getPipeline());
+ for (TupleTag<?> tag : originalAppliedPTransform.getInputs().keySet())
{
+ input = input.and(tag.getId(),
originalAppliedPTransform.getInputs().get(tag));
+ }
+ externalTransform.expand(input);
+
+ return externalTransform.getExpandedTransform().getSpec();
Review Comment:
How do the outputs get wired up correctly?
##########
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java:
##########
@@ -178,6 +184,65 @@ public List<String> getDependencies(
}
}
+ List<String> deprecatedTransformURNs =
ImmutableList.of(READ_TRANSFORM_URN);
+ for (TransformPayloadTranslatorRegistrar registrar :
+ ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+ for (Map.Entry<? extends Class<? extends PTransform>, ? extends
TransformPayloadTranslator>
+ entry : registrar.getTransformPayloadTranslators().entrySet()) {
+ @Initialized TransformPayloadTranslator translator =
entry.getValue();
+ if (translator == null) {
+ continue;
+ }
+
+ String urn = null;
+ try {
+ urn = translator.getUrn();
+ if (urn == null) {
+ LOG.info(
+ "Could not load the TransformPayloadTranslator "
+ + translator
+ + " to the Expansion Service.");
+ continue;
+ }
+ } catch (Exception e) {
Review Comment:
Why would this happen? Feels that this is something exceptional enough to
propagate up.
##########
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java:
##########
@@ -178,6 +184,65 @@ public List<String> getDependencies(
}
}
+ List<String> deprecatedTransformURNs =
ImmutableList.of(READ_TRANSFORM_URN);
+ for (TransformPayloadTranslatorRegistrar registrar :
+ ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+ for (Map.Entry<? extends Class<? extends PTransform>, ? extends
TransformPayloadTranslator>
+ entry : registrar.getTransformPayloadTranslators().entrySet()) {
+ @Initialized TransformPayloadTranslator translator =
entry.getValue();
+ if (translator == null) {
Review Comment:
When would this happen?
##########
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java:
##########
@@ -178,6 +184,65 @@ public List<String> getDependencies(
}
}
+ List<String> deprecatedTransformURNs =
ImmutableList.of(READ_TRANSFORM_URN);
+ for (TransformPayloadTranslatorRegistrar registrar :
+ ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+ for (Map.Entry<? extends Class<? extends PTransform>, ? extends
TransformPayloadTranslator>
+ entry : registrar.getTransformPayloadTranslators().entrySet()) {
+ @Initialized TransformPayloadTranslator translator =
entry.getValue();
+ if (translator == null) {
+ continue;
+ }
+
+ String urn = null;
+ try {
+ urn = translator.getUrn();
+ if (urn == null) {
+ LOG.info(
+ "Could not load the TransformPayloadTranslator "
+ + translator
+ + " to the Expansion Service.");
+ continue;
+ }
+ } catch (Exception e) {
+ LOG.info(
+ "Could not load the TransformPayloadTranslator "
+ + translator
+ + " to the Expansion Service.");
+ continue;
+ }
+
+ if (deprecatedTransformURNs.contains(urn)) {
+ continue;
+ }
+ final String finalUrn = urn;
+ TransformProvider transformProvider =
+ spec -> {
+ try {
+ ExternalConfigurationPayload payload =
+
ExternalConfigurationPayload.parseFrom(spec.getPayload());
+ Row configRow =
+
RowCoder.of(SchemaTranslation.schemaFromProto(payload.getSchema()))
+ .decode(new
ByteArrayInputStream(payload.getPayload().toByteArray()));
+ @Nullable PTransform transformFromRow =
translator.fromConfigRow(configRow);
+ if (transformFromRow != null) {
Review Comment:
Prefer making this not nullable and throwing a NotImplementedError rather
than having to both check for null and catch the exception?
##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java:
##########
@@ -426,6 +437,85 @@ public String getUrn(PTransform transform) {
return
KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass()).getUrn(transform);
}
+ private int findAvailablePort() throws IOException {
+ ServerSocket s = new ServerSocket(0);
+ try {
+ return s.getLocalPort();
+ } finally {
+ s.close();
+ try {
+ // Some systems don't free the port for future use immediately.
+ Thread.sleep(100);
+ } catch (InterruptedException exn) {
+ // ignore
+ }
+ }
+ }
+
+ public FunctionSpec getSpecViaTransformService(
+ AppliedPTransform<?, ?, ?> originalAppliedPTransform,
+ TransformPayloadTranslator originalPayloadTranslator)
+ throws IOException {
+
+ ExternalTranslationOptions externalTranslationOptions =
+
originalAppliedPTransform.getPipeline().getOptions().as(ExternalTranslationOptions.class);
+
+ Row configRow =
+
originalPayloadTranslator.toConfigRow(originalAppliedPTransform.getTransform());
+
+ ByteStringOutputStream outputStream = new ByteStringOutputStream();
+ try {
+ RowCoder.of(configRow.getSchema()).encode(configRow, outputStream);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ String urn =
originalPayloadTranslator.getUrn(originalAppliedPTransform.getTransform());
+ ExternalTransforms.ExternalConfigurationPayload payload =
+ ExternalTransforms.ExternalConfigurationPayload.newBuilder()
+
.setSchema(SchemaTranslation.schemaToProto(configRow.getSchema(), true))
+ .setPayload(outputStream.toByteString())
+ .build();
+
+ String serviceAddress = null;
+ TransformServiceLauncher service = null;
+
+ try {
+ if (externalTranslationOptions.getTransformServiceAddress() != null) {
+ serviceAddress =
externalTranslationOptions.getTransformServiceAddress();
+ } else if (externalTranslationOptions.getTransformServiceBeamVersion()
!= null) {
+ String projectName = UUID.randomUUID().toString();
+ service = TransformServiceLauncher.forProject(projectName,
this.findAvailablePort());
+
service.setBeamVersion(externalTranslationOptions.getTransformServiceBeamVersion());
+
+ // Starting the transform service.
+ service.start();
+ // Waiting the service to be ready.
+ service.waitTillUp(15000);
+ } else {
+ throw new IllegalArgumentException(
+ "Either option TransformServiceAddress or option
TransformServiceBeamVersion should be provided to override a transform using
the transform service");
+ }
+
+ ExpandableTransform externalTransform =
+ External.of(urn, payload.toByteArray(), serviceAddress);
Review Comment:
serviceAddress might be unset. (Perhaps don't set it to null above to let
the compiler check this for you.)
##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java:
##########
@@ -435,10 +525,24 @@ public RunnerApi.PTransform translate(
RunnerApi.PTransform.Builder transformBuilder =
translateAppliedPTransform(appliedPTransform, subtransforms,
components);
- FunctionSpec spec =
- KNOWN_PAYLOAD_TRANSLATORS
- .get(appliedPTransform.getTransform().getClass())
- .translate(appliedPTransform, components);
+ TransformPayloadTranslator payloadTranslator =
+
KNOWN_PAYLOAD_TRANSLATORS.get(appliedPTransform.getTransform().getClass());
+
+ ExternalTranslationOptions externalTranslationOptions =
+
appliedPTransform.getPipeline().getOptions().as(ExternalTranslationOptions.class);
+ List<String> urnsToOverride =
externalTranslationOptions.getTransformsToOverride();
+
+ PTransform<?, ?> transform = appliedPTransform.getTransform();
+
+ FunctionSpec spec = null;
+ if (getUrn(transform) != null &&
urnsToOverride.contains(getUrn(transform))) {
+ // Expand using the transform service.
+ spec = getSpecViaTransformService(appliedPTransform,
payloadTranslator);
+ } else {
+ // Expand locally.
Review Comment:
This isn't an expand. Maybe "translate directly."
##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java:
##########
@@ -435,10 +525,24 @@ public RunnerApi.PTransform translate(
RunnerApi.PTransform.Builder transformBuilder =
translateAppliedPTransform(appliedPTransform, subtransforms,
components);
- FunctionSpec spec =
- KNOWN_PAYLOAD_TRANSLATORS
- .get(appliedPTransform.getTransform().getClass())
- .translate(appliedPTransform, components);
+ TransformPayloadTranslator payloadTranslator =
+
KNOWN_PAYLOAD_TRANSLATORS.get(appliedPTransform.getTransform().getClass());
+
+ ExternalTranslationOptions externalTranslationOptions =
+
appliedPTransform.getPipeline().getOptions().as(ExternalTranslationOptions.class);
+ List<String> urnsToOverride =
externalTranslationOptions.getTransformsToOverride();
+
+ PTransform<?, ?> transform = appliedPTransform.getTransform();
+
+ FunctionSpec spec = null;
+ if (getUrn(transform) != null &&
urnsToOverride.contains(getUrn(transform))) {
Review Comment:
I have to say it does feel a bit odd to do this substitution at translation
time. Did you consider doing this as a separate pass post-translation rather
than intertwining the two?
##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ImpulseTranslation.java:
##########
@@ -46,6 +54,22 @@ public FunctionSpec translate(
AppliedPTransform<?, ?, Impulse> application, SdkComponents
components) throws IOException {
return
FunctionSpec.newBuilder().setUrn(getUrn(application.getTransform())).build();
}
+
+ @Override
+ public @Nullable Row toConfigRow(PTransform<?, ?> pTransform) {
+ Impulse impulse = (Impulse) pTransform;
+ System.out.println("Found impulse transform: " + impulse);
Review Comment:
Remove debugging.
##########
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java:
##########
@@ -178,6 +184,65 @@ public List<String> getDependencies(
}
}
+ List<String> deprecatedTransformURNs =
ImmutableList.of(READ_TRANSFORM_URN);
+ for (TransformPayloadTranslatorRegistrar registrar :
+ ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+ for (Map.Entry<? extends Class<? extends PTransform>, ? extends
TransformPayloadTranslator>
+ entry : registrar.getTransformPayloadTranslators().entrySet()) {
+ @Initialized TransformPayloadTranslator translator =
entry.getValue();
+ if (translator == null) {
+ continue;
+ }
+
+ String urn = null;
+ try {
+ urn = translator.getUrn();
+ if (urn == null) {
+ LOG.info(
Review Comment:
Maybe the warning should be softer, that this translator does not have a
tranform-independent URN?
##########
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java:
##########
@@ -178,6 +184,65 @@ public List<String> getDependencies(
}
}
+ List<String> deprecatedTransformURNs =
ImmutableList.of(READ_TRANSFORM_URN);
+ for (TransformPayloadTranslatorRegistrar registrar :
+ ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+ for (Map.Entry<? extends Class<? extends PTransform>, ? extends
TransformPayloadTranslator>
+ entry : registrar.getTransformPayloadTranslators().entrySet()) {
+ @Initialized TransformPayloadTranslator translator =
entry.getValue();
+ if (translator == null) {
+ continue;
+ }
+
+ String urn = null;
Review Comment:
I think we can leave this unset and the compiler is smart enough to see the
assignment or continue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]