tillrohrmann commented on a change in pull request #14259:
URL: https://github.com/apache/flink/pull/14259#discussion_r532644008
##########
File path:
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
##########
@@ -149,6 +280,49 @@ public void testContinuousTextFileSource() throws
Exception {
verifyResult(result1);
}
+ //
------------------------------------------------------------------------
+ // test utilities
+ //
------------------------------------------------------------------------
+
+ private enum FailoverType {
+ NONE,
+ TM,
+ JM
+ }
+
+ private static void triggerFailover(
+ FailoverType type,
+ JobID jobId,
+ Runnable afterFailAction) throws Exception {
+ switch (type) {
+ case NONE:
+ afterFailAction.run();
+ break;
+ case TM:
+ restartTaskManager(afterFailAction);
+ break;
+ case JM:
+ triggerJobManagerFailover(jobId,
afterFailAction);
+ break;
+ }
+ }
+
+ private static void triggerJobManagerFailover(JobID jobId, Runnable
afterFailAction) throws Exception {
+ highAvailabilityServices.revokeJobMasterLeadership(jobId).get();
+ Thread.sleep(50);
+ afterFailAction.run();
+ Thread.sleep(50);
+ highAvailabilityServices.grantJobMasterLeadership(jobId).get();
+ }
+
+ private static void restartTaskManager(Runnable afterFailAction) throws
Exception {
+ miniCluster.terminateTaskExecutor(0).get();
+ Thread.sleep(50);
+ afterFailAction.run();
+ Thread.sleep(50);
Review comment:
Same here for the sleeps?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
##########
@@ -298,11 +298,11 @@ void unregisterSourceReader(int subtaskId) {
/**
* Get the split to put back. This only happens when a source reader
subtask has failed.
*
- * @param failedSubtaskId the failed subtask id.
+ * @param subtaskId the failed subtask id.
Review comment:
missing `@param restoredCheckpointId`
##########
File path:
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
##########
@@ -149,6 +280,49 @@ public void testContinuousTextFileSource() throws
Exception {
verifyResult(result1);
}
+ //
------------------------------------------------------------------------
+ // test utilities
+ //
------------------------------------------------------------------------
+
+ private enum FailoverType {
+ NONE,
+ TM,
+ JM
+ }
+
+ private static void triggerFailover(
+ FailoverType type,
+ JobID jobId,
+ Runnable afterFailAction) throws Exception {
+ switch (type) {
+ case NONE:
+ afterFailAction.run();
+ break;
+ case TM:
+ restartTaskManager(afterFailAction);
+ break;
+ case JM:
+ triggerJobManagerFailover(jobId,
afterFailAction);
+ break;
+ }
+ }
+
+ private static void triggerJobManagerFailover(JobID jobId, Runnable
afterFailAction) throws Exception {
+ highAvailabilityServices.revokeJobMasterLeadership(jobId).get();
+ Thread.sleep(50);
Review comment:
Why are the sleeps needed?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]