GitHub user uce opened a pull request:
https://github.com/apache/flink/pull/705
[FLINK-1954] [FLINK-1636] [runtime] Improve partition not found error
handling
**Problem**: cancelling of tasks sometimes leads to misleading error
messages about "not found partitions". This is an artifact of task cancelling.
If a task (consumer) consumes data from another remote task (producer), its
sends a partition request over the network. If the producer fails concurrently
with this
request, the request returns with a PartitioNotFoundException to the
consumer. If this error message is received *before* the consumer is cancelled
(as a result of the failing producer), you see the misleading error being
attributed to the consumer. This makes it hard to trace the root cause of the
problem (the
failing producer).
**Solution**: when a consumer receives a remote PartitionNotFoundException,
it asks the central job manager whether the producer is still running or has
failed.
If the producer is still running, the partition request is send again
(using an exponential back off with default max back off of 3s). If the
following requests fail again, the consumer fails with a
PartitionNotFoundException.
If the producer has failed, the consumer is cancelled.
If the producer is not running and has not failed, there is a bug either in
the consumer task setup (e.g. requesting a non-existing result) or in the
network stack (e.g. unsafe publication of produced results), in which case the
error is attributed to the consumer.
---
The new Akka messages introduced with this change are only exchanged in
error cases and don't affect normal operation.
Normal operation (not affected by this change):
```
TM1 => TM2: request result
TM2 => TM1: result
```
Error case:
```
TM1=>TM2: request result
TM2=>TM1: PartitionNotFoundException
TM1=>JM: check partition state
JM=>TM1: retrigger request -OR- cancel consumer
```
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/uce/incubator-flink partition_not_found-1636
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/705.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #705
----
commit a480a638056e4f5300573f543acf46cd239f2674
Author: Ufuk Celebi <[email protected]>
Date: 2015-05-11T14:34:55Z
[FLINK-1954] [FLINK-1636] [runtime] Improve partition not found error
handling
Problem: cancelling of tasks sometimes leads to misleading error messages
about
"not found partitions". This is an artifact of task cancelling. If a task
(consumer) consumes data from another remote task (producer), its sends a
partition request over the network. If the producer fails concurrently with
this
request, the request returns with a PartitioNotFoundException to the
consumer.
If this error message is received *before* the consumer is cancelled (as a
result of the failing producer), you see the misleading error being
attributed
to the consumer. This makes it hard to trace the root cause of the problem
(the
failing producer).
Solution: when a consumer receives a remote PartitionNotFoundException, it
asks
the central job manager whether the producer is still running or has failed.
If the producer is still running, the partition request is send again
(using an
exponential back off). If the following requests fail again, the consumer
fails
with a PartitionNotFoundException.
If the producer has failed, the consumer is cancelled.
If the producer is not running and has not failed, there is a bug either in
the
consumer task setup (e.g. requesting a non-existing result) or in the
network
stack (e.g. unsafe publication of produced results), in which case the
error is
attributed to the consumer.
---
The new Akka messages introduced with this change are only exchanged in
error
cases and don't affect normal operation.
Normal operation (not affected by this change):
- TM1=>TM2: request result
- TM2=>TM1: result
Error case:
- TM1=>TM2: request result
- TM2=>TM1: PartitionNotFoundException
- TM1=>JM: check partition state
- JM=>TM1: retrigger request -OR- cancel consumer
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---