rmetzger commented on a change in pull request #13217:
URL: https://github.com/apache/flink/pull/13217#discussion_r475527372
##########
File path: flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
##########
@@ -111,4 +122,41 @@ public static void executeProgram(
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
+
+ /**
+ * This method blocks until the job status is not INITIALIZING anymore.
+ * If the job is FAILED, it throws an CompletionException with the
failure cause.
+ * @param jobStatusSupplier supplier returning the job status.
+ */
+ public static void waitUntilJobInitializationFinished(
+ SupplierWithException<JobStatus, Exception> jobStatusSupplier,
+ SupplierWithException<JobResult, Exception> jobResultSupplier
+ ) throws CompletionException {
+ LOG.debug("Wait until job initialization is finished");
+ WaitStrategy waitStrategy = new ExponentialWaitStrategy(50,
2000);
+ try {
+ JobStatus status = jobStatusSupplier.get();
+ long attempt = 0;
+ while (status == JobStatus.INITIALIZING) {
+ Thread.sleep(waitStrategy.sleepTime(attempt++));
+ status = jobStatusSupplier.get();
+ }
+ if (status == JobStatus.FAILED) {
+ // note: we can not distinguish between
initialization failures and failures once
+ // execution has started. Execution errors are
potentially reported here.
+ JobResult result = jobResultSupplier.get();
+ Optional<SerializedThrowable> throwable =
result.getSerializedThrowable();
+ if (throwable.isPresent()) {
+ throw new
CompletionException(throwable.get().deserializeError(Thread.currentThread().getContextClassLoader()));
Review comment:
My assumption is yes, but I agree this could be fragile.
If we are passing the wrong classloader here, we'll throw the
`SerializedThrowable`, which contains contains the exception message and
stacktrace as strings as well (users will see the cause, but they can't check
for instance types in the causes).
Getting the usercode classloader into this method should be doable if you
think it is the right approach. I just didn't want to be too invasive with this
change :)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]