[ 
https://issues.apache.org/jira/browse/FLINK-1319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14569271#comment-14569271
 ] 

ASF GitHub Bot commented on FLINK-1319:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/729#discussion_r31535397
  
    --- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java
 ---
    @@ -0,0 +1,94 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.operators;
    +
    +import org.apache.flink.api.common.UdfAnalysisMode;
    +import org.apache.flink.api.common.functions.Function;
    +import org.apache.flink.api.common.functions.InvalidTypesException;
    +import org.apache.flink.api.common.operators.DualInputSemanticProperties;
    +import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
    +import org.apache.flink.api.java.sca.UdfAnalyzer;
    +import org.apache.flink.api.java.sca.UdfAnalyzerException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public abstract class UdfOperatorUtils {
    +
    +   private static final Logger LOG = 
LoggerFactory.getLogger(UdfOperatorUtils.class);
    +
    +   public static void analyzeSingleInputUdf(SingleInputUdfOperator<?, ?, 
?> operator, Class<?> udfBaseClass,
    +                   Function udf, Keys<?> key) {
    +           final UdfAnalysisMode mode = 
operator.getExecutionEnvironment().getConfig().getUdfAnalysisMode();
    +           if (mode != UdfAnalysisMode.DISABLED) {
    +                   try {
    +                           final UdfAnalyzer analyzer = new 
UdfAnalyzer(udfBaseClass, udf.getClass(), operator.getInputType(), null,
    +                                           operator.getResultType(), key, 
null, mode == UdfAnalysisMode.OPTIMIZING_ENABLED);
    +                           final boolean success = analyzer.analyze();
    +                           if (success) {
    +                                   if (mode == 
UdfAnalysisMode.OPTIMIZING_ENABLED
    +                                                   && 
!operator.udfWithForwardedFieldsAnnotation(udf.getClass())) {
    +                                           
operator.setSemanticProperties((SingleInputSemanticProperties) 
analyzer.getSemanticProperties());
    +                                           
operator.setAnalyzedUdfSemanticsFlag();
    +                                   }
    +                                   else if (mode == 
UdfAnalysisMode.HINTING_ENABLED) {
    +                                           
analyzer.addSemanticPropertiesHints();
    +                                   }
    +                                   LOG.info(analyzer.getHintsString());
    +                           }
    +                   }
    +                   catch (InvalidTypesException e) {
    +                           LOG.debug("Unable to do UDF analysis due to 
missing type information.", e);
    +                   }
    +                   catch (UdfAnalyzerException e) {
    +                           LOG.debug("UDF analysis failed.", e);
    +                   }
    +           }
    +   }
    +
    +   public static void analyzeDualInputUdf(TwoInputUdfOperator<?, ?, ?, ?> 
operator, Class<?> udfBaseClass,
    +                   Function udf, Keys<?> key1, Keys<?> key2) {
    +           final UdfAnalysisMode mode = 
operator.getExecutionEnvironment().getConfig().getUdfAnalysisMode();
    +           if (mode != UdfAnalysisMode.DISABLED) {
    --- End diff --
    
    We could log that the analysis is disabled as well.


> Add static code analysis for UDFs
> ---------------------------------
>
>                 Key: FLINK-1319
>                 URL: https://issues.apache.org/jira/browse/FLINK-1319
>             Project: Flink
>          Issue Type: New Feature
>          Components: Java API, Scala API
>            Reporter: Stephan Ewen
>            Assignee: Timo Walther
>            Priority: Minor
>
> Flink's Optimizer takes information that tells it for UDFs which fields of 
> the input elements are accessed, modified, or frwarded/copied. This 
> information frequently helps to reuse partitionings, sorts, etc. It may speed 
> up programs significantly, as it can frequently eliminate sorts and shuffles, 
> which are costly.
> Right now, users can add lightweight annotations to UDFs to provide this 
> information (such as adding {{@ConstandFields("0->3, 1, 2->1")}}.
> We worked with static code analysis of UDFs before, to determine this 
> information automatically. This is an incredible feature, as it "magically" 
> makes programs faster.
> For record-at-a-time operations (Map, Reduce, FlatMap, Join, Cross), this 
> works surprisingly well in many cases. We used the "Soot" toolkit for the 
> static code analysis. Unfortunately, Soot is LGPL licensed and thus we did 
> not include any of the code so far.
> I propose to add this functionality to Flink, in the form of a drop-in 
> addition, to work around the LGPL incompatibility with ALS 2.0. Users could 
> simply download a special "flink-code-analysis.jar" and drop it into the 
> "lib" folder to enable this functionality. We may even add a script to 
> "tools" that downloads that library automatically into the lib folder. This 
> should be legally fine, since we do not redistribute LGPL code and only 
> dynamically link it (the incompatibility with ASL 2.0 is mainly in the 
> patentability, if I remember correctly).
> Prior work on this has been done by [~aljoscha] and [~skunert], which could 
> provide a code base to start with.
> *Appendix*
> Hompage to Soot static analysis toolkit: http://www.sable.mcgill.ca/soot/
> Papers on static analysis and for optimization: 
> http://stratosphere.eu/assets/papers/EnablingOperatorReorderingSCA_12.pdf and 
> http://stratosphere.eu/assets/papers/openingTheBlackBoxes_12.pdf
> Quick introduction to the Optimizer: 
> http://stratosphere.eu/assets/papers/2014-VLDBJ_Stratosphere_Overview.pdf 
> (Section 6)
> Optimizer for Iterations: 
> http://stratosphere.eu/assets/papers/spinningFastIterativeDataFlows_12.pdf 
> (Sections 4.3 and 5.3)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to