Hi Stephan,
Thank you for fixing this so fast (given that you are very busy
preparing your first release).
Maybe I should explain why I need to work on GenericTypes.
I am trying to make Apache MRQL run on Flink. MRQL is a query processing
and optimization system for large-scale, distributed data analysis,
built on top of Apache Hadoop, Hama, and Spark. MRQL queries are
SQL-like but not SQL. They can work on complex nested data (JSON, XML,
etc) and can express complex queries (pagerank, matrix factorization,
etc). Let me make this clear first: Flink doesn't really need a query
language. Flink programs are like queries because operations are
collected and optimized. This gives Flink an edge over Spark. The reason
I want to port MRQL to Flink is for the benefit of our project only: we
want our queries to run on multiple platforms so that users can play and
experiment with these systems without having to learn their APIs and
without changing the queries. So I am not interested in the Flink
optimizations (which is a shame, I know) since our system has it's own
optimizer (which is currently not cost-based). So, to make the story
short, the MRQL data model is like AVRO since it must support complex
types. So the getKey methods must map an AVRO-like object to another
AVRO-like object (the key). It doesn't mean that the key is the same as
the value. It's fully understandable (and expected) that I will not be
able to use the benefits of the Flink optimizer much on GenericTypes.
Anyway, I am in the process of learning Flink and I will probably bother
you with more questions later (but I will wait for the first Flink
release first, since this will keep you busy for a while).
Thanks for your help
Leonidas
On 07/30/2014 08:09 PM, Stephan Ewen wrote:
Addendum: I made a quick fix for the POJOs to support the expression
keys for reducers. The example from the above mail works with the code
in this branch:
https://github.com/StephanEwen/incubator-flink/commits/pojofix
On Thu, Jul 31, 2014 at 3:06 AM, Stephan Ewen <[email protected]
<mailto:[email protected]>> wrote:
Yep, there is a flaw in the POJO Code, it incorrectly replaces the
GenericTypes:
@Leonidas: To explain what is going on:
- Flink determines the types of the functions via reflection and
build up its own model of representing them at runtime. It handles
basic types (int, String, double, ...), arrays, and tuples in a
special way, the rest is treated in a generic fashion. The FData
class is such a "generic" type in flink.
- We recently added experimental code to analyze those types and
represent their contained fields in a transparent way. Those are
the POJO types, the code at the bottom illustrates what they can do.
They replace the "generic types" at several levels, but their
implementation is incomplete currently
A few remarks on how to best use the system:
- Flink keeps data always in a serialized form. That allows it
to operate very robust with respect to memory pressure, spilling, etc.
- A way in which we gain efficiency is to access in that binary
data only what is really necessary (only the parts that make up
the key, for example).
- If you define the entire object to be the key, you prevent the
system from doing these optimizations. If you can actually define
which part is the key, you allow it more efficient operation.
===================================
Example of PoJos and Expression Fields
public static void main(String[] args) throws Exception {
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
DataSet<FData> data = env.fromElements(new FData("some"), new
FData("test"), new FData("POJOs"));
data.groupBy("theString").reduceGroup(new GroupbyReducer()).print();
env.execute();
}
public static class FData implements Serializable, Comparable<FData> {
public String theString;
public FData () {
theString = "";
}
public FData (String data) {
this.theString = data;
}
@Override
public int compareTo ( FData x ) { return
theString.compareTo(x.theString); }
@Override
public String toString() {
return theString;
}
}
public static final class GroupbyReducer extends
GroupReduceFunction<FData,FData> {
@Override
public void reduce ( Iterator<FData> values, Collector<FData> out ) {
while (values.hasNext()) {
out.collect(values.next());
}
}
}