Hi Stephan,
Thank you for fixing this so fast (given that you are very busy preparing your first release).
Maybe I should explain why I need to work on GenericTypes.
I am trying to make Apache MRQL run on Flink. MRQL is a query processing and optimization system for large-scale, distributed data analysis, built on top of Apache Hadoop, Hama, and Spark. MRQL queries are SQL-like but not SQL. They can work on complex nested data (JSON, XML, etc) and can express complex queries (pagerank, matrix factorization, etc). Let me make this clear first: Flink doesn't really need a query language. Flink programs are like queries because operations are collected and optimized. This gives Flink an edge over Spark. The reason I want to port MRQL to Flink is for the benefit of our project only: we want our queries to run on multiple platforms so that users can play and experiment with these systems without having to learn their APIs and without changing the queries. So I am not interested in the Flink optimizations (which is a shame, I know) since our system has it's own optimizer (which is currently not cost-based). So, to make the story short, the MRQL data model is like AVRO since it must support complex types. So the getKey methods must map an AVRO-like object to another AVRO-like object (the key). It doesn't mean that the key is the same as the value. It's fully understandable (and expected) that I will not be able to use the benefits of the Flink optimizer much on GenericTypes. Anyway, I am in the process of learning Flink and I will probably bother you with more questions later (but I will wait for the first Flink release first, since this will keep you busy for a while).
Thanks for your help
Leonidas



On 07/30/2014 08:09 PM, Stephan Ewen wrote:
Addendum: I made a quick fix for the POJOs to support the expression keys for reducers. The example from the above mail works with the code in this branch: https://github.com/StephanEwen/incubator-flink/commits/pojofix


On Thu, Jul 31, 2014 at 3:06 AM, Stephan Ewen <[email protected] <mailto:[email protected]>> wrote:

    Yep, there is a flaw in the POJO Code, it incorrectly replaces the
    GenericTypes:

    @Leonidas: To explain what is going on:

      - Flink determines the types of the functions via reflection and
    build up its own model of representing them at runtime. It handles
    basic types (int, String, double, ...), arrays, and tuples in a
    special way, the rest is treated in a generic fashion. The FData
    class is such a "generic" type in flink.

      - We recently added experimental code to analyze those types and
    represent their contained fields in a transparent way. Those are
    the POJO types, the code at the bottom illustrates what they can do.
      They replace the "generic types" at several levels, but their
    implementation is incomplete currently


    A few remarks on how to best use the system:
      - Flink keeps data always in a serialized form. That allows it
    to operate very robust with respect to memory pressure, spilling, etc.

      - A way in which we gain efficiency is to access in that binary
    data only what is really necessary (only the parts that make up
    the key, for example).

      - If you define the entire object to be the key, you prevent the
    system from doing these optimizations. If you can actually define
    which part is the key, you allow it more efficient operation.



    ===================================

    Example of PoJos and Expression Fields

    public static void main(String[] args) throws Exception {
    ExecutionEnvironment env =
    ExecutionEnvironment.getExecutionEnvironment();
    DataSet<FData> data = env.fromElements(new FData("some"), new
    FData("test"), new FData("POJOs"));
    data.groupBy("theString").reduceGroup(new GroupbyReducer()).print();

    env.execute();
    }

    public static class FData implements Serializable, Comparable<FData> {
    public String theString;

    public FData () {
    theString = "";
    }
    public FData (String data) {
    this.theString = data;
    }
    @Override
    public int compareTo ( FData x ) { return
    theString.compareTo(x.theString); }
    @Override
    public String toString() {
    return theString;
    }
    }

    public static final class GroupbyReducer extends
    GroupReduceFunction<FData,FData> {
    @Override
    public void reduce ( Iterator<FData> values, Collector<FData> out ) {
    while (values.hasNext()) {
    out.collect(values.next());
    }
    }
    }



Reply via email to