Repository: incubator-gobblin Updated Branches: refs/heads/master c385f1ddd -> 6198120e3
[GOBBLIN-273] Add job failure monitoring Closes #2125 from zxcware/msg Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6198120e Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6198120e Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6198120e Branch: refs/heads/master Commit: 6198120e3f9f969064ee1bc2686f38f41ec32172 Parents: c385f1d Author: zhchen <[email protected]> Authored: Tue Nov 7 15:25:46 2017 -0800 Committer: Issac Buenrostro <[email protected]> Committed: Tue Nov 7 15:25:46 2017 -0800 ---------------------------------------------------------------------- .../configuration/ConfigurationKeys.java | 1 + .../apache/gobblin/metrics/MetricContext.java | 25 ++-- .../metrics/event/FailureEventBuilder.java | 116 +++++++++++++++++++ .../reporter/FileFailureEventReporter.java | 94 +++++++++++++++ .../reporter/OutputStreamEventReporter.java | 3 +- .../reporter/FileFailureEventReporterTest.java | 69 +++++++++++ .../apache/gobblin/metrics/GobblinMetrics.java | 41 +++++++ .../gobblin/r2/R2RestResponseHandler.java | 25 ++-- .../apache/gobblin/writer/AsyncHttpWriter.java | 29 ++++- .../org/apache/gobblin/runtime/JobState.java | 26 ++++- .../gobblin/runtime/SafeDatasetCommit.java | 16 +++ .../java/org/apache/gobblin/runtime/Task.java | 11 ++ .../plugins/email/EmailNotificationPlugin.java | 4 +- .../gobblin/runtime/TaskContinuousTest.java | 2 + .../org/apache/gobblin/runtime/TaskTest.java | 2 + 15 files changed, 437 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 0ef8416..c8de615 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -618,6 +618,7 @@ public class ConfigurationKeys { public static final String METRICS_LOG_DIR_KEY = METRICS_CONFIGURATIONS_PREFIX + "log.dir"; public static final String METRICS_FILE_SUFFIX = METRICS_CONFIGURATIONS_PREFIX + "reporting.file.suffix"; public static final String DEFAULT_METRICS_FILE_SUFFIX = ""; + public static final String FAILURE_LOG_DIR_KEY = "failure.log.dir"; // JMX-based reporting public static final String METRICS_REPORTING_JMX_ENABLED_KEY = http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java index e3dd939..dcc1029 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java @@ -184,6 +184,20 @@ public class MetricContext extends MetricRegistry implements ReportableContext, } /** + * Inject the tags of this {@link MetricContext} to the given {@link GobblinTrackingEvent} + */ + private void injectTagsToEvent(GobblinTrackingEvent event) { + Map<String, String> originalMetadata = event.getMetadata(); + Map<String, Object> tags = getTagMap(); + Map<String, String> newMetadata = Maps.newHashMap(); + for(Map.Entry<String, Object> entry : tags.entrySet()) { + newMetadata.put(entry.getKey(), entry.getValue().toString()); + } + newMetadata.putAll(originalMetadata); + event.setMetadata(newMetadata); + } + + /** * Submit {@link org.apache.gobblin.metrics.GobblinTrackingEvent} to all notification listeners attached to this or any * ancestor {@link org.apache.gobblin.metrics.MetricContext}s. The argument for this method is mutated by the method, so it * should not be reused by the caller. @@ -193,16 +207,7 @@ public class MetricContext extends MetricRegistry implements ReportableContext, */ public void submitEvent(GobblinTrackingEvent nonReusableEvent) { nonReusableEvent.setTimestamp(System.currentTimeMillis()); - - // Inject metric context tags into event metadata. - Map<String, String> originalMetadata = nonReusableEvent.getMetadata(); - Map<String, Object> tags = getTagMap(); - Map<String, String> newMetadata = Maps.newHashMap(); - for(Map.Entry<String, Object> entry : tags.entrySet()) { - newMetadata.put(entry.getKey(), entry.getValue().toString()); - } - newMetadata.putAll(originalMetadata); - nonReusableEvent.setMetadata(newMetadata); + injectTagsToEvent(nonReusableEvent); EventNotification notification = new EventNotification(nonReusableEvent); sendNotification(notification); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java new file mode 100644 index 0000000..89f83f5 --- /dev/null +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.event; + +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.gobblin.metrics.GobblinTrackingEvent; +import org.apache.gobblin.metrics.MetricContext; + +import com.google.common.collect.Maps; + +import lombok.Getter; + + +/** + * A failure event builds a specific {@link GobblinTrackingEvent} whose metadata has + * {@value EventSubmitter#EVENT_TYPE} to be {@value #EVENT_TYPE} + * + * <p> + * Note: A {@link FailureEventBuilder} instance is not reusable + */ +public class FailureEventBuilder { + private static final String EVENT_TYPE = "FailureEvent"; + private static final String EVENT_NAMESPACE = "gobblin.event"; + private static final String ROOT_CAUSE = "rootException"; + + @Getter + private final String name; + @Getter + private final String namespace; + private final Map<String, String> metadata; + + private Throwable rootCause; + + public FailureEventBuilder(String name) { + this(name, EVENT_NAMESPACE); + } + + public FailureEventBuilder(String name, String namespace) { + this.name = name; + this.namespace = namespace; + metadata = Maps.newHashMap(); + metadata.put(EventSubmitter.EVENT_TYPE, EVENT_TYPE); + } + + /** + * Given an throwable, get its root cause and set as a metadata + */ + public void setRootCause(Throwable t) { + rootCause = getRootCause(t); + } + + /** + * Add a metadata pair + */ + public void addMetadata(String key, String value) { + metadata.put(key, value); + } + + /** + * Add additional metadata + */ + public void addAdditionalMetadata(Map<String, String> additionalMetadata) { + metadata.putAll(additionalMetadata); + } + + /** + * Build as {@link GobblinTrackingEvent} + */ + public GobblinTrackingEvent build() { + if (rootCause != null) { + metadata.put(ROOT_CAUSE, ExceptionUtils.getStackTrace(rootCause)); + } + return new GobblinTrackingEvent(0L, EVENT_NAMESPACE, name, metadata); + } + + /** + * Submit the event + */ + public void submit(MetricContext context) { + context.submitEvent(build()); + } + + /** + * Check if the given {@link GobblinTrackingEvent} is a failiure event + */ + public static boolean isFailureEvent(GobblinTrackingEvent event) { + String eventType = event.getMetadata().get(EventSubmitter.EVENT_TYPE); + return StringUtils.isNotEmpty(eventType) && eventType.equals(EVENT_TYPE); + } + + private static Throwable getRootCause(Throwable t) { + Throwable rootCause = ExceptionUtils.getRootCause(t); + if (rootCause == null) { + rootCause = t; + } + return rootCause; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporter.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporter.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporter.java new file mode 100644 index 0000000..42e858e --- /dev/null +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporter.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.reporter; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; +import java.util.Queue; + +import org.apache.gobblin.metrics.GobblinTrackingEvent; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.event.FailureEventBuilder; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.common.base.Charsets; + +import lombok.extern.slf4j.Slf4j; + + +/** + * An {@link OutputStreamEventReporter} reports only failure event build by {@link FailureEventBuilder}. It won't create + * the failure log file until a failure event is processed + */ +@Slf4j +public class FileFailureEventReporter extends OutputStreamEventReporter { + private final FileSystem fs; + private final Path failureLogFile; + private volatile boolean hasSetupOutputStream; + + public FileFailureEventReporter(MetricContext context, FileSystem fs, Path failureLogFile) + throws IOException { + super(OutputStreamEventReporter.forContext(context)); + this.fs = fs; + this.failureLogFile = failureLogFile; + hasSetupOutputStream = false; + } + + @Override + public void addEventToReportingQueue(GobblinTrackingEvent event) { + if (FailureEventBuilder.isFailureEvent(event)) { + super.addEventToReportingQueue(event); + } + } + + @Override + public void reportEventQueue(Queue<GobblinTrackingEvent> queue) { + if (queue.size() > 0) { + setupOutputStream(); + super.reportEventQueue(queue); + } + } + + /** + * Set up the {@link OutputStream} to the {@link #failureLogFile} only once + */ + private void setupOutputStream() { + synchronized (failureLogFile) { + // Setup is done by some thread + if (hasSetupOutputStream) { + return; + } + + try { + boolean append = false; + if (fs.exists(failureLogFile)) { + log.info("Failure log file %s already exists, appending to it", failureLogFile); + append = true; + } + OutputStream outputStream = append ? fs.append(failureLogFile) : fs.create(failureLogFile); + output = this.closer.register(new PrintStream(outputStream, false, Charsets.UTF_8.toString())); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + hasSetupOutputStream = true; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/OutputStreamEventReporter.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/OutputStreamEventReporter.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/OutputStreamEventReporter.java index e0415fe..48c4f57 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/OutputStreamEventReporter.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/OutputStreamEventReporter.java @@ -48,7 +48,8 @@ public class OutputStreamEventReporter extends EventReporter { private static final Logger LOGGER = LoggerFactory.getLogger(OutputStreamEventReporter.class); private static final int CONSOLE_WIDTH = 80; - private final PrintStream output; + protected PrintStream output; + protected final AvroSerializer<GobblinTrackingEvent> serializer; private final ByteArrayOutputStream outputBuffer; private final PrintStream outputBufferPrintStream; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporterTest.java b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporterTest.java new file mode 100644 index 0000000..389e6ab --- /dev/null +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporterTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.reporter; + +import java.io.IOException; + +import org.apache.gobblin.metrics.GobblinTrackingEvent; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.event.FailureEventBuilder; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.testng.annotations.Test; + +import avro.shaded.com.google.common.collect.Maps; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + + +public class FileFailureEventReporterTest { + @Test + public void testReport() + throws IOException { + MetricContext testContext = MetricContext.builder(getClass().getCanonicalName()).build(); + FileSystem fs = mock(FileSystem.class); + Path failureLogPath = mock(Path.class); + FSDataOutputStream outputStream = mock(FSDataOutputStream.class); + + FileFailureEventReporter reporter = new FileFailureEventReporter(testContext, fs, failureLogPath); + when(fs.exists(any())).thenReturn(true); + when(fs.append(any())).thenReturn(outputStream); + + final String eventName = "testEvent"; + final String eventNamespace = "testNamespace"; + GobblinTrackingEvent event = + new GobblinTrackingEvent(0L, eventNamespace, eventName, Maps.newHashMap()); + + // Noop on normal event + testContext.submitEvent(event); + verify(fs, never()).append(failureLogPath); + verify(outputStream, never()).write(anyByte()); + + // Process failure event + FailureEventBuilder failureEvent = new FailureEventBuilder(eventName, eventNamespace); + failureEvent.submit(testContext); + reporter.report(); + // Failure log output is setup + verify(fs, times(1)).append(failureLogPath); + // Report successfully + doAnswer( invocation -> null ).when(outputStream).write(any(byte[].class), anyInt(), anyInt()); + verify(outputStream, times(1)).write(any(byte[].class), anyInt(), anyInt()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java index e123158..ae20031 100644 --- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java +++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java @@ -26,6 +26,7 @@ import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import org.apache.gobblin.metrics.reporter.FileFailureEventReporter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -405,6 +406,7 @@ public class GobblinMetrics { buildGraphiteMetricReporter(properties); buildInfluxDBMetricReporter(properties); buildCustomMetricReporters(properties); + buildFileFailureEventReporter(properties); // Start reporters that implement org.apache.gobblin.metrics.report.ScheduledReporter RootMetricContext.get().startReporting(); @@ -491,7 +493,9 @@ public class GobblinMetrics { } OutputStream output = append ? fs.append(metricLogFile) : fs.create(metricLogFile, true); + // Add metrics reporter OutputStreamReporter.Factory.newBuilder().outputTo(output).build(properties); + // Set up events reporter at the same time!! this.codahaleScheduledReporters.add(this.codahaleReportersCloser .register(OutputStreamEventReporter.forContext(RootMetricContext.get()).outputTo(output).build())); @@ -501,6 +505,43 @@ public class GobblinMetrics { } } + private void buildFileFailureEventReporter(Properties properties) { + if (!properties.containsKey(ConfigurationKeys.FAILURE_LOG_DIR_KEY)) { + LOGGER.error( + "Not reporting failure to log files because " + ConfigurationKeys.FAILURE_LOG_DIR_KEY + " is undefined"); + return; + } + + try { + String fsUri = properties.getProperty(ConfigurationKeys.FS_URI_KEY, ConfigurationKeys.LOCAL_FS_URI); + FileSystem fs = FileSystem.get(URI.create(fsUri), new Configuration()); + + // Each job gets its own log subdirectory + Path failureLogDir = new Path(properties.getProperty(ConfigurationKeys.FAILURE_LOG_DIR_KEY), this.getName()); + if (!fs.exists(failureLogDir) && !fs.mkdirs(failureLogDir)) { + LOGGER.error("Failed to create failure log directory for metrics " + this.getName()); + return; + } + + // Add a suffix to file name if specified in properties. + String metricsFileSuffix = + properties.getProperty(ConfigurationKeys.METRICS_FILE_SUFFIX, ConfigurationKeys.DEFAULT_METRICS_FILE_SUFFIX); + if (!Strings.isNullOrEmpty(metricsFileSuffix) && !metricsFileSuffix.startsWith(".")) { + metricsFileSuffix = "." + metricsFileSuffix; + } + + // Each job run gets its own failure log file + Path failureLogFile = + new Path(failureLogDir, this.id + metricsFileSuffix + ".failure.log"); + this.codahaleScheduledReporters.add(this.codahaleReportersCloser + .register(new FileFailureEventReporter(RootMetricContext.get(), fs, failureLogFile))); + + LOGGER.info("Will start reporting failure to directory " + failureLogDir); + } catch (IOException ioe) { + LOGGER.error("Failed to build file failure event reporter for job " + this.id, ioe); + } + } + private void buildJmxMetricReporter(Properties properties) { if (!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_JMX_ENABLED_KEY, ConfigurationKeys.DEFAULT_METRICS_REPORTING_JMX_ENABLED))) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/r2/R2RestResponseHandler.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/r2/R2RestResponseHandler.java b/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/r2/R2RestResponseHandler.java index c947cd4..0ae794b 100644 --- a/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/r2/R2RestResponseHandler.java +++ b/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/r2/R2RestResponseHandler.java @@ -19,12 +19,14 @@ package org.apache.gobblin.r2; import com.google.common.collect.Maps; import com.linkedin.r2.message.rest.RestRequest; import com.linkedin.r2.message.rest.RestResponse; + import org.apache.gobblin.configuration.State; import org.apache.gobblin.http.ResponseHandler; import org.apache.gobblin.http.StatusType; import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.metrics.GobblinTrackingEvent; import org.apache.gobblin.metrics.MetricContext; -import org.apache.gobblin.metrics.event.EventSubmitter; +import org.apache.gobblin.metrics.event.FailureEventBuilder; import org.apache.gobblin.net.Request; import org.apache.gobblin.utils.HttpConstants; import org.apache.gobblin.utils.HttpUtils; @@ -48,10 +50,9 @@ public class R2RestResponseHandler implements ResponseHandler<RestRequest, RestR public static final String CONTENT_TYPE_HEADER = "Content-Type"; private final String R2_RESPONSE_EVENT_NAMESPACE = "r2.response"; - private final String R2_FAILED_REQUEST = "R2FailedRequest"; + private final String R2_FAILED_REQUEST_EVENT = "r2FailedRequest"; private final Set<String> errorCodeWhitelist; - MetricContext metricsContext; - EventSubmitter eventSubmitter; + private MetricContext metricsContext; public R2RestResponseHandler() { this(new HashSet<>(), Instrumented.getMetricContext(new State(), R2RestResponseHandler.class)); @@ -60,7 +61,6 @@ public class R2RestResponseHandler implements ResponseHandler<RestRequest, RestR public R2RestResponseHandler(Set<String> errorCodeWhitelist, MetricContext metricContext) { this.errorCodeWhitelist = errorCodeWhitelist; this.metricsContext = metricContext; - eventSubmitter = new EventSubmitter.Builder(metricsContext, R2_RESPONSE_EVENT_NAMESPACE).build(); } @Override @@ -74,11 +74,20 @@ public class R2RestResponseHandler implements ResponseHandler<RestRequest, RestR status.setContent(response.getEntity()); status.setContentType(response.getHeader(CONTENT_TYPE_HEADER)); } else { + log.info("Receive an unsuccessful response with status code: " + statusCode); + Map<String, String> metadata = Maps.newHashMap(); - metadata.put(HttpConstants.REQUEST, request.toString()); metadata.put(HttpConstants.STATUS_CODE, String.valueOf(statusCode)); - eventSubmitter.submit(R2_FAILED_REQUEST, metadata); - log.info("Receive an unsuccessful response with status code: " + statusCode); + metadata.put(HttpConstants.REQUEST, request.toString()); + if (status.getType() != StatusType.CONTINUE) { + FailureEventBuilder failureEvent = new FailureEventBuilder(R2_FAILED_REQUEST_EVENT); + failureEvent.addAdditionalMetadata(metadata); + failureEvent.submit(metricsContext); + } else { + GobblinTrackingEvent event = + new GobblinTrackingEvent(0L, R2_RESPONSE_EVENT_NAMESPACE, R2_FAILED_REQUEST_EVENT, metadata); + metricsContext.submitEvent(event); + } } return status; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/writer/AsyncHttpWriter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/writer/AsyncHttpWriter.java b/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/writer/AsyncHttpWriter.java index 3396de9..6c1b105 100644 --- a/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/writer/AsyncHttpWriter.java +++ b/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/writer/AsyncHttpWriter.java @@ -20,6 +20,9 @@ package org.apache.gobblin.writer; import java.io.IOException; import java.util.Queue; +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.event.FailureEventBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +48,8 @@ import org.apache.gobblin.http.ResponseStatus; @Slf4j public class AsyncHttpWriter<D, RQ, RP> extends AbstractAsyncDataWriter<D> { private static final Logger LOG = LoggerFactory.getLogger(AsyncHttpWriter.class); + private static final String ASYNC_REQUEST = "asyncRequest"; + private static final String FATAL_ASYNC_HTTP_WRITE_EVENT = "fatalAsyncHttpWrite"; public static final int DEFAULT_MAX_ATTEMPTS = 3; @@ -53,12 +58,15 @@ public class AsyncHttpWriter<D, RQ, RP> extends AbstractAsyncDataWriter<D> { private final AsyncRequestBuilder<D, RQ> requestBuilder; private final int maxAttempts; + private final MetricContext context; + public AsyncHttpWriter(AsyncHttpWriterBuilder builder) { super(builder.getQueueCapacity()); this.httpClient = builder.getClient(); this.requestBuilder = builder.getAsyncRequestBuilder(); this.responseHandler = builder.getResponseHandler(); this.maxAttempts = builder.getMaxAttempts(); + this.context = Instrumented.getMetricContext(builder.getState(), AsyncHttpWriter.class); } @Override @@ -82,8 +90,9 @@ public class AsyncHttpWriter<D, RQ, RP> extends AbstractAsyncDataWriter<D> { LOG.error("Fail to send request"); LOG.info(asyncRequest.toString()); - onFailure(asyncRequest, e); - throw new DispatchException("Write failed on IOException", e); + DispatchException de = new DispatchException("Write failed on IOException", e); + onFailure(asyncRequest, de); + throw de; } else { continue; } @@ -152,13 +161,29 @@ public class AsyncHttpWriter<D, RQ, RP> extends AbstractAsyncDataWriter<D> { /** * Callback on failing to send the asyncRequest + * + * @deprecated Use {@link #onFailure(AsyncRequest, DispatchException)} */ + @Deprecated protected void onFailure(AsyncRequest<D, RQ> asyncRequest, Throwable throwable) { for (AsyncRequest.Thunk thunk: asyncRequest.getThunks()) { thunk.callback.onFailure(throwable); } } + protected void onFailure(AsyncRequest<D, RQ> asyncRequest, DispatchException exception) { + if (exception.isFatal()) { + // Report failure event + FailureEventBuilder failureEvent = new FailureEventBuilder(FATAL_ASYNC_HTTP_WRITE_EVENT); + failureEvent.setRootCause(exception); + failureEvent.addMetadata(ASYNC_REQUEST, asyncRequest.toString()); + failureEvent.submit(context); + } + for (AsyncRequest.Thunk thunk : asyncRequest.getThunks()) { + thunk.callback.onFailure(exception); + } + } + @Override public void close() throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java index 3bd3006..6c16de1 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java @@ -554,11 +554,7 @@ public class JobState extends SourceState { public void toJson(JsonWriter jsonWriter, boolean keepConfig) throws IOException { jsonWriter.beginObject(); - - jsonWriter.name("job name").value(this.getJobName()).name("job id").value(this.getJobId()).name("job state") - .value(this.getState().name()).name("start time").value(this.getStartTime()).name("end time") - .value(this.getEndTime()).name("duration").value(this.getDuration()).name("tasks").value(this.getTaskCount()) - .name("completed tasks").value(this.getCompletedTasks()); + writeStateSummary(jsonWriter); jsonWriter.name("task states"); jsonWriter.beginArray(); @@ -578,6 +574,19 @@ public class JobState extends SourceState { jsonWriter.endObject(); } + /** + * Write a summary to the json document + * + * @param jsonWriter a {@link com.google.gson.stream.JsonWriter} + * used to write the json document + */ + protected void writeStateSummary(JsonWriter jsonWriter) throws IOException { + jsonWriter.name("job name").value(this.getJobName()).name("job id").value(this.getJobId()).name("job state") + .value(this.getState().name()).name("start time").value(this.getStartTime()).name("end time") + .value(this.getEndTime()).name("duration").value(this.getDuration()).name("tasks").value(this.getTaskCount()) + .name("completed tasks").value(this.getCompletedTasks()); + } + protected void propsToJson(JsonWriter jsonWriter) throws IOException { jsonWriter.beginObject(); @@ -805,5 +814,12 @@ public class JobState extends SourceState { public void overrideWith(Properties properties) { throw new UnsupportedOperationException(); } + + @Override + protected void writeStateSummary(JsonWriter jsonWriter) + throws IOException { + super.writeStateSummary(jsonWriter); + jsonWriter.name("datasetUrn").value(getDatasetUrn()); + } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java index 07d167b..d6a1b58 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java @@ -39,6 +39,7 @@ import org.apache.gobblin.lineage.LineageException; import org.apache.gobblin.lineage.LineageInfo; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.event.EventSubmitter; +import org.apache.gobblin.metrics.event.FailureEventBuilder; import org.apache.gobblin.publisher.CommitSequencePublisher; import org.apache.gobblin.publisher.DataPublisher; import org.apache.gobblin.publisher.UnpublishedHandling; @@ -63,6 +64,9 @@ final class SafeDatasetCommit implements Callable<Void> { private static final Object GLOBAL_LOCK = new Object(); + private static final String DATASET_STATE = "datasetState"; + private static final String FAILED_DATASET_EVENT = "failedDataset"; + private final boolean shouldCommitDataInJob; private final boolean isJobCancelled; private final DeliverySemantics deliverySemantics; @@ -100,6 +104,8 @@ final class SafeDatasetCommit implements Callable<Void> { log.error("Failed to instantiate data publisher for dataset %s of job %s.", this.datasetUrn, this.jobContext.getJobId(), roe); throw new RuntimeException(roe); + } finally { + maySubmitFailureEvent(datasetState); } if (this.isJobCancelled) { @@ -163,7 +169,9 @@ final class SafeDatasetCommit implements Callable<Void> { } finally { try { finalizeDatasetState(datasetState, datasetUrn); + maySubmitFailureEvent(datasetState); submitLineageEvent(datasetState.getTaskStates()); + if (commitSequenceBuilder.isPresent()) { buildAndExecuteCommitSequence(commitSequenceBuilder.get(), datasetState, datasetUrn); datasetState.setState(JobState.RunningState.COMMITTED); @@ -181,6 +189,14 @@ final class SafeDatasetCommit implements Callable<Void> { return null; } + private void maySubmitFailureEvent(JobState.DatasetState datasetState) { + if (datasetState.getState() == JobState.RunningState.FAILED) { + FailureEventBuilder failureEvent = new FailureEventBuilder(FAILED_DATASET_EVENT); + failureEvent.addMetadata(DATASET_STATE, datasetState.toString()); + failureEvent.submit(metricContext); + } + } + private void submitLineageEvent(Collection<TaskState> states) { if (states.size() == 0) { return; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java index 65cf611..3265ab8 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.BooleanUtils; +import org.apache.gobblin.metrics.event.FailureEventBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -114,6 +115,9 @@ public class Task implements TaskIFace { private static final Logger LOG = LoggerFactory.getLogger(Task.class); + private static final String TASK_STATE = "taskState"; + private static final String FAILED_TASK_EVENT = "failedTask"; + private final String jobId; private final String taskId; private final String taskKey; @@ -448,6 +452,7 @@ public class Task implements TaskIFace { } } catch (Exception e) { if (!(e instanceof DataConversionException) && !(e.getCause() instanceof DataConversionException)) { + LOG.error("Processing record incurs an unexpected exception: ", e); throw new RuntimeException(e.getCause()); } errRecords++; @@ -511,6 +516,12 @@ public class Task implements TaskIFace { LOG.error(String.format("Task %s failed", this.taskId), t); this.taskState.setWorkingState(WorkUnitState.WorkingState.FAILED); this.taskState.setProp(ConfigurationKeys.TASK_FAILURE_EXCEPTION_KEY, Throwables.getStackTraceAsString(t)); + + // Send task failure event + FailureEventBuilder failureEvent = new FailureEventBuilder(FAILED_TASK_EVENT); + failureEvent.setRootCause(t); + failureEvent.addMetadata(TASK_STATE, this.taskState.toString()); + failureEvent.submit(taskContext.getTaskMetrics().getMetricContext()); } /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/plugins/email/EmailNotificationPlugin.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/plugins/email/EmailNotificationPlugin.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/plugins/email/EmailNotificationPlugin.java index 3cfc67b..5122391 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/plugins/email/EmailNotificationPlugin.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/plugins/email/EmailNotificationPlugin.java @@ -117,7 +117,9 @@ public class EmailNotificationPlugin extends BaseIdlePluginImpl { } private static String getEmailBody(JobExecutionState state, RunningState previousStatus, RunningState newStatus) { - return new StringBuilder().append("RunningState: ").append(newStatus.toString()).append("\n") + return new StringBuilder().append("JobId: ") + .append(state.getJobSpec().getConfig().getString(ConfigurationKeys.JOB_ID_KEY)) + .append("RunningState: ").append(newStatus.toString()).append("\n") .append("JobExecutionState: ").append(state.getJobSpec().toLongString()).append("\n") .append("ExecutionMetadata: ").append(state.getExecutionMetadata()).toString(); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java index f4f3dfb..ef57684 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.apache.gobblin.runtime.util.TaskMetrics; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.testng.Assert; @@ -348,6 +349,7 @@ public class TaskContinuousTest { // Create a mock TaskContext TaskContext mockTaskContext = mock(TaskContext.class); + when(mockTaskContext.getTaskMetrics()).thenReturn(TaskMetrics.get(taskState)); when(mockTaskContext.getExtractor()).thenReturn(mockExtractor); when(mockTaskContext.getRawSourceExtractor()).thenReturn(mockExtractor); when(mockTaskContext.getWatermarkStorage()).thenReturn(mockWatermarkStorage); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskTest.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskTest.java index 184943d..84c4f0a 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskTest.java @@ -35,6 +35,7 @@ import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.gobblin.runtime.util.TaskMetrics; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.testng.Assert; @@ -109,6 +110,7 @@ public class TaskTest { taskState.addAll(overrides); // Create a mock TaskContext TaskContext mockTaskContext = mock(TaskContext.class); + when(mockTaskContext.getTaskMetrics()).thenReturn(TaskMetrics.get(taskState)); when(mockTaskContext.getExtractor()).thenReturn(new FailOnceExtractor()); when(mockTaskContext.getForkOperator()).thenReturn(new IdentityForkOperator()); when(mockTaskContext.getTaskState()).thenReturn(taskState);
