Repository: incubator-gobblin Updated Branches: refs/heads/master c5caa9822 -> 784d7106f
[GOBBLIN-212] Fix for exception handling of TaskStateCollectorServiceHanlder Closes #2064 from autumnust/inline-hive-fix Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/784d7106 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/784d7106 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/784d7106 Branch: refs/heads/master Commit: 784d7106f64ef4594e2005bcfb25e2ed5574a6a7 Parents: c5caa98 Author: Lei Sun <[email protected]> Authored: Mon Aug 21 13:59:24 2017 -0700 Committer: Hung Tran <[email protected]> Committed: Mon Aug 21 13:59:24 2017 -0700 ---------------------------------------------------------------------- .../configuration/ConfigurationKeys.java | 5 +++ ...RegTaskStateCollectorServiceHandlerImpl.java | 14 +++---- .../gobblin/runtime/SafeDatasetCommit.java | 8 ++-- .../runtime/TaskStateCollectorService.java | 42 ++++++++++++++++---- .../TaskStateCollectorServiceHandler.java | 5 ++- .../runtime/TaskStateCollectorServiceTest.java | 2 +- 6 files changed, 54 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/784d7106/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 3336426..c986fd6 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -579,6 +579,11 @@ public class ConfigurationKeys { public static final String TASK_STATE_COLLECTOR_HANDLER_CLASS = "task.state.collector.handler.class"; /** + * Set to true so that job still proceed if TaskStateCollectorService failed. + */ + public static final String JOB_PROCEED_ON_TASK_STATE_COLLECOTR_SERVICE_FAILURE = "job.proceed.onTaskStateCollectorServiceFailure"; + + /** * Configuration properties for email settings. */ public static final String ALERT_EMAIL_ENABLED_KEY = "email.alert.enabled"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/784d7106/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java index ecbd1a5..32ad1b9 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java @@ -20,9 +20,11 @@ package org.apache.gobblin.runtime; import java.io.IOException; import java.util.Collection; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.publisher.HiveRegistrationPublisher; +import lombok.extern.slf4j.Slf4j; /** * A {@link TaskStateCollectorServiceHandler} implementation that execute hive registration on driver level. @@ -31,22 +33,18 @@ import org.apache.gobblin.publisher.HiveRegistrationPublisher; * if a single batch of hive registration finishes within a minute, the latency can be hidden by the gap between two run * of {@link TaskStateCollectorService}. */ - +@Slf4j public class HiveRegTaskStateCollectorServiceHandlerImpl implements TaskStateCollectorServiceHandler { private HiveRegistrationPublisher hiveRegHandler; - public HiveRegTaskStateCollectorServiceHandlerImpl(JobState jobState){ + public HiveRegTaskStateCollectorServiceHandlerImpl(JobState jobState) { hiveRegHandler = new HiveRegistrationPublisher(jobState); } @Override - public void handle(Collection<? extends WorkUnitState> taskStates) { - try { - this.hiveRegHandler.publishData(taskStates); - }catch (IOException ioe){ - throw new RuntimeException("Hive-registration pushling of data in TaskStateCollector run into IOException:", ioe); - } + public void handle(Collection<? extends WorkUnitState> taskStates) throws IOException { + this.hiveRegHandler.publishData(taskStates); } @Override http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/784d7106/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java index 9521575..c7bd90f 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java @@ -370,10 +370,12 @@ final class SafeDatasetCommit implements Callable<Void> { /** * Sets the {@link ConfigurationKeys#TASK_FAILURE_EXCEPTION_KEY} for each given {@link TaskState} to the given * {@link Throwable}. + * + * Make this method public as this exception catching routine can be reusable in other occasions as well. */ - private static void setTaskFailureException(Collection<TaskState> taskStates, Throwable t) { - for (TaskState taskState : taskStates) { - taskState.setTaskFailureException(t); + public static void setTaskFailureException(Collection<? extends WorkUnitState> taskStates, Throwable t) { + for (WorkUnitState taskState : taskStates) { + ((TaskState) taskState).setTaskFailureException(t); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/784d7106/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java index 6f19243..0318f67 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java @@ -24,6 +24,9 @@ import java.util.Queue; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,6 +54,7 @@ import org.apache.gobblin.metastore.StateStore; * * @author Yinan Li */ +@Slf4j public class TaskStateCollectorService extends AbstractScheduledService { private static final Logger LOGGER = LoggerFactory.getLogger(TaskStateCollectorService.class); @@ -74,9 +78,18 @@ public class TaskStateCollectorService extends AbstractScheduledService { * A typical example to plug here is hive registration: * We do hive registration everytime there are available taskStates deserialized from storage, on the driver level. */ - public final Optional<TaskStateCollectorServiceHandler> optionalTaskCollectorHandler; + @Getter + private final Optional<TaskStateCollectorServiceHandler> optionalTaskCollectorHandler; private final Closer handlerCloser = Closer.create(); + private boolean isJobProceedOnCollectorServiceFailure; + + /** + * By default, whether {@link TaskStateCollectorService} finishes successfully or not won't influence + * job's proceed. + */ + private static final boolean defaultPolicyOnCollectorServiceFailure = true; + public TaskStateCollectorService(Properties jobProps, JobState jobState, EventBus eventBus, StateStore<TaskState> taskStateStore, Path outputTaskStateDir) { this.jobState = jobState; @@ -91,21 +104,24 @@ public class TaskStateCollectorService extends AbstractScheduledService { Integer.parseInt(jobProps.getProperty(ConfigurationKeys.TASK_STATE_COLLECTOR_INTERVAL_SECONDS, Integer.toString(ConfigurationKeys.DEFAULT_TASK_STATE_COLLECTOR_INTERVAL_SECONDS))); - if (jobProps.containsKey(ConfigurationKeys.TASK_STATE_COLLECTOR_HANDLER_CLASS)){ + if (!StringUtils.isBlank(jobProps.getProperty(ConfigurationKeys.TASK_STATE_COLLECTOR_HANDLER_CLASS))) { String handlerTypeName = jobProps.getProperty(ConfigurationKeys.TASK_STATE_COLLECTOR_HANDLER_CLASS); - try{ + try { ClassAliasResolver<TaskStateCollectorServiceHandler.TaskStateCollectorServiceHandlerFactory> aliasResolver = new ClassAliasResolver<>(TaskStateCollectorServiceHandler.TaskStateCollectorServiceHandlerFactory.class); TaskStateCollectorServiceHandler.TaskStateCollectorServiceHandlerFactory handlerFactory = aliasResolver.resolveClass(handlerTypeName).newInstance(); optionalTaskCollectorHandler = Optional.of(handlerCloser.register(handlerFactory.createHandler(this.jobState))); - } catch (ReflectiveOperationException rfe){ + } catch (ReflectiveOperationException rfe) { throw new RuntimeException("Could not construct TaskCollectorHandler " + handlerTypeName, rfe); } - } - else{ + } else { optionalTaskCollectorHandler = Optional.absent(); } + + isJobProceedOnCollectorServiceFailure = + jobState.getPropAsBoolean(ConfigurationKeys.JOB_PROCEED_ON_TASK_STATE_COLLECOTR_SERVICE_FAILURE, + defaultPolicyOnCollectorServiceFailure); } @Override @@ -190,9 +206,19 @@ public class TaskStateCollectorService extends AbstractScheduledService { // Finish any addtional steps defined in handler on driver level. // Currently implemented handler for Hive registration only. - if (optionalTaskCollectorHandler.isPresent()){ + if (optionalTaskCollectorHandler.isPresent()) { LOGGER.info("Execute Pipelined TaskStateCollectorService Handler for " + taskStateQueue.size() + " tasks"); - optionalTaskCollectorHandler.get().handle(taskStateQueue); + + try { + optionalTaskCollectorHandler.get().handle(taskStateQueue); + } catch (Throwable t) { + if (isJobProceedOnCollectorServiceFailure) { + log.error("Failed to commit dataset while job proceeds", t); + SafeDatasetCommit.setTaskFailureException(taskStateQueue, t); + } else { + throw new RuntimeException("Hive Registration as the TaskStateCollectorServiceHandler failed.", t); + } + } } // Notify the listeners for the completion of the tasks http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/784d7106/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHandler.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHandler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHandler.java index a35964c..7a45bc2 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHandler.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHandler.java @@ -17,6 +17,7 @@ package org.apache.gobblin.runtime; +import java.io.IOException; import org.apache.gobblin.configuration.WorkUnitState; import java.io.Closeable; import java.util.Collection; @@ -32,12 +33,12 @@ public interface TaskStateCollectorServiceHandler extends Closeable { /** * Interface of handler factory. */ - interface TaskStateCollectorServiceHandlerFactory{ + interface TaskStateCollectorServiceHandlerFactory { TaskStateCollectorServiceHandler createHandler(JobState jobState); } /** * Execute the actions of handler. */ - public void handle(Collection<? extends WorkUnitState> states) ; + public void handle(Collection<? extends WorkUnitState> states) throws IOException; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/784d7106/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskStateCollectorServiceTest.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskStateCollectorServiceTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskStateCollectorServiceTest.java index 287d7f4..74f8f8c 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskStateCollectorServiceTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskStateCollectorServiceTest.java @@ -109,7 +109,7 @@ public class TaskStateCollectorServiceTest { props.setProperty(ConfigurationKeys.TASK_STATE_COLLECTOR_HANDLER_CLASS, "hivereg"); TaskStateCollectorService taskStateCollectorServiceHive = new TaskStateCollectorService(props, this.jobState, this.eventBus, this.taskStateStore, new Path(this.outputTaskStateDir, JOB_ID + "_prime")); - Assert.assertEquals(taskStateCollectorServiceHive.optionalTaskCollectorHandler.get().getClass().getName(), + Assert.assertEquals(taskStateCollectorServiceHive.getOptionalTaskCollectorHandler().get().getClass().getName(), "org.apache.gobblin.runtime.HiveRegTaskStateCollectorServiceHandlerImpl"); taskStateCollectorServiceHive.shutDown(); return;
