gortiz opened a new pull request, #13666:
URL: https://github.com/apache/pinot/pull/13666

   This PR fixes an issue related to colocated join.
   
   For example, in ColocatedJoinEngineQuickStart, the following query should be 
executed in colocated fashion given:
   1. We are specifying the correct tableOptions
   2. We are joining by the partition key
   
   ```sql
   EXPLAIN IMPLEMENTATION PLAN FOR
   SELECT a.mySum, b.totalTrips
   FROM (select userUUID, totalTrips + daysSinceFirstTrip as mySum
         FROM userAttributes /*+ tableOptions(partition_key='userUUID', 
partition_size='4') */) as a
   JOIN userAttributes /*+ tableOptions(partition_key='userUUID', 
partition_size='4') */ as b
   ON a.userUUID = b.userUUID
   ```
   
   But the actual plan is:
   ```
   [0]@192.168.1.42:46431|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)
   ├── [1]@192.168.1.42:37333|[3] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@192.168.1.42:46431|[0]} (Subtree Omitted)
   ├── [1]@192.168.1.42:42455|[0] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@192.168.1.42:46431|[0]} (Subtree Omitted)
   ├── [1]@192.168.1.42:37933|[2] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@192.168.1.42:46431|[0]} (Subtree Omitted)
   └── [1]@192.168.1.42:43961|[1] 
MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@192.168.1.42:46431|[0]}
       └── [1]@192.168.1.42:43961|[1] PROJECT
           └── [1]@192.168.1.42:43961|[1] JOIN
               ├── [1]@192.168.1.42:43961|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)
               │   ├── [2]@192.168.1.42:37333|[0] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@192.168.1.42:34523|[0],[1]@192.168.1.42:42163|[1],[1]@192.168.1.42:43717|[2],[1]@192.168.1.42:46477|[3]}
 (Subtree Omitted)
               │   ├── [2]@192.168.1.42:37333|[2] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@192.168.1.42:34523|[0],[1]@192.168.1.42:42163|[1],[1]@192.168.1.42:43717|[2],[1]@192.168.1.42:46477|[3]}
 (Subtree Omitted)
               │   ├── [2]@192.168.1.42:37933|[1] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@192.168.1.42:34523|[0],[1]@192.168.1.42:42163|[1],[1]@192.168.1.42:43717|[2],[1]@192.168.1.42:46477|[3]}
 (Subtree Omitted)
               │   └── [2]@192.168.1.42:37933|[3] 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@192.168.1.42:34523|[0],[1]@192.168.1.42:42163|[1],[1]@192.168.1.42:43717|[2],[1]@192.168.1.42:46477|[3]}
               │       └── [2]@192.168.1.42:37933|[3] PROJECT
               │           └── [2]@192.168.1.42:37933|[3] TABLE SCAN 
(userAttributes) null
               └── [1]@192.168.1.42:43961|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)
                   ├── [3]@192.168.1.42:37333|[0] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@192.168.1.42:34523|[0],[1]@192.168.1.42:42163|[1],[1]@192.168.1.42:43717|[2],[1]@192.168.1.42:46477|[3]}
 (Subtree Omitted)
                   ├── [3]@192.168.1.42:37333|[2] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@192.168.1.42:34523|[0],[1]@192.168.1.42:42163|[1],[1]@192.168.1.42:43717|[2],[1]@192.168.1.42:46477|[3]}
 (Subtree Omitted)
                   ├── [3]@192.168.1.42:37933|[1] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@192.168.1.42:34523|[0],[1]@192.168.1.42:42163|[1],[1]@192.168.1.42:43717|[2],[1]@192.168.1.42:46477|[3]}
 (Subtree Omitted)
                   └── [3]@192.168.1.42:37933|[3] 
MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@192.168.1.42:34523|[0],[1]@192.168.1.42:42163|[1],[1]@192.168.1.42:43717|[2],[1]@192.168.1.42:46477|[3]}
                       └── [3]@192.168.1.42:37933|[3] PROJECT
                           └── [3]@192.168.1.42:37933|[3] TABLE SCAN 
(userAttributes) null
   ```
   
   Which is not colocated (see the first join input).
   
   The reason is that `PinotRelDistributionTraitRule.deriveDistribution` is 
doing `inputRelDistribution.apply(project.getMapping())` for projects. 
`project.getMapping()` returns null if the project is not a mapping, which is 
defined as _all inputs are references. In our case the project includes the 
projection key, but it also includes a call to `+` so it is not mapping.
   
   What `PinotRelDistributionTraitRule.deriveDistribution` should be doing is 
similar to Calcites 
[RelMdDistribution#L165](https://github.com/apache/calcite/blob/e371b336a8b404ed36955f517196a5e8606455d7/core/src/main/java/org/apache/calcite/rel/metadata/RelMdDistribution.java#L165C17-L165C34)
 (in fact we should be using that class, but that would require some refactors) 
which is ask for a partial mapping instead of complete mapping.
   
   I've tried that solution, but found what it looks to be a bug in Calcite. 
I've reported the issue in Calcite's dev mailist (see 
https://lists.apache.org/thread/qz18qxrfp5bqldnoln2tg4582g402zyv). But there is 
a simple solution that consist on creating a copy of the mapping itself and 
that is what I'm adding here (including a note to try to remove my hack once 
the issue or my lack of knowledge is solved).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to