chamikaramj commented on code in PR #22991:
URL: https://github.com/apache/beam/pull/22991#discussion_r961072911


##########
sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java:
##########
@@ -418,13 +433,24 @@ public OutputT expand(InputT input) {
         return apply(input, expansionService, payload);
       } else {
         int port = PythonService.findAvailablePort();
+        ImmutableList.Builder<String> args = ImmutableList.builder();
+        args.add("--port", "" + port, "--fully_qualified_name_glob", "*");
+        if (!extraPackages.isEmpty()) {
+          File requirementsFile = File.createTempFile("requirements", ".txt");
+          requirementsFile.deleteOnExit();
+          try (FileWriter fout =
+              new FileWriter(requirementsFile.getAbsolutePath(), 
Charsets.UTF_8)) {
+            for (String pkg : extraPackages) {
+              fout.write(pkg);
+              fout.write('\n');
+            }
+          }
+          args.add("--requirements_file=" + 
requirementsFile.getAbsolutePath());

Review Comment:
   I see that we pass through PipelineOptions to the expansion service but I 
don't see the requirements file being used when adding dependencies to the 
default environment:
   
https://github.com/apache/beam/blob/ac3766ab4847963a52d5fd5ad0c34456dbf54c65/sdks/python/apache_beam/runners/portability/portable_runner.py#L267
   
   I might be missing something.



##########
sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonService.java:
##########
@@ -40,25 +40,43 @@ public class PythonService {
 
   private final String module;
   private final List<String> args;
+  private final List<String> extraPackages;
 
-  public PythonService(String module, List<String> args) {
+  public PythonService(String module, List<String> args, List<String> 
extraPackages) {
     this.module = module;
     this.args = args;
+    this.extraPackages = extraPackages;
+  }
+
+  public PythonService(String module, List<String> args) {
+    this(module, args, ImmutableList.of());
   }
 
   public PythonService(String module, String... args) {
     this(module, Arrays.asList(args));
   }
 
+  public PythonService withExtraPackages(List<String> extraPackages) {

Review Comment:
   Also add a Java doc (but this can be less details since this is mostly an 
internal utility).



##########
sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java:
##########
@@ -418,13 +433,24 @@ public OutputT expand(InputT input) {
         return apply(input, expansionService, payload);
       } else {
         int port = PythonService.findAvailablePort();
+        ImmutableList.Builder<String> args = ImmutableList.builder();
+        args.add("--port", "" + port, "--fully_qualified_name_glob", "*");
+        if (!extraPackages.isEmpty()) {
+          File requirementsFile = File.createTempFile("requirements", ".txt");
+          requirementsFile.deleteOnExit();
+          try (FileWriter fout =
+              new FileWriter(requirementsFile.getAbsolutePath(), 
Charsets.UTF_8)) {
+            for (String pkg : extraPackages) {
+              fout.write(pkg);
+              fout.write('\n');
+            }
+          }
+          args.add("--requirements_file=" + 
requirementsFile.getAbsolutePath());

Review Comment:
   Also, we should add a Python-side unit test for this.



##########
sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java:
##########
@@ -266,6 +277,14 @@ public PythonExternalTransform<InputT, OutputT> 
withOutputCoder(Coder<?> outputC
     return this;
   }
 
+  public PythonExternalTransform<InputT, OutputT> 
withExtraPackages(List<String> extraPackages) {

Review Comment:
   Please add a Java doc (for example, we should mention that this can also 
include the version).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to