[
https://issues.apache.org/jira/browse/PHOENIX-5362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16934023#comment-16934023
]
Chinmay Kulkarni commented on PHOENIX-5362:
-------------------------------------------
[~gjacoby] I guess the recreation of the query plan in each mapper during
mapper retries would have similar repercussions as the normal case when query
plans differ between the driver and mappers.
PHOENIX-5027 describes an online index rebuild so is it possible that the scans
generated per split (to be used for each mapper) in the driver don't cover all
the data, since by the time the mappers are run/re-run and regenerate the query
plan, more data was upserted in the base table?
I'm not entirely aware of how PhoenixIndexImportDirectMapper declares a success
in the case of map (does success just mean that no exception is thrown?).
Overall, each mapper gets a split which is responsible for a list of scans as
decided in the driver
[[1]|https://github.com/apache/phoenix/blob/c43a3e30246127bfd5139b9e556906d6be7a8c0f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java#L151].
When the mapper task is scheduled, it creates a record reader for its split
and before that, also creates a query plan
[[2]|https://github.com/apache/phoenix/blob/c43a3e30246127bfd5139b9e556906d6be7a8c0f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java#L77-L80].
When the record reader for each mapper is initialized, we iterate over the
list of scans (that the split is responsible for) and create an iterator
(wrapped in another iterator :() which is finally wrapped in a result set (see
[[3]|https://github.com/apache/phoenix/blob/c43a3e30246127bfd5139b9e556906d6be7a8c0f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java#L150]
and
[[4]|https://github.com/apache/phoenix/blob/c43a3e30246127bfd5139b9e556906d6be7a8c0f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java#L163]).
This result set is what each mapper uses to read data
[[5]|https://github.com/apache/phoenix/blob/c43a3e30246127bfd5139b9e556906d6be7a8c0f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java#L184].
Note that the scans we iterate over in the mapper
[[6]|https://github.com/apache/phoenix/blob/c43a3e30246127bfd5139b9e556906d6be7a8c0f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java#L114]
and
[[7]|https://github.com/apache/phoenix/blob/c43a3e30246127bfd5139b9e556906d6be7a8c0f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java#L130]
are as generated by the driver. When the mapper task retries/is scheduled,
though it recreates the query plan, the list of scans it works with are still
the ones that the driver had created, and not new ones as per the recreated
query plan.
I guess it may be possible that the mapper's iterator does not cover all the
scans since there may be data written to the base table between the time that
the driver generated scans/splits and the time the mapper tasks were executed?
One thing to run through the debugger and check is the key ranges of the scans
run on a mapper vs. the key range of its split. They should ideally be the
same, no? If not, that would explain the missing rows when rebuilding indexes.
Are you also seeing this issue on offline index rebuilds?
> Mappers should use the queryPlan from the driver rather than regenerating the
> plan
> ----------------------------------------------------------------------------------
>
> Key: PHOENIX-5362
> URL: https://issues.apache.org/jira/browse/PHOENIX-5362
> Project: Phoenix
> Issue Type: Improvement
> Reporter: Chinmay Kulkarni
> Priority: Major
> Fix For: 4.15.1, 5.1.1
>
>
> Currently, PhoenixInputFormat#getQueryPlan already generates a queryPlan and
> we use this plan to get the scans and splits for the MR job. In
> PhoenixInputFormat#createRecordReader which is called inside each mapper, we
> again create a queryPlan and pass this to the PhoenixRecordReader instance.
> There are multiple problems with this approach:
> # The mappers already have information about the scans from the driver code.
> We potentially just need to wrap these scans in an iterator and create a
> subsequent ResultSet.
> # The mappers don't need most of the information embedded within a queryPlan,
> so they shouldn't need to regenerate the plan.
> # There are weird corner cases that can occur if we replan the query in each
> mapper. For ex: If there is an index creation or metadata change in between
> when the MR job was created, and when the mappers actually launch. In this
> case, the mappers have the scans created for the first queryPlan, but the
> mappers will use iterators created for the second queryPlan. In such cases,
> the issued scans would not match the queryPlan embedded in the mappers'
> iterators/ResultSet. We could potentially miss some scans/be looking for more
> than we actually require since we check the original scans for this size. The
> resolved table would be as per the new queryPlan, and there could be a
> mismatch here as well (considering the index creation case). There are
> potentially other repercussions in case of intermediary metadata changes as
> well.
> Serializing a subset of the information (like the projector, which iterator
> to use, etc.) of a QueryPlan and passing it from the driver to the mappers
> without having them regenerate the plans seems like the best way forward.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)