Hi Tez experts,


We are hitting some issues with proper handling of DataMovementEvent and
InputFailedEvent and are hoping if you could provide us with some guidance.



Questions:

 # What is the expectation for handling InputFailedEvent in
AbstractLogicalInput implementations? What order do we expect to receive
DataMovementEvents and InputFailedEvents for particular targetIndex and
versionNumber?

 # When are we expected to call inputIsReady? Should we probe all physical
inputs before signaling this?

 # inputIsReady method documentation says it can be called multiple times.
In what scenario?

 # How can we be sure that there are no more events that we should wait
for? A generous timeout? How do we avoid unnecessary latency?


Some background for understanding this:



We are using Tez DAG engine to run SCOPE[1] jobs. The operator and IO
implementation for SCOPE is in C++, let's call it scopehost.

On a high level, we use Tez task to assemble information (scopeHostRunSpec)
to run the scopehost and then execute the process. The information is very
similar to task spec: vertexid, taskid, input/output channels.

Note that we do have custom edge managers as well.

Problem details:


We are hitting a problem with handling input failed events, and wondering
if there is something fundamental that we are missing.



Say ScopeInput implements AgstractLogicalInput.

The handleEvents logic looks like:



```

    for (final Event event : events) {

      if (event instanceof DataMovementEvent) {

        final ScopeDataMovementEventChannelDetailsPayload taskPayload =


SerializationUtils.bufferToObject(dataMovementEvent.getUserPayload());



        // if we have already seen an InputFailedEvent for this targetIndex

        // with this version then just ignore this DataMovementEvent

        if (obsoleteInputMap.containsKey(dataMovementEvent.getTargetIndex())

          && obsoleteInputMap.get(dataMovementEvent.getTargetIndex())

           == dataMovementEvent.getVersion()) {

          continue;

        }



        // Track the current latest version for this targetIndex

        this.inputIdVersionMap.put(dataMovementEvent.getTargetIndex(),
dataMovementEvent.getVersion());



        // save the physical input path for this targetIndex

        this.taskPhysicalInputs.put(

          dataMovementEvent.getTargetIndex(),

          taskPayload.getPhysicalInput()

        );

      } else if (event instanceof InputFailedEvent) {

        // Keep track the version of input that failed for this targetIndex

        obsoleteInputMap.put(inputFailedEvent.getTargetIndex(),
inputFailedEvent.getVersion());

      }

    }



    // if I have seen all the inputs then signal the processor that this
logical input is ready

    if (taskPhysicalInputs.size() == getNumPhysicalInputs()) {

      LOG.info("Got all inputs (input count = " +
vertexPhysicalInputs.size() + ")");

      getContext().inputIsReady();

    }

```



where, handleDataMovementEvent and handleInputFailedEvent are:



```

  private void handleInputFailedEvent(final InputFailedEvent
inputFailedEvent) {

    obsoleteInputMap.put(inputFailedEvent.getTargetIndex(),
inputFailedEvent.getVersion());

  }



  private void handleDataMovementEvent(final DataMovementEvent
dataMovementEvent) {

    final ScopeDataMovementEventChannelDetailsPayload taskPayload =


SerializationUtils.bufferToObject(dataMovementEvent.getUserPayload());

    if (obsoleteInputMap.containsKey(dataMovementEvent.getTargetIndex())

        && obsoleteInputMap.get(dataMovementEvent.getTargetIndex())

           == dataMovementEvent.getVersion()) {

      return;

    }



    this.inputIdVersionMap.put(dataMovementEvent.getTargetIndex(),
dataMovementEvent.getVersion());

    this.taskPhysicalInputs.put(

        dataMovementEvent.getTargetIndex(),

        taskPayload.getPhysicalInput()

    );

  }

```



In certain run we get the following sequence of events:



2018-03-05 08:25:15,591 [INFO]
[TezTaskEventRouter{attempt_1520099374056_29809_1_13_000015_5}]
|task.ScopeInput|: Data movement event received for targetIndex: 0 version:
0

2018-03-05 08:25:15,591 [INFO]
[TezTaskEventRouter{attempt_1520099374056_29809_1_13_000015_5}]
|task.ScopeInput|: Got all inputs (input count = 1)

2018-03-05 08:25:15,591 [INFO]
[TezTaskEventRouter{attempt_1520099374056_29809_1_13_000015_5}]
|task.ScopeInput|: Input failed event received for targetIndex: 0 version: 0

2018-03-05 08:25:15,591 [INFO]
[TezTaskEventRouter{attempt_1520099374056_29809_1_13_000015_5}]
|task.ScopeInput|: Got all inputs (input count = 1)





This is a ScopeInput with only 1 physical input. We get
ScopeInput::handleEvents called with DataMovementEvent version 0. The code
doesn't know if there might be more events pending.

We expected only one physical input so we indicated inputIsReady.
Subsequently we receive InputFailedEvent for this version. But we have
already set inputIsReady and hence the processor

assembles scopeHostRunSpec with stale information causing input read
failure again.





Reference:

[1] SCOPE: easy and efficient parallel processing of massive data sets.
https://dl.acm.org/citation.cfm?id=1454166

Reply via email to