danny0405 commented on code in PR #5876:
URL: https://github.com/apache/hudi/pull/5876#discussion_r898641719
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java:
##########
@@ -85,25 +87,19 @@ public void execute(
final ExceptionHook hook,
final String actionName,
final Object... actionParams) {
+ executor.execute(wrapAction(action, hook, actionName, actionParams));
+ }
- executor.execute(
- () -> {
- final String actionString = String.format(actionName, actionParams);
- try {
- action.run();
- logger.info("Executor executes action [{}] success!",
actionString);
- } catch (Throwable t) {
- // if we have a JVM critical error, promote it immediately, there
is a good
- // chance the
- // logging or job failing will not succeed any more
- ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
- final String errMsg = String.format("Executor executes action [%s]
error", actionString);
- logger.error(errMsg, t);
- if (hook != null) {
- hook.apply(errMsg, t);
- }
- }
- });
+ /**
+ * Run the action in a loop and wait for completion.
+ */
+ public void executeSync(ThrowingRunnable<Throwable> action, String
actionName, Object... actionParams) {
+ try {
+ executor.submit(wrapAction(action, this.exceptionHook, actionName,
actionParams)).get();
+ } catch (Throwable t) {
+ Throwable ex = t instanceof ExecutionException ? t.getCause() : t;
Review Comment:
The `wrapAction` already catches throwables inside, what's the intention
here to catch the Throwable again ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]