dianfu commented on a change in pull request #10017: [FLINK-14019][python] add 
support for managing environment and dependencies of Python UDF in Flink Python 
API
URL: https://github.com/apache/flink/pull/10017#discussion_r349910702
 
 

 ##########
 File path: 
flink-python/src/main/java/org/apache/flink/python/ProcessEnvironmentManager.java
 ##########
 @@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.python.util.ResourceUtil;
+import org.apache.flink.python.util.UnzipUtil;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.ShutdownHookUtil;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.Environments;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * The ProcessEnvironmentManager used to prepare the working dir of python UDF 
worker
+ * and create ProcessEnvironment object of Beam Fn API. It will be created if 
the python
+ * function runner is configured to run python UDF in process mode.
+ */
+@Internal
+public final class ProcessEnvironmentManager implements 
PythonEnvironmentManager {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ProcessEnvironmentManager.class);
+
+       public static final String PYTHON_REQUIREMENTS_FILE = 
"_PYTHON_REQUIREMENTS_FILE";
+       public static final String PYTHON_REQUIREMENTS_CACHE = 
"_PYTHON_REQUIREMENTS_CACHE";
+       public static final String PYTHON_REQUIREMENTS_INSTALL_DIR = 
"_PYTHON_REQUIREMENTS_INSTALL_DIR";
+       public static final String PYTHON_WORKING_DIR = "_PYTHON_WORKING_DIR";
+
+       private static final String PYTHON_TMP_DIR_PREFIX = "python_dist_";
+       static final String PYTHON_REQUIREMENTS_DIR = 
"python_requirements_target";
+       static final String PYTHON_ARCHIVES_DIR = "python_archives";
+       static final String PYTHON_FILES_DIR = "python_files_dir";
+
+       private final PythonDependencyManager dependencyManager;
+       private final String baseDirectory;
+       private final String requirementsDirectory;
+       private final String archivesDirectory;
+       private final String filesDirectory;
+       private final Map<String, String> systemEnv;
+       @Nullable private final String logDirectory;
+       private Thread shutdownHook;
+
+       private ProcessEnvironmentManager(
+               PythonDependencyManager dependencyManager,
+               String baseDirectory,
+               String filesDirectory,
+               String archivesDirectory,
+               String requirementsDirectory,
+               @Nullable String logDirectory,
+               Map<String, String> systemEnv) {
+               this.dependencyManager = dependencyManager;
+               this.baseDirectory = baseDirectory;
+               this.filesDirectory = filesDirectory;
+               this.archivesDirectory = archivesDirectory;
+               this.requirementsDirectory = requirementsDirectory;
+               this.logDirectory = logDirectory;
+               this.systemEnv = systemEnv;
+       }
+
+       @Override
+       public void open() throws Exception {
+               File baseDirectoryFile = new File(baseDirectory);
+               if (!baseDirectoryFile.exists() && !baseDirectoryFile.mkdir()) {
+                       throw new IOException(
+                               "Can not create the base directory: " + 
baseDirectory + "of python process environment manager!");
+               }
+               shutdownHook = new Thread(() -> 
FileUtils.deleteDirectoryQuietly(new File(baseDirectory)));
+               ShutdownHookUtil.addShutdownHookThread(shutdownHook, 
ProcessEnvironmentManager.class.getSimpleName(), LOG);
+       }
+
+       @Override
+       public void close() {
+               if (shutdownHook != null) {
+                       shutdownHook.run();
+                       ShutdownHookUtil.removeShutdownHook(shutdownHook, 
ProcessEnvironmentManager.class.getSimpleName(), LOG);
+                       shutdownHook = null;
+               }
+       }
+
+       @Override
+       public RunnerApi.Environment createEnvironment() throws IOException {
+               prepareWorkingDir();
+               Map<String, String> generatedEnv = 
generateEnvironmentVariables();
+               String pythonWorkerCommand = String.join(File.separator, 
baseDirectory, "pyflink-udf-runner.sh");
+
+               return Environments.createProcessEnvironment(
+                       "",
+                       "",
+                       pythonWorkerCommand,
+                       generatedEnv);
+       }
+
+       /**
+        * Just return a empty RetrievalToken because no files will be transmit 
via ArtifactService in process mode.
+        *
+        * @return The path of empty RetrievalToken.
+        */
+       @Override
+       public String createRetrievalToken() throws IOException {
+               File retrievalToken = new File(baseDirectory,
+                       "retrieval_token_" + UUID.randomUUID().toString() + 
".json");
+               if (!retrievalToken.getParentFile().exists() && 
!retrievalToken.getParentFile().mkdir()) {
+                       throw new IOException(
+                               "Could not create the parent directory of 
RetrievalToken file: " +
+                                       retrievalToken.getAbsolutePath());
+               }
+               if (retrievalToken.createNewFile()) {
+                       final DataOutputStream dos = new DataOutputStream(new 
FileOutputStream(retrievalToken));
+                       dos.writeBytes("{\"manifest\": {}}");
+                       dos.flush();
+                       dos.close();
+                       return retrievalToken.getAbsolutePath();
+               } else {
+                       throw new IOException(
+                               "Could not create the RetrievalToken file: " + 
retrievalToken.getAbsolutePath());
+               }
+       }
+
+       /**
+        * Generate the environment variables used to create the launcher 
process of python UDF worker.
+        *
+        * <p>To avoid unnecessary IO usage, when running in process mode no 
artifacts will be transmitted
+        * via ArtifactService of Beam. Instead, the path of artifacts will be 
transmit to the launcher
+        * of python UDF worker via environment variable.
+        *
+        * @return Environment variable map containing paths of python 
dependencies.
+        */
+       @VisibleForTesting
+       Map<String, String> generateEnvironmentVariables() {
+               Map<String, String> systemEnv = new HashMap<>(this.systemEnv);
+
+               // add pyflink, py4j and cloudpickle to PYTHONPATH
+               String internalLibs = 
Arrays.stream(ResourceUtil.PYTHON_BASIC_DEPENDENCIES)
+                       .filter(file -> file.endsWith(".zip"))
+                       .map(file -> String.join(File.separator, baseDirectory, 
file))
+                       .collect(Collectors.joining(File.pathSeparator));
+               appendToPythonPath(systemEnv, internalLibs);
+
+               // if the log directory exists, transmit it to the worker
+               if (logDirectory != null && !logDirectory.isEmpty()) {
+                       systemEnv.put("FLINK_LOG_DIR", logDirectory);
+               }
+
+               // generate the PYTHONPATH of udf worker.
+               if (!dependencyManager.getFilesInPythonPath().isEmpty()) {
+                       List<String> filesList = new ArrayList<>();
+                       for (Map.Entry<String, String> entry : 
dependencyManager.getFilesInPythonPath().entrySet()) {
+                               // The origin file name will be wiped during 
the transmission in Flink Distributed Cache
+                               // and replaced by a unreadable character 
sequence, now restore their origin name.
+                               // Every python file will be placed at :
+                               // 
${baseDirectory}/python_files_dir/${distributedCacheFileName}/${originFileName}
+                               String distributedCacheFileName = new 
File(entry.getKey()).getName();
+                               String actualFileName = entry.getValue();
+                               File file = new File(entry.getKey());
+                               String pathForSearching;
+                               if (file.isFile() && 
actualFileName.endsWith(".py")) {
+                                       // If the file is single py file, use 
its parent directory as PYTHONPATH.
+                                       pathForSearching = 
String.join(File.separator, filesDirectory, distributedCacheFileName);
+                               } else {
+                                       pathForSearching = 
String.join(File.separator, filesDirectory, distributedCacheFileName, 
actualFileName);
+                               }
+                               filesList.add(pathForSearching);
+                       }
+                       appendToPythonPath(systemEnv, 
String.join(File.pathSeparator, filesList));
+               }
+               LOG.info("PYTHONPATH of python worker: {}", 
systemEnv.get("PYTHONPATH"));
+
+               // To support set python interpreter path in archives, the 
archives directory should be used as
+               // working directory of udf worker.
+               if (!dependencyManager.getArchives().isEmpty()) {
+                       systemEnv.put(PYTHON_WORKING_DIR, archivesDirectory);
+                       LOG.info("python working dir of python worker: {}", 
archivesDirectory);
+               }
+
+               // The requirements will be installed by a bootstrap script, 
here just transmit the necessary information
+               // to the script via environment variable.
+               if (dependencyManager.getRequirementsFilePath().isPresent()) {
+                       systemEnv.put(PYTHON_REQUIREMENTS_FILE, 
dependencyManager.getRequirementsFilePath().get());
+                       LOG.info("requirements.txt of python worker: {}", 
dependencyManager.getRequirementsFilePath().get());
+
+                       if 
(dependencyManager.getRequirementsCacheDir().isPresent()) {
+                               systemEnv.put(PYTHON_REQUIREMENTS_CACHE, 
dependencyManager.getRequirementsCacheDir().get());
+                               LOG.info("requirements cached dir of python 
worker: {}",
+                                       
dependencyManager.getRequirementsCacheDir().get());
+                       }
+
+                       systemEnv.put(PYTHON_REQUIREMENTS_INSTALL_DIR, 
requirementsDirectory);
+                       LOG.info("requirements install directory of python 
worker: {}", requirementsDirectory);
+               }
+
+               // Transmit the path of python interpreter to bootstrap script.
 
 Review comment:
   bootstrap script -> the boot script

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to