I neglected to mentioned that this is still work in progress (!). It has all the necessary parts to work with Flink but still has bugs and obviously needs lots of performance tuning. The reason I announced it early is to get feedback and hopefully bug reports from the dev@flink. But I must say you already gave me a lot of encouragement. Thanks! The major component missing in this system is to work with HDFS on distributed mode by default. Now, it uses the local file system (which is NFS shared by workers) on both local and distributed mode, which is terribly inefficient. For local mode, I want to have the local working directory as the default for relative paths (I think this works OK). For distributed mode, I want the HDFS and the user home on HDFS to be the default. I will try to fix this and have a workable system for Yarn by the end of this weekend. The local mode works fine now, I think. It was easy to port the MRQL physical operators to Flink DataSet methods; I have done something similar for Spark. The components that took me long to develop were the DataSources and the DataSinks. All the other MRQL backends use the hadoop HDFS. So I had to copy some of my files from my core system that uses HDFS to the Flink backend, change their names, and use the Flink filesystem packages (which are very similar to Hadoop HDFS). Another problem was that I had heavily used Hadoop Sequential files to store results for the other backends. So I had to switch to Flink's BinaryOutputFormat. The DataSinks in Flink are not very convenient. I wish there was a DataSink that contains an Iterator so that we can use the results for purposes other than storing them in files. Also, compared to Spark, there are very few ways to send results from workers to the master node after execution. Custom aggregators still have a bug when the aggregation result is a custom class (it's a serialization problem: the class of the deserialized result doesn't match the expected class, although they have the same name). In general, I encountered some problems with serialization: sometimes I couldn't use inner classes for the Flink functional parameters and I had to define them as static classes. Another thing that took me a couple of days to fix was to dump data from an Iterator to a Flink Binary file. Dumping the iterator data into a vector first was not feasible because these data may be huge. First, I tried to use the fromCollection method, but it required that the Iterator be serializable (It doesn't make sense; how do you make an Iterator serializable?) Then I used the following hack:

 BinaryOutputFormat of = new BinaryOutputFormat();
 of.setOutputFilePath(path);
 of.open(0,2);
 ...
It took me a while to find that I need to put of.open(0,2) instead of of.open(0,1). Why do we need 2 tasks? So, thanks for your encouragement. I will try to fix some of these bugs by Monday and have a system that performs well on Yarn.
Leonidas


On 08/28/2014 03:58 AM, Fabian Hueske wrote:
That's really cool!

I'm also curious about your experience with Flink. Did you find major
obstacles that you needed to overcome for the integration?
Is there some write-up / report available somewhere (maybe in JIRA) that
discusses the integration? Are you using Flink's full operator set or do
you compile everything into Map and Reduce?

Best, Fabian


2014-08-28 7:37 GMT+02:00 Aljoscha Krettek <[email protected]>:

Very nice indeed! How well is this tested? Can it already run all the
example queries you have? Can you say anything about the performance
of the different underlying execution engines?

On Thu, Aug 28, 2014 at 12:58 AM, Stephan Ewen <[email protected]> wrote:
Wow, that is impressive!


On Thu, Aug 28, 2014 at 12:06 AM, Ufuk Celebi <[email protected]> wrote:

Awesome, indeed! Looking forward to trying it out. :)


On Wed, Aug 27, 2014 at 10:52 PM, Sebastian Schelter <[email protected]>
wrote:

Awesome!


2014-08-27 13:49 GMT-07:00 Leonidas Fegaras <[email protected]>:

Hello,
I would like to let you know that Apache MRQL can now run queries on
Flink.
MRQL is a query processing and optimization system for large-scale,
distributed data analysis, built on top of Apache Hadoop/map-reduce,
Hama, Spark, and now Flink. MRQL queries are SQL-like but not SQL.
They can work on complex, user-defined data (such as JSON and XML)
and
can express complex queries (such as pagerank and matrix
factorization).
MRQL on Flink has been tested on local mode and on a small Yarn
cluster.
Here are the directions on how to build the latest MRQL snapshot:

git clone
https://git-wip-us.apache.org/repos/asf/incubator-mrql.git
mrql
cd mrql
mvn -Pyarn clean install

To make it run on your cluster, edit conf/mrql-env.sh and set the
Java, the Hadoop, and the Flink installation directories.

Here is how to run PageRank. First, you need to generate a random
graph and store it in a file using the MRQL query RMAT.mrql:

bin/mrql.flink -local queries/RMAT.mrql 1000 10000

This will create a graph with 1K nodes and 10K edges using the RMAT
algorithm, will remove duplicate edges, and will store the graph in
the binary file graph.bin. Then, run PageRank on Flink mode using:

bin/mrql.flink -local queries/pagerank.mrql

To run MRQL/Flink on a Yarn cluster, first start the Flink container
on Yarn by running the script yarn-session.sh, such as:

${FLINK_HOME}/bin/yarn-session.sh -n 8

This will print the name of the Flink JobManager, which can be used
in:
export FLINK_MASTER=name-of-the-Flink-JobManager
bin/mrql.flink -dist -nodes 16 queries/RMAT.mrql 1000000 10000000

This will create a graph with 1M nodes and 10M edges using RMAT on
16
nodes (slaves). You can adjust these numbers to fit your cluster.
Then, run PageRank using:

bin/mrql.flink -dist -nodes 16 queries/pagerank.mrql

The MRQL project page is at: http://mrql.incubator.apache.org/

Let me know if you have any questions.
Leonidas Fegaras



Reply via email to