[
https://issues.apache.org/jira/browse/DRILL-6839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690149#comment-16690149
]
Aman Sinha commented on DRILL-6839:
-----------------------------------
[~vvysotskyi] it is certainly possible to change the `StreamAggPrule` to
generate single-phase agg if for some reason the 2 phase agg is not possible.
It would be a workaround but the performance will be poor because after a
`NestedLoopJoin` the output row count would increase substantially due to the
cross join, so we should ideally do local aggregation first before sending all
those rows to the foreman node. After doing local aggregation, we are only
sending M rows to the foreman where M is the number of minor fragments, whereas
otherwise it will send `O(N^2)` rows to the foreman where N is the number of
input rows.
So, it would be useful to determine why the 2 phase StreamingAgg was not
created in this example. Looking at the code, it is creating the distribution
traits using the distribution traits of the child. If there is no GROUP-BY in
the aggregation, then I would expect that DrillDistributionTrait.ANY should be
sufficient. It would be good to check if that is happening.
> Failed to plan (aggregate + Hash or NL join) when slice target is low
> ----------------------------------------------------------------------
>
> Key: DRILL-6839
> URL: https://issues.apache.org/jira/browse/DRILL-6839
> Project: Apache Drill
> Issue Type: Bug
> Reporter: Igor Guzenko
> Priority: Major
> Fix For: 1.16.0
>
>
> *Case 1.* When nested loop join is about to be used:
> - Option "_planner.enable_nljoin_for_scalar_only_" is set to false
> - Option "_planner.slice_target_" is set to low value for imitation of big
> input tables
>
> {code:java}
> @Category(SqlTest.class)
> public class CrossJoinTest extends ClusterTest {
> @BeforeClass
> public static void setUp() throws Exception {
> startCluster(ClusterFixture.builder(dirTestWatcher));
> }
> @Test
> public void testCrossJoinSucceedsForLowSliceTarget() throws Exception {
> try {
> client.alterSession(PlannerSettings.NLJOIN_FOR_SCALAR.getOptionName(),
> false);
> client.alterSession(ExecConstants.SLICE_TARGET, 1);
> queryBuilder().sql(
> "SELECT COUNT(l.nation_id) " +
> "FROM cp.`tpch/nation.parquet` l " +
> ", cp.`tpch/region.parquet` r")
> .run();
> } finally {
> client.resetSession(ExecConstants.SLICE_TARGET);
> client.resetSession(PlannerSettings.NLJOIN_FOR_SCALAR.getOptionName());
> }
> }
> }{code}
>
> *Case 2.* When hash join is about to be used:
> - Option "planner.enable_mergejoin" is set to false, so hash join will be
> used instead
> - Option "planner.slice_target" is set to low value for imitation of big
> input tables
> - Comment out //ruleList.add(HashJoinPrule.DIST_INSTANCE); in
> PlannerPhase.getPhysicalRules method
> {code:java}
> @Category(SqlTest.class)
> public class CrossJoinTest extends ClusterTest {
> @BeforeClass
> public static void setUp() throws Exception {
> startCluster(ClusterFixture.builder(dirTestWatcher));
> }
> @Test
> public void testInnerJoinSucceedsForLowSliceTarget() throws Exception {
> try {
> client.alterSession(PlannerSettings.MERGEJOIN.getOptionName(), false);
> client.alterSession(ExecConstants.SLICE_TARGET, 1);
> queryBuilder().sql(
> "SELECT COUNT(l.nation_id) " +
> "FROM cp.`tpch/nation.parquet` l " +
> "INNER JOIN cp.`tpch/region.parquet` r " +
> "ON r.nation_id = l.nation_id")
> .run();
> } finally {
> client.resetSession(ExecConstants.SLICE_TARGET);
> client.resetSession(PlannerSettings.MERGEJOIN.getOptionName());
> }
> }
> }
> {code}
>
> *Workaround:* To avoid the exception we need to set option
> "_planner.enable_multiphase_agg_" to false. By doing this we avoid
> unsuccessful attempts to create 2 phase aggregation plan in StreamAggPrule
> and guarantee that logical aggregate will be converted to physical one.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)