lindong28 commented on a change in pull request #15781:
URL: https://github.com/apache/flink/pull/15781#discussion_r621002788



##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
##########
@@ -119,11 +116,21 @@ public void run() {
             } catch (InterruptedException e) {
                 // ignore
             }
-            LOG.info(
-                    "============================> Failing mapper  {}: 
count={}, totalCount={}",
-                    getRuntimeContext().getIndexOfThisSubtask(),
-                    numElementsThisTime,
-                    numElementsTotal);
+            if (failer) {
+                LOG.info(
+                        "==============> Failing mapper(failer) {}: count={}, 
totalCount={}, failedBefore={}, failAt={}",
+                        getRuntimeContext().getIndexOfThisSubtask(),
+                        numElementsThisTime,
+                        numElementsTotal,
+                        failedBefore,

Review comment:
       Hmm.. I am probably missing the idea of how the extra information from 
this log can be useful.
   
   For `failedBefore`, we can derive this information by seeing whether there 
is `Artificial Test Failure` before this log is printed. For `failCount`, since 
it is just an integer set by the test which never changes during the job 
execution, we can derive this number by reading the test code.
   
   Is there any case where we can not easily derive the value of `failedBefore` 
and `failCount` as suggested above? In that case, I feel it is a little bit 
simpler to have just one log statement in this method (regardless of the value 
of `failer`).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to