[ 
https://issues.apache.org/jira/browse/FLINK-337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14212312#comment-14212312
 ] 

ASF GitHub Bot commented on FLINK-337:
--------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/202#discussion_r20361477
  
    --- Diff: 
flink-addons/flink-language-binding/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
 ---
    @@ -0,0 +1,376 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
    + * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the "License"); you may 
not use this file except in compliance with the
    + * License. You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
    + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
    + * specific language governing permissions and limitations under the 
License.
    + */
    +package org.apache.flink.languagebinding.api.java.python;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +import java.net.URISyntaxException;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.operators.GroupReduceOperator;
    +import org.apache.flink.api.java.operators.SortedGrouping;
    +import org.apache.flink.api.java.operators.UnsortedGrouping;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.languagebinding.api.java.common.PlanBinder;
    +import org.apache.flink.languagebinding.api.java.common.OperationInfo;
    +import 
org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.PythonOperationInfo;
    +//CHECKSTYLE.OFF: AvoidStarImport - enum/function import
    +import static 
org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.PythonOperationInfo.*;
    +import org.apache.flink.languagebinding.api.java.python.functions.*;
    +//CHECKSTYLE.ON: AvoidStarImport
    +import org.apache.flink.languagebinding.api.java.common.streaming.Receiver;
    +import 
org.apache.flink.languagebinding.api.java.common.streaming.StreamPrinter;
    +import org.apache.flink.runtime.filecache.FileCache;
    +
    +/**
    + * This class allows the execution of a Flink plan written in python.
    + */
    +public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
    +   public static final String FLINK_PYTHON_ID = "flink";
    +   public static final String FLINK_PYTHON_PLAN_NAME = "/plan.py";
    +   public static final String FLINK_PYTHON_EXECUTOR_NAME = "/executor.py";
    +
    +   private static final String FLINK_PYTHON_FILE_PATH = 
System.getProperty("java.io.tmpdir") + "/flink_plan";
    +   private static final String FLINK_PYTHON_REL_LOCAL_PATH = 
"/resources/python";
    +   private static final String FLINK_DIR = System.getenv("FLINK_ROOT_DIR");
    +
    +   private Process process;
    +
    +   /**
    +    * Entry point for the execution of a python plan.
    +    *
    +    * @param args planPath [package1[packageX[|parameter1[parameterX]]]]
    +    * @throws Exception
    +    */
    +   public static void main(String[] args) throws Exception {
    +           PythonPlanBinder binder = new PythonPlanBinder();
    +           binder.go(args);
    +   }
    +
    +   private void go(String[] args) throws Exception {
    +           env = ExecutionEnvironment.getExecutionEnvironment();
    +
    +           int split = 0;
    +           for (int x = 0; x < args.length; x++) {
    +                   if (args[x].compareTo("|") == 0) {
    +                           split = x;
    +                   }
    +           }
    +
    +           prepareFiles(Arrays.copyOfRange(args, 0, split == 0 ? 1 : 
split));
    +           startPython(Arrays.copyOfRange(args, split == 0 ? args.length : 
split + 1, args.length));
    +           receivePlan();
    +           distributeFiles(env);
    +
    +           env.execute();
    +           close();
    +   }
    +
    +   
//=====Setup========================================================================================================
    +   private void prepareFiles(String... filePaths) throws IOException, 
URISyntaxException {
    +           prepareFlinkPythonPackage();
    +
    +           String planPath = filePaths[0];
    +           if (planPath.endsWith("/")) {
    +                   planPath = planPath.substring(0, planPath.length() - 1);
    +           }
    +           String tmpPlanPath = FLINK_PYTHON_FILE_PATH + 
FLINK_PYTHON_PLAN_NAME;
    +           clearPath(tmpPlanPath);
    +           FileCache.copy(new Path(planPath), new Path(tmpPlanPath), 
false);
    +
    +           for (int x = 1; x < filePaths.length; x++) {
    +                   copyFile(filePaths[x]);
    +           }
    +   }
    +
    +   private void startPython(String[] args) throws IOException {
    +           sets = new HashMap();
    +           StringBuilder argsBuilder = new StringBuilder();
    +           for (String arg : args) {
    +                   argsBuilder.append(" ").append(arg);
    +           }
    +           receiver = new Receiver(null);
    +           receiver.open(null);
    +           process = Runtime.getRuntime().exec("python -B " + 
FLINK_PYTHON_FILE_PATH + FLINK_PYTHON_PLAN_NAME + argsBuilder.toString());
    +
    +           new StreamPrinter(process.getInputStream()).start();
    +           new StreamPrinter(process.getErrorStream()).start();
    +   }
    +
    +   private void close() throws IOException, URISyntaxException {
    +           FileSystem hdfs = FileSystem.get(new URI(FLINK_HDFS_PATH));
    +           hdfs.delete(new Path(FLINK_HDFS_PATH), true);
    +
    +           FileSystem local = FileSystem.getLocalFileSystem();
    +           local.delete(new Path(FLINK_PYTHON_FILE_PATH), true);
    +           local.delete(new Path(FLINK_TMP_DATA_DIR), true);
    +
    +           try {
    +                   receiver.close();
    +           } catch (NullPointerException npe) {
    +           }
    +           process.destroy();
    +   }
    +
    +   public static void prepareFlinkPythonPackage() throws IOException, 
URISyntaxException {
    +           String originalFilePath = FLINK_DIR.substring(0, 
FLINK_DIR.length() - 7) + FLINK_PYTHON_REL_LOCAL_PATH;
    +           String tempFilePath = FLINK_PYTHON_FILE_PATH;
    +           clearPath(tempFilePath);
    +           FileCache.copy(new Path(originalFilePath), new 
Path(tempFilePath), false);
    +   }
    +
    +   public static void prepareFlinkPythonPackage(String path) throws 
IOException {
    +           FileCache.copy(new Path(path), new 
Path(FLINK_PYTHON_FILE_PATH), true);
    +   }
    +
    +   public static void distributeFiles(ExecutionEnvironment env) throws 
IOException, URISyntaxException {
    +           clearPath(FLINK_HDFS_PATH);
    +           FileCache.copy(new Path(FLINK_PYTHON_FILE_PATH), new 
Path(FLINK_HDFS_PATH), true);
    +           env.registerCachedFile(FLINK_HDFS_PATH, FLINK_PYTHON_ID);
    +           clearPath(FLINK_PYTHON_FILE_PATH);
    +   }
    +
    +   private static void clearPath(String path) throws IOException, 
URISyntaxException {
    +           FileSystem fs = FileSystem.get(new URI(path));
    +           if (fs.exists(new Path(path))) {
    +                   fs.delete(new Path(path), true);
    +           }
    +   }
    +
    +   public static String copyFile(String path) throws IOException, 
URISyntaxException {
    +           if (path.endsWith("/")) {
    +                   path = path.substring(0, path.length() - 1);
    +           }
    +           String identifier = path.substring(path.lastIndexOf("/"));
    +           String tmpFilePath = FLINK_PYTHON_FILE_PATH + "/" + identifier;
    +           clearPath(tmpFilePath);
    +           FileCache.copy(new Path(path), new Path(tmpFilePath), true);
    +           return identifier;
    +   }
    +
    +   //=====Plan 
Binding=================================================================================================
    +   protected class PythonOperationInfo extends OperationInfo {
    +           protected static final int 
INFO_MODE_UDF_DOUBLE_KEYED_PROJECTION_TYPED = -1;
    +           protected static final int INFO_MODE_UDF_DOUBLE_KEYED_TYPED = 0;
    +           protected static final int INFO_MODE_UDF_DOUBLE_TYPED = 1;
    +           protected static final int INFO_MODE_UDF_SINGLE_TYPED = 2;
    +           protected static final int INFO_MODE_UDF_SINGLE_TYPED_COMBINE = 
9;
    +           protected static final int INFO_MODE_UDF = 3;
    +           protected static final int INFO_MODE_GROUP = 4;
    +           protected static final int INFO_MODE_SORT = 5;
    +           protected static final int INFO_MODE_UNION = 6;
    +           protected static final int INFO_MODE_PROJECT = 7;
    +           protected static final int 
INFO_MODE_UDF_DOUBLE_PROJECTION_TYPED = 8;
    +           protected String operator;
    +           protected String meta;
    +           protected boolean combine;
    +
    +           protected PythonOperationInfo(int mode) throws IOException {
    +                   parentID = (Integer) receiver.getRecord();
    +                   childID = (Integer) receiver.getRecord();
    +                   switch (mode) {
    +                           case 
INFO_MODE_UDF_DOUBLE_KEYED_PROJECTION_TYPED:
    +                                   keys1 = (Tuple) receiver.getRecord();
    +                                   keys2 = (Tuple) receiver.getRecord();
    +                                   otherID = (Integer) 
receiver.getRecord();
    +                                   types = receiver.getRecord();
    +                                   operator = (String) 
receiver.getRecord();
    +                                   meta = (String) receiver.getRecord();
    +                                   projectionKeys1 = (Tuple) 
receiver.getRecord();
    +                                   projectionKeys2 = (Tuple) 
receiver.getRecord();
    +                                   break;
    +                           case INFO_MODE_UDF_DOUBLE_PROJECTION_TYPED:
    +                                   otherID = (Integer) 
receiver.getRecord();
    +                                   types = receiver.getRecord();
    +                                   operator = (String) 
receiver.getRecord();
    +                                   meta = (String) receiver.getRecord();
    +                                   projectionKeys1 = (Tuple) 
receiver.getRecord();
    +                                   projectionKeys2 = (Tuple) 
receiver.getRecord();
    +                                   break;
    +                           case INFO_MODE_UDF_DOUBLE_KEYED_TYPED:
    +                                   keys1 = (Tuple) receiver.getRecord();
    +                                   keys2 = (Tuple) receiver.getRecord();
    +                                   otherID = (Integer) 
receiver.getRecord();
    +                                   types = receiver.getRecord();
    +                                   operator = (String) 
receiver.getRecord();
    +                                   meta = (String) receiver.getRecord();
    +                                   break;
    +                           case INFO_MODE_UDF_DOUBLE_TYPED:
    +                                   otherID = (Integer) 
receiver.getRecord();
    +                                   types = receiver.getRecord();
    +                                   operator = (String) 
receiver.getRecord();
    +                                   meta = (String) receiver.getRecord();
    +                                   break;
    +                           case INFO_MODE_UDF_SINGLE_TYPED:
    +                                   types = receiver.getRecord();
    +                                   operator = (String) 
receiver.getRecord();
    +                                   meta = (String) receiver.getRecord();
    +                                   break;
    +                           case INFO_MODE_UDF_SINGLE_TYPED_COMBINE:
    +                                   types = receiver.getRecord();
    +                                   operator = (String) 
receiver.getRecord();
    +                                   meta = (String) receiver.getRecord();
    +                                   combine = (Boolean) 
receiver.getRecord();
    +                                   break;
    +                           case INFO_MODE_UDF:
    +                                   operator = (String) 
receiver.getRecord();
    +                                   meta = (String) receiver.getRecord();
    +                                   break;
    +                           case INFO_MODE_GROUP:
    +                                   keys1 = (Tuple) receiver.getRecord();
    +                                   break;
    +                           case INFO_MODE_SORT:
    +                                   field = (Integer) receiver.getRecord();
    +                                   order = (Integer) receiver.getRecord();
    +                                   break;
    +                           case INFO_MODE_UNION:
    +                                   otherID = (Integer) 
receiver.getRecord();
    +                                   break;
    +                           case INFO_MODE_PROJECT:
    +                                   keys1 = (Tuple) receiver.getRecord();
    +                                   types = receiver.getRecord();
    +                                   break;
    +                   }
    +           }
    +   }
    +
    +   @Override
    +   protected PythonOperationInfo createOperationInfo(String identifier) 
throws IOException {
    +           switch (Operations.valueOf(identifier.toUpperCase())) {
    +                   case COGROUP:
    +                           return new 
PythonOperationInfo(INFO_MODE_UDF_DOUBLE_KEYED_TYPED);
    +                   case CROSS:
    +                           return new 
PythonOperationInfo(INFO_MODE_UDF_DOUBLE_PROJECTION_TYPED);
    +                   case CROSS_H:
    +                           return new 
PythonOperationInfo(INFO_MODE_UDF_DOUBLE_PROJECTION_TYPED);
    +                   case CROSS_T:
    +                           return new 
PythonOperationInfo(INFO_MODE_UDF_DOUBLE_PROJECTION_TYPED);
    +                   case FILTER:
    +                           return new PythonOperationInfo(INFO_MODE_UDF);
    +                   case FLATMAP:
    +                           return new 
PythonOperationInfo(INFO_MODE_UDF_SINGLE_TYPED);
    +                   case GROUPREDUCE:
    +                           return new 
PythonOperationInfo(INFO_MODE_UDF_SINGLE_TYPED_COMBINE);
    +                   case JOIN:
    +                           return new 
PythonOperationInfo(INFO_MODE_UDF_DOUBLE_KEYED_PROJECTION_TYPED);
    +                   case JOIN_H:
    +                           return new 
PythonOperationInfo(INFO_MODE_UDF_DOUBLE_KEYED_PROJECTION_TYPED);
    +                   case JOIN_T:
    +                           return new 
PythonOperationInfo(INFO_MODE_UDF_DOUBLE_KEYED_PROJECTION_TYPED);
    +                   case MAP:
    +                           return new 
PythonOperationInfo(INFO_MODE_UDF_SINGLE_TYPED);
    +                   case PROJECTION:
    +                           return new 
PythonOperationInfo(INFO_MODE_PROJECT);
    +                   case REDUCE:
    +                           return new PythonOperationInfo(INFO_MODE_UDF);
    +                   case GROUPBY:
    +                           return new PythonOperationInfo(INFO_MODE_GROUP);
    +                   case SORT:
    +                           return new PythonOperationInfo(INFO_MODE_SORT);
    +                   case UNION:
    +                           return new PythonOperationInfo(INFO_MODE_UNION);
    +           }
    +           return new 
PythonOperationInfo(INFO_MODE_UDF_DOUBLE_KEYED_TYPED);
    +   }
    +
    +   @Override
    +   protected DataSet applyCoGroupOperation(DataSet op1, DataSet op2, int[] 
firstKeys, int[] secondKeys, PythonOperationInfo info) {
    +           return 
op1.coGroup(op2).where(firstKeys).equalTo(secondKeys).with(new 
PythonCoGroup(info.operator, info.types, info.meta));
    +   }
    +
    +   public static class PseudoKeySelector<X> implements KeySelector<X, 
Integer> {
    +           @Override
    +           public Integer getKey(X value) throws Exception {
    +                   return 0;
    +           }
    +   }
    +
    +   @Override
    +   protected DataSet applyCrossOperation(DataSet op1, DataSet op2, int 
mode, PythonOperationInfo info) {
    +           switch (mode) {
    +                   case 0:
    +                           return op1.join(op2).where(new 
PseudoKeySelector()).equalTo(new PseudoKeySelector()).with(new 
PythonCross(info.operator, info.types, info.meta));
    --- End diff --
    
    A Cross is implemented as a join where every pair matches. I don't know the 
implications of doing it in such a hacky way. (The comparison overhead should 
be negligible considering the current performance.)


> Fixed all JavaDoc warnings in pact-common.
> ------------------------------------------
>
>                 Key: FLINK-337
>                 URL: https://issues.apache.org/jira/browse/FLINK-337
>             Project: Flink
>          Issue Type: Bug
>            Reporter: GitHub Import
>              Labels: github-import
>             Fix For: pre-apache
>
>         Attachments: pull-request-337-8997712741697485992.patch
>
>
> Fixed all JavaDoc warnings in pact-common. 
> I focused on fixing the build warnings but also added and updated some 
> JavaDoc on the way.
> Adresses issue 
> ([#330|https://github.com/stratosphere/stratosphere/issues/330] | 
> [FLINK-330|https://issues.apache.org/jira/browse/FLINK-330]).
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/stratosphere/stratosphere/pull/337
> Created by: [fhueske|https://github.com/fhueske]
> Labels: 
> Created at: Wed Dec 11 17:50:03 CET 2013
> State: closed



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to