Hi Tez experts,
We are hitting some issues with proper handling of DataMovementEvent and InputFailedEvent and are hoping if you could provide us with some guidance. Questions: # What is the expectation for handling InputFailedEvent in AbstractLogicalInput implementations? What order do we expect to receive DataMovementEvents and InputFailedEvents for particular targetIndex and versionNumber? # When are we expected to call inputIsReady? Should we probe all physical inputs before signaling this? # inputIsReady method documentation says it can be called multiple times. In what scenario? # How can we be sure that there are no more events that we should wait for? A generous timeout? How do we avoid unnecessary latency? Some background for understanding this: We are using Tez DAG engine to run SCOPE[1] jobs. The operator and IO implementation for SCOPE is in C++, let's call it scopehost. On a high level, we use Tez task to assemble information (scopeHostRunSpec) to run the scopehost and then execute the process. The information is very similar to task spec: vertexid, taskid, input/output channels. Note that we do have custom edge managers as well. Problem details: We are hitting a problem with handling input failed events, and wondering if there is something fundamental that we are missing. Say ScopeInput implements AgstractLogicalInput. The handleEvents logic looks like: ``` for (final Event event : events) { if (event instanceof DataMovementEvent) { final ScopeDataMovementEventChannelDetailsPayload taskPayload = SerializationUtils.bufferToObject(dataMovementEvent.getUserPayload()); // if we have already seen an InputFailedEvent for this targetIndex // with this version then just ignore this DataMovementEvent if (obsoleteInputMap.containsKey(dataMovementEvent.getTargetIndex()) && obsoleteInputMap.get(dataMovementEvent.getTargetIndex()) == dataMovementEvent.getVersion()) { continue; } // Track the current latest version for this targetIndex this.inputIdVersionMap.put(dataMovementEvent.getTargetIndex(), dataMovementEvent.getVersion()); // save the physical input path for this targetIndex this.taskPhysicalInputs.put( dataMovementEvent.getTargetIndex(), taskPayload.getPhysicalInput() ); } else if (event instanceof InputFailedEvent) { // Keep track the version of input that failed for this targetIndex obsoleteInputMap.put(inputFailedEvent.getTargetIndex(), inputFailedEvent.getVersion()); } } // if I have seen all the inputs then signal the processor that this logical input is ready if (taskPhysicalInputs.size() == getNumPhysicalInputs()) { LOG.info("Got all inputs (input count = " + vertexPhysicalInputs.size() + ")"); getContext().inputIsReady(); } ``` where, handleDataMovementEvent and handleInputFailedEvent are: ``` private void handleInputFailedEvent(final InputFailedEvent inputFailedEvent) { obsoleteInputMap.put(inputFailedEvent.getTargetIndex(), inputFailedEvent.getVersion()); } private void handleDataMovementEvent(final DataMovementEvent dataMovementEvent) { final ScopeDataMovementEventChannelDetailsPayload taskPayload = SerializationUtils.bufferToObject(dataMovementEvent.getUserPayload()); if (obsoleteInputMap.containsKey(dataMovementEvent.getTargetIndex()) && obsoleteInputMap.get(dataMovementEvent.getTargetIndex()) == dataMovementEvent.getVersion()) { return; } this.inputIdVersionMap.put(dataMovementEvent.getTargetIndex(), dataMovementEvent.getVersion()); this.taskPhysicalInputs.put( dataMovementEvent.getTargetIndex(), taskPayload.getPhysicalInput() ); } ``` In certain run we get the following sequence of events: 2018-03-05 08:25:15,591 [INFO] [TezTaskEventRouter{attempt_1520099374056_29809_1_13_000015_5}] |task.ScopeInput|: Data movement event received for targetIndex: 0 version: 0 2018-03-05 08:25:15,591 [INFO] [TezTaskEventRouter{attempt_1520099374056_29809_1_13_000015_5}] |task.ScopeInput|: Got all inputs (input count = 1) 2018-03-05 08:25:15,591 [INFO] [TezTaskEventRouter{attempt_1520099374056_29809_1_13_000015_5}] |task.ScopeInput|: Input failed event received for targetIndex: 0 version: 0 2018-03-05 08:25:15,591 [INFO] [TezTaskEventRouter{attempt_1520099374056_29809_1_13_000015_5}] |task.ScopeInput|: Got all inputs (input count = 1) This is a ScopeInput with only 1 physical input. We get ScopeInput::handleEvents called with DataMovementEvent version 0. The code doesn't know if there might be more events pending. We expected only one physical input so we indicated inputIsReady. Subsequently we receive InputFailedEvent for this version. But we have already set inputIsReady and hence the processor assembles scopeHostRunSpec with stale information causing input read failure again. Reference: [1] SCOPE: easy and efficient parallel processing of massive data sets. https://dl.acm.org/citation.cfm?id=1454166