zhijiangW commented on a change in pull request #9854: [FLINK-14230][task] Change the endInput call of the downstream operator to after the upstream operator closes URL: https://github.com/apache/flink/pull/9854#discussion_r338499482
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ########## @@ -244,46 +239,32 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { } /** - * Ends an input (specified by {@code inputId}) of the {@link StreamTask}. The {@code inputId} + * Ends the input of an operator, which specified by {@code inputId}). The {@code inputId} * is numbered starting from 1, and `1` indicates the first input. * + * @param operatorIndex The index of the operator in the operator list of the chain. * @param inputId The ID of the input. - * @throws Exception if some exception happens in the endInput function of an operator. + * @throws Exception if some exception happens in the endInput method of the operator. */ - public void endInput(int inputId) throws Exception { - if (finishedInputs.areAllInputsSelected()) { - return; - } - - if (headOperator instanceof TwoInputStreamOperator) { - if (finishedInputs.isInputSelected(inputId)) { - return; - } - - if (headOperator instanceof BoundedMultiInput) { - ((BoundedMultiInput) headOperator).endInput(inputId); - } + public void endOperatorInput(int operatorIndex, int inputId) throws Exception { + endOperatorInput(allOperators[operatorIndex], inputId); + } - finishedInputs = InputSelection.Builder - .from(finishedInputs) - .select(finishedInputs.getInputMask() == 0 ? inputId : -1) - .build(); + /** + * Ends the input of an operator, which specified by {@code inputId}). The {@code inputId} + * is numbered starting from 1, and `1` indicates the first input. + * + * @param streamOperator + * @param inputId The ID of the input. + * @throws Exception if some exception happens in the endInput method of the operator. Review comment: nit: `@param streamOperator non-head operator for ending the only input.` `@param inputId the input ID starts from 1 which indicates the first input.` Remove `@throws Exception if some exception happens in the endInput method of the operator.`, because it gives no useful information. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services