Github user zentol commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/202#discussion_r20361477
  
    --- Diff: 
flink-addons/flink-language-binding/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
 ---
    @@ -0,0 +1,376 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
    + * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the "License"); you may 
not use this file except in compliance with the
    + * License. You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
    + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
    + * specific language governing permissions and limitations under the 
License.
    + */
    +package org.apache.flink.languagebinding.api.java.python;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +import java.net.URISyntaxException;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.operators.GroupReduceOperator;
    +import org.apache.flink.api.java.operators.SortedGrouping;
    +import org.apache.flink.api.java.operators.UnsortedGrouping;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.languagebinding.api.java.common.PlanBinder;
    +import org.apache.flink.languagebinding.api.java.common.OperationInfo;
    +import 
org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.PythonOperationInfo;
    +//CHECKSTYLE.OFF: AvoidStarImport - enum/function import
    +import static 
org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.PythonOperationInfo.*;
    +import org.apache.flink.languagebinding.api.java.python.functions.*;
    +//CHECKSTYLE.ON: AvoidStarImport
    +import org.apache.flink.languagebinding.api.java.common.streaming.Receiver;
    +import 
org.apache.flink.languagebinding.api.java.common.streaming.StreamPrinter;
    +import org.apache.flink.runtime.filecache.FileCache;
    +
    +/**
    + * This class allows the execution of a Flink plan written in python.
    + */
    +public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
    +   public static final String FLINK_PYTHON_ID = "flink";
    +   public static final String FLINK_PYTHON_PLAN_NAME = "/plan.py";
    +   public static final String FLINK_PYTHON_EXECUTOR_NAME = "/executor.py";
    +
    +   private static final String FLINK_PYTHON_FILE_PATH = 
System.getProperty("java.io.tmpdir") + "/flink_plan";
    +   private static final String FLINK_PYTHON_REL_LOCAL_PATH = 
"/resources/python";
    +   private static final String FLINK_DIR = System.getenv("FLINK_ROOT_DIR");
    +
    +   private Process process;
    +
    +   /**
    +    * Entry point for the execution of a python plan.
    +    *
    +    * @param args planPath [package1[packageX[|parameter1[parameterX]]]]
    +    * @throws Exception
    +    */
    +   public static void main(String[] args) throws Exception {
    +           PythonPlanBinder binder = new PythonPlanBinder();
    +           binder.go(args);
    +   }
    +
    +   private void go(String[] args) throws Exception {
    +           env = ExecutionEnvironment.getExecutionEnvironment();
    +
    +           int split = 0;
    +           for (int x = 0; x < args.length; x++) {
    +                   if (args[x].compareTo("|") == 0) {
    +                           split = x;
    +                   }
    +           }
    +
    +           prepareFiles(Arrays.copyOfRange(args, 0, split == 0 ? 1 : 
split));
    +           startPython(Arrays.copyOfRange(args, split == 0 ? args.length : 
split + 1, args.length));
    +           receivePlan();
    +           distributeFiles(env);
    +
    +           env.execute();
    +           close();
    +   }
    +
    +   
//=====Setup========================================================================================================
    +   private void prepareFiles(String... filePaths) throws IOException, 
URISyntaxException {
    +           prepareFlinkPythonPackage();
    +
    +           String planPath = filePaths[0];
    +           if (planPath.endsWith("/")) {
    +                   planPath = planPath.substring(0, planPath.length() - 1);
    +           }
    +           String tmpPlanPath = FLINK_PYTHON_FILE_PATH + 
FLINK_PYTHON_PLAN_NAME;
    +           clearPath(tmpPlanPath);
    +           FileCache.copy(new Path(planPath), new Path(tmpPlanPath), 
false);
    +
    +           for (int x = 1; x < filePaths.length; x++) {
    +                   copyFile(filePaths[x]);
    +           }
    +   }
    +
    +   private void startPython(String[] args) throws IOException {
    +           sets = new HashMap();
    +           StringBuilder argsBuilder = new StringBuilder();
    +           for (String arg : args) {
    +                   argsBuilder.append(" ").append(arg);
    +           }
    +           receiver = new Receiver(null);
    +           receiver.open(null);
    +           process = Runtime.getRuntime().exec("python -B " + 
FLINK_PYTHON_FILE_PATH + FLINK_PYTHON_PLAN_NAME + argsBuilder.toString());
    +
    +           new StreamPrinter(process.getInputStream()).start();
    +           new StreamPrinter(process.getErrorStream()).start();
    +   }
    +
    +   private void close() throws IOException, URISyntaxException {
    +           FileSystem hdfs = FileSystem.get(new URI(FLINK_HDFS_PATH));
    +           hdfs.delete(new Path(FLINK_HDFS_PATH), true);
    +
    +           FileSystem local = FileSystem.getLocalFileSystem();
    +           local.delete(new Path(FLINK_PYTHON_FILE_PATH), true);
    +           local.delete(new Path(FLINK_TMP_DATA_DIR), true);
    +
    +           try {
    +                   receiver.close();
    +           } catch (NullPointerException npe) {
    +           }
    +           process.destroy();
    +   }
    +
    +   public static void prepareFlinkPythonPackage() throws IOException, 
URISyntaxException {
    +           String originalFilePath = FLINK_DIR.substring(0, 
FLINK_DIR.length() - 7) + FLINK_PYTHON_REL_LOCAL_PATH;
    +           String tempFilePath = FLINK_PYTHON_FILE_PATH;
    +           clearPath(tempFilePath);
    +           FileCache.copy(new Path(originalFilePath), new 
Path(tempFilePath), false);
    +   }
    +
    +   public static void prepareFlinkPythonPackage(String path) throws 
IOException {
    +           FileCache.copy(new Path(path), new 
Path(FLINK_PYTHON_FILE_PATH), true);
    +   }
    +
    +   public static void distributeFiles(ExecutionEnvironment env) throws 
IOException, URISyntaxException {
    +           clearPath(FLINK_HDFS_PATH);
    +           FileCache.copy(new Path(FLINK_PYTHON_FILE_PATH), new 
Path(FLINK_HDFS_PATH), true);
    +           env.registerCachedFile(FLINK_HDFS_PATH, FLINK_PYTHON_ID);
    +           clearPath(FLINK_PYTHON_FILE_PATH);
    +   }
    +
    +   private static void clearPath(String path) throws IOException, 
URISyntaxException {
    +           FileSystem fs = FileSystem.get(new URI(path));
    +           if (fs.exists(new Path(path))) {
    +                   fs.delete(new Path(path), true);
    +           }
    +   }
    +
    +   public static String copyFile(String path) throws IOException, 
URISyntaxException {
    +           if (path.endsWith("/")) {
    +                   path = path.substring(0, path.length() - 1);
    +           }
    +           String identifier = path.substring(path.lastIndexOf("/"));
    +           String tmpFilePath = FLINK_PYTHON_FILE_PATH + "/" + identifier;
    +           clearPath(tmpFilePath);
    +           FileCache.copy(new Path(path), new Path(tmpFilePath), true);
    +           return identifier;
    +   }
    +
    +   //=====Plan 
Binding=================================================================================================
    +   protected class PythonOperationInfo extends OperationInfo {
    +           protected static final int 
INFO_MODE_UDF_DOUBLE_KEYED_PROJECTION_TYPED = -1;
    +           protected static final int INFO_MODE_UDF_DOUBLE_KEYED_TYPED = 0;
    +           protected static final int INFO_MODE_UDF_DOUBLE_TYPED = 1;
    +           protected static final int INFO_MODE_UDF_SINGLE_TYPED = 2;
    +           protected static final int INFO_MODE_UDF_SINGLE_TYPED_COMBINE = 
9;
    +           protected static final int INFO_MODE_UDF = 3;
    +           protected static final int INFO_MODE_GROUP = 4;
    +           protected static final int INFO_MODE_SORT = 5;
    +           protected static final int INFO_MODE_UNION = 6;
    +           protected static final int INFO_MODE_PROJECT = 7;
    +           protected static final int 
INFO_MODE_UDF_DOUBLE_PROJECTION_TYPED = 8;
    +           protected String operator;
    +           protected String meta;
    +           protected boolean combine;
    +
    +           protected PythonOperationInfo(int mode) throws IOException {
    +                   parentID = (Integer) receiver.getRecord();
    +                   childID = (Integer) receiver.getRecord();
    +                   switch (mode) {
    +                           case 
INFO_MODE_UDF_DOUBLE_KEYED_PROJECTION_TYPED:
    +                                   keys1 = (Tuple) receiver.getRecord();
    +                                   keys2 = (Tuple) receiver.getRecord();
    +                                   otherID = (Integer) 
receiver.getRecord();
    +                                   types = receiver.getRecord();
    +                                   operator = (String) 
receiver.getRecord();
    +                                   meta = (String) receiver.getRecord();
    +                                   projectionKeys1 = (Tuple) 
receiver.getRecord();
    +                                   projectionKeys2 = (Tuple) 
receiver.getRecord();
    +                                   break;
    +                           case INFO_MODE_UDF_DOUBLE_PROJECTION_TYPED:
    +                                   otherID = (Integer) 
receiver.getRecord();
    +                                   types = receiver.getRecord();
    +                                   operator = (String) 
receiver.getRecord();
    +                                   meta = (String) receiver.getRecord();
    +                                   projectionKeys1 = (Tuple) 
receiver.getRecord();
    +                                   projectionKeys2 = (Tuple) 
receiver.getRecord();
    +                                   break;
    +                           case INFO_MODE_UDF_DOUBLE_KEYED_TYPED:
    +                                   keys1 = (Tuple) receiver.getRecord();
    +                                   keys2 = (Tuple) receiver.getRecord();
    +                                   otherID = (Integer) 
receiver.getRecord();
    +                                   types = receiver.getRecord();
    +                                   operator = (String) 
receiver.getRecord();
    +                                   meta = (String) receiver.getRecord();
    +                                   break;
    +                           case INFO_MODE_UDF_DOUBLE_TYPED:
    +                                   otherID = (Integer) 
receiver.getRecord();
    +                                   types = receiver.getRecord();
    +                                   operator = (String) 
receiver.getRecord();
    +                                   meta = (String) receiver.getRecord();
    +                                   break;
    +                           case INFO_MODE_UDF_SINGLE_TYPED:
    +                                   types = receiver.getRecord();
    +                                   operator = (String) 
receiver.getRecord();
    +                                   meta = (String) receiver.getRecord();
    +                                   break;
    +                           case INFO_MODE_UDF_SINGLE_TYPED_COMBINE:
    +                                   types = receiver.getRecord();
    +                                   operator = (String) 
receiver.getRecord();
    +                                   meta = (String) receiver.getRecord();
    +                                   combine = (Boolean) 
receiver.getRecord();
    +                                   break;
    +                           case INFO_MODE_UDF:
    +                                   operator = (String) 
receiver.getRecord();
    +                                   meta = (String) receiver.getRecord();
    +                                   break;
    +                           case INFO_MODE_GROUP:
    +                                   keys1 = (Tuple) receiver.getRecord();
    +                                   break;
    +                           case INFO_MODE_SORT:
    +                                   field = (Integer) receiver.getRecord();
    +                                   order = (Integer) receiver.getRecord();
    +                                   break;
    +                           case INFO_MODE_UNION:
    +                                   otherID = (Integer) 
receiver.getRecord();
    +                                   break;
    +                           case INFO_MODE_PROJECT:
    +                                   keys1 = (Tuple) receiver.getRecord();
    +                                   types = receiver.getRecord();
    +                                   break;
    +                   }
    +           }
    +   }
    +
    +   @Override
    +   protected PythonOperationInfo createOperationInfo(String identifier) 
throws IOException {
    +           switch (Operations.valueOf(identifier.toUpperCase())) {
    +                   case COGROUP:
    +                           return new 
PythonOperationInfo(INFO_MODE_UDF_DOUBLE_KEYED_TYPED);
    +                   case CROSS:
    +                           return new 
PythonOperationInfo(INFO_MODE_UDF_DOUBLE_PROJECTION_TYPED);
    +                   case CROSS_H:
    +                           return new 
PythonOperationInfo(INFO_MODE_UDF_DOUBLE_PROJECTION_TYPED);
    +                   case CROSS_T:
    +                           return new 
PythonOperationInfo(INFO_MODE_UDF_DOUBLE_PROJECTION_TYPED);
    +                   case FILTER:
    +                           return new PythonOperationInfo(INFO_MODE_UDF);
    +                   case FLATMAP:
    +                           return new 
PythonOperationInfo(INFO_MODE_UDF_SINGLE_TYPED);
    +                   case GROUPREDUCE:
    +                           return new 
PythonOperationInfo(INFO_MODE_UDF_SINGLE_TYPED_COMBINE);
    +                   case JOIN:
    +                           return new 
PythonOperationInfo(INFO_MODE_UDF_DOUBLE_KEYED_PROJECTION_TYPED);
    +                   case JOIN_H:
    +                           return new 
PythonOperationInfo(INFO_MODE_UDF_DOUBLE_KEYED_PROJECTION_TYPED);
    +                   case JOIN_T:
    +                           return new 
PythonOperationInfo(INFO_MODE_UDF_DOUBLE_KEYED_PROJECTION_TYPED);
    +                   case MAP:
    +                           return new 
PythonOperationInfo(INFO_MODE_UDF_SINGLE_TYPED);
    +                   case PROJECTION:
    +                           return new 
PythonOperationInfo(INFO_MODE_PROJECT);
    +                   case REDUCE:
    +                           return new PythonOperationInfo(INFO_MODE_UDF);
    +                   case GROUPBY:
    +                           return new PythonOperationInfo(INFO_MODE_GROUP);
    +                   case SORT:
    +                           return new PythonOperationInfo(INFO_MODE_SORT);
    +                   case UNION:
    +                           return new PythonOperationInfo(INFO_MODE_UNION);
    +           }
    +           return new 
PythonOperationInfo(INFO_MODE_UDF_DOUBLE_KEYED_TYPED);
    +   }
    +
    +   @Override
    +   protected DataSet applyCoGroupOperation(DataSet op1, DataSet op2, int[] 
firstKeys, int[] secondKeys, PythonOperationInfo info) {
    +           return 
op1.coGroup(op2).where(firstKeys).equalTo(secondKeys).with(new 
PythonCoGroup(info.operator, info.types, info.meta));
    +   }
    +
    +   public static class PseudoKeySelector<X> implements KeySelector<X, 
Integer> {
    +           @Override
    +           public Integer getKey(X value) throws Exception {
    +                   return 0;
    +           }
    +   }
    +
    +   @Override
    +   protected DataSet applyCrossOperation(DataSet op1, DataSet op2, int 
mode, PythonOperationInfo info) {
    +           switch (mode) {
    +                   case 0:
    +                           return op1.join(op2).where(new 
PseudoKeySelector()).equalTo(new PseudoKeySelector()).with(new 
PythonCross(info.operator, info.types, info.meta));
    --- End diff --
    
    A Cross is implemented as a join where every pair matches. I don't know the 
implications of doing it in such a hacky way. (The comparison overhead should 
be negligible considering the current performance.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to