This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new c762c97 [GOBBLIN-1127] Provide an option to make metric reporting
instantiation failure fatal
c762c97 is described below
commit c762c97c97336aa5603f571f383258a1baa7e8c3
Author: sv2000 <[email protected]>
AuthorDate: Tue Jun 16 10:04:05 2020 -0700
[GOBBLIN-1127] Provide an option to make metric reporting instantiation
failure fatal
Closes #3035 from
sv2000/metricReportInstantiationFailure
---
.../gobblin/configuration/ConfigurationKeys.java | 18 +-
.../apache/gobblin/cluster/GobblinTaskRunner.java | 36 +++-
.../gobblin/cluster/GobblinTaskRunnerTest.java | 20 +++
.../gobblin/compaction/mapreduce/MRCompactor.java | 10 +-
.../gobblin/metrics/MetricReporterException.java} | 34 ++--
.../gobblin/metrics/MultiReporterException.java} | 29 ++--
.../apache/gobblin/metrics/ReporterSinkType.java} | 31 +---
.../org/apache/gobblin/metrics/ReporterType.java} | 29 +---
.../metrics/reporter/util/MetricReportUtils.java | 16 ++
.../org/apache/gobblin/metrics/GobblinMetrics.java | 187 ++++++++++++++-------
.../apache/gobblin/metrics/GobblinMetricsTest.java | 55 +++++-
.../apache/gobblin/azkaban/AzkabanJobLauncher.java | 51 +++---
...Factory.java => KafkaEventReporterFactory.java} | 38 ++---
...actory.java => KafkaMetricReporterFactory.java} | 55 ++----
.../gobblin/runtime/mapreduce/MRJobLauncher.java | 19 ++-
.../runtime/services/MetricsReportingService.java | 23 ++-
.../apache/gobblin/yarn/GobblinYarnTaskRunner.java | 9 +-
.../java/org/apache/gobblin/yarn/YarnService.java | 10 +-
18 files changed, 415 insertions(+), 255 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 5cb2613..424424b 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -684,6 +684,19 @@ public class ConfigurationKeys {
public static final String METRICS_REPORTING_CONFIGURATIONS_PREFIX =
"metrics.reporting";
public static final String METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX =
METRICS_REPORTING_CONFIGURATIONS_PREFIX + ".events";
+
+ //Configuration keys to trigger job/task failures on metric reporter
instantiation failures. Useful
+ //when monitoring of Gobblin pipelines critically depend on events and
metrics emitted by the metrics
+ //reporting service running in each container.
+ public static final String GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL =
"gobblin.task.isMetricReportingFailureFatal";
+ public static final boolean
DEFAULT_GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL = false;
+ public static final String GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL =
"gobblin.task.isEventReportingFailureFatal";
+ public static final boolean
DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL = false;
+ public static final String GOBBLIN_JOB_METRIC_REPORTING_FAILURE_FATAL =
"gobblin.job.isMetricReportingFailureFatal";
+ public static final boolean
DEFAULT_GOBBLIN_JOB_METRIC_REPORTING_FAILURE_FATAL = false;
+ public static final String GOBBLIN_JOB_EVENT_REPORTING_FAILURE_FATAL =
"gobblin.job.isEventReportingFailureFatal";
+ public static final boolean
DEFAULT_GOBBLIN_JOB_EVENT_REPORTING_FAILURE_FATAL = false;
+
// File-based reporting
public static final String METRICS_REPORTING_FILE_ENABLED_KEY =
METRICS_CONFIGURATIONS_PREFIX + "reporting.file.enabled";
@@ -705,7 +718,10 @@ public class ConfigurationKeys {
METRICS_CONFIGURATIONS_PREFIX + "reporting.kafka.enabled";
public static final String DEFAULT_METRICS_REPORTING_KAFKA_ENABLED =
Boolean.toString(false);
public static final String DEFAULT_METRICS_REPORTING_KAFKA_REPORTER_CLASS =
- "org.apache.gobblin.metrics.kafka.KafkaReporterFactory";
+ "org.apache.gobblin.metrics.kafka.KafkaMetricReporterFactory";
+ public static final String DEFAULT_EVENTS_REPORTING_KAFKA_REPORTER_CLASS =
+ "org.apache.gobblin.metrics.kafka.KafkaEventReporterFactory";
+
public static final String METRICS_REPORTING_KAFKA_FORMAT =
METRICS_CONFIGURATIONS_PREFIX + "reporting.kafka.format";
public static final String METRICS_REPORTING_EVENTS_KAFKA_FORMAT =
METRICS_CONFIGURATIONS_PREFIX + "reporting.events.kafka.format";
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 68f98a2..08d4dd0 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -85,12 +85,15 @@ import lombok.Setter;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.StandardMetricsBridge;
import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MultiReporterException;
import org.apache.gobblin.metrics.RootMetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
@@ -171,6 +174,8 @@ public class GobblinTaskRunner implements
StandardMetricsBridge {
protected final FileSystem fs;
protected final String applicationName;
protected final String applicationId;
+ private final boolean isMetricReportingFailureFatal;
+ private final boolean isEventReportingFailureFatal;
public GobblinTaskRunner(String applicationName,
String helixInstanceName,
@@ -190,6 +195,15 @@ public class GobblinTaskRunner implements
StandardMetricsBridge {
this.appWorkPath = initAppWorkDir(config, appWorkDirOptional);
this.clusterConfig = saveConfigToFile(config);
this.clusterName =
this.clusterConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+
+ this.isMetricReportingFailureFatal =
ConfigUtils.getBoolean(this.clusterConfig,
+ ConfigurationKeys.GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL,
+ ConfigurationKeys.DEFAULT_GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL);
+
+ this.isEventReportingFailureFatal =
ConfigUtils.getBoolean(this.clusterConfig,
+ ConfigurationKeys.GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL,
+ ConfigurationKeys.DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL);
+
logger.info("Configured GobblinTaskRunner work dir to: {}",
this.appWorkPath.toString());
// Set system properties passed in via application config. As an example,
Helix uses System#getProperty() for ZK configuration
@@ -313,7 +327,8 @@ public class GobblinTaskRunner implements
StandardMetricsBridge {
/**
* Start this {@link GobblinTaskRunner} instance.
*/
- public void start() throws ContainerHealthCheckException {
+ public void start()
+ throws ContainerHealthCheckException {
logger.info(String.format("Starting %s in container %s",
this.helixInstanceName, this.taskRunnerId));
// Add a shutdown hook so the task scheduler gets properly shutdown
@@ -345,11 +360,7 @@ public class GobblinTaskRunner implements
StandardMetricsBridge {
addInstanceTags();
// Start metric reporting
- if (this.containerMetrics.isPresent()) {
- this.containerMetrics.get()
-
.startMetricReportingWithFileSuffix(ConfigUtils.configToState(this.clusterConfig),
- this.taskRunnerId);
- }
+ initMetricReporter();
if (this.containerHealthEventBus != null) {
//Register itself with the container health event bus instance to
receive container health events
@@ -373,6 +384,19 @@ public class GobblinTaskRunner implements
StandardMetricsBridge {
}
}
+ private void initMetricReporter() {
+ if (this.containerMetrics.isPresent()) {
+ try {
+ this.containerMetrics.get()
+
.startMetricReportingWithFileSuffix(ConfigUtils.configToState(this.clusterConfig),
this.taskRunnerId);
+ } catch (MultiReporterException ex) {
+ if (MetricReportUtils.shouldThrowException(logger, ex,
this.isMetricReportingFailureFatal, this.isEventReportingFailureFatal)) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+ }
+
public synchronized void stop() {
if (this.isStopped) {
logger.info("Gobblin Task runner is already stopped.");
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
index 7a0b879..534eb3b 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
@@ -47,6 +47,7 @@ import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.cluster.suite.IntegrationBasicSuite;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.testing.AssertWithBackoff;
import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
import org.apache.gobblin.util.eventbus.EventBusFactory;
@@ -76,6 +77,7 @@ public class GobblinTaskRunnerTest {
private GobblinTaskRunner gobblinTaskRunner;
private GobblinTaskRunner gobblinTaskRunnerHealthCheck;
private GobblinTaskRunner corruptGobblinTaskRunner;
+ private GobblinTaskRunner gobblinTaskRunnerFailedReporter;
private GobblinClusterManager gobblinClusterManager;
private String clusterName;
@@ -117,6 +119,18 @@ public class GobblinTaskRunnerTest {
config.withValue(GobblinClusterConfigurationKeys.CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED,
ConfigValueFactory.fromAnyRef(true))
, Optional.<Path>absent());
+ // Participant that fails to start due to metric reporter failures
+ String instanceName =
HelixUtils.getHelixInstanceName("MetricReporterFailureInstance", 0);
+
+ Config metricConfig =
config.withValue(ConfigurationKeys.METRICS_ENABLED_KEY,
ConfigValueFactory.fromAnyRef(true))
+ .withValue(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY,
ConfigValueFactory.fromAnyRef(true))
+ .withValue(ConfigurationKeys.METRICS_KAFKA_TOPIC_METRICS,
ConfigValueFactory.fromAnyRef("metricTopic"))
+
.withValue(ConfigurationKeys.GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL,
ConfigValueFactory.fromAnyRef(true));
+
+ this.gobblinTaskRunnerFailedReporter =
+ new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, instanceName,
+ TestHelper.TEST_APPLICATION_ID, "2", metricConfig,
Optional.<Path>absent());
+
// Participant with a partial Instance set up on Helix/ZK
this.corruptHelixInstance =
HelixUtils.getHelixInstanceName("CorruptHelixInstance", 0);
this.corruptGobblinTaskRunner =
@@ -158,6 +172,11 @@ public class GobblinTaskRunnerTest {
}, "gobblinTaskRunner stopped");
}
+ @Test (expectedExceptions = RuntimeException.class,
expectedExceptionsMessageRegExp = ".*Could not create one or more reporters.*")
+ public void testStartUpFailsDueToMetricReporterFailure() {
+ GobblinTaskRunnerTest.this.gobblinTaskRunnerFailedReporter.start();
+ }
+
@Test
public void testBuildFileSystemConfig() {
FileSystem fileSystem = this.gobblinTaskRunner.getFs();
@@ -270,6 +289,7 @@ public class GobblinTaskRunnerTest {
this.gobblinClusterManager.disconnectHelixManager();
this.gobblinTaskRunner.disconnectHelixManager();
this.corruptGobblinTaskRunner.disconnectHelixManager();
+ this.gobblinTaskRunnerFailedReporter.disconnectHelixManager();
this.gobblinTaskRunnerHealthCheck.disconnectHelixManager();
if (this.suite != null) {
this.suite.shutdownCluster();
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
index 5a937ed..850a1eb 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
@@ -69,6 +69,8 @@ import
org.apache.gobblin.compaction.verify.DataCompletenessVerifier.Results;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricReporterException;
+import org.apache.gobblin.metrics.MultiReporterException;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.util.ClassAliasResolver;
@@ -364,7 +366,13 @@ public class MRCompactor implements Compactor {
tags.addAll(Tag.fromMap(ClusterNameTags.getClusterNameTags()));
GobblinMetrics gobblinMetrics =
GobblinMetrics.get(this.state.getProp(ConfigurationKeys.JOB_NAME_KEY),
null, tags.build());
- gobblinMetrics.startMetricReporting(this.state.getProperties());
+ try {
+ gobblinMetrics.startMetricReporting(this.state.getProperties());
+ } catch (MultiReporterException ex) {
+ for (MetricReporterException e: ex.getExceptions()) {
+ LOG.error("Failed to start {} {} reporter.", e.getSinkType().name(),
e.getReporterType().name(), e);
+ }
+ }
return gobblinMetrics;
}
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricReporterException.java
similarity index 53%
copy from
gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
copy to
gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricReporterException.java
index 4a357c8..3c3142e 100644
---
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricReporterException.java
@@ -14,29 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.gobblin.metrics;
-import java.util.Properties;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
+import java.io.IOException;
+import lombok.Getter;
-@Test
-public class GobblinMetricsTest {
+public class MetricReporterException extends IOException {
+ @Getter
+ private final ReporterType reporterType;
+ @Getter
+ private final ReporterSinkType sinkType;
- /**
- * Test the {@link GobblinMetrics} instance is removed from {@link
GobblinMetricsRegistry} when
- * it stops metrics reporting
- */
- public void testStopReportingMetrics() {
- String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
- GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
- gobblinMetrics.startMetricReporting(new Properties());
- Assert.assertEquals(GobblinMetricsRegistry.getInstance().get(id).get(),
gobblinMetrics);
+ public MetricReporterException(String message, ReporterType reporterType,
ReporterSinkType sinkType) {
+ super(message);
+ this.reporterType = reporterType;
+ this.sinkType = sinkType;
+ }
- gobblinMetrics.stopMetricsReporting();
-
Assert.assertFalse(GobblinMetricsRegistry.getInstance().get(id).isPresent());
+ public MetricReporterException(String message, Throwable t, ReporterType
reporterType, ReporterSinkType sinkType) {
+ super(message, t);
+ this.reporterType = reporterType;
+ this.sinkType = sinkType;
}
}
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MultiReporterException.java
similarity index 53%
copy from
gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
copy to
gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MultiReporterException.java
index 4a357c8..1d0dafd 100644
---
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MultiReporterException.java
@@ -14,29 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.gobblin.metrics;
-import java.util.Properties;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
+import java.io.IOException;
+import java.util.List;
+public class MultiReporterException extends IOException {
+ private List<MetricReporterException> exceptions;
-@Test
-public class GobblinMetricsTest {
-
- /**
- * Test the {@link GobblinMetrics} instance is removed from {@link
GobblinMetricsRegistry} when
- * it stops metrics reporting
- */
- public void testStopReportingMetrics() {
- String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
- GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
- gobblinMetrics.startMetricReporting(new Properties());
- Assert.assertEquals(GobblinMetricsRegistry.getInstance().get(id).get(),
gobblinMetrics);
+ public MultiReporterException(String message, List<MetricReporterException>
exceptions) {
+ super(message);
+ this.exceptions = exceptions;
+ }
- gobblinMetrics.stopMetricsReporting();
-
Assert.assertFalse(GobblinMetricsRegistry.getInstance().get(id).isPresent());
+ public List<MetricReporterException> getExceptions() {
+ return this.exceptions;
}
}
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ReporterSinkType.java
similarity index 53%
copy from
gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
copy to
gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ReporterSinkType.java
index 4a357c8..53ccdf8 100644
---
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ReporterSinkType.java
@@ -14,29 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.gobblin.metrics;
-import java.util.Properties;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-
-@Test
-public class GobblinMetricsTest {
-
- /**
- * Test the {@link GobblinMetrics} instance is removed from {@link
GobblinMetricsRegistry} when
- * it stops metrics reporting
- */
- public void testStopReportingMetrics() {
- String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
- GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
- gobblinMetrics.startMetricReporting(new Properties());
- Assert.assertEquals(GobblinMetricsRegistry.getInstance().get(id).get(),
gobblinMetrics);
-
- gobblinMetrics.stopMetricsReporting();
-
Assert.assertFalse(GobblinMetricsRegistry.getInstance().get(id).isPresent());
- }
+public enum ReporterSinkType {
+ JMX,
+ GRAPHITE,
+ FILE,
+ FILE_FAILURE,
+ KAFKA,
+ INFLUXDB,
+ CUSTOM
}
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ReporterType.java
similarity index 53%
copy from
gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
copy to
gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ReporterType.java
index 4a357c8..c36584a 100644
---
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ReporterType.java
@@ -14,29 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.gobblin.metrics;
-import java.util.Properties;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
+public enum ReporterType {
+ METRIC, EVENT, METRIC_EVENT, CUSTOM;
-@Test
-public class GobblinMetricsTest {
-
- /**
- * Test the {@link GobblinMetrics} instance is removed from {@link
GobblinMetricsRegistry} when
- * it stops metrics reporting
- */
- public void testStopReportingMetrics() {
- String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
- GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
- gobblinMetrics.startMetricReporting(new Properties());
- Assert.assertEquals(GobblinMetricsRegistry.getInstance().get(id).get(),
gobblinMetrics);
+ public static boolean isReporterTypeMetric(ReporterType t) {
+ return t.equals(ReporterType.METRIC) ||
t.equals(ReporterType.METRIC_EVENT);
+ }
- gobblinMetrics.stopMetricsReporting();
-
Assert.assertFalse(GobblinMetricsRegistry.getInstance().get(id).isPresent());
+ public static boolean isReporterTypeEvent(ReporterType t) {
+ return t.equals(ReporterType.EVENT) || t.equals(ReporterType.METRIC_EVENT);
}
-}
+}
\ No newline at end of file
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
index b9eefec..0980cea 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/MetricReportUtils.java
@@ -25,6 +25,7 @@ import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.codec.binary.Hex;
+import org.slf4j.Logger;
import com.google.common.base.Optional;
import com.google.common.io.Closer;
@@ -32,6 +33,9 @@ import com.google.common.io.Closer;
import javax.annotation.Nullable;
import org.apache.gobblin.metrics.MetricReport;
+import org.apache.gobblin.metrics.MetricReporterException;
+import org.apache.gobblin.metrics.MultiReporterException;
+import org.apache.gobblin.metrics.ReporterType;
/**
@@ -148,4 +152,16 @@ public class MetricReportUtils {
SCHEMA_VERSION));
}
}
+
+ public static boolean shouldThrowException(Logger log,
MultiReporterException ex, boolean isMetricReportingFailureFatal, boolean
isEventReportingFailureFatal) {
+ boolean shouldThrow = false;
+ for (MetricReporterException e: ex.getExceptions()) {
+ if ((isMetricReportingFailureFatal &&
ReporterType.isReporterTypeMetric(e.getReporterType())) || (
+ isEventReportingFailureFatal &&
ReporterType.isReporterTypeEvent(e.getReporterType()))) {
+ shouldThrow = true;
+ }
+ log.error("Failed to start {} {} reporter", e.getSinkType().name(),
e.getReporterType().name(), e);
+ }
+ return shouldThrow;
+ }
}
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
index e969083..4e4fa6c 100644
---
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
@@ -27,7 +27,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.gobblin.metrics.reporter.FileFailureEventReporter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -48,6 +47,8 @@ import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import com.typesafe.config.Config;
+import lombok.Getter;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metrics.graphite.GraphiteConnectionType;
@@ -56,14 +57,13 @@ import org.apache.gobblin.metrics.graphite.GraphiteReporter;
import org.apache.gobblin.metrics.influxdb.InfluxDBConnectionType;
import org.apache.gobblin.metrics.influxdb.InfluxDBEventReporter;
import org.apache.gobblin.metrics.influxdb.InfluxDBReporter;
+import org.apache.gobblin.metrics.reporter.FileFailureEventReporter;
import org.apache.gobblin.metrics.reporter.OutputStreamEventReporter;
import org.apache.gobblin.metrics.reporter.OutputStreamReporter;
import org.apache.gobblin.metrics.reporter.ScheduledReporter;
import org.apache.gobblin.password.PasswordManager;
import org.apache.gobblin.util.PropertiesUtils;
-import lombok.Getter;
-
/**
* A class that represents a set of metrics associated with a given name.
@@ -354,7 +354,8 @@ public class GobblinMetrics {
* Starts metric reporting and appends the given metrics file suffix to the
current value of
* {@link ConfigurationKeys#METRICS_FILE_SUFFIX}.
*/
- public void startMetricReportingWithFileSuffix(State state, String
metricsFileSuffix) {
+ public void startMetricReportingWithFileSuffix(State state, String
metricsFileSuffix)
+ throws MultiReporterException {
Properties metricsReportingProps = new Properties();
metricsReportingProps.putAll(state.getProperties());
@@ -374,7 +375,8 @@ public class GobblinMetrics {
*
* @param configuration configuration properties
*/
- public void startMetricReporting(Configuration configuration) {
+ public void startMetricReporting(Configuration configuration)
+ throws MultiReporterException {
Properties props = new Properties();
for (Map.Entry<String, String> entry : configuration) {
props.put(entry.getKey(), entry.getValue());
@@ -387,7 +389,7 @@ public class GobblinMetrics {
*
* @param properties configuration properties
*/
- public void startMetricReporting(Properties properties) {
+ public void startMetricReporting(Properties properties) throws
MultiReporterException {
if (this.metricsReportingStarted) {
LOGGER.warn("Metric reporting has already started");
return;
@@ -400,22 +402,21 @@ public class GobblinMetrics {
long startTime = System.currentTimeMillis();
+ List<MetricReporterException> reporterExceptions = Lists.newArrayList();
+
try {
- // Build and start the JMX reporter
- buildJmxMetricReporter(properties);
- if (this.jmxReporter.isPresent()) {
- LOGGER.info("Will start reporting metrics to JMX");
- this.jmxReporter.get().start();
+ for (ReporterSinkType sinkType: ReporterSinkType.values()) {
+ if (sinkType.equals(ReporterSinkType.CUSTOM)) {
+ buildCustomMetricReporters(properties, reporterExceptions);
+ } else {
+ try {
+ buildReporter(properties, sinkType);
+ } catch (MultiReporterException e) {
+ reporterExceptions.addAll(e.getExceptions());
+ }
+ }
}
- // Build all other reporters
- buildFileMetricReporter(properties);
- buildKafkaMetricReporter(properties);
- buildGraphiteMetricReporter(properties);
- buildInfluxDBMetricReporter(properties);
- buildCustomMetricReporters(properties);
- buildFileFailureEventReporter(properties);
-
// Start reporters that implement
org.apache.gobblin.metrics.report.ScheduledReporter
RootMetricContext.get().startReporting();
@@ -431,6 +432,36 @@ public class GobblinMetrics {
this.metricsReportingStarted = true;
LOGGER.info("Metrics reporting has been started in {} ms: GobblinMetrics
{}",
System.currentTimeMillis() - startTime, this.toString());
+
+ if (!reporterExceptions.isEmpty()) {
+ throw new MultiReporterException("Could not create one or more
reporters", reporterExceptions);
+ }
+ }
+
+ private void buildReporter(Properties properties, ReporterSinkType sinkType)
throws MultiReporterException {
+ switch (sinkType) {
+ case JMX:
+ buildJmxMetricReporter(properties);
+ break;
+ case FILE:
+ buildFileMetricReporter(properties);
+ break;
+ case KAFKA:
+ buildKafkaMetricReporter(properties);
+ break;
+ case GRAPHITE:
+ buildGraphiteMetricReporter(properties);
+ break;
+ case INFLUXDB:
+ buildInfluxDBMetricReporter(properties);
+ break;
+ case FILE_FAILURE:
+ buildFileFailureEventReporter(properties);
+ break;
+ default:
+ LOGGER.error("Unknown reporter sink type: {}", sinkType.name());
+ break;
+ }
}
/**
@@ -473,7 +504,8 @@ public class GobblinMetrics {
LOGGER.info("Metrics reporting stopped successfully");
}
- private void buildFileMetricReporter(Properties properties) {
+ private void buildFileMetricReporter(Properties properties)
+ throws MultiReporterException {
if
(!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_FILE_ENABLED_KEY,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_FILE_ENABLED))) {
return;
@@ -481,9 +513,9 @@ public class GobblinMetrics {
LOGGER.info("Reporting metrics to log files");
if (!properties.containsKey(ConfigurationKeys.METRICS_LOG_DIR_KEY)) {
- LOGGER.error(
- "Not reporting metrics to log files because " +
ConfigurationKeys.METRICS_LOG_DIR_KEY + " is undefined");
- return;
+ MetricReporterException e = new MetricReporterException(
+ "Not reporting metrics to log files because " +
ConfigurationKeys.METRICS_LOG_DIR_KEY + " is undefined", ReporterType.METRIC,
ReporterSinkType.FILE);
+ throw new MultiReporterException("Failed to create file metric
reporter", Lists.newArrayList(e));
}
try {
@@ -493,8 +525,7 @@ public class GobblinMetrics {
// Each job gets its own metric log subdirectory
Path metricsLogDir = new
Path(properties.getProperty(ConfigurationKeys.METRICS_LOG_DIR_KEY),
this.getName());
if (!fs.exists(metricsLogDir) && !fs.mkdirs(metricsLogDir)) {
- LOGGER.error("Failed to create metric log directory for metrics " +
this.getName());
- return;
+ throw new MetricReporterException("Failed to create metric log
directory for metrics " + this.getName(), ReporterType.METRIC,
ReporterSinkType.FILE);
}
// Add a suffix to file name if specified in properties.
@@ -523,22 +554,19 @@ public class GobblinMetrics {
LOGGER.info("Will start reporting metrics to directory " +
metricsLogDir);
} catch (IOException ioe) {
- LOGGER.error("Failed to build file metric reporter for job " + this.id,
ioe);
+ MetricReporterException e = new MetricReporterException("Failed to build
file metric reporter for job " + this.id, ioe, ReporterType.METRIC,
ReporterSinkType.FILE);
+ throw new MultiReporterException("Failed to create file metric
reporter", Lists.newArrayList(e));
}
}
- private void buildFileFailureEventReporter(Properties properties) {
- if
(!Boolean.valueOf(properties.getProperty(ConfigurationKeys.FAILURE_REPORTING_FILE_ENABLED_KEY,
- ConfigurationKeys.DEFAULT_FAILURE_REPORTING_FILE_ENABLED))) {
+ private void buildFileFailureEventReporter(Properties properties)
+ throws MultiReporterException {
+ if
((!Boolean.valueOf(properties.getProperty(ConfigurationKeys.FAILURE_REPORTING_FILE_ENABLED_KEY,
+ ConfigurationKeys.DEFAULT_FAILURE_REPORTING_FILE_ENABLED)) ||
!properties.containsKey(ConfigurationKeys.FAILURE_LOG_DIR_KEY))) {
return;
}
- LOGGER.info("Reporting failure to log files");
- if (!properties.containsKey(ConfigurationKeys.FAILURE_LOG_DIR_KEY)) {
- LOGGER.error(
- "Not reporting failure to log files because " +
ConfigurationKeys.FAILURE_LOG_DIR_KEY + " is undefined");
- return;
- }
+ LOGGER.info("Reporting failure to log files");
try {
String fsUri = properties.getProperty(ConfigurationKeys.FS_URI_KEY,
ConfigurationKeys.LOCAL_FS_URI);
@@ -547,8 +575,7 @@ public class GobblinMetrics {
// Each job gets its own log subdirectory
Path failureLogDir = new
Path(properties.getProperty(ConfigurationKeys.FAILURE_LOG_DIR_KEY),
this.getName());
if (!fs.exists(failureLogDir) && !fs.mkdirs(failureLogDir)) {
- LOGGER.error("Failed to create failure log directory for metrics " +
this.getName());
- return;
+ throw new MetricReporterException("Failed to create failure log
directory for metrics " + this.getName(), ReporterType.EVENT,
ReporterSinkType.FILE);
}
// Add a suffix to file name if specified in properties.
@@ -566,7 +593,8 @@ public class GobblinMetrics {
LOGGER.info("Will start reporting failure to directory " +
failureLogDir);
} catch (IOException ioe) {
- LOGGER.error("Failed to build file failure event reporter for job " +
this.id, ioe);
+ MetricReporterException e = new MetricReporterException("Failed to build
file failure event reporter for job " + this.id, ioe, ReporterType.EVENT,
ReporterSinkType.FILE);
+ throw new MultiReporterException("Failed to create failure file event
reporter", Lists.newArrayList(e));
}
}
@@ -581,15 +609,33 @@ public class GobblinMetrics {
convertRatesTo(TimeUnit.SECONDS).convertDurationsTo(TimeUnit.MILLISECONDS).build()));
}
- private void buildKafkaMetricReporter(Properties properties) {
+ private void buildKafkaMetricReporter(Properties properties)
+ throws MultiReporterException {
+ List<MetricReporterException> reporterExceptions = Lists.newArrayList();
if
(!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_ENABLED))) {
return;
}
- buildScheduledReporter(properties,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_REPORTER_CLASS,
Optional.of("Kafka"));
+
+ try {
+ buildScheduledReporter(properties,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_REPORTER_CLASS);
+ } catch (MetricReporterException e) {
+ reporterExceptions.add(e);
+ }
+ try {
+ buildScheduledReporter(properties,
ConfigurationKeys.DEFAULT_EVENTS_REPORTING_KAFKA_REPORTER_CLASS);
+ } catch (MetricReporterException e) {
+ reporterExceptions.add(e);
+ }
+ if (!reporterExceptions.isEmpty()) {
+ throw new MultiReporterException("Failed to start one or more Kafka
reporters", reporterExceptions);
+ }
}
- private void buildGraphiteMetricReporter(Properties properties) {
+ private void buildGraphiteMetricReporter(Properties properties)
+ throws MultiReporterException {
+ List<MetricReporterException> reporterExceptionList = Lists.newArrayList();
+
boolean metricsEnabled = PropertiesUtils
.getPropAsBoolean(properties,
ConfigurationKeys.METRICS_REPORTING_GRAPHITE_METRICS_ENABLED_KEY,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_GRAPHITE_METRICS_ENABLED);
@@ -612,8 +658,8 @@ public class GobblinMetrics {
Preconditions.checkArgument(properties.containsKey(ConfigurationKeys.METRICS_REPORTING_GRAPHITE_HOSTNAME),
"Graphite hostname is missing.");
} catch (IllegalArgumentException exception) {
- LOGGER.error("Not reporting to Graphite due to missing Graphite
configuration(s).", exception);
- return;
+ reporterExceptionList.add(new MetricReporterException("Missing Graphite
configuration(s).", exception, ReporterType.METRIC_EVENT,
ReporterSinkType.GRAPHITE));
+ throw new MultiReporterException("Failed to start one or more Graphite
reporters", reporterExceptionList);
}
String hostname =
properties.getProperty(ConfigurationKeys.METRICS_REPORTING_GRAPHITE_HOSTNAME);
@@ -641,7 +687,7 @@ public class GobblinMetrics {
.withMetricsPrefix(prefix)
.build(properties);
} catch (IOException e) {
- LOGGER.error("Failed to create Graphite metrics reporter. Will not
report metrics to Graphite.", e);
+ reporterExceptionList.add(new MetricReporterException("Failed to
create Graphite metrics reporter.", e, ReporterType.METRIC,
ReporterSinkType.GRAPHITE));
}
}
@@ -663,12 +709,19 @@ public class GobblinMetrics {
this.codahaleScheduledReporters.add(this.codahaleReportersCloser.register(eventReporter));
}
catch (IOException e) {
- LOGGER.error("Failed to create Graphite event reporter. Will not
report events to Graphite.", e);
+ reporterExceptionList.add(new MetricReporterException("Failed to
create Graphite event reporter.", e, ReporterType.EVENT,
ReporterSinkType.GRAPHITE));
}
}
+
+ if (!reporterExceptionList.isEmpty()) {
+ throw new MultiReporterException("Failed to create one or more Graphite
Reporters", reporterExceptionList);
+ }
}
- private void buildInfluxDBMetricReporter(Properties properties) {
+ private void buildInfluxDBMetricReporter(Properties properties)
+ throws MultiReporterException {
+ List<MetricReporterException> reporterExceptionList = Lists.newArrayList();
+
boolean metricsEnabled = PropertiesUtils
.getPropAsBoolean(properties,
ConfigurationKeys.METRICS_REPORTING_INFLUXDB_METRICS_ENABLED_KEY,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_INFLUXDB_METRICS_ENABLED);
@@ -691,8 +744,8 @@ public class GobblinMetrics {
Preconditions.checkArgument(properties.containsKey(ConfigurationKeys.METRICS_REPORTING_INFLUXDB_DATABASE),
"InfluxDB database name is missing.");
} catch (IllegalArgumentException exception) {
- LOGGER.error("Not reporting to InfluxDB due to missing InfluxDB
configuration(s).", exception);
- return;
+ reporterExceptionList.add(new MetricReporterException("Missing InfluxDB
configuration(s)", exception, ReporterType.METRIC_EVENT,
ReporterSinkType.INFLUXDB));
+ throw new MultiReporterException("Failed to start one or more InfluxDB
reporters", reporterExceptionList);
}
String url =
properties.getProperty(ConfigurationKeys.METRICS_REPORTING_INFLUXDB_URL);
@@ -719,7 +772,7 @@ public class GobblinMetrics {
this.metricContext.getName()) // contains the current job id
.build(properties);
} catch (IOException e) {
- LOGGER.error("Failed to create InfluxDB metrics reporter. Will not
report metrics to InfluxDB.", e);
+ reporterExceptionList.add(new MetricReporterException("Failed to
create InfluxDB metrics reporter.", e, ReporterType.METRIC,
ReporterSinkType.INFLUXDB));
}
}
@@ -735,9 +788,13 @@ public class GobblinMetrics {
this.codahaleScheduledReporters.add(this.codahaleReportersCloser.register(eventReporter));
}
catch (IOException e) {
- LOGGER.error("Failed to create InfluxDB event reporter. Will not
report events to InfluxDB.", e);
+ reporterExceptionList.add(new MetricReporterException("Failed to
create InfluxDB event reporter.", e, ReporterType.EVENT,
ReporterSinkType.INFLUXDB));
}
}
+
+ if (!reporterExceptionList.isEmpty()) {
+ throw new MultiReporterException("Failed to create one or more InfluxDB
reporters", reporterExceptionList);
+ }
}
/**
@@ -745,7 +802,7 @@ public class GobblinMetrics {
* {@link
org.apache.gobblin.configuration.ConfigurationKeys#METRICS_CUSTOM_BUILDERS}.
This allows users to specify custom
* reporters for Gobblin runtime without having to modify the code.
*/
- private void buildCustomMetricReporters(Properties properties) {
+ private void buildCustomMetricReporters(Properties properties,
List<MetricReporterException> reporterExceptions) {
String reporterClasses =
properties.getProperty(ConfigurationKeys.METRICS_CUSTOM_BUILDERS);
if (Strings.isNullOrEmpty(reporterClasses)) {
@@ -753,11 +810,16 @@ public class GobblinMetrics {
}
for (String reporterClass : Splitter.on(",").split(reporterClasses)) {
- buildScheduledReporter(properties, reporterClass,
Optional.<String>absent());
+ try {
+ buildScheduledReporter(properties, reporterClass);
+ } catch (MetricReporterException e) {
+ reporterExceptions.add(e);
+ }
}
}
- private void buildScheduledReporter(Properties properties, String
reporterClass, Optional<String> reporterSink) {
+ private void buildScheduledReporter(Properties properties, String
reporterClass)
+ throws MetricReporterException {
try {
Class<?> clazz = Class.forName(reporterClass);
@@ -771,27 +833,28 @@ public class GobblinMetrics {
return;
}
this.codahaleReportersCloser.register(scheduledReporter);
- String reporterSinkMsg = reporterSink.isPresent() ? "to " +
reporterSink.get() : "";
- LOGGER.info("Will start reporting metrics " + reporterSinkMsg + "
using " + reporterClass);
+ LOGGER.info("Will start reporting metrics using " + reporterClass);
this.codahaleScheduledReporters.add(scheduledReporter);
} else if (CustomReporterFactory.class.isAssignableFrom(clazz)) {
CustomReporterFactory customReporterFactory = ((CustomReporterFactory)
clazz.getConstructor().newInstance());
customReporterFactory.newScheduledReporter(properties);
LOGGER.info("Will start reporting metrics using " + reporterClass);
} else {
- throw new IllegalArgumentException("Class " + reporterClass +
+ throw new MetricReporterException("Class " + reporterClass +
" specified by key " + ConfigurationKeys.METRICS_CUSTOM_BUILDERS +
" must implement: "
- + CustomCodahaleReporterFactory.class + " or " +
CustomReporterFactory.class);
+ + CustomCodahaleReporterFactory.class + " or " +
CustomReporterFactory.class, ReporterType.CUSTOM, ReporterSinkType.CUSTOM);
}
} catch (ClassNotFoundException exception) {
- LOGGER.warn(String
+ throw new MetricReporterException(String
.format("Failed to create metric reporter: requested
CustomReporterFactory %s not found.", reporterClass),
- exception);
+ exception, ReporterType.CUSTOM, ReporterSinkType.CUSTOM);
} catch (NoSuchMethodException exception) {
- LOGGER.warn(String.format("Failed to create metric reporter: requested
CustomReporterFactory %s "
- + "does not have parameterless constructor.", reporterClass),
exception);
+ throw new MetricReporterException(String.format("Failed to create metric
reporter: requested CustomReporterFactory %s "
+ + "does not have parameterless constructor.", reporterClass),
exception, ReporterType.CUSTOM, ReporterSinkType.CUSTOM);
+ } catch (MetricReporterException exception) {
+ throw exception;
} catch (Exception exception) {
- LOGGER.warn("Could not create metric reporter from builder " +
reporterClass + ".", exception);
+ throw new MetricReporterException("Could not create metric reporter from
builder " + reporterClass + ".", exception, ReporterType.CUSTOM,
ReporterSinkType.CUSTOM);
}
}
}
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
index 4a357c8..d2789db 100644
---
a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
+++
b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
@@ -22,21 +22,70 @@ import java.util.Properties;
import org.testng.Assert;
import org.testng.annotations.Test;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
@Test
public class GobblinMetricsTest {
-
/**
* Test the {@link GobblinMetrics} instance is removed from {@link
GobblinMetricsRegistry} when
* it stops metrics reporting
*/
- public void testStopReportingMetrics() {
+ public void testStopReportingMetrics()
+ throws MultiReporterException {
String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
- gobblinMetrics.startMetricReporting(new Properties());
+
+ Properties properties = new Properties();
+ properties.put(ConfigurationKeys.FAILURE_REPORTING_FILE_ENABLED_KEY,
"false");
+
+ gobblinMetrics.startMetricReporting(properties);
Assert.assertEquals(GobblinMetricsRegistry.getInstance().get(id).get(),
gobblinMetrics);
gobblinMetrics.stopMetricsReporting();
Assert.assertFalse(GobblinMetricsRegistry.getInstance().get(id).isPresent());
}
+
+ public void testMetricFileReporterThrowsException() {
+ String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
+ GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
+
+ //Enable file reporter without specifying metrics.log.dir.
+ Config config = ConfigFactory.empty()
+ .withValue(ConfigurationKeys.METRICS_REPORTING_FILE_ENABLED_KEY,
ConfigValueFactory.fromAnyRef(true));
+
+ Properties properties = ConfigUtils.configToProperties(config);
+ //Ensure MultiReporterException is thrown
+ try {
+ gobblinMetrics.startMetricReporting(properties);
+ Assert.fail("Metric reporting unexpectedly succeeded.");
+ } catch (MultiReporterException e) {
+ //Do nothing. Expected to be here.
+ }
+ }
+
+ public void testMetricFileReporterSuccessful() {
+ String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
+ GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
+
+ //Enable file reporter without specifying metrics.log.dir.
+ Config config = ConfigFactory.empty()
+ .withValue(ConfigurationKeys.METRICS_REPORTING_FILE_ENABLED_KEY,
ConfigValueFactory.fromAnyRef(true))
+ .withValue(ConfigurationKeys.METRICS_LOG_DIR_KEY,
ConfigValueFactory.fromAnyRef("/tmp"))
+ .withValue(ConfigurationKeys.FAILURE_LOG_DIR_KEY,
ConfigValueFactory.fromAnyRef("/tmp"));
+
+ Properties properties = ConfigUtils.configToProperties(config);
+ //Ensure MultiReporterException is thrown
+ try {
+ gobblinMetrics.startMetricReporting(properties);
+ } catch (MultiReporterException e) {
+ Assert.fail("Unexpected exception " + e.getMessage());
+ }
+ }
+
}
diff --git
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index e576468..33d3591 100644
---
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -31,6 +31,26 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValue;
+
+import azkaban.jobExecutor.AbstractJob;
+import javax.annotation.Nullable;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.DynamicConfigGenerator;
import org.apache.gobblin.configuration.State;
@@ -48,30 +68,13 @@ import
org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
import org.apache.gobblin.runtime.listeners.CompositeJobListener;
import org.apache.gobblin.runtime.listeners.EmailNotificationJobListener;
import org.apache.gobblin.runtime.listeners.JobListener;
+import org.apache.gobblin.runtime.services.MetricsReportingService;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.util.TimeRangeChecker;
import org.apache.gobblin.util.hadoop.TokenUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.Credentials;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.io.Closer;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValue;
-
-import azkaban.jobExecutor.AbstractJob;
-import javax.annotation.Nullable;
import static
org.apache.gobblin.runtime.AbstractJobLauncher.resolveGobblinJobTemplateIfNecessary;
import static
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
@@ -222,6 +225,16 @@ public class AzkabanJobLauncher extends AbstractJob
implements ApplicationLaunch
// Since Java classes cannot extend multiple classes and Azkaban jobs must
extend AbstractJob, we must use composition
// verses extending ServiceBasedAppLauncher
+ boolean isMetricReportingFailureFatal = PropertiesUtils
+ .getPropAsBoolean(jobProps,
ConfigurationKeys.GOBBLIN_JOB_METRIC_REPORTING_FAILURE_FATAL,
+
Boolean.toString(ConfigurationKeys.DEFAULT_GOBBLIN_JOB_METRIC_REPORTING_FAILURE_FATAL));
+ boolean isEventReportingFailureFatal = PropertiesUtils
+ .getPropAsBoolean(jobProps,
ConfigurationKeys.GOBBLIN_JOB_EVENT_REPORTING_FAILURE_FATAL,
+
Boolean.toString(ConfigurationKeys.DEFAULT_GOBBLIN_JOB_EVENT_REPORTING_FAILURE_FATAL));
+
+
jobProps.setProperty(MetricsReportingService.METRICS_REPORTING_FAILURE_FATAL_KEY,
Boolean.toString(isMetricReportingFailureFatal));
+
jobProps.setProperty(MetricsReportingService.EVENT_REPORTING_FAILURE_FATAL_KEY,
Boolean.toString(isEventReportingFailureFatal));
+
this.applicationLauncher =
this.closer.register(new ServiceBasedAppLauncher(jobProps, "Azkaban-"
+ UUID.randomUUID()));
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporterFactory.java
similarity index 74%
copy from
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
copy to
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporterFactory.java
index 5c12360..5fb360d 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporterFactory.java
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.gobblin.metrics.kafka;
import java.io.IOException;
@@ -30,25 +29,21 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.CustomCodahaleReporterFactory;
import org.apache.gobblin.metrics.KafkaReportingFormats;
+import org.apache.gobblin.metrics.MetricReporterException;
+import org.apache.gobblin.metrics.ReporterSinkType;
+import org.apache.gobblin.metrics.ReporterType;
import org.apache.gobblin.metrics.RootMetricContext;
import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
-
@Slf4j
-public class KafkaReporterFactory implements CustomCodahaleReporterFactory {
+public class KafkaEventReporterFactory implements
CustomCodahaleReporterFactory {
@Override
- public ScheduledReporter newScheduledReporter(MetricRegistry registry,
Properties properties) {
+ public ScheduledReporter newScheduledReporter(MetricRegistry registry,
Properties properties)
+ throws IOException {
if
(!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_ENABLED))) {
return null;
}
- log.info("Reporting metrics & events to Kafka");
-
- boolean metricsEnabled = KafkaReporterUtils.isMetricsEnabled(properties);
-
- if (KafkaReporterUtils.isMetricsEnabled(properties)) {
- log.info("Metrics enabled --- Reporting metrics to Kafka");
- }
boolean eventsEnabled = KafkaReporterUtils.isEventsEnabled(properties);
if (KafkaReporterUtils.isEventsEnabled(properties)) {
@@ -61,10 +56,9 @@ public class KafkaReporterFactory implements
CustomCodahaleReporterFactory {
try {
Preconditions.checkArgument(properties.containsKey(ConfigurationKeys.METRICS_KAFKA_BROKERS),
"Kafka metrics brokers missing.");
-
Preconditions.checkArgument(KafkaReporterUtils.getMetricsTopic(properties).or(eventsTopic).or(defaultTopic).isPresent(),
"Kafka topic missing.");
+ Preconditions.checkArgument(eventsTopic.or(defaultTopic).isPresent(),
"Kafka topic missing.");
} catch (IllegalArgumentException exception) {
- log.error("Not reporting metrics to Kafka due to missing Kafka
configuration(s).", exception);
- return null;
+ throw new MetricReporterException("Missing Kafka configuration(s).",
exception, ReporterType.EVENT, ReporterSinkType.KAFKA);
}
String brokers =
properties.getProperty(ConfigurationKeys.METRICS_KAFKA_BROKERS);
@@ -82,15 +76,6 @@ public class KafkaReporterFactory implements
CustomCodahaleReporterFactory {
formatEnum = KafkaReportingFormats.JSON;
}
- Optional<String> metricsTopic =
KafkaReporterUtils.getMetricsTopic(properties);
- if (metricsEnabled) {
- try {
- formatEnum.buildMetricsReporter(brokers,
metricsTopic.or(defaultTopic).get(), properties);
- } catch (IOException exception) {
- log.error("Failed to create Kafka metrics reporter. Will not report
metrics to Kafka.", exception);
- }
- }
-
KafkaReportingFormats eventFormatEnum;
if
(properties.containsKey(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKA_FORMAT))
{
String eventsReportingFormat =
properties.getProperty(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKA_FORMAT,
@@ -110,12 +95,9 @@ public class KafkaReporterFactory implements
CustomCodahaleReporterFactory {
if (eventsEnabled) {
try {
String eventTopic = eventsTopic.or(defaultTopic).get();
- ScheduledReporter reporter =
- eventFormatEnum.buildEventsReporter(brokers, eventTopic,
RootMetricContext.get(), properties);
-
- return reporter;
+ return eventFormatEnum.buildEventsReporter(brokers, eventTopic,
RootMetricContext.get(), properties);
} catch (IOException exception) {
- log.error("Failed to create Kafka events reporter. Will not report
events to Kafka.", exception);
+ throw new MetricReporterException("Failed to create Kafka events
reporter.", exception, ReporterType.EVENT, ReporterSinkType.KAFKA);
}
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaMetricReporterFactory.java
similarity index 60%
rename from
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
rename to
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaMetricReporterFactory.java
index 5c12360..fd11dbf 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaMetricReporterFactory.java
@@ -30,41 +30,36 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.CustomCodahaleReporterFactory;
import org.apache.gobblin.metrics.KafkaReportingFormats;
-import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.MetricReporterException;
+import org.apache.gobblin.metrics.ReporterSinkType;
+import org.apache.gobblin.metrics.ReporterType;
import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
@Slf4j
-public class KafkaReporterFactory implements CustomCodahaleReporterFactory {
+public class KafkaMetricReporterFactory implements
CustomCodahaleReporterFactory {
@Override
- public ScheduledReporter newScheduledReporter(MetricRegistry registry,
Properties properties) {
+ public ScheduledReporter newScheduledReporter(MetricRegistry registry,
Properties properties)
+ throws IOException {
if
(!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_ENABLED))) {
return null;
}
- log.info("Reporting metrics & events to Kafka");
-
boolean metricsEnabled = KafkaReporterUtils.isMetricsEnabled(properties);
if (KafkaReporterUtils.isMetricsEnabled(properties)) {
log.info("Metrics enabled --- Reporting metrics to Kafka");
}
- boolean eventsEnabled = KafkaReporterUtils.isEventsEnabled(properties);
- if (KafkaReporterUtils.isEventsEnabled(properties)) {
- log.info("Events enabled --- Reporting events to Kafka");
- }
-
- Optional<String> eventsTopic =
KafkaReporterUtils.getEventsTopic(properties);
+ Optional<String> metricsTopic =
KafkaReporterUtils.getMetricsTopic(properties);
Optional<String> defaultTopic =
KafkaReporterUtils.getDefaultTopic(properties);
try {
Preconditions.checkArgument(properties.containsKey(ConfigurationKeys.METRICS_KAFKA_BROKERS),
"Kafka metrics brokers missing.");
-
Preconditions.checkArgument(KafkaReporterUtils.getMetricsTopic(properties).or(eventsTopic).or(defaultTopic).isPresent(),
"Kafka topic missing.");
+ Preconditions.checkArgument(metricsTopic.or(defaultTopic).isPresent(),
"Kafka topic missing.");
} catch (IllegalArgumentException exception) {
- log.error("Not reporting metrics to Kafka due to missing Kafka
configuration(s).", exception);
- return null;
+ throw new MetricReporterException("Missing Kafka configuration(s).",
exception, ReporterType.METRIC, ReporterSinkType.KAFKA);
}
String brokers =
properties.getProperty(ConfigurationKeys.METRICS_KAFKA_BROKERS);
@@ -82,43 +77,13 @@ public class KafkaReporterFactory implements
CustomCodahaleReporterFactory {
formatEnum = KafkaReportingFormats.JSON;
}
- Optional<String> metricsTopic =
KafkaReporterUtils.getMetricsTopic(properties);
if (metricsEnabled) {
try {
formatEnum.buildMetricsReporter(brokers,
metricsTopic.or(defaultTopic).get(), properties);
} catch (IOException exception) {
- log.error("Failed to create Kafka metrics reporter. Will not report
metrics to Kafka.", exception);
+ throw new MetricReporterException("Failed to create Kafka metrics
reporter.", exception, ReporterType.METRIC, ReporterSinkType.KAFKA);
}
}
-
- KafkaReportingFormats eventFormatEnum;
- if
(properties.containsKey(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKA_FORMAT))
{
- String eventsReportingFormat =
properties.getProperty(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKA_FORMAT,
- ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_FORMAT);
- try {
- eventFormatEnum =
KafkaReportingFormats.valueOf(eventsReportingFormat.toUpperCase());
- } catch (IllegalArgumentException exception) {
- log.warn(
- "Kafka events reporting format " + eventsReportingFormat + " not
recognized. Will report in json format.",
- exception);
- eventFormatEnum = KafkaReportingFormats.JSON;
- }
- } else {
- eventFormatEnum = formatEnum;
- }
-
- if (eventsEnabled) {
- try {
- String eventTopic = eventsTopic.or(defaultTopic).get();
- ScheduledReporter reporter =
- eventFormatEnum.buildEventsReporter(brokers, eventTopic,
RootMetricContext.get(), properties);
-
- return reporter;
- } catch (IOException exception) {
- log.error("Failed to create Kafka events reporter. Will not report
events to Kafka.", exception);
- }
- }
-
return null;
}
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index c649595..ed02d38 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -74,11 +74,13 @@ import org.apache.gobblin.fsm.FiniteStateMachine;
import org.apache.gobblin.metastore.FsStateStore;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MultiReporterException;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.CountEventBuilder;
import org.apache.gobblin.metrics.event.JobEvent;
import org.apache.gobblin.metrics.event.JobStateEventBuilder;
import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
import org.apache.gobblin.password.PasswordManager;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.DynamicConfigGeneratorFactory;
@@ -783,8 +785,21 @@ public class MRJobLauncher extends AbstractJobLauncher {
if (Boolean.valueOf(
configuration.get(ConfigurationKeys.METRICS_ENABLED_KEY,
ConfigurationKeys.DEFAULT_METRICS_ENABLED))) {
this.jobMetrics = Optional.of(JobMetrics.get(this.jobState));
- this.jobMetrics.get()
- .startMetricReportingWithFileSuffix(gobblinJobState,
context.getTaskAttemptID().toString());
+ try {
+ this.jobMetrics.get()
+ .startMetricReportingWithFileSuffix(gobblinJobState,
context.getTaskAttemptID().toString());
+ } catch (MultiReporterException ex) {
+ //Fail the task if metric/event reporting failure is configured to
be fatal.
+ boolean isMetricReportingFailureFatal = Boolean.valueOf(configuration
+
.get(ConfigurationKeys.GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL,
+
Boolean.toString(ConfigurationKeys.DEFAULT_GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL)));
+ boolean isEventReportingFailureFatal = Boolean.valueOf(configuration
+
.get(ConfigurationKeys.GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL,
+
Boolean.toString(ConfigurationKeys.DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL)));
+ if (MetricReportUtils.shouldThrowException(LOG, ex,
isMetricReportingFailureFatal, isEventReportingFailureFatal)) {
+ throw new RuntimeException(ex);
+ }
+ }
}
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/MetricsReportingService.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/MetricsReportingService.java
index 43b53cd..4e53d24 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/MetricsReportingService.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/MetricsReportingService.java
@@ -21,25 +21,46 @@ import java.util.Properties;
import com.google.common.util.concurrent.AbstractIdleService;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MultiReporterException;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
+import org.apache.gobblin.util.PropertiesUtils;
/**
* A {@link com.google.common.util.concurrent.Service} for handling life cycle
events around {@link GobblinMetrics}.
*/
+@Slf4j
public class MetricsReportingService extends AbstractIdleService {
+ public static final String METRICS_REPORTING_FAILURE_FATAL_KEY =
"metrics.reporting.failure.fatal";
+ public static final String EVENT_REPORTING_FAILURE_FATAL_KEY =
"event.reporting.failure.fatal";
+
+ public static final String DEFAULT_METRICS_REPORTING_FAILURE_FATAL = "false";
+ public static final String DEFAULT_EVENT_REPORTING_FAILURE_FATAL = "false";
private final Properties properties;
private final String appId;
+ private final boolean isMetricReportingFailureFatal;
+ private final boolean isEventReportingFailureFatal;
public MetricsReportingService(Properties properties, String appId) {
this.properties = properties;
this.appId = appId;
+ this.isMetricReportingFailureFatal =
PropertiesUtils.getPropAsBoolean(properties,
METRICS_REPORTING_FAILURE_FATAL_KEY, DEFAULT_METRICS_REPORTING_FAILURE_FATAL);
+ this.isEventReportingFailureFatal =
PropertiesUtils.getPropAsBoolean(properties, EVENT_REPORTING_FAILURE_FATAL_KEY,
DEFAULT_EVENT_REPORTING_FAILURE_FATAL);
}
@Override
protected void startUp() throws Exception {
- GobblinMetrics.get(this.appId).startMetricReporting(this.properties);
+ try {
+ GobblinMetrics.get(this.appId).startMetricReporting(this.properties);
+ } catch (MultiReporterException ex) {
+ if (MetricReportUtils.shouldThrowException(log, ex,
this.isMetricReportingFailureFatal, this.isEventReportingFailureFatal)) {
+ throw ex;
+ }
+ }
}
@Override
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index 1abf8d2..d3d91df 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -45,7 +45,6 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
-import org.apache.gobblin.cluster.ContainerHealthCheckException;
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.cluster.GobblinClusterUtils;
import org.apache.gobblin.cluster.GobblinTaskRunner;
@@ -174,7 +173,7 @@ public class GobblinYarnTaskRunner extends
GobblinTaskRunner {
return containerId.toString();
}
- public static void main(String[] args) throws Exception {
+ public static void main(String[] args) {
Options options = buildOptions();
try {
CommandLine cmd = new DefaultParser().parse(options, args);
@@ -210,11 +209,11 @@ public class GobblinYarnTaskRunner extends
GobblinTaskRunner {
} catch (ParseException pe) {
printUsage(options);
System.exit(1);
- } catch (ContainerHealthCheckException e) {
- // Ideally, we should not be catching this exception, as this is
indicative of a non-recoverable exception. However,
+ } catch (Throwable t) {
+ // Ideally, we should not be catching non-recoverable exceptions and
errors. However,
// simply propagating the exception may prevent the container exit due
to the presence of non-daemon threads present
// in the application. Hence, we catch this exception to invoke
System.exit() which in turn ensures that all non-daemon threads are killed.
- LOGGER.error("Exception encountered: {}", e);
+ LOGGER.error("Exception encountered: {}", t);
System.exit(1);
}
}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index f9c6cc6..177cc69 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -98,6 +98,8 @@ import org.apache.gobblin.cluster.HelixUtils;
import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricReporterException;
+import org.apache.gobblin.metrics.MultiReporterException;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.util.ConfigUtils;
@@ -399,7 +401,13 @@ public class YarnService extends AbstractIdleService {
// Intialize Gobblin metrics and start reporters
GobblinMetrics gobblinMetrics = GobblinMetrics.get(this.applicationId,
null, tags.build());
-
gobblinMetrics.startMetricReporting(ConfigUtils.configToProperties(config));
+ try {
+
gobblinMetrics.startMetricReporting(ConfigUtils.configToProperties(config));
+ } catch (MultiReporterException ex) {
+ for (MetricReporterException e: ex.getExceptions()) {
+ LOGGER.error("Failed to start {} {} reporter.",
e.getSinkType().name(), e.getReporterType().name(), e);
+ }
+ }
return gobblinMetrics;
}