Repository: incubator-gobblin Updated Branches: refs/heads/master 7d8d40dd4 -> db8ee526f
[GOBBLIN-374] GobblinMetrics failed to close event reporters Closes #2249 from zxcware/metrics Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/db8ee526 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/db8ee526 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/db8ee526 Branch: refs/heads/master Commit: db8ee526ffa82af75f5db6e64dccf4479163c279 Parents: 7d8d40d Author: zhchen <[email protected]> Authored: Wed Jan 17 16:26:43 2018 -0800 Committer: Issac Buenrostro <[email protected]> Committed: Wed Jan 17 16:26:43 2018 -0800 ---------------------------------------------------------------------- .../gobblin/cluster/GobblinHelixTaskDriver.java | 4 +- .../apache/gobblin/metrics/GobblinMetrics.java | 8 ++-- .../gobblin/metrics/GobblinMetricsRegistry.java | 2 +- .../gobblin/metrics/GobblinMetricsTest.java | 42 ++++++++++++++++++++ 4 files changed, 50 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/db8ee526/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java index 6c29775..cedb111 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java @@ -304,14 +304,14 @@ public class GobblinHelixTaskDriver { if (currentData != null) { // Only update target state for non-completed workflows String finishTime = currentData.getSimpleField(WorkflowContext.FINISH_TIME); - if (finishTime == null || finishTime.equals(WorkflowContext.UNFINISHED)) { + if (finishTime == null || finishTime.equals(String.valueOf(WorkflowContext.UNFINISHED))) { currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(), state.name()); } else { LOG.info("TargetState DataUpdater: ignore to update target state " + finishTime); } } else { - LOG.error("TargetState DataUpdater: Fails to update target state " + currentData); + LOG.error("TargetState DataUpdater: Fails to update target state "); } return currentData; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/db8ee526/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java index 703a603..76eb6d3 100644 --- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java +++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java @@ -423,20 +423,20 @@ public class GobblinMetrics { } this.metricsReportingStarted = true; - LOGGER.info("Metrics reporting has been started: GobblinMetrics {}", this.hashCode()); + LOGGER.info("Metrics reporting has been started: GobblinMetrics {}", this.toString()); } /** * Stop metric reporting. */ public void stopMetricsReporting() { + LOGGER.info("Metrics reporting will be stopped: GobblinMetrics {}", this.toString()); + if (!this.metricsReportingStarted) { LOGGER.warn("Metric reporting has not started yet"); return; } - LOGGER.info("Metrics reporting will be stopped: GobblinMetrics {}", this.hashCode()); - // Stop the JMX reporter if (this.jmxReporter.isPresent()) { this.jmxReporter.get().stop(); @@ -461,6 +461,8 @@ public class GobblinMetrics { } this.metricsReportingStarted = false; + // Remove from the cache registry + GobblinMetrics.remove(id); LOGGER.info("Metrics reporting stopped successfully"); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/db8ee526/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetricsRegistry.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetricsRegistry.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetricsRegistry.java index d50a0bc..c37d56b 100644 --- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetricsRegistry.java +++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetricsRegistry.java @@ -46,7 +46,7 @@ public class GobblinMetricsRegistry { private static final GobblinMetricsRegistry GLOBAL_INSTANCE = new GobblinMetricsRegistry(); - private final Cache<String, GobblinMetrics> metricsCache = CacheBuilder.newBuilder().softValues().build(); + private final Cache<String, GobblinMetrics> metricsCache = CacheBuilder.newBuilder().build(); private GobblinMetricsRegistry() { // Do nothing http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/db8ee526/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java new file mode 100644 index 0000000..4a357c8 --- /dev/null +++ b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics; + +import java.util.Properties; + +import org.testng.Assert; +import org.testng.annotations.Test; + + +@Test +public class GobblinMetricsTest { + + /** + * Test the {@link GobblinMetrics} instance is removed from {@link GobblinMetricsRegistry} when + * it stops metrics reporting + */ + public void testStopReportingMetrics() { + String id = getClass().getSimpleName() + "-" + System.currentTimeMillis(); + GobblinMetrics gobblinMetrics = GobblinMetrics.get(id); + gobblinMetrics.startMetricReporting(new Properties()); + Assert.assertEquals(GobblinMetricsRegistry.getInstance().get(id).get(), gobblinMetrics); + + gobblinMetrics.stopMetricsReporting(); + Assert.assertFalse(GobblinMetricsRegistry.getInstance().get(id).isPresent()); + } +}
