[ 
https://issues.apache.org/jira/browse/FLINK-1319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14375955#comment-14375955
 ] 

Timo Walther commented on FLINK-1319:
-------------------------------------

Hey everybody,
I have implemented a prototype that analyzes UDFs for ForwardedFields. 

You can have a look on my current implementation at 
https://github.com/apache/flink/compare/master...twalthr:sca#files_bucket 

I have added a lot of test cases (currently 76) in order to be as robust as 
possible.

Here are two examples what the analyzer is currently able to do (but it can 
much more ^^ ):

{code}
// analyzer returns @ForwardedFields("1")
public static class Map34 implements MapFunction<Tuple2<Long, Long>, 
Tuple2<Long, Long>> {
        private Tuple2<Long, Long> t;
        @Override
        public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws 
Exception {
                if (value == null) {
                        return value;
                }
                else if (value.f0 != null) {
                        t = value;
                        t.f0 = 23L;
                        return t;
                }
                return new Tuple2<>(value.f0, value.f1);
        }
}
{code}

{code}
// analyzer returns @ForwardedFields("*")
public static class Map23 implements MapFunction<Tuple1<Integer>, 
Tuple1<Integer>> {
        @Override
        public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
                if (value.f0.equals(23)) {
                        return new Tuple1<Integer>(value.<Integer> getField(0));
                } else if (value.f0.equals(22)) {
                        Tuple1<Integer> inputContainer = new Tuple1<Integer>();
                        inputContainer.f0 = value.f0;
                        return new Tuple1<Integer>(inputContainer.<Integer> 
getField(0));
                } else {
                        return value;
                }
        }
}
{code}

The question is how I should proceed now:

- Where should I put my code into?
Stephan mentioned a separate maven module for that a while ago...
- Do we want to enable it by default immediately?
My suggestion would be to first only print "recommendations" to the log, so 
that the user can than add forwarded fields if he/she wants to.
- What else can we do, if we are going through the UDF code anyway?
Maybe it also makes sense to introduce a ".optimizeUdf()" method in the Java 
API, so the user can explicitly state if he/she wants such optimizations. In a 
next step we could also do other UDF optimizations such as putting object 
creation ("new XYZ()") into instance variables where possible, check that 
objects in a FilterFunction are not modified, where to enable/disable 
object-reuse mode, or where deserialization can be skipped if a field is not 
used within the UDF. A lot of ideas, but I don't know which of them would make 
sense.


> Add static code analysis for UDFs
> ---------------------------------
>
>                 Key: FLINK-1319
>                 URL: https://issues.apache.org/jira/browse/FLINK-1319
>             Project: Flink
>          Issue Type: New Feature
>          Components: Java API, Scala API
>            Reporter: Stephan Ewen
>            Assignee: Timo Walther
>            Priority: Minor
>
> Flink's Optimizer takes information that tells it for UDFs which fields of 
> the input elements are accessed, modified, or frwarded/copied. This 
> information frequently helps to reuse partitionings, sorts, etc. It may speed 
> up programs significantly, as it can frequently eliminate sorts and shuffles, 
> which are costly.
> Right now, users can add lightweight annotations to UDFs to provide this 
> information (such as adding {{@ConstandFields("0->3, 1, 2->1")}}.
> We worked with static code analysis of UDFs before, to determine this 
> information automatically. This is an incredible feature, as it "magically" 
> makes programs faster.
> For record-at-a-time operations (Map, Reduce, FlatMap, Join, Cross), this 
> works surprisingly well in many cases. We used the "Soot" toolkit for the 
> static code analysis. Unfortunately, Soot is LGPL licensed and thus we did 
> not include any of the code so far.
> I propose to add this functionality to Flink, in the form of a drop-in 
> addition, to work around the LGPL incompatibility with ALS 2.0. Users could 
> simply download a special "flink-code-analysis.jar" and drop it into the 
> "lib" folder to enable this functionality. We may even add a script to 
> "tools" that downloads that library automatically into the lib folder. This 
> should be legally fine, since we do not redistribute LGPL code and only 
> dynamically link it (the incompatibility with ASL 2.0 is mainly in the 
> patentability, if I remember correctly).
> Prior work on this has been done by [~aljoscha] and [~skunert], which could 
> provide a code base to start with.
> *Appendix*
> Hompage to Soot static analysis toolkit: http://www.sable.mcgill.ca/soot/
> Papers on static analysis and for optimization: 
> http://stratosphere.eu/assets/papers/EnablingOperatorReorderingSCA_12.pdf and 
> http://stratosphere.eu/assets/papers/openingTheBlackBoxes_12.pdf
> Quick introduction to the Optimizer: 
> http://stratosphere.eu/assets/papers/2014-VLDBJ_Stratosphere_Overview.pdf 
> (Section 6)
> Optimizer for Iterations: 
> http://stratosphere.eu/assets/papers/spinningFastIterativeDataFlows_12.pdf 
> (Sections 4.3 and 5.3)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to