Repository: flink
Updated Branches:
  refs/heads/master 44c603d2b -> c4107d4c3


[FLINK-8123][py] Bundle python scripts in jar


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4107d4c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4107d4c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4107d4c

Branch: refs/heads/master
Commit: c4107d4c336ed8dbadc03a7018eb255f4df3d1cc
Parents: 44c603d
Author: zentol <[email protected]>
Authored: Tue Nov 21 14:15:24 2017 +0100
Committer: zentol <[email protected]>
Committed: Tue Nov 21 22:11:20 2017 +0100

----------------------------------------------------------------------
 flink-dist/src/main/assemblies/bin.xml          |  9 ---
 flink-libraries/flink-python/pom.xml            | 66 +++++++++++++++-----
 .../flink-python/src/assembly/python.xml        | 37 +++++++++++
 .../flink/python/api/PythonPlanBinder.java      | 55 +++++++++++-----
 4 files changed, 128 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c4107d4c/flink-dist/src/main/assemblies/bin.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/bin.xml 
b/flink-dist/src/main/assemblies/bin.xml
index eb6867d..4415d25 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -193,15 +193,6 @@ under the License.
                        </includes>
                </fileSet>
 
-               <!-- copy python package -->
-               <fileSet>
-                       
<directory>../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api</directory>
-                       <outputDirectory>resources/python</outputDirectory>
-                       <fileMode>0755</fileMode>
-                       <excludes>
-                               <exclude>**/example/**</exclude>
-                       </excludes>
-               </fileSet>
                <!-- copy python example to examples of dist -->
                <fileSet>
                        
<directory>../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example</directory>

http://git-wip-us.apache.org/repos/asf/flink/blob/c4107d4c/flink-libraries/flink-python/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/pom.xml 
b/flink-libraries/flink-python/pom.xml
index 310be72..eaa5b87 100644
--- a/flink-libraries/flink-python/pom.xml
+++ b/flink-libraries/flink-python/pom.xml
@@ -30,24 +30,58 @@ under the License.
     <artifactId>flink-python_${scala.binary.version}</artifactId>
     <name>flink-python</name>
     <packaging>jar</packaging>
+       
+       <build>
+               <resources>
+                       <resource>
+                               <!-- include the zip generated by the 
assembly-plugin in the jar as a resource -->
+                               <directory>target</directory>
+                               <includes>
+                                       <include>python-source.zip</include>
+                               </includes>
+                       </resource>
+               </resources>
 
-    <build>
         <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-jar-plugin</artifactId>
-                <configuration>
-                    <descriptorRefs>
-                        <descriptorRef>jar-with-dependencies</descriptorRef>
-                    </descriptorRefs>
-                    <archive>
-                        <manifest>
-                            <addClasspath>true</addClasspath>
-                            
<mainClass>org.apache.flink.python.api.PythonPlanBinder</mainClass>
-                        </manifest>
-                    </archive>
-                </configuration>
-            </plugin>
+                       <plugin>
+                               <!-- generate zip containing the flink python 
library -->
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-assembly-plugin</artifactId>
+                               <configuration>
+                                       
<descriptor>src/assembly/python.xml</descriptor>
+                                       <finalName>python-source</finalName>
+                                       
<appendAssemblyId>false</appendAssemblyId>
+                               </configuration>
+                               <executions>
+                                       <execution>
+                                               
<phase>generate-resources</phase>
+                                               <goals>
+                                                       <goal>single</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+                       
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>shade-flink</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <transformers>
+                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                                                       
<mainClass>org.apache.flink.python.api.PythonPlanBinder</mainClass>
+                                                               </transformer>
+                                                       </transformers>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
         </plugins>
     </build>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c4107d4c/flink-libraries/flink-python/src/assembly/python.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/assembly/python.xml 
b/flink-libraries/flink-python/src/assembly/python.xml
new file mode 100644
index 0000000..4487b7c
--- /dev/null
+++ b/flink-libraries/flink-python/src/assembly/python.xml
@@ -0,0 +1,37 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<assembly
+       
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0
 http://maven.apache.org/xsd/assembly-1.1.0.xsd";>
+       <id>python</id>
+       <formats>
+               <format>zip</format>
+       </formats>
+
+       <includeBaseDirectory>false</includeBaseDirectory>
+
+       <fileSets>
+               <fileSet>
+                       
<directory>src/main/python/org/apache/flink/python/api/flink</directory>
+                       <outputDirectory>flink</outputDirectory>
+               </fileSet>
+       </fileSets>
+
+</assembly>

http://git-wip-us.apache.org/repos/asf/flink/blob/c4107d4c/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index e0c8215..e4aa518 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.python.api.PythonOperationInfo.DatasizeHint;
@@ -45,6 +46,7 @@ import 
org.apache.flink.python.api.functions.util.StringTupleDeserializerMap;
 import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;
 import org.apache.flink.python.api.util.SetCache;
 import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.util.IOUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,6 +56,8 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.UUID;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
 
 import static 
org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.HUGE;
 import static 
org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.NONE;
@@ -72,12 +76,8 @@ public class PythonPlanBinder {
        public static final String PLANBINDER_CONFIG_BCVAR_NAME_PREFIX = 
"PLANBINDER_BCVAR_";
        public static final String PLAN_ARGUMENTS_KEY = "python.plan.arguments";
 
-       private static final String FLINK_PYTHON_REL_LOCAL_PATH = 
File.separator + "resources" + File.separator + "python";
-
        private final Configuration operatorConfig;
 
-       private final String pythonLibraryPath;
-
        private final String tmpPlanFilesDir;
        private Path tmpDistributedDir;
 
@@ -110,13 +110,6 @@ public class PythonPlanBinder {
 
                tmpDistributedDir = new 
Path(globalConfig.getString(PythonOptions.DC_TMP_DIR));
 
-               String flinkRootDir = System.getenv("FLINK_ROOT_DIR");
-               pythonLibraryPath = flinkRootDir != null
-                               //command-line
-                               ? flinkRootDir + FLINK_PYTHON_REL_LOCAL_PATH
-                               //testing
-                               : new File(System.getProperty("user.dir"), 
"src/main/python/org/apache/flink/python/api").getAbsolutePath();
-
                operatorConfig = new Configuration();
                operatorConfig.setString(PythonOptions.PYTHON_BINARY_PATH, 
globalConfig.getString(PythonOptions.PYTHON_BINARY_PATH));
                String configuredTmpDataDir = 
globalConfig.getString(PythonOptions.DATA_TMP_DIR);
@@ -163,10 +156,16 @@ public class PythonPlanBinder {
                                }
                        }
 
-                       // copy flink library, plan file and additional files 
to temporary location
+                       // setup temporary local directory for flink python 
library and user files
+                       Path targetDir = new Path(tmpPlanFilesDir);
+                       deleteIfExists(targetDir);
+                       targetDir.getFileSystem().mkdirs(targetDir);
+
+                       // extract and unzip flink library to temporary location
+                       unzipPythonLibrary(new Path(tmpPlanFilesDir));
+
+                       // copy user files to temporary location
                        Path tmpPlanFilesPath = new Path(tmpPlanFilesDir);
-                       deleteIfExists(tmpPlanFilesPath);
-                       FileCache.copy(new Path(pythonLibraryPath), 
tmpPlanFilesPath, false);
                        copyFile(planPath, tmpPlanFilesPath, 
FLINK_PYTHON_PLAN_NAME);
                        for (String file : filesToCopy) {
                                Path source = new Path(file);
@@ -213,6 +212,34 @@ public class PythonPlanBinder {
                }
        }
 
+       private static void unzipPythonLibrary(Path targetDir) throws 
IOException {
+               FileSystem targetFs = targetDir.getFileSystem();
+               ClassLoader classLoader = 
PythonPlanBinder.class.getClassLoader();
+               ZipInputStream zis = new 
ZipInputStream(classLoader.getResourceAsStream("python-source.zip"));
+               ZipEntry entry = zis.getNextEntry();
+               while (entry != null) {
+                       String fileName = entry.getName();
+                       Path newFile = new Path(targetDir, fileName);
+                       if (entry.isDirectory()) {
+                               targetFs.mkdirs(newFile);
+                       } else {
+                               try {
+                                       LOG.debug("Unzipping to {}.", newFile);
+                                       FSDataOutputStream fsDataOutputStream = 
targetFs.create(newFile, FileSystem.WriteMode.NO_OVERWRITE);
+                                       IOUtils.copyBytes(zis, 
fsDataOutputStream, false);
+                               } catch (Exception e) {
+                                       zis.closeEntry();
+                                       zis.close();
+                                       throw new IOException("Failed to unzip 
flink python library.", e);
+                               }
+                       }
+
+                       zis.closeEntry();
+                       entry = zis.getNextEntry();
+               }
+               zis.closeEntry();
+       }
+
        
//=====Setup========================================================================================================
 
        private static void deleteIfExists(Path path) throws IOException {

Reply via email to