Updated Branches: refs/heads/flume-1.5 8ea478932 -> b1f122f23
FLUME-2213. MorphlineInterceptor should share metric registry across threads for better (aggregate) reporting (Wolfgang Hoschek via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/b1f122f2 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/b1f122f2 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/b1f122f2 Branch: refs/heads/flume-1.5 Commit: b1f122f23d05aa55492941b11a4a1f7b5a7b5dfe Parents: 8ea4789 Author: Hari Shreedharan <[email protected]> Authored: Wed Oct 16 14:24:13 2013 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Wed Oct 16 14:25:04 2013 -0700 ---------------------------------------------------------------------- .../solr/morphline/MorphlineHandlerImpl.java | 67 ++++++++++++++------ 1 file changed, 47 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/b1f122f2/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java index ea76322..cb88dc2 100644 --- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java +++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java @@ -31,8 +31,12 @@ import com.cloudera.cdk.morphline.api.Record; import com.cloudera.cdk.morphline.base.Compiler; import com.cloudera.cdk.morphline.base.FaultTolerance; import com.cloudera.cdk.morphline.base.Fields; +import com.cloudera.cdk.morphline.base.Metrics; import com.cloudera.cdk.morphline.base.Notifications; +import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.SharedMetricRegistries; +import com.codahale.metrics.Timer; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -46,6 +50,11 @@ public class MorphlineHandlerImpl implements MorphlineHandler { private Command finalChild; private String morphlineFileAndId; + private Timer mappingTimer; + private Meter numRecords; + private Meter numFailedRecords; + private Meter numExceptionRecords; + public static final String MORPHLINE_FILE_PARAM = "morphlineFile"; public static final String MORPHLINE_ID_PARAM = "morphlineId"; @@ -69,6 +78,13 @@ public class MorphlineHandlerImpl implements MorphlineHandler { @Override public void configure(Context context) { + String morphlineFile = context.getString(MORPHLINE_FILE_PARAM); + String morphlineId = context.getString(MORPHLINE_ID_PARAM); + if (morphlineFile == null || morphlineFile.trim().length() == 0) { + throw new MorphlineCompilationException("Missing parameter: " + MORPHLINE_FILE_PARAM, null); + } + morphlineFileAndId = morphlineFile + "@" + morphlineId; + if (morphlineContext == null) { FaultTolerance faultTolerance = new FaultTolerance( context.getBoolean(FaultTolerance.IS_PRODUCTION_MODE, false), @@ -77,37 +93,48 @@ public class MorphlineHandlerImpl implements MorphlineHandler { morphlineContext = new MorphlineContext.Builder() .setExceptionHandler(faultTolerance) - .setMetricRegistry(new MetricRegistry()) + .setMetricRegistry(SharedMetricRegistries.getOrCreate(morphlineFileAndId)) .build(); } - String morphlineFile = context.getString(MORPHLINE_FILE_PARAM); - String morphlineId = context.getString(MORPHLINE_ID_PARAM); - if (morphlineFile == null || morphlineFile.trim().length() == 0) { - throw new MorphlineCompilationException("Missing parameter: " + MORPHLINE_FILE_PARAM, null); - } Config override = ConfigFactory.parseMap(context.getSubProperties(MORPHLINE_VARIABLE_PARAM + ".")); morphline = new Compiler().compile(new File(morphlineFile), morphlineId, morphlineContext, finalChild, override); - morphlineFileAndId = morphlineFile + "@" + morphlineId; + + this.mappingTimer = morphlineContext.getMetricRegistry().timer( + MetricRegistry.name("morphline.app", Metrics.ELAPSED_TIME)); + this.numRecords = morphlineContext.getMetricRegistry().meter( + MetricRegistry.name("morphline.app", Metrics.NUM_RECORDS)); + this.numFailedRecords = morphlineContext.getMetricRegistry().meter( + MetricRegistry.name("morphline.app", "numFailedRecords")); + this.numExceptionRecords = morphlineContext.getMetricRegistry().meter( + MetricRegistry.name("morphline.app", "numExceptionRecords")); } @Override public void process(Event event) { - Record record = new Record(); - for (Entry<String, String> entry : event.getHeaders().entrySet()) { - record.put(entry.getKey(), entry.getValue()); - } - byte[] bytes = event.getBody(); - if (bytes != null && bytes.length > 0) { - record.put(Fields.ATTACHMENT_BODY, bytes); - } + numRecords.mark(); + Timer.Context timerContext = mappingTimer.time(); try { - Notifications.notifyStartSession(morphline); - if (!morphline.process(record)) { - LOG.warn("Morphline {} failed to process record: {}", morphlineFileAndId, record); + Record record = new Record(); + for (Entry<String, String> entry : event.getHeaders().entrySet()) { + record.put(entry.getKey(), entry.getValue()); + } + byte[] bytes = event.getBody(); + if (bytes != null && bytes.length > 0) { + record.put(Fields.ATTACHMENT_BODY, bytes); + } + try { + Notifications.notifyStartSession(morphline); + if (!morphline.process(record)) { + numFailedRecords.mark(); + LOG.warn("Morphline {} failed to process record: {}", morphlineFileAndId, record); + } + } catch (RuntimeException t) { + numExceptionRecords.mark(); + morphlineContext.getExceptionHandler().handleException(t, record); } - } catch (RuntimeException t) { - morphlineContext.getExceptionHandler().handleException(t, record); + } finally { + timerContext.stop(); } }
