Repository: flink Updated Branches: refs/heads/master 44c603d2b -> c4107d4c3
[FLINK-8123][py] Bundle python scripts in jar Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4107d4c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4107d4c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4107d4c Branch: refs/heads/master Commit: c4107d4c336ed8dbadc03a7018eb255f4df3d1cc Parents: 44c603d Author: zentol <[email protected]> Authored: Tue Nov 21 14:15:24 2017 +0100 Committer: zentol <[email protected]> Committed: Tue Nov 21 22:11:20 2017 +0100 ---------------------------------------------------------------------- flink-dist/src/main/assemblies/bin.xml | 9 --- flink-libraries/flink-python/pom.xml | 66 +++++++++++++++----- .../flink-python/src/assembly/python.xml | 37 +++++++++++ .../flink/python/api/PythonPlanBinder.java | 55 +++++++++++----- 4 files changed, 128 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c4107d4c/flink-dist/src/main/assemblies/bin.xml ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml index eb6867d..4415d25 100644 --- a/flink-dist/src/main/assemblies/bin.xml +++ b/flink-dist/src/main/assemblies/bin.xml @@ -193,15 +193,6 @@ under the License. </includes> </fileSet> - <!-- copy python package --> - <fileSet> - <directory>../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api</directory> - <outputDirectory>resources/python</outputDirectory> - <fileMode>0755</fileMode> - <excludes> - <exclude>**/example/**</exclude> - </excludes> - </fileSet> <!-- copy python example to examples of dist --> <fileSet> <directory>../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example</directory> http://git-wip-us.apache.org/repos/asf/flink/blob/c4107d4c/flink-libraries/flink-python/pom.xml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/pom.xml b/flink-libraries/flink-python/pom.xml index 310be72..eaa5b87 100644 --- a/flink-libraries/flink-python/pom.xml +++ b/flink-libraries/flink-python/pom.xml @@ -30,24 +30,58 @@ under the License. <artifactId>flink-python_${scala.binary.version}</artifactId> <name>flink-python</name> <packaging>jar</packaging> + + <build> + <resources> + <resource> + <!-- include the zip generated by the assembly-plugin in the jar as a resource --> + <directory>target</directory> + <includes> + <include>python-source.zip</include> + </includes> + </resource> + </resources> - <build> <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <descriptorRefs> - <descriptorRef>jar-with-dependencies</descriptorRef> - </descriptorRefs> - <archive> - <manifest> - <addClasspath>true</addClasspath> - <mainClass>org.apache.flink.python.api.PythonPlanBinder</mainClass> - </manifest> - </archive> - </configuration> - </plugin> + <plugin> + <!-- generate zip containing the flink python library --> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptor>src/assembly/python.xml</descriptor> + <finalName>python-source</finalName> + <appendAssemblyId>false</appendAssemblyId> + </configuration> + <executions> + <execution> + <phase>generate-resources</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>shade-flink</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.flink.python.api.PythonPlanBinder</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/flink/blob/c4107d4c/flink-libraries/flink-python/src/assembly/python.xml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/assembly/python.xml b/flink-libraries/flink-python/src/assembly/python.xml new file mode 100644 index 0000000..4487b7c --- /dev/null +++ b/flink-libraries/flink-python/src/assembly/python.xml @@ -0,0 +1,37 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<assembly + xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"> + <id>python</id> + <formats> + <format>zip</format> + </formats> + + <includeBaseDirectory>false</includeBaseDirectory> + + <fileSets> + <fileSet> + <directory>src/main/python/org/apache/flink/python/api/flink</directory> + <outputDirectory>flink</outputDirectory> + </fileSet> + </fileSets> + +</assembly> http://git-wip-us.apache.org/repos/asf/flink/blob/c4107d4c/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java index e0c8215..e4aa518 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java @@ -31,6 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.python.api.PythonOperationInfo.DatasizeHint; @@ -45,6 +46,7 @@ import org.apache.flink.python.api.functions.util.StringTupleDeserializerMap; import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer; import org.apache.flink.python.api.util.SetCache; import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.util.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +56,8 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.Arrays; import java.util.UUID; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.HUGE; import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.NONE; @@ -72,12 +76,8 @@ public class PythonPlanBinder { public static final String PLANBINDER_CONFIG_BCVAR_NAME_PREFIX = "PLANBINDER_BCVAR_"; public static final String PLAN_ARGUMENTS_KEY = "python.plan.arguments"; - private static final String FLINK_PYTHON_REL_LOCAL_PATH = File.separator + "resources" + File.separator + "python"; - private final Configuration operatorConfig; - private final String pythonLibraryPath; - private final String tmpPlanFilesDir; private Path tmpDistributedDir; @@ -110,13 +110,6 @@ public class PythonPlanBinder { tmpDistributedDir = new Path(globalConfig.getString(PythonOptions.DC_TMP_DIR)); - String flinkRootDir = System.getenv("FLINK_ROOT_DIR"); - pythonLibraryPath = flinkRootDir != null - //command-line - ? flinkRootDir + FLINK_PYTHON_REL_LOCAL_PATH - //testing - : new File(System.getProperty("user.dir"), "src/main/python/org/apache/flink/python/api").getAbsolutePath(); - operatorConfig = new Configuration(); operatorConfig.setString(PythonOptions.PYTHON_BINARY_PATH, globalConfig.getString(PythonOptions.PYTHON_BINARY_PATH)); String configuredTmpDataDir = globalConfig.getString(PythonOptions.DATA_TMP_DIR); @@ -163,10 +156,16 @@ public class PythonPlanBinder { } } - // copy flink library, plan file and additional files to temporary location + // setup temporary local directory for flink python library and user files + Path targetDir = new Path(tmpPlanFilesDir); + deleteIfExists(targetDir); + targetDir.getFileSystem().mkdirs(targetDir); + + // extract and unzip flink library to temporary location + unzipPythonLibrary(new Path(tmpPlanFilesDir)); + + // copy user files to temporary location Path tmpPlanFilesPath = new Path(tmpPlanFilesDir); - deleteIfExists(tmpPlanFilesPath); - FileCache.copy(new Path(pythonLibraryPath), tmpPlanFilesPath, false); copyFile(planPath, tmpPlanFilesPath, FLINK_PYTHON_PLAN_NAME); for (String file : filesToCopy) { Path source = new Path(file); @@ -213,6 +212,34 @@ public class PythonPlanBinder { } } + private static void unzipPythonLibrary(Path targetDir) throws IOException { + FileSystem targetFs = targetDir.getFileSystem(); + ClassLoader classLoader = PythonPlanBinder.class.getClassLoader(); + ZipInputStream zis = new ZipInputStream(classLoader.getResourceAsStream("python-source.zip")); + ZipEntry entry = zis.getNextEntry(); + while (entry != null) { + String fileName = entry.getName(); + Path newFile = new Path(targetDir, fileName); + if (entry.isDirectory()) { + targetFs.mkdirs(newFile); + } else { + try { + LOG.debug("Unzipping to {}.", newFile); + FSDataOutputStream fsDataOutputStream = targetFs.create(newFile, FileSystem.WriteMode.NO_OVERWRITE); + IOUtils.copyBytes(zis, fsDataOutputStream, false); + } catch (Exception e) { + zis.closeEntry(); + zis.close(); + throw new IOException("Failed to unzip flink python library.", e); + } + } + + zis.closeEntry(); + entry = zis.getNextEntry(); + } + zis.closeEntry(); + } + //=====Setup======================================================================================================== private static void deleteIfExists(Path path) throws IOException {
