[
https://issues.apache.org/jira/browse/TAJO-472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13861318#comment-13861318
]
Min Zhou edited comment on TAJO-472 at 1/3/14 8:13 AM:
-------------------------------------------------------
Hi Jihoon,
That's a good question. Data shuffling is definitely a big question for
distributed system. Probably we are thinking about the same thing. Please
check my proposal, I am thinking about creating a table through hash
partitioning. Before dive into details, Let's figure out what kind of SQL query
would lead data shuffling. From my knowledge, those are joins, group by
aggregations, distinct aggregations and window functions.
1. For table joining, because that data warehouse always use snowflake model
which has a fact table and several dimension tables. Domain experts can define
which key those table will join on. If it's a big table join with small
tables, we can use something like pig's replicated join. If all of those
tables are big table, we can partition those tables on the same join key before
joins, the same partition will fall into the same node. Since tables are
co-located, joining those tables is just in local.
Thus avoid data shuffling. Although this approach has somewhat limitation, for
our experience, it can cover most area of our business. If you reviewed the
proposal I submitted, you can know that I introduce a way to do this job
through CTAS.
2. Regarding to group by aggregation. Because we can't expect which dimensions
of the table will be grouped by, pre-partition won't help. Typically, due to
we can do pre-calculate on the map side, combing partial result,shuffle data is
always much smaller than the other types of queries which lead a shuffle. We
can use push model rather than pull model, streaming data from one side to the
other side through memory, don't touch disks. This is the way search engine
does. We shouldn't worry about fault tolerance, because the query is quite
fast. We can simply relaunch it if failed.
3. Distinct aggregation, mostly it should be a distinct count, is hard to
solve. The bottleneck of this kind of query is not data shuffle, but inserting
large number of tuples into a hashmap or searching one key from this big
hashmap. There are several solutions, like bitmap, array for numeric columns,
but each solution is suitable for a certain scenario. If use can accept
approximate result, HyperLogLog would help.
BTW, tajo divided a query into a number of execution blocks. The second blocks
will never start until the first one completes. On the other hand, hadoop
mapreduce will launch reduces tasks if the portion of finished map tasks reach
a certain value, for example 80%. This approach will significantly reduce the
time of data shuffle. I think we can launch the first execution block and
randomly choose a batch of workers for receiving data. Map side directly
streaming data to those nodes. Since workers are daemons and we needn't launch
a new jvm, data shuffling is cheaper than hadoop mapreduce. After the first
completes, data shuffe will be finished at the same time.
Regards,
Min
was (Author: coderplay):
Hi Jihoon,
That's a good question. Data shuffling is definitely a big question for
distributed system. Probably we are thinking about the same thing. Please
check my proposal, I am thinking about creating a table through hash
partitioning. Before dive into details, Let's figure out what kind of SQL query
would lead data shuffling. From my knowledge, those are joins, group by
aggregations, distinct aggregations and window functions.
1. For table joining, because that data warehouse always use snowflake model
which has a fact table and several dimension tables. Domain experts can define
which key those table will join on. If it's a big table join with small
tables, we can use something like pig's replicated join. If all of those
tables are big table, we can partition those tables on the same join key before
joins, the same partition will fall into the same node. Since tables are
co-located, joining those tables is just in local.
Thus avoid data shuffling. Although this approach has somewhat limitation, for
our experience, it can cover most area of our business. If you reviewed the
proposal I submitted, you can know that I introduce a way to do this job
through CTAS.
2. Regarding to group by aggregation. Because we can't expect which dimensions
of the table will be grouped by, pre-partition won't help. Typically, due to
we can do pre-calculate on the map side, combing partial result,shuffle data is
always much smaller than the other types of queries which lead a shuffle. We
can use push model rather than pull model, streaming data from one side to the
other side through memory, don't touch disks. This is the way search engine
does. We shouldn't worry about fault tolerance, because the query is quite
fast. We can simply relaunch it if failed.
3. Distinct aggregation, mostly it should be a distinct count, is hard to
solve. The bottleneck of this kind of query is not data shuffle, but inserting
large number of tuples into a hashmap or searching one key from this big
hashmap. There are several solutions, like bitmap, array for numeric columns,
but each solution is suitable for a certain scenario. If use can accept
approximate result, HyperLogLog would help.
BTW, tajo divided a query into a number of execution blocks. The second blocks
will never start until the first one completes. On the other hand, hadoop
mapreduce will launch reduces tasks if the portion of finished map tasks reach
a certain value, for example 80%. This approach will significantly reduce the
time of data shuffle. I think if the data volume is small, we can launch those
execution blocks in the same time and directly streaming data between those
execution blocks since data shuffling is cheap then.
Regards,
Min
> Umbrella ticket for accelerating query speed through memory cached table
> ------------------------------------------------------------------------
>
> Key: TAJO-472
> URL: https://issues.apache.org/jira/browse/TAJO-472
> Project: Tajo
> Issue Type: New Feature
> Components: distributed query plan, physical operator
> Reporter: Min Zhou
> Assignee: Min Zhou
> Attachments: TAJO-472 Proposal.pdf
>
>
> Previously, I was involved as a technical expert into an in-memory database
> for on-line businesses in Alibaba group. That's an internal project, which
> can do group by aggregation on billions of rows in less than 1 second.
> I'd like to apply this technology into tajo, make it much faster than it is.
> From some benchmark, we believe that spark&shark currently is the fastest
> solution among all the open source interactive query system , such as impala,
> presto, tajo. The main reason is that it benefit from in-memory data.
> I will take memory cached table as my first step to accelerate query speed
> of tajo. Actually , this is the reason why I concerned at table partition
> during Xmas and new year holidays.
> Will submit a proposal soon.
>
--
This message was sent by Atlassian JIRA
(v6.1.5#6160)