This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new c762c97  [GOBBLIN-1127] Provide an option to make metric reporting 
instantiation failure fatal
c762c97 is described below

commit c762c97c97336aa5603f571f383258a1baa7e8c3
Author: sv2000 <[email protected]>
AuthorDate: Tue Jun 16 10:04:05 2020 -0700

    [GOBBLIN-1127] Provide an option to make metric reporting instantiation 
failure fatal
    
    Closes #3035 from
    sv2000/metricReportInstantiationFailure
---
 .../gobblin/configuration/ConfigurationKeys.java   |  18 +-
 .../apache/gobblin/cluster/GobblinTaskRunner.java  |  36 +++-
 .../gobblin/cluster/GobblinTaskRunnerTest.java     |  20 +++
 .../gobblin/compaction/mapreduce/MRCompactor.java  |  10 +-
 .../gobblin/metrics/MetricReporterException.java}  |  34 ++--
 .../gobblin/metrics/MultiReporterException.java}   |  29 ++--
 .../apache/gobblin/metrics/ReporterSinkType.java}  |  31 +---
 .../org/apache/gobblin/metrics/ReporterType.java}  |  29 +---
 .../metrics/reporter/util/MetricReportUtils.java   |  16 ++
 .../org/apache/gobblin/metrics/GobblinMetrics.java | 187 ++++++++++++++-------
 .../apache/gobblin/metrics/GobblinMetricsTest.java |  55 +++++-
 .../apache/gobblin/azkaban/AzkabanJobLauncher.java |  51 +++---
 ...Factory.java => KafkaEventReporterFactory.java} |  38 ++---
 ...actory.java => KafkaMetricReporterFactory.java} |  55 ++----
 .../gobblin/runtime/mapreduce/MRJobLauncher.java   |  19 ++-
 .../runtime/services/MetricsReportingService.java  |  23 ++-
 .../apache/gobblin/yarn/GobblinYarnTaskRunner.java |   9 +-
 .../java/org/apache/gobblin/yarn/YarnService.java  |  10 +-
 18 files changed, 415 insertions(+), 255 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 5cb2613..424424b 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -684,6 +684,19 @@ public class ConfigurationKeys {
   public static final String METRICS_REPORTING_CONFIGURATIONS_PREFIX = 
"metrics.reporting";
   public static final String METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX =
       METRICS_REPORTING_CONFIGURATIONS_PREFIX + ".events";
+
+  //Configuration keys to trigger job/task failures on metric reporter 
instantiation failures. Useful
+  //when monitoring of Gobblin pipelines critically depend on events and 
metrics emitted by the metrics
+  //reporting service running in each container.
+  public static final String GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL = 
"gobblin.task.isMetricReportingFailureFatal";
+  public static final boolean 
DEFAULT_GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL = false;
+  public static final String GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL = 
"gobblin.task.isEventReportingFailureFatal";
+  public static final boolean 
DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL = false;
+  public static final String GOBBLIN_JOB_METRIC_REPORTING_FAILURE_FATAL = 
"gobblin.job.isMetricReportingFailureFatal";
+  public static final boolean 
DEFAULT_GOBBLIN_JOB_METRIC_REPORTING_FAILURE_FATAL = false;
+  public static final String GOBBLIN_JOB_EVENT_REPORTING_FAILURE_FATAL = 
"gobblin.job.isEventReportingFailureFatal";
+  public static final boolean 
DEFAULT_GOBBLIN_JOB_EVENT_REPORTING_FAILURE_FATAL = false;
+
   // File-based reporting
   public static final String METRICS_REPORTING_FILE_ENABLED_KEY =
       METRICS_CONFIGURATIONS_PREFIX + "reporting.file.enabled";
@@ -705,7 +718,10 @@ public class ConfigurationKeys {
       METRICS_CONFIGURATIONS_PREFIX + "reporting.kafka.enabled";
   public static final String DEFAULT_METRICS_REPORTING_KAFKA_ENABLED = 
Boolean.toString(false);
   public static final String DEFAULT_METRICS_REPORTING_KAFKA_REPORTER_CLASS =
-      "org.apache.gobblin.metrics.kafka.KafkaReporterFactory";
+      "org.apache.gobblin.metrics.kafka.KafkaMetricReporterFactory";
+  public static final String DEFAULT_EVENTS_REPORTING_KAFKA_REPORTER_CLASS =
+      "org.apache.gobblin.metrics.kafka.KafkaEventReporterFactory";
+
   public static final String METRICS_REPORTING_KAFKA_FORMAT = 
METRICS_CONFIGURATIONS_PREFIX + "reporting.kafka.format";
   public static final String METRICS_REPORTING_EVENTS_KAFKA_FORMAT =
       METRICS_CONFIGURATIONS_PREFIX + "reporting.events.kafka.format";
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 68f98a2..08d4dd0 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -85,12 +85,15 @@ import lombok.Setter;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.instrumented.StandardMetricsBridge;
 import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MultiReporterException;
 import org.apache.gobblin.metrics.RootMetricContext;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
 import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
@@ -171,6 +174,8 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
   protected final FileSystem fs;
   protected final String applicationName;
   protected final String applicationId;
+  private final boolean isMetricReportingFailureFatal;
+  private final boolean isEventReportingFailureFatal;
 
   public GobblinTaskRunner(String applicationName,
       String helixInstanceName,
@@ -190,6 +195,15 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
     this.appWorkPath = initAppWorkDir(config, appWorkDirOptional);
     this.clusterConfig = saveConfigToFile(config);
     this.clusterName = 
this.clusterConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+
+    this.isMetricReportingFailureFatal = 
ConfigUtils.getBoolean(this.clusterConfig,
+        ConfigurationKeys.GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL,
+        ConfigurationKeys.DEFAULT_GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL);
+
+    this.isEventReportingFailureFatal = 
ConfigUtils.getBoolean(this.clusterConfig,
+        ConfigurationKeys.GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL,
+        ConfigurationKeys.DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL);
+
     logger.info("Configured GobblinTaskRunner work dir to: {}", 
this.appWorkPath.toString());
 
     // Set system properties passed in via application config. As an example, 
Helix uses System#getProperty() for ZK configuration
@@ -313,7 +327,8 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
   /**
    * Start this {@link GobblinTaskRunner} instance.
    */
-  public void start() throws ContainerHealthCheckException {
+  public void start()
+      throws ContainerHealthCheckException {
     logger.info(String.format("Starting %s in container %s", 
this.helixInstanceName, this.taskRunnerId));
 
     // Add a shutdown hook so the task scheduler gets properly shutdown
@@ -345,11 +360,7 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
     addInstanceTags();
 
     // Start metric reporting
-    if (this.containerMetrics.isPresent()) {
-      this.containerMetrics.get()
-          
.startMetricReportingWithFileSuffix(ConfigUtils.configToState(this.clusterConfig),
-              this.taskRunnerId);
-    }
+    initMetricReporter();
 
     if (this.containerHealthEventBus != null) {
       //Register itself with the container health event bus instance to 
receive container health events
@@ -373,6 +384,19 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
     }
   }
 
+  private void initMetricReporter() {
+    if (this.containerMetrics.isPresent()) {
+      try {
+        this.containerMetrics.get()
+            
.startMetricReportingWithFileSuffix(ConfigUtils.configToState(this.clusterConfig),
 this.taskRunnerId);
+      } catch (MultiReporterException ex) {
+        if (MetricReportUtils.shouldThrowException(logger, ex, 
this.isMetricReportingFailureFatal, this.isEventReportingFailureFatal)) {
+          throw new RuntimeException(ex);
+        }
+      }
+    }
+  }
+
   public synchronized void stop() {
     if (this.isStopped) {
       logger.info("Gobblin Task runner is already stopped.");
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
index 7a0b879..534eb3b 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
@@ -47,6 +47,7 @@ import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.cluster.suite.IntegrationBasicSuite;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.testing.AssertWithBackoff;
 import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
 import org.apache.gobblin.util.eventbus.EventBusFactory;
@@ -76,6 +77,7 @@ public class GobblinTaskRunnerTest {
   private GobblinTaskRunner gobblinTaskRunner;
   private GobblinTaskRunner gobblinTaskRunnerHealthCheck;
   private GobblinTaskRunner corruptGobblinTaskRunner;
+  private GobblinTaskRunner gobblinTaskRunnerFailedReporter;
 
   private GobblinClusterManager gobblinClusterManager;
   private String clusterName;
@@ -117,6 +119,18 @@ public class GobblinTaskRunnerTest {
             
config.withValue(GobblinClusterConfigurationKeys.CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED,
 ConfigValueFactory.fromAnyRef(true))
         , Optional.<Path>absent());
 
+    // Participant that fails to start due to metric reporter failures
+    String instanceName = 
HelixUtils.getHelixInstanceName("MetricReporterFailureInstance", 0);
+
+    Config metricConfig = 
config.withValue(ConfigurationKeys.METRICS_ENABLED_KEY, 
ConfigValueFactory.fromAnyRef(true))
+        .withValue(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY, 
ConfigValueFactory.fromAnyRef(true))
+        .withValue(ConfigurationKeys.METRICS_KAFKA_TOPIC_METRICS, 
ConfigValueFactory.fromAnyRef("metricTopic"))
+        
.withValue(ConfigurationKeys.GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL, 
ConfigValueFactory.fromAnyRef(true));
+
+    this.gobblinTaskRunnerFailedReporter =
+        new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, instanceName,
+            TestHelper.TEST_APPLICATION_ID, "2", metricConfig, 
Optional.<Path>absent());
+
     // Participant with a partial Instance set up on Helix/ZK
     this.corruptHelixInstance = 
HelixUtils.getHelixInstanceName("CorruptHelixInstance", 0);
     this.corruptGobblinTaskRunner =
@@ -158,6 +172,11 @@ public class GobblinTaskRunnerTest {
       }, "gobblinTaskRunner stopped");
   }
 
+  @Test (expectedExceptions = RuntimeException.class, 
expectedExceptionsMessageRegExp = ".*Could not create one or more reporters.*")
+  public void testStartUpFailsDueToMetricReporterFailure() {
+    GobblinTaskRunnerTest.this.gobblinTaskRunnerFailedReporter.start();
+  }
+
   @Test
   public void testBuildFileSystemConfig() {
     FileSystem fileSystem = this.gobblinTaskRunner.getFs();
@@ -270,6 +289,7 @@ public class GobblinTaskRunnerTest {
       this.gobblinClusterManager.disconnectHelixManager();
       this.gobblinTaskRunner.disconnectHelixManager();
       this.corruptGobblinTaskRunner.disconnectHelixManager();
+      this.gobblinTaskRunnerFailedReporter.disconnectHelixManager();
       this.gobblinTaskRunnerHealthCheck.disconnectHelixManager();
       if (this.suite != null) {
         this.suite.shutdownCluster();
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
index 5a937ed..850a1eb 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
@@ -69,6 +69,8 @@ import 
org.apache.gobblin.compaction.verify.DataCompletenessVerifier.Results;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricReporterException;
+import org.apache.gobblin.metrics.MultiReporterException;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.util.ClassAliasResolver;
@@ -364,7 +366,13 @@ public class MRCompactor implements Compactor {
     tags.addAll(Tag.fromMap(ClusterNameTags.getClusterNameTags()));
     GobblinMetrics gobblinMetrics =
         GobblinMetrics.get(this.state.getProp(ConfigurationKeys.JOB_NAME_KEY), 
null, tags.build());
-    gobblinMetrics.startMetricReporting(this.state.getProperties());
+    try {
+      gobblinMetrics.startMetricReporting(this.state.getProperties());
+    } catch (MultiReporterException ex) {
+      for (MetricReporterException e: ex.getExceptions()) {
+        LOG.error("Failed to start {} {} reporter.", e.getSinkType().name(), 
e.getReporterType().name(), e);
+      }
+    }
     return gobblinMetrics;
   }
 
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricReporterException.java
similarity index 53%
copy from 
gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
copy to 
gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricReporterException.java
index 4a357c8..3c3142e 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricReporterException.java
@@ -14,29 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.gobblin.metrics;
 
-import java.util.Properties;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
+import java.io.IOException;
 
+import lombok.Getter;
 
-@Test
-public class GobblinMetricsTest {
+public class MetricReporterException extends IOException {
+  @Getter
+  private final ReporterType reporterType;
+  @Getter
+  private final ReporterSinkType sinkType;
 
-  /**
-   * Test the {@link GobblinMetrics} instance is removed from {@link 
GobblinMetricsRegistry} when
-   * it stops metrics reporting
-   */
-  public void testStopReportingMetrics() {
-    String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
-    GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
-    gobblinMetrics.startMetricReporting(new Properties());
-    Assert.assertEquals(GobblinMetricsRegistry.getInstance().get(id).get(), 
gobblinMetrics);
+  public MetricReporterException(String message, ReporterType reporterType, 
ReporterSinkType sinkType) {
+    super(message);
+    this.reporterType = reporterType;
+    this.sinkType = sinkType;
+  }
 
-    gobblinMetrics.stopMetricsReporting();
-    
Assert.assertFalse(GobblinMetricsRegistry.getInstance().get(id).isPresent());
+  public MetricReporterException(String message, Throwable t, ReporterType 
reporterType, ReporterSinkType sinkType) {
+    super(message, t);
+    this.reporterType = reporterType;
+    this.sinkType = sinkType;
   }
 }
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MultiReporterException.java
similarity index 53%
copy from 
gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
copy to 
gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MultiReporterException.java
index 4a357c8..1d0dafd 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MultiReporterException.java
@@ -14,29 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.gobblin.metrics;
 
-import java.util.Properties;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
+import java.io.IOException;
+import java.util.List;
 
+public class MultiReporterException extends IOException {
+  private List<MetricReporterException> exceptions;
 
-@Test
-public class GobblinMetricsTest {
-
-  /**
-   * Test the {@link GobblinMetrics} instance is removed from {@link 
GobblinMetricsRegistry} when
-   * it stops metrics reporting
-   */
-  public void testStopReportingMetrics() {
-    String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
-    GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
-    gobblinMetrics.startMetricReporting(new Properties());
-    Assert.assertEquals(GobblinMetricsRegistry.getInstance().get(id).get(), 
gobblinMetrics);
+  public MultiReporterException(String message, List<MetricReporterException> 
exceptions) {
+    super(message);
+    this.exceptions = exceptions;
+  }
 
-    gobblinMetrics.stopMetricsReporting();
-    
Assert.assertFalse(GobblinMetricsRegistry.getInstance().get(id).isPresent());
+  public List<MetricReporterException> getExceptions() {
+    return this.exceptions;
   }
 }
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ReporterSinkType.java
similarity index 53%
copy from 
gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
copy to 
gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ReporterSinkType.java
index 4a357c8..53ccdf8 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ReporterSinkType.java
@@ -14,29 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.gobblin.metrics;
 
-import java.util.Properties;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-
-@Test
-public class GobblinMetricsTest {
-
-  /**
-   * Test the {@link GobblinMetrics} instance is removed from {@link 
GobblinMetricsRegistry} when
-   * it stops metrics reporting
-   */
-  public void testStopReportingMetrics() {
-    String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
-    GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
-    gobblinMetrics.startMetricReporting(new Properties());
-    Assert.assertEquals(GobblinMetricsRegistry.getInstance().get(id).get(), 
gobblinMetrics);
-
-    gobblinMetrics.stopMetricsReporting();
-    
Assert.assertFalse(GobblinMetricsRegistry.getInstance().get(id).isPresent());
-  }
+public enum ReporterSinkType {
+  JMX,
+  GRAPHITE,
+  FILE,
+  FILE_FAILURE,
+  KAFKA,
+  INFLUXDB,
+  CUSTOM
 }
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ReporterType.java
similarity index 53%
copy from 
gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
copy to 
gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ReporterType.java
index 4a357c8..c36584a 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ReporterType.java
@@ -14,29 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.gobblin.metrics;
 
-import java.util.Properties;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
+public enum ReporterType {
+  METRIC, EVENT, METRIC_EVENT, CUSTOM;
 
-@Test
-public class GobblinMetricsTest {
-
-  /**
-   * Test the {@link GobblinMetrics} instance is removed from {@link 
GobblinMetricsRegistry} when
-   * it stops metrics reporting
-   */
-  public void testStopReportingMetrics() {
-    String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
-    GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
-    gobblinMetrics.startMetricReporting(new Properties());
-    Assert.assertEquals(GobblinMetricsRegistry.getInstance().get(id).get(), 
gobblinMetrics);
+  public static boolean isReporterTypeMetric(ReporterType t) {
+    return t.equals(ReporterType.METRIC) || 
t.equals(ReporterType.METRIC_EVENT);
+  }
 
-    gobblinMetrics.stopMetricsReporting();
-    
Assert.assertFalse(GobblinMetricsRegistry.getInstance().get(id).isPresent());
+  public static boolean isReporterTypeEvent(ReporterType t) {
+    return t.equals(ReporterType.EVENT) || t.equals(ReporterType.METRIC_EVENT);
   }
-}
+}
\ No newline at end of file
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
index b9eefec..0980cea 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
@@ -25,6 +25,7 @@ import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.commons.codec.binary.Hex;
+import org.slf4j.Logger;
 
 import com.google.common.base.Optional;
 import com.google.common.io.Closer;
@@ -32,6 +33,9 @@ import com.google.common.io.Closer;
 import javax.annotation.Nullable;
 
 import org.apache.gobblin.metrics.MetricReport;
+import org.apache.gobblin.metrics.MetricReporterException;
+import org.apache.gobblin.metrics.MultiReporterException;
+import org.apache.gobblin.metrics.ReporterType;
 
 
 /**
@@ -148,4 +152,16 @@ public class MetricReportUtils {
               SCHEMA_VERSION));
     }
   }
+
+  public static boolean shouldThrowException(Logger log, 
MultiReporterException ex, boolean isMetricReportingFailureFatal, boolean 
isEventReportingFailureFatal) {
+    boolean shouldThrow = false;
+    for (MetricReporterException e: ex.getExceptions()) {
+      if ((isMetricReportingFailureFatal && 
ReporterType.isReporterTypeMetric(e.getReporterType())) || (
+          isEventReportingFailureFatal && 
ReporterType.isReporterTypeEvent(e.getReporterType()))) {
+        shouldThrow = true;
+      }
+      log.error("Failed to start {} {} reporter", e.getSinkType().name(), 
e.getReporterType().name(), e);
+    }
+    return shouldThrow;
+  }
 }
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
index e969083..4e4fa6c 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
@@ -27,7 +27,6 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.gobblin.metrics.reporter.FileFailureEventReporter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -48,6 +47,8 @@ import com.google.common.collect.Lists;
 import com.google.common.io.Closer;
 import com.typesafe.config.Config;
 
+import lombok.Getter;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metrics.graphite.GraphiteConnectionType;
@@ -56,14 +57,13 @@ import org.apache.gobblin.metrics.graphite.GraphiteReporter;
 import org.apache.gobblin.metrics.influxdb.InfluxDBConnectionType;
 import org.apache.gobblin.metrics.influxdb.InfluxDBEventReporter;
 import org.apache.gobblin.metrics.influxdb.InfluxDBReporter;
+import org.apache.gobblin.metrics.reporter.FileFailureEventReporter;
 import org.apache.gobblin.metrics.reporter.OutputStreamEventReporter;
 import org.apache.gobblin.metrics.reporter.OutputStreamReporter;
 import org.apache.gobblin.metrics.reporter.ScheduledReporter;
 import org.apache.gobblin.password.PasswordManager;
 import org.apache.gobblin.util.PropertiesUtils;
 
-import lombok.Getter;
-
 
 /**
  * A class that represents a set of metrics associated with a given name.
@@ -354,7 +354,8 @@ public class GobblinMetrics {
    * Starts metric reporting and appends the given metrics file suffix to the 
current value of
    * {@link ConfigurationKeys#METRICS_FILE_SUFFIX}.
    */
-  public void startMetricReportingWithFileSuffix(State state, String 
metricsFileSuffix) {
+  public void startMetricReportingWithFileSuffix(State state, String 
metricsFileSuffix)
+      throws MultiReporterException {
     Properties metricsReportingProps = new Properties();
     metricsReportingProps.putAll(state.getProperties());
 
@@ -374,7 +375,8 @@ public class GobblinMetrics {
    *
    * @param configuration configuration properties
    */
-  public void startMetricReporting(Configuration configuration) {
+  public void startMetricReporting(Configuration configuration)
+      throws MultiReporterException {
     Properties props = new Properties();
     for (Map.Entry<String, String> entry : configuration) {
       props.put(entry.getKey(), entry.getValue());
@@ -387,7 +389,7 @@ public class GobblinMetrics {
    *
    * @param properties configuration properties
    */
-  public void startMetricReporting(Properties properties) {
+  public void startMetricReporting(Properties properties) throws 
MultiReporterException {
     if (this.metricsReportingStarted) {
       LOGGER.warn("Metric reporting has already started");
       return;
@@ -400,22 +402,21 @@ public class GobblinMetrics {
 
     long startTime = System.currentTimeMillis();
 
+    List<MetricReporterException> reporterExceptions = Lists.newArrayList();
+
     try {
-      // Build and start the JMX reporter
-      buildJmxMetricReporter(properties);
-      if (this.jmxReporter.isPresent()) {
-        LOGGER.info("Will start reporting metrics to JMX");
-        this.jmxReporter.get().start();
+      for (ReporterSinkType sinkType: ReporterSinkType.values()) {
+        if (sinkType.equals(ReporterSinkType.CUSTOM)) {
+          buildCustomMetricReporters(properties, reporterExceptions);
+        } else {
+          try {
+            buildReporter(properties, sinkType);
+          } catch (MultiReporterException e) {
+            reporterExceptions.addAll(e.getExceptions());
+          }
+        }
       }
 
-      // Build all other reporters
-      buildFileMetricReporter(properties);
-      buildKafkaMetricReporter(properties);
-      buildGraphiteMetricReporter(properties);
-      buildInfluxDBMetricReporter(properties);
-      buildCustomMetricReporters(properties);
-      buildFileFailureEventReporter(properties);
-
       // Start reporters that implement 
org.apache.gobblin.metrics.report.ScheduledReporter
       RootMetricContext.get().startReporting();
 
@@ -431,6 +432,36 @@ public class GobblinMetrics {
     this.metricsReportingStarted = true;
     LOGGER.info("Metrics reporting has been started in {} ms: GobblinMetrics 
{}",
         System.currentTimeMillis() - startTime, this.toString());
+
+    if (!reporterExceptions.isEmpty()) {
+      throw new MultiReporterException("Could not create one or more 
reporters", reporterExceptions);
+    }
+  }
+
+  private void buildReporter(Properties properties, ReporterSinkType sinkType) 
throws MultiReporterException {
+    switch (sinkType) {
+      case JMX:
+        buildJmxMetricReporter(properties);
+        break;
+      case FILE:
+        buildFileMetricReporter(properties);
+        break;
+      case KAFKA:
+        buildKafkaMetricReporter(properties);
+        break;
+      case GRAPHITE:
+        buildGraphiteMetricReporter(properties);
+        break;
+      case INFLUXDB:
+        buildInfluxDBMetricReporter(properties);
+        break;
+      case FILE_FAILURE:
+        buildFileFailureEventReporter(properties);
+        break;
+      default:
+        LOGGER.error("Unknown reporter sink type: {}", sinkType.name());
+        break;
+    }
   }
 
   /**
@@ -473,7 +504,8 @@ public class GobblinMetrics {
     LOGGER.info("Metrics reporting stopped successfully");
   }
 
-  private void buildFileMetricReporter(Properties properties) {
+  private void buildFileMetricReporter(Properties properties)
+      throws MultiReporterException {
     if 
(!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_FILE_ENABLED_KEY,
         ConfigurationKeys.DEFAULT_METRICS_REPORTING_FILE_ENABLED))) {
       return;
@@ -481,9 +513,9 @@ public class GobblinMetrics {
     LOGGER.info("Reporting metrics to log files");
 
     if (!properties.containsKey(ConfigurationKeys.METRICS_LOG_DIR_KEY)) {
-      LOGGER.error(
-          "Not reporting metrics to log files because " + 
ConfigurationKeys.METRICS_LOG_DIR_KEY + " is undefined");
-      return;
+      MetricReporterException e = new MetricReporterException(
+          "Not reporting metrics to log files because " + 
ConfigurationKeys.METRICS_LOG_DIR_KEY + " is undefined", ReporterType.METRIC, 
ReporterSinkType.FILE);
+      throw new MultiReporterException("Failed to create file metric 
reporter", Lists.newArrayList(e));
     }
 
     try {
@@ -493,8 +525,7 @@ public class GobblinMetrics {
       // Each job gets its own metric log subdirectory
       Path metricsLogDir = new 
Path(properties.getProperty(ConfigurationKeys.METRICS_LOG_DIR_KEY), 
this.getName());
       if (!fs.exists(metricsLogDir) && !fs.mkdirs(metricsLogDir)) {
-        LOGGER.error("Failed to create metric log directory for metrics " + 
this.getName());
-        return;
+        throw new MetricReporterException("Failed to create metric log 
directory for metrics " + this.getName(), ReporterType.METRIC, 
ReporterSinkType.FILE);
       }
 
       // Add a suffix to file name if specified in properties.
@@ -523,22 +554,19 @@ public class GobblinMetrics {
 
       LOGGER.info("Will start reporting metrics to directory " + 
metricsLogDir);
     } catch (IOException ioe) {
-      LOGGER.error("Failed to build file metric reporter for job " + this.id, 
ioe);
+      MetricReporterException e = new MetricReporterException("Failed to build 
file metric reporter for job " + this.id, ioe, ReporterType.METRIC, 
ReporterSinkType.FILE);
+      throw new MultiReporterException("Failed to create file metric 
reporter", Lists.newArrayList(e));
     }
   }
 
-  private void buildFileFailureEventReporter(Properties properties) {
-    if 
(!Boolean.valueOf(properties.getProperty(ConfigurationKeys.FAILURE_REPORTING_FILE_ENABLED_KEY,
-        ConfigurationKeys.DEFAULT_FAILURE_REPORTING_FILE_ENABLED))) {
+  private void buildFileFailureEventReporter(Properties properties)
+      throws MultiReporterException {
+    if 
((!Boolean.valueOf(properties.getProperty(ConfigurationKeys.FAILURE_REPORTING_FILE_ENABLED_KEY,
+        ConfigurationKeys.DEFAULT_FAILURE_REPORTING_FILE_ENABLED)) || 
!properties.containsKey(ConfigurationKeys.FAILURE_LOG_DIR_KEY))) {
       return;
     }
-    LOGGER.info("Reporting failure to log files");
 
-    if (!properties.containsKey(ConfigurationKeys.FAILURE_LOG_DIR_KEY)) {
-      LOGGER.error(
-          "Not reporting failure to log files because " + 
ConfigurationKeys.FAILURE_LOG_DIR_KEY + " is undefined");
-      return;
-    }
+    LOGGER.info("Reporting failure to log files");
 
     try {
       String fsUri = properties.getProperty(ConfigurationKeys.FS_URI_KEY, 
ConfigurationKeys.LOCAL_FS_URI);
@@ -547,8 +575,7 @@ public class GobblinMetrics {
       // Each job gets its own log subdirectory
       Path failureLogDir = new 
Path(properties.getProperty(ConfigurationKeys.FAILURE_LOG_DIR_KEY), 
this.getName());
       if (!fs.exists(failureLogDir) && !fs.mkdirs(failureLogDir)) {
-        LOGGER.error("Failed to create failure log directory for metrics " + 
this.getName());
-        return;
+        throw new MetricReporterException("Failed to create failure log 
directory for metrics " + this.getName(), ReporterType.EVENT, 
ReporterSinkType.FILE);
       }
 
       // Add a suffix to file name if specified in properties.
@@ -566,7 +593,8 @@ public class GobblinMetrics {
 
       LOGGER.info("Will start reporting failure to directory " + 
failureLogDir);
     } catch (IOException ioe) {
-      LOGGER.error("Failed to build file failure event reporter for job " + 
this.id, ioe);
+      MetricReporterException e = new MetricReporterException("Failed to build 
file failure event reporter for job " + this.id, ioe, ReporterType.EVENT, 
ReporterSinkType.FILE);
+      throw new MultiReporterException("Failed to create failure file event 
reporter", Lists.newArrayList(e));
     }
   }
 
@@ -581,15 +609,33 @@ public class GobblinMetrics {
         
convertRatesTo(TimeUnit.SECONDS).convertDurationsTo(TimeUnit.MILLISECONDS).build()));
   }
 
-  private void buildKafkaMetricReporter(Properties properties) {
+  private void buildKafkaMetricReporter(Properties properties)
+      throws MultiReporterException {
+    List<MetricReporterException> reporterExceptions = Lists.newArrayList();
     if 
(!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY,
         ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_ENABLED))) {
       return;
     }
-    buildScheduledReporter(properties, 
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_REPORTER_CLASS, 
Optional.of("Kafka"));
+
+    try {
+      buildScheduledReporter(properties, 
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_REPORTER_CLASS);
+    } catch (MetricReporterException e) {
+      reporterExceptions.add(e);
+    }
+    try {
+      buildScheduledReporter(properties, 
ConfigurationKeys.DEFAULT_EVENTS_REPORTING_KAFKA_REPORTER_CLASS);
+    } catch (MetricReporterException e) {
+      reporterExceptions.add(e);
+    }
+    if (!reporterExceptions.isEmpty()) {
+      throw new MultiReporterException("Failed to start one or more Kafka 
reporters", reporterExceptions);
+    }
   }
 
-  private void buildGraphiteMetricReporter(Properties properties) {
+  private void buildGraphiteMetricReporter(Properties properties)
+      throws MultiReporterException {
+    List<MetricReporterException> reporterExceptionList = Lists.newArrayList();
+
     boolean metricsEnabled = PropertiesUtils
         .getPropAsBoolean(properties, 
ConfigurationKeys.METRICS_REPORTING_GRAPHITE_METRICS_ENABLED_KEY,
             
ConfigurationKeys.DEFAULT_METRICS_REPORTING_GRAPHITE_METRICS_ENABLED);
@@ -612,8 +658,8 @@ public class GobblinMetrics {
       
Preconditions.checkArgument(properties.containsKey(ConfigurationKeys.METRICS_REPORTING_GRAPHITE_HOSTNAME),
           "Graphite hostname is missing.");
     } catch (IllegalArgumentException exception) {
-      LOGGER.error("Not reporting to Graphite due to missing Graphite 
configuration(s).", exception);
-      return;
+      reporterExceptionList.add(new MetricReporterException("Missing Graphite 
configuration(s).", exception, ReporterType.METRIC_EVENT, 
ReporterSinkType.GRAPHITE));
+      throw new MultiReporterException("Failed to start one or more Graphite 
reporters", reporterExceptionList);
     }
 
     String hostname = 
properties.getProperty(ConfigurationKeys.METRICS_REPORTING_GRAPHITE_HOSTNAME);
@@ -641,7 +687,7 @@ public class GobblinMetrics {
             .withMetricsPrefix(prefix)
             .build(properties);
       } catch (IOException e) {
-        LOGGER.error("Failed to create Graphite metrics reporter. Will not 
report metrics to Graphite.", e);
+        reporterExceptionList.add(new MetricReporterException("Failed to 
create Graphite metrics reporter.", e, ReporterType.METRIC, 
ReporterSinkType.GRAPHITE));
       }
     }
 
@@ -663,12 +709,19 @@ public class GobblinMetrics {
         
this.codahaleScheduledReporters.add(this.codahaleReportersCloser.register(eventReporter));
       }
       catch (IOException e) {
-        LOGGER.error("Failed to create Graphite event reporter. Will not 
report events to Graphite.", e);
+        reporterExceptionList.add(new MetricReporterException("Failed to 
create Graphite event reporter.", e, ReporterType.EVENT, 
ReporterSinkType.GRAPHITE));
       }
     }
+
+    if (!reporterExceptionList.isEmpty()) {
+      throw new MultiReporterException("Failed to create one or more Graphite 
Reporters", reporterExceptionList);
+    }
   }
 
-  private void buildInfluxDBMetricReporter(Properties properties) {
+  private void buildInfluxDBMetricReporter(Properties properties)
+      throws MultiReporterException {
+    List<MetricReporterException> reporterExceptionList = Lists.newArrayList();
+
     boolean metricsEnabled = PropertiesUtils
         .getPropAsBoolean(properties, 
ConfigurationKeys.METRICS_REPORTING_INFLUXDB_METRICS_ENABLED_KEY,
             
ConfigurationKeys.DEFAULT_METRICS_REPORTING_INFLUXDB_METRICS_ENABLED);
@@ -691,8 +744,8 @@ public class GobblinMetrics {
       
Preconditions.checkArgument(properties.containsKey(ConfigurationKeys.METRICS_REPORTING_INFLUXDB_DATABASE),
           "InfluxDB database name is missing.");
     } catch (IllegalArgumentException exception) {
-      LOGGER.error("Not reporting to InfluxDB due to missing InfluxDB 
configuration(s).", exception);
-      return;
+      reporterExceptionList.add(new MetricReporterException("Missing InfluxDB 
configuration(s)", exception, ReporterType.METRIC_EVENT, 
ReporterSinkType.INFLUXDB));
+      throw new MultiReporterException("Failed to start one or more InfluxDB 
reporters", reporterExceptionList);
     }
 
     String url = 
properties.getProperty(ConfigurationKeys.METRICS_REPORTING_INFLUXDB_URL);
@@ -719,7 +772,7 @@ public class GobblinMetrics {
             this.metricContext.getName()) // contains the current job id
             .build(properties);
       } catch (IOException e) {
-        LOGGER.error("Failed to create InfluxDB metrics reporter. Will not 
report metrics to InfluxDB.", e);
+        reporterExceptionList.add(new MetricReporterException("Failed to 
create InfluxDB metrics reporter.", e, ReporterType.METRIC, 
ReporterSinkType.INFLUXDB));
       }
     }
 
@@ -735,9 +788,13 @@ public class GobblinMetrics {
         
this.codahaleScheduledReporters.add(this.codahaleReportersCloser.register(eventReporter));
       }
       catch (IOException e) {
-        LOGGER.error("Failed to create InfluxDB event reporter. Will not 
report events to InfluxDB.", e);
+        reporterExceptionList.add(new MetricReporterException("Failed to 
create InfluxDB event reporter.", e, ReporterType.EVENT, 
ReporterSinkType.INFLUXDB));
       }
     }
+
+    if (!reporterExceptionList.isEmpty()) {
+      throw new MultiReporterException("Failed to create one or more InfluxDB 
reporters", reporterExceptionList);
+    }
   }
 
   /**
@@ -745,7 +802,7 @@ public class GobblinMetrics {
    * {@link 
org.apache.gobblin.configuration.ConfigurationKeys#METRICS_CUSTOM_BUILDERS}. 
This allows users to specify custom
    * reporters for Gobblin runtime without having to modify the code.
    */
-  private void buildCustomMetricReporters(Properties properties) {
+  private void buildCustomMetricReporters(Properties properties, 
List<MetricReporterException> reporterExceptions) {
     String reporterClasses = 
properties.getProperty(ConfigurationKeys.METRICS_CUSTOM_BUILDERS);
 
     if (Strings.isNullOrEmpty(reporterClasses)) {
@@ -753,11 +810,16 @@ public class GobblinMetrics {
     }
 
     for (String reporterClass : Splitter.on(",").split(reporterClasses)) {
-      buildScheduledReporter(properties, reporterClass, 
Optional.<String>absent());
+      try {
+        buildScheduledReporter(properties, reporterClass);
+      } catch (MetricReporterException e) {
+        reporterExceptions.add(e);
+      }
     }
   }
 
-  private void buildScheduledReporter(Properties properties, String 
reporterClass, Optional<String> reporterSink) {
+  private void buildScheduledReporter(Properties properties, String 
reporterClass)
+      throws MetricReporterException {
     try {
       Class<?> clazz = Class.forName(reporterClass);
 
@@ -771,27 +833,28 @@ public class GobblinMetrics {
           return;
         }
         this.codahaleReportersCloser.register(scheduledReporter);
-        String reporterSinkMsg = reporterSink.isPresent() ? "to " + 
reporterSink.get() : "";
-        LOGGER.info("Will start reporting metrics " + reporterSinkMsg + " 
using " + reporterClass);
+        LOGGER.info("Will start reporting metrics using " + reporterClass);
         this.codahaleScheduledReporters.add(scheduledReporter);
       } else if (CustomReporterFactory.class.isAssignableFrom(clazz)) {
         CustomReporterFactory customReporterFactory = ((CustomReporterFactory) 
clazz.getConstructor().newInstance());
         customReporterFactory.newScheduledReporter(properties);
         LOGGER.info("Will start reporting metrics using " + reporterClass);
       } else {
-        throw new IllegalArgumentException("Class " + reporterClass +
+        throw new MetricReporterException("Class " + reporterClass +
             " specified by key " + ConfigurationKeys.METRICS_CUSTOM_BUILDERS + 
" must implement: "
-            + CustomCodahaleReporterFactory.class + " or " + 
CustomReporterFactory.class);
+            + CustomCodahaleReporterFactory.class + " or " + 
CustomReporterFactory.class, ReporterType.CUSTOM, ReporterSinkType.CUSTOM);
       }
     } catch (ClassNotFoundException exception) {
-      LOGGER.warn(String
+      throw new MetricReporterException(String
           .format("Failed to create metric reporter: requested 
CustomReporterFactory %s not found.", reporterClass),
-          exception);
+          exception, ReporterType.CUSTOM, ReporterSinkType.CUSTOM);
     } catch (NoSuchMethodException exception) {
-      LOGGER.warn(String.format("Failed to create metric reporter: requested 
CustomReporterFactory %s "
-          + "does not have parameterless constructor.", reporterClass), 
exception);
+      throw new MetricReporterException(String.format("Failed to create metric 
reporter: requested CustomReporterFactory %s "
+          + "does not have parameterless constructor.", reporterClass), 
exception, ReporterType.CUSTOM, ReporterSinkType.CUSTOM);
+    } catch (MetricReporterException exception) {
+      throw exception;
     } catch (Exception exception) {
-      LOGGER.warn("Could not create metric reporter from builder " + 
reporterClass + ".", exception);
+      throw new MetricReporterException("Could not create metric reporter from 
builder " + reporterClass + ".", exception, ReporterType.CUSTOM, 
ReporterSinkType.CUSTOM);
     }
   }
 }
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
index 4a357c8..d2789db 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
@@ -22,21 +22,70 @@ import java.util.Properties;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
 
 @Test
 public class GobblinMetricsTest {
-
   /**
    * Test the {@link GobblinMetrics} instance is removed from {@link 
GobblinMetricsRegistry} when
    * it stops metrics reporting
    */
-  public void testStopReportingMetrics() {
+  public void testStopReportingMetrics()
+      throws MultiReporterException {
     String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
     GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
-    gobblinMetrics.startMetricReporting(new Properties());
+
+    Properties properties = new Properties();
+    properties.put(ConfigurationKeys.FAILURE_REPORTING_FILE_ENABLED_KEY, 
"false");
+
+    gobblinMetrics.startMetricReporting(properties);
     Assert.assertEquals(GobblinMetricsRegistry.getInstance().get(id).get(), 
gobblinMetrics);
 
     gobblinMetrics.stopMetricsReporting();
     
Assert.assertFalse(GobblinMetricsRegistry.getInstance().get(id).isPresent());
   }
+
+  public void testMetricFileReporterThrowsException() {
+    String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
+    GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
+
+    //Enable file reporter without specifying metrics.log.dir.
+    Config config = ConfigFactory.empty()
+        .withValue(ConfigurationKeys.METRICS_REPORTING_FILE_ENABLED_KEY, 
ConfigValueFactory.fromAnyRef(true));
+
+    Properties properties = ConfigUtils.configToProperties(config);
+    //Ensure MultiReporterException is thrown
+    try {
+      gobblinMetrics.startMetricReporting(properties);
+      Assert.fail("Metric reporting unexpectedly succeeded.");
+    } catch (MultiReporterException e) {
+      //Do nothing. Expected to be here.
+    }
+  }
+
+  public void testMetricFileReporterSuccessful() {
+    String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
+    GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
+
+    //Enable file reporter without specifying metrics.log.dir.
+    Config config = ConfigFactory.empty()
+        .withValue(ConfigurationKeys.METRICS_REPORTING_FILE_ENABLED_KEY, 
ConfigValueFactory.fromAnyRef(true))
+        .withValue(ConfigurationKeys.METRICS_LOG_DIR_KEY, 
ConfigValueFactory.fromAnyRef("/tmp"))
+        .withValue(ConfigurationKeys.FAILURE_LOG_DIR_KEY, 
ConfigValueFactory.fromAnyRef("/tmp"));
+
+    Properties properties = ConfigUtils.configToProperties(config);
+    //Ensure MultiReporterException is thrown
+    try {
+      gobblinMetrics.startMetricReporting(properties);
+    } catch (MultiReporterException e) {
+      Assert.fail("Unexpected exception " + e.getMessage());
+    }
+  }
+
 }
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index e576468..33d3591 100644
--- 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -31,6 +31,26 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValue;
+
+import azkaban.jobExecutor.AbstractJob;
+import javax.annotation.Nullable;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.DynamicConfigGenerator;
 import org.apache.gobblin.configuration.State;
@@ -48,30 +68,13 @@ import 
org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
 import org.apache.gobblin.runtime.listeners.CompositeJobListener;
 import org.apache.gobblin.runtime.listeners.EmailNotificationJobListener;
 import org.apache.gobblin.runtime.listeners.JobListener;
+import org.apache.gobblin.runtime.services.MetricsReportingService;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.PropertiesUtils;
 import org.apache.gobblin.util.TimeRangeChecker;
 import org.apache.gobblin.util.hadoop.TokenUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.Credentials;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.io.Closer;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValue;
-
-import azkaban.jobExecutor.AbstractJob;
-import javax.annotation.Nullable;
 
 import static 
org.apache.gobblin.runtime.AbstractJobLauncher.resolveGobblinJobTemplateIfNecessary;
 import static 
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
@@ -222,6 +225,16 @@ public class AzkabanJobLauncher extends AbstractJob 
implements ApplicationLaunch
 
     // Since Java classes cannot extend multiple classes and Azkaban jobs must 
extend AbstractJob, we must use composition
     // verses extending ServiceBasedAppLauncher
+    boolean isMetricReportingFailureFatal = PropertiesUtils
+        .getPropAsBoolean(jobProps, 
ConfigurationKeys.GOBBLIN_JOB_METRIC_REPORTING_FAILURE_FATAL,
+            
Boolean.toString(ConfigurationKeys.DEFAULT_GOBBLIN_JOB_METRIC_REPORTING_FAILURE_FATAL));
+    boolean isEventReportingFailureFatal = PropertiesUtils
+        .getPropAsBoolean(jobProps, 
ConfigurationKeys.GOBBLIN_JOB_EVENT_REPORTING_FAILURE_FATAL,
+            
Boolean.toString(ConfigurationKeys.DEFAULT_GOBBLIN_JOB_EVENT_REPORTING_FAILURE_FATAL));
+
+    
jobProps.setProperty(MetricsReportingService.METRICS_REPORTING_FAILURE_FATAL_KEY,
 Boolean.toString(isMetricReportingFailureFatal));
+    
jobProps.setProperty(MetricsReportingService.EVENT_REPORTING_FAILURE_FATAL_KEY, 
Boolean.toString(isEventReportingFailureFatal));
+
     this.applicationLauncher =
         this.closer.register(new ServiceBasedAppLauncher(jobProps, "Azkaban-" 
+ UUID.randomUUID()));
   }
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporterFactory.java
similarity index 74%
copy from 
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
copy to 
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporterFactory.java
index 5c12360..5fb360d 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporterFactory.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.gobblin.metrics.kafka;
 
 import java.io.IOException;
@@ -30,25 +29,21 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.CustomCodahaleReporterFactory;
 import org.apache.gobblin.metrics.KafkaReportingFormats;
+import org.apache.gobblin.metrics.MetricReporterException;
+import org.apache.gobblin.metrics.ReporterSinkType;
+import org.apache.gobblin.metrics.ReporterType;
 import org.apache.gobblin.metrics.RootMetricContext;
 import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
 
-
 @Slf4j
-public class KafkaReporterFactory implements CustomCodahaleReporterFactory {
+public class KafkaEventReporterFactory implements 
CustomCodahaleReporterFactory {
   @Override
-  public ScheduledReporter newScheduledReporter(MetricRegistry registry, 
Properties properties) {
+  public ScheduledReporter newScheduledReporter(MetricRegistry registry, 
Properties properties)
+      throws IOException {
     if 
(!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY,
         ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_ENABLED))) {
       return null;
     }
-    log.info("Reporting metrics & events to Kafka");
-
-    boolean metricsEnabled = KafkaReporterUtils.isMetricsEnabled(properties);
-
-    if (KafkaReporterUtils.isMetricsEnabled(properties)) {
-      log.info("Metrics enabled ---  Reporting metrics to Kafka");
-    }
 
     boolean eventsEnabled = KafkaReporterUtils.isEventsEnabled(properties);
     if (KafkaReporterUtils.isEventsEnabled(properties)) {
@@ -61,10 +56,9 @@ public class KafkaReporterFactory implements 
CustomCodahaleReporterFactory {
     try {
       
Preconditions.checkArgument(properties.containsKey(ConfigurationKeys.METRICS_KAFKA_BROKERS),
           "Kafka metrics brokers missing.");
-      
Preconditions.checkArgument(KafkaReporterUtils.getMetricsTopic(properties).or(eventsTopic).or(defaultTopic).isPresent(),
 "Kafka topic missing.");
+      Preconditions.checkArgument(eventsTopic.or(defaultTopic).isPresent(), 
"Kafka topic missing.");
     } catch (IllegalArgumentException exception) {
-      log.error("Not reporting metrics to Kafka due to missing Kafka 
configuration(s).", exception);
-      return null;
+      throw new MetricReporterException("Missing Kafka configuration(s).", 
exception, ReporterType.EVENT, ReporterSinkType.KAFKA);
     }
 
     String brokers = 
properties.getProperty(ConfigurationKeys.METRICS_KAFKA_BROKERS);
@@ -82,15 +76,6 @@ public class KafkaReporterFactory implements 
CustomCodahaleReporterFactory {
       formatEnum = KafkaReportingFormats.JSON;
     }
 
-    Optional<String> metricsTopic = 
KafkaReporterUtils.getMetricsTopic(properties);
-    if (metricsEnabled) {
-      try {
-        formatEnum.buildMetricsReporter(brokers, 
metricsTopic.or(defaultTopic).get(), properties);
-      } catch (IOException exception) {
-        log.error("Failed to create Kafka metrics reporter. Will not report 
metrics to Kafka.", exception);
-      }
-    }
-
     KafkaReportingFormats eventFormatEnum;
     if 
(properties.containsKey(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKA_FORMAT))
 {
       String eventsReportingFormat = 
properties.getProperty(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKA_FORMAT,
@@ -110,12 +95,9 @@ public class KafkaReporterFactory implements 
CustomCodahaleReporterFactory {
     if (eventsEnabled) {
       try {
         String eventTopic = eventsTopic.or(defaultTopic).get();
-        ScheduledReporter reporter =
-            eventFormatEnum.buildEventsReporter(brokers, eventTopic, 
RootMetricContext.get(), properties);
-
-        return reporter;
+        return eventFormatEnum.buildEventsReporter(brokers, eventTopic, 
RootMetricContext.get(), properties);
       } catch (IOException exception) {
-        log.error("Failed to create Kafka events reporter. Will not report 
events to Kafka.", exception);
+        throw new MetricReporterException("Failed to create Kafka events 
reporter.", exception, ReporterType.EVENT, ReporterSinkType.KAFKA);
       }
     }
 
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaMetricReporterFactory.java
similarity index 60%
rename from 
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
rename to 
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaMetricReporterFactory.java
index 5c12360..fd11dbf 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaMetricReporterFactory.java
@@ -30,41 +30,36 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.CustomCodahaleReporterFactory;
 import org.apache.gobblin.metrics.KafkaReportingFormats;
-import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.MetricReporterException;
+import org.apache.gobblin.metrics.ReporterSinkType;
+import org.apache.gobblin.metrics.ReporterType;
 import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
 
 
 @Slf4j
-public class KafkaReporterFactory implements CustomCodahaleReporterFactory {
+public class KafkaMetricReporterFactory implements 
CustomCodahaleReporterFactory {
   @Override
-  public ScheduledReporter newScheduledReporter(MetricRegistry registry, 
Properties properties) {
+  public ScheduledReporter newScheduledReporter(MetricRegistry registry, 
Properties properties)
+      throws IOException {
     if 
(!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY,
         ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_ENABLED))) {
       return null;
     }
-    log.info("Reporting metrics & events to Kafka");
-
     boolean metricsEnabled = KafkaReporterUtils.isMetricsEnabled(properties);
 
     if (KafkaReporterUtils.isMetricsEnabled(properties)) {
       log.info("Metrics enabled ---  Reporting metrics to Kafka");
     }
 
-    boolean eventsEnabled = KafkaReporterUtils.isEventsEnabled(properties);
-    if (KafkaReporterUtils.isEventsEnabled(properties)) {
-      log.info("Events enabled --- Reporting events to Kafka");
-    }
-
-    Optional<String> eventsTopic = 
KafkaReporterUtils.getEventsTopic(properties);
+    Optional<String> metricsTopic = 
KafkaReporterUtils.getMetricsTopic(properties);
     Optional<String> defaultTopic = 
KafkaReporterUtils.getDefaultTopic(properties);
 
     try {
       
Preconditions.checkArgument(properties.containsKey(ConfigurationKeys.METRICS_KAFKA_BROKERS),
           "Kafka metrics brokers missing.");
-      
Preconditions.checkArgument(KafkaReporterUtils.getMetricsTopic(properties).or(eventsTopic).or(defaultTopic).isPresent(),
 "Kafka topic missing.");
+      Preconditions.checkArgument(metricsTopic.or(defaultTopic).isPresent(), 
"Kafka topic missing.");
     } catch (IllegalArgumentException exception) {
-      log.error("Not reporting metrics to Kafka due to missing Kafka 
configuration(s).", exception);
-      return null;
+      throw new MetricReporterException("Missing Kafka configuration(s).", 
exception, ReporterType.METRIC, ReporterSinkType.KAFKA);
     }
 
     String brokers = 
properties.getProperty(ConfigurationKeys.METRICS_KAFKA_BROKERS);
@@ -82,43 +77,13 @@ public class KafkaReporterFactory implements 
CustomCodahaleReporterFactory {
       formatEnum = KafkaReportingFormats.JSON;
     }
 
-    Optional<String> metricsTopic = 
KafkaReporterUtils.getMetricsTopic(properties);
     if (metricsEnabled) {
       try {
         formatEnum.buildMetricsReporter(brokers, 
metricsTopic.or(defaultTopic).get(), properties);
       } catch (IOException exception) {
-        log.error("Failed to create Kafka metrics reporter. Will not report 
metrics to Kafka.", exception);
+        throw new MetricReporterException("Failed to create Kafka metrics 
reporter.", exception, ReporterType.METRIC, ReporterSinkType.KAFKA);
       }
     }
-
-    KafkaReportingFormats eventFormatEnum;
-    if 
(properties.containsKey(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKA_FORMAT))
 {
-      String eventsReportingFormat = 
properties.getProperty(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKA_FORMAT,
-          ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_FORMAT);
-      try {
-        eventFormatEnum = 
KafkaReportingFormats.valueOf(eventsReportingFormat.toUpperCase());
-      } catch (IllegalArgumentException exception) {
-        log.warn(
-            "Kafka events reporting format " + eventsReportingFormat + " not 
recognized. Will report in json format.",
-            exception);
-        eventFormatEnum = KafkaReportingFormats.JSON;
-      }
-    } else {
-      eventFormatEnum = formatEnum;
-    }
-
-    if (eventsEnabled) {
-      try {
-        String eventTopic = eventsTopic.or(defaultTopic).get();
-        ScheduledReporter reporter =
-            eventFormatEnum.buildEventsReporter(brokers, eventTopic, 
RootMetricContext.get(), properties);
-
-        return reporter;
-      } catch (IOException exception) {
-        log.error("Failed to create Kafka events reporter. Will not report 
events to Kafka.", exception);
-      }
-    }
-
     return null;
   }
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index c649595..ed02d38 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -74,11 +74,13 @@ import org.apache.gobblin.fsm.FiniteStateMachine;
 import org.apache.gobblin.metastore.FsStateStore;
 import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MultiReporterException;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.CountEventBuilder;
 import org.apache.gobblin.metrics.event.JobEvent;
 import org.apache.gobblin.metrics.event.JobStateEventBuilder;
 import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
 import org.apache.gobblin.password.PasswordManager;
 import org.apache.gobblin.runtime.AbstractJobLauncher;
 import org.apache.gobblin.runtime.DynamicConfigGeneratorFactory;
@@ -783,8 +785,21 @@ public class MRJobLauncher extends AbstractJobLauncher {
       if (Boolean.valueOf(
           configuration.get(ConfigurationKeys.METRICS_ENABLED_KEY, 
ConfigurationKeys.DEFAULT_METRICS_ENABLED))) {
         this.jobMetrics = Optional.of(JobMetrics.get(this.jobState));
-        this.jobMetrics.get()
-            .startMetricReportingWithFileSuffix(gobblinJobState, 
context.getTaskAttemptID().toString());
+        try {
+          this.jobMetrics.get()
+              .startMetricReportingWithFileSuffix(gobblinJobState, 
context.getTaskAttemptID().toString());
+        } catch (MultiReporterException ex) {
+          //Fail the task if metric/event reporting failure is configured to 
be fatal.
+          boolean isMetricReportingFailureFatal = Boolean.valueOf(configuration
+              
.get(ConfigurationKeys.GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL,
+                  
Boolean.toString(ConfigurationKeys.DEFAULT_GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL)));
+          boolean isEventReportingFailureFatal = Boolean.valueOf(configuration
+              
.get(ConfigurationKeys.GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL,
+                  
Boolean.toString(ConfigurationKeys.DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL)));
+          if (MetricReportUtils.shouldThrowException(LOG, ex, 
isMetricReportingFailureFatal, isEventReportingFailureFatal)) {
+            throw new RuntimeException(ex);
+          }
+        }
       }
     }
 
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/MetricsReportingService.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/MetricsReportingService.java
index 43b53cd..4e53d24 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/MetricsReportingService.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/MetricsReportingService.java
@@ -21,25 +21,46 @@ import java.util.Properties;
 
 import com.google.common.util.concurrent.AbstractIdleService;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MultiReporterException;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
+import org.apache.gobblin.util.PropertiesUtils;
 
 
 /**
  * A {@link com.google.common.util.concurrent.Service} for handling life cycle 
events around {@link GobblinMetrics}.
  */
+@Slf4j
 public class MetricsReportingService extends AbstractIdleService {
+  public static final String METRICS_REPORTING_FAILURE_FATAL_KEY = 
"metrics.reporting.failure.fatal";
+  public static final String EVENT_REPORTING_FAILURE_FATAL_KEY = 
"event.reporting.failure.fatal";
+
+  public static final String DEFAULT_METRICS_REPORTING_FAILURE_FATAL = "false";
+  public static final String DEFAULT_EVENT_REPORTING_FAILURE_FATAL = "false";
 
   private final Properties properties;
   private final String appId;
+  private final boolean isMetricReportingFailureFatal;
+  private final boolean isEventReportingFailureFatal;
 
   public MetricsReportingService(Properties properties, String appId) {
     this.properties = properties;
     this.appId = appId;
+    this.isMetricReportingFailureFatal = 
PropertiesUtils.getPropAsBoolean(properties, 
METRICS_REPORTING_FAILURE_FATAL_KEY, DEFAULT_METRICS_REPORTING_FAILURE_FATAL);
+    this.isEventReportingFailureFatal = 
PropertiesUtils.getPropAsBoolean(properties, EVENT_REPORTING_FAILURE_FATAL_KEY, 
DEFAULT_EVENT_REPORTING_FAILURE_FATAL);
   }
 
   @Override
   protected void startUp() throws Exception {
-    GobblinMetrics.get(this.appId).startMetricReporting(this.properties);
+    try {
+      GobblinMetrics.get(this.appId).startMetricReporting(this.properties);
+    } catch (MultiReporterException ex) {
+      if (MetricReportUtils.shouldThrowException(log, ex, 
this.isMetricReportingFailureFatal, this.isEventReportingFailureFatal)) {
+        throw ex;
+      }
+    }
   }
 
   @Override
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index 1abf8d2..d3d91df 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -45,7 +45,6 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
-import org.apache.gobblin.cluster.ContainerHealthCheckException;
 import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
 import org.apache.gobblin.cluster.GobblinClusterUtils;
 import org.apache.gobblin.cluster.GobblinTaskRunner;
@@ -174,7 +173,7 @@ public class GobblinYarnTaskRunner extends 
GobblinTaskRunner {
     return containerId.toString();
   }
 
-  public static void main(String[] args) throws Exception {
+  public static void main(String[] args) {
     Options options = buildOptions();
     try {
       CommandLine cmd = new DefaultParser().parse(options, args);
@@ -210,11 +209,11 @@ public class GobblinYarnTaskRunner extends 
GobblinTaskRunner {
     } catch (ParseException pe) {
       printUsage(options);
       System.exit(1);
-    } catch (ContainerHealthCheckException e) {
-      // Ideally, we should not be catching this exception, as this is 
indicative of a non-recoverable exception. However,
+    } catch (Throwable t) {
+      // Ideally, we should not be catching non-recoverable exceptions and 
errors. However,
       // simply propagating the exception may prevent the container exit due 
to the presence of non-daemon threads present
       // in the application. Hence, we catch this exception to invoke 
System.exit() which in turn ensures that all non-daemon threads are killed.
-      LOGGER.error("Exception encountered: {}", e);
+      LOGGER.error("Exception encountered: {}", t);
       System.exit(1);
     }
   }
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index f9c6cc6..177cc69 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -98,6 +98,8 @@ import org.apache.gobblin.cluster.HelixUtils;
 import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricReporterException;
+import org.apache.gobblin.metrics.MultiReporterException;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.util.ConfigUtils;
@@ -399,7 +401,13 @@ public class YarnService extends AbstractIdleService {
 
     // Intialize Gobblin metrics and start reporters
     GobblinMetrics gobblinMetrics = GobblinMetrics.get(this.applicationId, 
null, tags.build());
-    
gobblinMetrics.startMetricReporting(ConfigUtils.configToProperties(config));
+    try {
+      
gobblinMetrics.startMetricReporting(ConfigUtils.configToProperties(config));
+    } catch (MultiReporterException ex) {
+      for (MetricReporterException e: ex.getExceptions()) {
+        LOGGER.error("Failed to start {} {} reporter.", 
e.getSinkType().name(), e.getReporterType().name(), e);
+      }
+    }
 
     return gobblinMetrics;
   }

Reply via email to