mynameborat commented on a change in pull request #1483:
URL: https://github.com/apache/samza/pull/1483#discussion_r602522926
##########
File path:
samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
##########
@@ -146,6 +146,11 @@ public final void processAsync(IncomingMessageEnvelope
ime, MessageCollector col
callback.complete();
}
});
+ } else {
+ // If InputOperator is not found in the operator graph for a given
SystemStream, throw an exception else the
+ // job will timeout due to async task callback timeout
(TaskCallbackTimeoutException)
+ callback.failure(new SamzaException(
+ String.format("InputOperator not found in OperatorGraph for %s",
systemStream.toString())));
Review comment:
IIRC, this isn't a typical user related issue. The bug manifested due to
non-determinism during generation of the DAG and got exposed due to a
bug/change in beam transform names. Beam transform appended hashcode to the
names of transform and as a result, the SSP associated for the input source
known the framework end (message chooser and system consumers) differed from
the SSP associated with the same input within `StreamOperatorTask`.
Here is the example where it happened
`2021-02-02 23:07:44.187 [main] LiSamzaConfigUtil [INFO] task.inputs system
names parsed:
[37-Combine_globally_Count_2_CreateVoid_Read_CreateSource__ParDo_BoundedSourceAsSDFWrapper__ParMultiDo_BoundedSourceAsSDFWrapper__output__PCollection_185544081_,`
while the ApplicationDescriptor had a different view on the input name
`2021-02-02 23:07:46.111 [main] ApplicationDescriptorImpl [INFO] Using
NoOpSerde as the key serde for stream
37-Combine_globally_Count_2_CreateVoid_Read_CreateSource__ParDo_BoundedSourceAsSDFWrapper__ParMultiDo_BoundedSourceAsSDFWrapper__output__PCollection_1582906120_`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]