[
https://issues.apache.org/jira/browse/BEAM-14343?focusedWorklogId=761320&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-761320
]
ASF GitHub Bot logged work on BEAM-14343:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 23/Apr/22 17:36
Start Date: 23/Apr/22 17:36
Worklog Time Spent: 10m
Work Description: chamikaramj commented on code in PR #17418:
URL: https://github.com/apache/beam/pull/17418#discussion_r856943846
##########
sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java:
##########
@@ -65,8 +66,10 @@
private @Nullable Object @NonNull [] argsArray;
private @Nullable Row providedKwargsRow;
- private ExternalPythonTransform(String fullyQualifiedName, int
expansionPort) {
+ private ExternalPythonTransform(
+ String fullyQualifiedName, String expansionHost, int expansionPort) {
Review Comment:
Still unsure shy the API is different from Python where we take a single
"expansion_service" address:
https://github.com/apache/beam/blob/3f2e3c7c9eccb9d40370cbc70e9a451a4b5573f5/sdks/python/apache_beam/transforms/external.py#L417
##########
sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java:
##########
@@ -268,20 +268,31 @@ public OutputT expand(InputT input) {
Row argsRow = buildOrGetArgsRow();
Row kwargsRow = buildOrGetKwargsRow();
try {
- Schema payloadSchema =
- Schema.of(
- Schema.Field.of("constructor", Schema.FieldType.STRING),
- Schema.Field.of("args",
Schema.FieldType.row(argsRow.getSchema())),
- Schema.Field.of("kwargs",
Schema.FieldType.row(kwargsRow.getSchema())));
+ Schema.Builder schemaBuilder = Schema.builder();
+ schemaBuilder.addStringField("constructor");
+ if (argsRow.getValues().size() > 0) {
+ schemaBuilder.addRowField("args", argsRow.getSchema());
+ }
+ if (kwargsRow.getValues().size() > 0) {
+ schemaBuilder.addRowField("kwargs", kwargsRow.getSchema());
+ }
+ Schema payloadSchema = schemaBuilder.build();
payloadSchema.setUUID(UUID.randomUUID());
- Row payloadRow =
- Row.withSchema(payloadSchema).addValues(fullyQualifiedName, argsRow,
kwargsRow).build();
+ Row.Builder payloadRowBuilder = Row.withSchema(payloadSchema);
+ payloadRowBuilder.addValue(fullyQualifiedName);
+ if (argsRow.getValues().size() > 0) {
Review Comment:
Please add a comment and a unit test so that this doesn't get broken by a
future change.
Issue Time Tracking
-------------------
Worklog Id: (was: 761320)
Time Spent: 1h 10m (was: 1h)
> Allow expansion service override in ExternalPythonTransform
> -----------------------------------------------------------
>
> Key: BEAM-14343
> URL: https://issues.apache.org/jira/browse/BEAM-14343
> Project: Beam
> Issue Type: Bug
> Components: cross-language, sdk-java-core
> Reporter: Heejong Lee
> Assignee: Heejong Lee
> Priority: P2
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> Allow expansion service override in ExternalPythonTransform
--
This message was sent by Atlassian Jira
(v8.20.7#820007)