http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java deleted file mode 100644 index 4e0c15c..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.metrics; - -import java.io.Serializable; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.annotations.Experimental.Kind; - -/** - * Atomically tracks the dirty-state of a metric. - * - * <p>Reporting an update is split into two parts such that only changes made before the call to - * {@link #beforeCommit()} are committed when {@link #afterCommit()} is invoked. This allows for - * a two-step commit process of gathering all the dirty updates (calling {#link beforeCommit()}) - * followed by committing and calling {#link afterCommit()}. - * - * <p>The tracking of dirty states is done conservatively -- sometimes {@link #beforeCommit()} - * will return true (indicating a dirty metric) even if there have been no changes since the last - * commit. - * - * <p>There is also a possible race when the underlying metric is modified but the call to - * {@link #afterModification()} hasn't happened before the call to {@link #beforeCommit()}. In this - * case the next round of metric updating will see the changes. If this was for the final commit, - * then the metric updates shouldn't be extracted until all possible user modifications have - * completed. - */ -@Experimental(Kind.METRICS) -class DirtyState implements Serializable { - private enum State { - /** Indicates that there have been changes to the MetricCell since last commit. */ - DIRTY, - /** Indicates that there have been no changes to the MetricCell since last commit. */ - CLEAN, - /** Indicates that a commit of the current value is in progress. */ - COMMITTING - } - - private final AtomicReference<State> dirty = new AtomicReference<>(State.DIRTY); - - /** - * Indicate that changes have been made to the metric being tracked by this {@link DirtyState}. - * - * <p>Should be called <b>after</b> modification of the value. - */ - public void afterModification() { - dirty.set(State.DIRTY); - } - - /** - * Check the dirty state and mark the metric as committing. - * - * <p>If the state was {@code CLEAN}, this returns {@code false}. If the state was {@code DIRTY} - * or {@code COMMITTING} this returns {@code true} and sets the state to {@code COMMITTING}. - * - * @return {@code false} if the state is clean and {@code true} otherwise. - */ - public boolean beforeCommit() { - // After this loop, we want the state to be either CLEAN or COMMITTING. - // If the state was CLEAN, we don't need to do anything (and exit the loop early) - // If the state was DIRTY, we will attempt to do a CAS(DIRTY, COMMITTING). This will only - // fail if another thread is getting updates which generally shouldn't be the case. - // If the state was COMMITTING, we will attempt to do a CAS(COMMITTING, COMMITTING). This will - // fail if another thread commits updates (which shouldn't be the case) or if the user code - // updates the metric, in which case it will transition to DIRTY and the next iteration will - // successfully update it. - State state; - do { - state = dirty.get(); - } while (state != State.CLEAN && !dirty.compareAndSet(state, State.COMMITTING)); - - return state != State.CLEAN; - } - - /** - * Mark any changes up to the most recently call to {@link #beforeCommit()}} as committed. - * The next call to {@link #beforeCommit()} will return {@code false} unless there have - * been changes made since the previous call to {@link #beforeCommit()}. - */ - public void afterCommit() { - dirty.compareAndSet(State.COMMITTING, State.CLEAN); - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java deleted file mode 100644 index 93a3649..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.metrics; - -import java.util.concurrent.atomic.AtomicReference; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.annotations.Experimental.Kind; - -/** - * Tracks the current value (and delta) for a Distribution metric. - * - * <p>This class generally shouldn't be used directly. The only exception is within a runner where - * a distribution is being reported for a specific step (rather than the distribution in the current - * context). In that case retrieving the underlying cell and reporting directly to it avoids a step - * of indirection. - */ -@Experimental(Kind.METRICS) -public class DistributionCell implements MetricCell<Distribution, DistributionData> { - - private final DirtyState dirty = new DirtyState(); - private final AtomicReference<DistributionData> value = - new AtomicReference<>(DistributionData.EMPTY); - - /** - * Package-visibility because all {@link DistributionCell DistributionCells} should be created by - * {@link MetricsContainer#getDistribution(MetricName)}. - */ - DistributionCell() {} - - /** Increment the distribution by the given amount. */ - public void update(long n) { - update(DistributionData.singleton(n)); - } - - @Override - public void update(DistributionData data) { - DistributionData original; - do { - original = value.get(); - } while (!value.compareAndSet(original, original.combine(data))); - dirty.afterModification(); - } - - @Override - public void update(MetricCell<Distribution, DistributionData> other) { - update(other.getCumulative()); - } - - @Override - public DirtyState getDirty() { - return dirty; - } - - @Override - public DistributionData getCumulative() { - return value.get(); - } -} - http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java deleted file mode 100644 index 8068e1b..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.metrics; - -import com.google.auto.value.AutoValue; -import java.io.Serializable; - -/** - * Data describing the the distribution. This should retain enough detail that it can be combined - * with other {@link DistributionData}. - * - * <p>This is kept distinct from {@link DistributionResult} since this may be extended to include - * data necessary to approximate quantiles, etc. while {@link DistributionResult} would just include - * the approximate value of those quantiles. - */ -@AutoValue -public abstract class DistributionData implements Serializable { - - public abstract long sum(); - public abstract long count(); - public abstract long min(); - public abstract long max(); - - public static final DistributionData EMPTY = create(0, 0, Long.MAX_VALUE, Long.MIN_VALUE); - - public static DistributionData create(long sum, long count, long min, long max) { - return new AutoValue_DistributionData(sum, count, min, max); - } - - public static DistributionData singleton(long value) { - return create(value, 1, value, value); - } - - public DistributionData combine(DistributionData value) { - return create( - sum() + value.sum(), - count() + value.count(), - Math.min(value.min(), min()), - Math.max(value.max(), max())); - } - - public DistributionResult extractResult() { - return DistributionResult.create(sum(), count(), min(), max()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java index 27c242c..b01ae46 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java @@ -18,10 +18,13 @@ package org.apache.beam.sdk.metrics; import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; /** * The result of a {@link Distribution} metric. */ +@Experimental(Kind.METRICS) @AutoValue public abstract class DistributionResult { http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java deleted file mode 100644 index 0cdd568..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.metrics; - -import java.util.concurrent.atomic.AtomicReference; -import org.apache.beam.sdk.annotations.Experimental; - -/** - * Tracks the current value (and delta) for a {@link Gauge} metric. - * - * <p>This class generally shouldn't be used directly. The only exception is within a runner where - * a gauge is being reported for a specific step (rather than the gauge in the current - * context). In that case retrieving the underlying cell and reporting directly to it avoids a step - * of indirection. - */ -@Experimental(Experimental.Kind.METRICS) -public class GaugeCell implements MetricCell<Gauge, GaugeData> { - - private final DirtyState dirty = new DirtyState(); - private final AtomicReference<GaugeData> gaugeValue = new AtomicReference<>(GaugeData.empty()); - - /** Set the gauge to the given value. */ - public void set(long value) { - update(GaugeData.create(value)); - } - - @Override - public void update(GaugeData data) { - GaugeData original; - do { - original = gaugeValue.get(); - } while (!gaugeValue.compareAndSet(original, original.combine(data))); - dirty.afterModification(); - } - - @Override - public void update(MetricCell<Gauge, GaugeData> other) { - GaugeData original; - do { - original = gaugeValue.get(); - } while (!gaugeValue.compareAndSet(original, original.combine(other.getCumulative()))); - dirty.afterModification(); - update(other.getCumulative()); - } - - @Override - public DirtyState getDirty() { - return dirty; - } - - @Override - public GaugeData getCumulative() { - return gaugeValue.get(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeData.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeData.java deleted file mode 100644 index bf3401d..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeData.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.metrics; - -import com.google.auto.value.AutoValue; -import java.io.Serializable; -import org.joda.time.Instant; - -/** - * Data describing the gauge. This should retain enough detail that it can be combined with - * other {@link GaugeData}. - */ -@AutoValue -public abstract class GaugeData implements Serializable { - - public abstract long value(); - - public abstract Instant timestamp(); - - public static GaugeData create(long value) { - return new AutoValue_GaugeData(value, Instant.now()); - } - - public static GaugeData empty() { - return EmptyGaugeData.INSTANCE; - } - - public GaugeData combine(GaugeData other) { - if (this.timestamp().isAfter(other.timestamp())) { - return this; - } else { - return other; - } - } - - public GaugeResult extractResult() { - return GaugeResult.create(value(), timestamp()); - } - - /** - * Empty {@link GaugeData}, representing no values reported. - */ - public static class EmptyGaugeData extends GaugeData { - - private static final EmptyGaugeData INSTANCE = new EmptyGaugeData(); - private static final Instant EPOCH = new Instant(0); - - private EmptyGaugeData() { - } - - @Override - public long value() { - return -1L; - } - - @Override - public Instant timestamp() { - return EPOCH; - } - - @Override - public GaugeResult extractResult() { - return GaugeResult.empty(); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeResult.java index 878776a..f24ded2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeResult.java @@ -18,11 +18,14 @@ package org.apache.beam.sdk.metrics; import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; import org.joda.time.Instant; /** * The result of a {@link Gauge} metric. */ +@Experimental(Kind.METRICS) @AutoValue public abstract class GaugeResult { public abstract long value(); http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java index dcd8a04..fdcc93c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java @@ -18,10 +18,15 @@ package org.apache.beam.sdk.metrics; +import java.io.Serializable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + /** * Marker interface for all user-facing metrics. */ -public interface Metric { +@Experimental(Kind.METRICS) +public interface Metric extends Serializable { /** * The {@link MetricName} given to this metric. http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java deleted file mode 100644 index 403cac2..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.metrics; - -import java.io.Serializable; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.annotations.Experimental.Kind; - -/** - * A {@link MetricCell} is used for accumulating in-memory changes to a metric. It represents a - * specific metric name in a single context. - * - * @param <UserT> The type of the user interface for reporting changes to this cell. - * @param <DataT> The type of metric data stored (and extracted) from this cell. - */ -@Experimental(Kind.METRICS) -public interface MetricCell<UserT extends Metric, DataT> extends Serializable { - - /** - * Update value of this cell. - */ - void update(DataT data); - - /** - * Update value of this cell by merging the value of another cell. - */ - void update(MetricCell<UserT, DataT> other); - - /** - * Return the {@link DirtyState} tracking whether this metric cell contains uncommitted changes. - */ - DirtyState getDirty(); - - /** - * Return the cumulative value of this metric. - */ - DataT getCumulative(); -} http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricFiltering.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricFiltering.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricFiltering.java deleted file mode 100644 index a3e43e1..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricFiltering.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.metrics; - -import com.google.common.base.Objects; -import java.util.Set; - -/** - * Implements matching for metrics filters. Specifically, matching for metric name, - * namespace, and step name. - */ -public class MetricFiltering { - - private MetricFiltering() { } - - /** Matching logic is implemented here rather than in MetricsFilter because we would like - * MetricsFilter to act as a "dumb" value-object, with the possibility of replacing it with - * a Proto/JSON/etc. schema object. - * @param filter {@link MetricsFilter} with the matching information of an actual metric - * @param key {@link MetricKey} with the information of a metric - * @return whether the filter matches the key or not - */ - public static boolean matches(MetricsFilter filter, MetricKey key) { - return filter == null - || (matchesName(key.metricName(), filter.names()) - && matchesScope(key.stepName(), filter.steps())); - } - - /** - * {@code subPathMatches(haystack, needle)} returns true if {@code needle} - * represents a path within {@code haystack}. For example, "foo/bar" is in "a/foo/bar/b", - * but not "a/fool/bar/b" or "a/foo/bart/b". - */ - public static boolean subPathMatches(String haystack, String needle) { - int location = haystack.indexOf(needle); - int end = location + needle.length(); - if (location == -1) { - return false; // needle not found - } else if (location != 0 && haystack.charAt(location - 1) != '/') { - return false; // the first entry in needle wasn't exactly matched - } else if (end != haystack.length() && haystack.charAt(end) != '/') { - return false; // the last entry in needle wasn't exactly matched - } else { - return true; - } - } - - /** - * {@code matchesScope(actualScope, scopes)} returns true if the scope of a metric is matched - * by any of the filters in {@code scopes}. A metric scope is a path of type "A/B/D". A - * path is matched by a filter if the filter is equal to the path (e.g. "A/B/D", or - * if it represents a subpath within it (e.g. "A/B" or "B/D", but not "A/D"). */ - public static boolean matchesScope(String actualScope, Set<String> scopes) { - if (scopes.isEmpty() || scopes.contains(actualScope)) { - return true; - } - - // If there is no perfect match, a stage name-level match is tried. - // This is done by a substring search over the levels of the scope. - // e.g. a scope "A/B/C/D" is matched by "A/B", but not by "A/C". - for (String scope : scopes) { - if (subPathMatches(actualScope, scope)) { - return true; - } - } - - return false; - } - - private static boolean matchesName(MetricName metricName, Set<MetricNameFilter> nameFilters) { - if (nameFilters.isEmpty()) { - return true; - } - for (MetricNameFilter nameFilter : nameFilters) { - if ((nameFilter.getName() == null || nameFilter.getName().equals(metricName.name())) - && Objects.equal(metricName.namespace(), nameFilter.getNamespace())) { - return true; - } - } - return false; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java deleted file mode 100644 index 8706853..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.metrics; - -import com.google.auto.value.AutoValue; -import java.io.Serializable; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.annotations.Experimental.Kind; - -/** - * Metrics are keyed by the step name they are associated with and the name of the metric. - */ -@Experimental(Kind.METRICS) -@AutoValue -public abstract class MetricKey implements Serializable { - - /** The step name that is associated with this metric. */ - public abstract String stepName(); - - /** The name of the metric. */ - public abstract MetricName metricName(); - - public static MetricKey create(String stepName, MetricName metricName) { - return new AutoValue_MetricKey(stepName, metricName); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java deleted file mode 100644 index 9cf6a5c..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.metrics; - -import com.google.auto.value.AutoValue; -import com.google.common.collect.Iterables; -import java.io.Serializable; -import java.util.Collections; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.annotations.Experimental.Kind; - -/** - * Representation of multiple metric updates. - */ -@Experimental(Kind.METRICS) -@AutoValue -public abstract class MetricUpdates { - - public static final MetricUpdates EMPTY = MetricUpdates.create( - Collections.<MetricUpdate<Long>>emptyList(), - Collections.<MetricUpdate<DistributionData>>emptyList(), - Collections.<MetricUpdate<GaugeData>>emptyList()); - - /** - * Representation of a single metric update. - * @param <T> The type of value representing the update. - */ - @AutoValue - public abstract static class MetricUpdate<T> implements Serializable { - - /** The key being updated. */ - public abstract MetricKey getKey(); - /** The value of the update. */ - public abstract T getUpdate(); - - public static <T> MetricUpdate<T> create(MetricKey key, T update) { - return new AutoValue_MetricUpdates_MetricUpdate(key, update); - } - } - - /** Returns true if there are no updates in this MetricUpdates object. */ - public boolean isEmpty() { - return Iterables.isEmpty(counterUpdates()) - && Iterables.isEmpty(distributionUpdates()); - } - - /** All of the counter updates. */ - public abstract Iterable<MetricUpdate<Long>> counterUpdates(); - - /** All of the distribution updates. */ - public abstract Iterable<MetricUpdate<DistributionData>> distributionUpdates(); - - /** All of the gauges updates. */ - public abstract Iterable<MetricUpdate<GaugeData>> gaugeUpdates(); - - /** Create a new {@link MetricUpdates} bundle. */ - public static MetricUpdates create( - Iterable<MetricUpdate<Long>> counterUpdates, - Iterable<MetricUpdate<DistributionData>> distributionUpdates, - Iterable<MetricUpdate<GaugeData>> gaugeUpdates) { - return new AutoValue_MetricUpdates(counterUpdates, distributionUpdates, gaugeUpdates); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java index 096d147..bad1f10 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java @@ -102,7 +102,7 @@ public class Metrics { } /** Implementation of {@link Counter} that delegates to the instance for the current context. */ - private static class DelegatingCounter implements Counter, Serializable { + private static class DelegatingCounter implements Metric, Counter, Serializable { private final MetricName name; private DelegatingCounter(MetricName name) { @@ -118,7 +118,7 @@ public class Metrics { @Override public void inc(long n) { MetricsContainer container = MetricsEnvironment.getCurrentContainer(); if (container != null) { - container.getCounter(name).update(n); + container.getCounter(name).inc(n); } } @@ -140,7 +140,7 @@ public class Metrics { /** * Implementation of {@link Distribution} that delegates to the instance for the current context. */ - private static class DelegatingDistribution implements Distribution, Serializable { + private static class DelegatingDistribution implements Metric, Distribution, Serializable { private final MetricName name; private DelegatingDistribution(MetricName name) { @@ -163,7 +163,7 @@ public class Metrics { /** * Implementation of {@link Gauge} that delegates to the instance for the current context. */ - private static class DelegatingGauge implements Gauge, Serializable { + private static class DelegatingGauge implements Metric, Gauge, Serializable { private final MetricName name; private DelegatingGauge(MetricName name) { http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java index 48fa359..62b0806 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java @@ -15,164 +15,34 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.metrics; -import static com.google.common.base.Preconditions.checkNotNull; +package org.apache.beam.sdk.metrics; -import com.google.common.collect.ImmutableList; import java.io.Serializable; -import java.util.Map; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; -import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate; /** * Holds the metrics for a single step and unit-of-commit (bundle). - * - * <p>This class is thread-safe. It is intended to be used with 1 (or more) threads are updating - * metrics and at-most 1 thread is extracting updates by calling {@link #getUpdates} and - * {@link #commitUpdates}. Outside of this it is still safe. Although races in the update extraction - * may cause updates that don't actually have any changes, it will never lose an update. - * - * <p>For consistency, all threads that update metrics should finish before getting the final - * cumulative values/updates. */ @Experimental(Kind.METRICS) -public class MetricsContainer implements Serializable { - - private final String stepName; - - private MetricsMap<MetricName, CounterCell> counters = - new MetricsMap<>(new MetricsMap.Factory<MetricName, CounterCell>() { - @Override - public CounterCell createInstance(MetricName unusedKey) { - return new CounterCell(); - } - }); - - private MetricsMap<MetricName, DistributionCell> distributions = - new MetricsMap<>(new MetricsMap.Factory<MetricName, DistributionCell>() { - @Override - public DistributionCell createInstance(MetricName unusedKey) { - return new DistributionCell(); - } - }); - - private MetricsMap<MetricName, GaugeCell> gauges = - new MetricsMap<>(new MetricsMap.Factory<MetricName, GaugeCell>() { - @Override - public GaugeCell createInstance(MetricName unusedKey) { - return new GaugeCell(); - } - }); - - /** - * Create a new {@link MetricsContainer} associated with the given {@code stepName}. - */ - public MetricsContainer(String stepName) { - this.stepName = stepName; - } +public interface MetricsContainer extends Serializable { /** - * Return the {@link CounterCell} that should be used for implementing the given + * Return the {@link Counter} that should be used for implementing the given * {@code metricName} in this container. */ - public CounterCell getCounter(MetricName metricName) { - return counters.get(metricName); - } + Counter getCounter(MetricName metricName); /** - * Return the {@link DistributionCell} that should be used for implementing the given + * Return the {@link Distribution} that should be used for implementing the given * {@code metricName} in this container. */ - public DistributionCell getDistribution(MetricName metricName) { - return distributions.get(metricName); - } + Distribution getDistribution(MetricName metricName); /** - * Return the {@link GaugeCell} that should be used for implementing the given + * Return the {@link Gauge} that should be used for implementing the given * {@code metricName} in this container. */ - public GaugeCell getGauge(MetricName metricName) { - return gauges.get(metricName); - } - - private <UserT extends Metric, UpdateT, CellT extends MetricCell<UserT, UpdateT>> - ImmutableList<MetricUpdate<UpdateT>> extractUpdates( - MetricsMap<MetricName, CellT> cells) { - ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder(); - for (Map.Entry<MetricName, CellT> cell : cells.entries()) { - if (cell.getValue().getDirty().beforeCommit()) { - updates.add(MetricUpdate.create(MetricKey.create(stepName, cell.getKey()), - cell.getValue().getCumulative())); - } - } - return updates.build(); - } - - /** - * Return the cumulative values for any metrics that have changed since the last time updates were - * committed. - */ - public MetricUpdates getUpdates() { - return MetricUpdates.create( - extractUpdates(counters), - extractUpdates(distributions), - extractUpdates(gauges)); - } - - private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?, ?>> cells) { - for (MetricCell<?, ?> cell : cells.values()) { - cell.getDirty().afterCommit(); - } - } - - /** - * Mark all of the updates that were retrieved with the latest call to {@link #getUpdates()} as - * committed. - */ - public void commitUpdates() { - commitUpdates(counters); - commitUpdates(distributions); - commitUpdates(gauges); - } - - private <UserT extends Metric, UpdateT, CellT extends MetricCell<UserT, UpdateT>> - ImmutableList<MetricUpdate<UpdateT>> extractCumulatives( - MetricsMap<MetricName, CellT> cells) { - ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder(); - for (Map.Entry<MetricName, CellT> cell : cells.entries()) { - UpdateT update = checkNotNull(cell.getValue().getCumulative()); - updates.add(MetricUpdate.create(MetricKey.create(stepName, cell.getKey()), update)); - } - return updates.build(); - } - - /** - * Return the {@link MetricUpdates} representing the cumulative values of all metrics in this - * container. - */ - public MetricUpdates getCumulative() { - return MetricUpdates.create( - extractCumulatives(counters), - extractCumulatives(distributions), - extractCumulatives(gauges)); - } - - /** - * Update values of this {@link MetricsContainer} by merging the value of another cell. - */ - public void update(MetricsContainer other) { - updateCells(counters, other.counters); - updateCells(distributions, other.distributions); - updateCells(gauges, other.gauges); - } - - private <UserT extends Metric, DataT, CellT extends MetricCell<UserT, DataT>> void updateCells( - MetricsMap<MetricName, CellT> current, - MetricsMap<MetricName, CellT> updates) { - for (Map.Entry<MetricName, CellT> counter : updates.entries()) { - current.get(counter.getKey()).update(counter.getValue()); - } - } + Gauge getGauge(MetricName metricName); } http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainerStepMap.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainerStepMap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainerStepMap.java deleted file mode 100644 index d01e970..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainerStepMap.java +++ /dev/null @@ -1,487 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.metrics; - -import com.google.common.base.Function; -import com.google.common.base.Predicate; -import com.google.common.collect.FluentIterable; -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate; - -/** - * Metrics containers by step. - * - * <p>This class is not thread-safe.</p> - */ -public class MetricsContainerStepMap implements Serializable { - private Map<String, MetricsContainer> metricsContainers; - - public MetricsContainerStepMap() { - this.metricsContainers = new ConcurrentHashMap<>(); - } - - /** - * Returns the container for the given step name. - */ - public MetricsContainer getContainer(String stepName) { - if (!metricsContainers.containsKey(stepName)) { - metricsContainers.put(stepName, new MetricsContainer(stepName)); - } - return metricsContainers.get(stepName); - } - - /** - * Update this {@link MetricsContainerStepMap} with all values from given - * {@link MetricsContainerStepMap}. - */ - public void updateAll(MetricsContainerStepMap other) { - for (Map.Entry<String, MetricsContainer> container : other.metricsContainers.entrySet()) { - getContainer(container.getKey()).update(container.getValue()); - } - } - - /** - * Update {@link MetricsContainer} for given step in this map with all values from given - * {@link MetricsContainer}. - */ - public void update(String step, MetricsContainer container) { - getContainer(step).update(container); - } - - /** - * Returns {@link MetricResults} based on given - * {@link MetricsContainerStepMap MetricsContainerStepMaps} of attempted and committed metrics. - * - * <p>This constructor is intended for runners which support both attempted and committed - * metrics. - */ - public static MetricResults asMetricResults( - MetricsContainerStepMap attemptedMetricsContainers, - MetricsContainerStepMap committedMetricsContainers) { - return new MetricsContainerStepMapMetricResults( - attemptedMetricsContainers, - committedMetricsContainers); - } - - /** - * Returns {@link MetricResults} based on given {@link MetricsContainerStepMap} of attempted - * metrics. - * - * <p>This constructor is intended for runners which only support `attempted` metrics. - * Accessing {@link MetricResult#committed()} in the resulting {@link MetricResults} will result - * in an {@link UnsupportedOperationException}.</p> - */ - public static MetricResults asAttemptedOnlyMetricResults( - MetricsContainerStepMap attemptedMetricsContainers) { - return new MetricsContainerStepMapMetricResults(attemptedMetricsContainers); - } - - private Map<String, MetricsContainer> getMetricsContainers() { - return metricsContainers; - } - - private static class MetricsContainerStepMapMetricResults extends MetricResults { - private final Map<MetricKey, AttemptedAndCommitted<Long>> counters = new HashMap<>(); - private final Map<MetricKey, AttemptedAndCommitted<DistributionData>> distributions = - new HashMap<>(); - private final Map<MetricKey, AttemptedAndCommitted<GaugeData>> gauges = new HashMap<>(); - private final boolean isCommittedSupported; - - private MetricsContainerStepMapMetricResults( - MetricsContainerStepMap attemptedMetricsContainers) { - this(attemptedMetricsContainers, new MetricsContainerStepMap(), false); - } - - private MetricsContainerStepMapMetricResults( - MetricsContainerStepMap attemptedMetricsContainers, - MetricsContainerStepMap committedMetricsContainers) { - this(attemptedMetricsContainers, committedMetricsContainers, true); - } - - private MetricsContainerStepMapMetricResults( - MetricsContainerStepMap attemptedMetricsContainers, - MetricsContainerStepMap committedMetricsContainers, - boolean isCommittedSupported) { - for (MetricsContainer container - : attemptedMetricsContainers.getMetricsContainers().values()) { - MetricUpdates cumulative = container.getCumulative(); - mergeCounters(counters, cumulative.counterUpdates(), attemptedCounterUpdateFn()); - mergeDistributions(distributions, cumulative.distributionUpdates(), - attemptedDistributionUpdateFn()); - mergeGauges(gauges, cumulative.gaugeUpdates(), attemptedGaugeUpdateFn()); - } - for (MetricsContainer container - : committedMetricsContainers.getMetricsContainers().values()) { - MetricUpdates cumulative = container.getCumulative(); - mergeCounters(counters, cumulative.counterUpdates(), committedCounterUpdateFn()); - mergeDistributions(distributions, cumulative.distributionUpdates(), - committedDistributionUpdateFn()); - mergeGauges(gauges, cumulative.gaugeUpdates(), committedGaugeUpdateFn()); - } - this.isCommittedSupported = isCommittedSupported; - } - - private Function<MetricUpdate<DistributionData>, AttemptedAndCommitted<DistributionData>> - attemptedDistributionUpdateFn() { - return new Function<MetricUpdate<DistributionData>, - AttemptedAndCommitted<DistributionData>>() { - @Override - public AttemptedAndCommitted<DistributionData> apply(MetricUpdate<DistributionData> input) { - MetricKey key = input.getKey(); - return new AttemptedAndCommitted<>( - key, - input, - MetricUpdate.create(key, DistributionData.EMPTY)); - } - }; - } - - private Function<MetricUpdate<DistributionData>, AttemptedAndCommitted<DistributionData>> - committedDistributionUpdateFn() { - return new Function<MetricUpdate<DistributionData>, - AttemptedAndCommitted<DistributionData>>() { - @Override - public AttemptedAndCommitted<DistributionData> apply(MetricUpdate<DistributionData> input) { - MetricKey key = input.getKey(); - return new AttemptedAndCommitted<>( - key, - MetricUpdate.create(key, DistributionData.EMPTY), - input); - } - }; - } - - private Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>> - attemptedGaugeUpdateFn() { - return new Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>() { - @Override - public AttemptedAndCommitted<GaugeData> apply(MetricUpdate<GaugeData> input) { - MetricKey key = input.getKey(); - return new AttemptedAndCommitted<>( - key, - input, - MetricUpdate.create(key, GaugeData.empty())); - } - }; - } - - private Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>> - committedGaugeUpdateFn() { - return new Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>() { - @Override - public AttemptedAndCommitted<GaugeData> apply(MetricUpdate<GaugeData> input) { - MetricKey key = input.getKey(); - return new AttemptedAndCommitted<>( - key, - MetricUpdate.create(key, GaugeData.empty()), - input); - } - }; - } - - private Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>> attemptedCounterUpdateFn() { - return new Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>>() { - @Override - public AttemptedAndCommitted<Long> apply(MetricUpdate<Long> input) { - MetricKey key = input.getKey(); - return new AttemptedAndCommitted<>( - key, - input, - MetricUpdate.create(key, 0L)); - } - }; - } - - private Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>> committedCounterUpdateFn() { - return new Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>>() { - @Override - public AttemptedAndCommitted<Long> apply(MetricUpdate<Long> input) { - MetricKey key = input.getKey(); - return new AttemptedAndCommitted<>( - key, - MetricUpdate.create(key, 0L), - input); - } - }; - } - - @Override - public MetricQueryResults queryMetrics(MetricsFilter filter) { - return new QueryResults(filter); - } - - private class QueryResults implements MetricQueryResults { - private final MetricsFilter filter; - - private QueryResults(MetricsFilter filter) { - this.filter = filter; - } - - @Override - public Iterable<MetricResult<Long>> counters() { - return - FluentIterable - .from(counters.values()) - .filter(matchesFilter(filter)) - .transform(counterUpdateToResult()) - .toList(); - } - - @Override - public Iterable<MetricResult<DistributionResult>> distributions() { - return - FluentIterable - .from(distributions.values()) - .filter(matchesFilter(filter)) - .transform(distributionUpdateToResult()) - .toList(); - } - - @Override - public Iterable<MetricResult<GaugeResult>> gauges() { - return - FluentIterable - .from(gauges.values()) - .filter(matchesFilter(filter)) - .transform(gaugeUpdateToResult()) - .toList(); - } - - private Predicate<AttemptedAndCommitted<?>> matchesFilter(final MetricsFilter filter) { - return new Predicate<AttemptedAndCommitted<?>>() { - @Override - public boolean apply(AttemptedAndCommitted<?> attemptedAndCommitted) { - return MetricFiltering.matches(filter, attemptedAndCommitted.getKey()); - } - }; - } - } - - private Function<AttemptedAndCommitted<Long>, MetricResult<Long>> counterUpdateToResult() { - return new - Function<AttemptedAndCommitted<Long>, MetricResult<Long>>() { - @Override - public MetricResult<Long> - apply(AttemptedAndCommitted<Long> metricResult) { - MetricKey key = metricResult.getKey(); - return new AccumulatedMetricResult<>( - key.metricName(), - key.stepName(), - metricResult.getAttempted().getUpdate(), - isCommittedSupported - ? metricResult.getCommitted().getUpdate() - : null, - isCommittedSupported); - } - }; - } - - private Function<AttemptedAndCommitted<DistributionData>, MetricResult<DistributionResult>> - distributionUpdateToResult() { - return new - Function<AttemptedAndCommitted<DistributionData>, MetricResult<DistributionResult>>() { - @Override - public MetricResult<DistributionResult> - apply(AttemptedAndCommitted<DistributionData> metricResult) { - MetricKey key = metricResult.getKey(); - return new AccumulatedMetricResult<>( - key.metricName(), - key.stepName(), - metricResult.getAttempted().getUpdate().extractResult(), - isCommittedSupported - ? metricResult.getCommitted().getUpdate().extractResult() - : null, - isCommittedSupported); - } - }; - } - - private Function<AttemptedAndCommitted<GaugeData>, MetricResult<GaugeResult>> - gaugeUpdateToResult() { - return new - Function<AttemptedAndCommitted<GaugeData>, MetricResult<GaugeResult>>() { - @Override - public MetricResult<GaugeResult> - apply(AttemptedAndCommitted<GaugeData> metricResult) { - MetricKey key = metricResult.getKey(); - return new AccumulatedMetricResult<>( - key.metricName(), - key.stepName(), - metricResult.getAttempted().getUpdate().extractResult(), - isCommittedSupported - ? metricResult.getCommitted().getUpdate().extractResult() - : null, - isCommittedSupported); - } - }; - } - - @SuppressWarnings("ConstantConditions") - private void mergeCounters( - Map<MetricKey, AttemptedAndCommitted<Long>> counters, - Iterable<MetricUpdate<Long>> updates, - Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>> updateToAttemptedAndCommittedFn) { - for (MetricUpdate<Long> metricUpdate : updates) { - MetricKey key = metricUpdate.getKey(); - AttemptedAndCommitted<Long> update = - updateToAttemptedAndCommittedFn.apply(metricUpdate); - if (counters.containsKey(key)) { - AttemptedAndCommitted<Long> current = counters.get(key); - update = new AttemptedAndCommitted<>( - key, - MetricUpdate.create( - key, - update.getAttempted().getUpdate() + current.getAttempted().getUpdate()), - MetricUpdate.create( - key, - update.getCommitted().getUpdate() + current.getCommitted().getUpdate())); - } - counters.put(key, update); - } - } - - @SuppressWarnings("ConstantConditions") - private void mergeDistributions( - Map<MetricKey, AttemptedAndCommitted<DistributionData>> distributions, - Iterable<MetricUpdate<DistributionData>> updates, - Function<MetricUpdate<DistributionData>, AttemptedAndCommitted<DistributionData>> - updateToAttemptedAndCommittedFn) { - for (MetricUpdate<DistributionData> metricUpdate : updates) { - MetricKey key = metricUpdate.getKey(); - AttemptedAndCommitted<DistributionData> update = - updateToAttemptedAndCommittedFn.apply(metricUpdate); - if (distributions.containsKey(key)) { - AttemptedAndCommitted<DistributionData> current = distributions.get(key); - update = new AttemptedAndCommitted<>( - key, - MetricUpdate.create( - key, - update.getAttempted().getUpdate().combine(current.getAttempted().getUpdate())), - MetricUpdate.create( - key, - update.getCommitted().getUpdate().combine(current.getCommitted().getUpdate()))); - } - distributions.put(key, update); - } - } - - @SuppressWarnings("ConstantConditions") - private void mergeGauges( - Map<MetricKey, AttemptedAndCommitted<GaugeData>> gauges, - Iterable<MetricUpdate<GaugeData>> updates, - Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>> - updateToAttemptedAndCommittedFn) { - for (MetricUpdate<GaugeData> metricUpdate : updates) { - MetricKey key = metricUpdate.getKey(); - AttemptedAndCommitted<GaugeData> update = - updateToAttemptedAndCommittedFn.apply(metricUpdate); - if (gauges.containsKey(key)) { - AttemptedAndCommitted<GaugeData> current = gauges.get(key); - update = new AttemptedAndCommitted<>( - key, - MetricUpdate.create( - key, - update.getAttempted().getUpdate().combine(current.getAttempted().getUpdate())), - MetricUpdate.create( - key, - update.getCommitted().getUpdate().combine(current.getCommitted().getUpdate()))); - } - gauges.put(key, update); - } - } - - /** - * Accumulated implementation of {@link MetricResult}. - */ - private static class AccumulatedMetricResult<T> implements MetricResult<T> { - private final MetricName name; - private final String step; - private final T attempted; - private final T committed; - private final boolean isCommittedSupported; - - private AccumulatedMetricResult( - MetricName name, - String step, - T attempted, - T committed, - boolean isCommittedSupported) { - this.name = name; - this.step = step; - this.attempted = attempted; - this.committed = committed; - this.isCommittedSupported = isCommittedSupported; - } - - @Override - public MetricName name() { - return name; - } - - @Override - public String step() { - return step; - } - - @Override - public T committed() { - if (!isCommittedSupported) { - throw new UnsupportedOperationException("This runner does not currently support committed" - + " metrics results. Please use 'attempted' instead."); - } - return committed; - } - - @Override - public T attempted() { - return attempted; - } - } - - /** - * Attempted and committed {@link MetricUpdate MetricUpdates}. - */ - private static class AttemptedAndCommitted<T> { - private final MetricKey key; - private final MetricUpdate<T> attempted; - private final MetricUpdate<T> committed; - - private AttemptedAndCommitted(MetricKey key, MetricUpdate<T> attempted, - MetricUpdate<T> committed) { - this.key = key; - this.attempted = attempted; - this.committed = committed; - } - - private MetricKey getKey() { - return key; - } - - private MetricUpdate<T> getAttempted() { - return attempted; - } - - private MetricUpdate<T> getCommitted() { - return committed; - } - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java index a4b311f..dadec33 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java @@ -21,6 +21,9 @@ import java.io.Closeable; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.annotations.Internal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,7 +33,7 @@ import org.slf4j.LoggerFactory; * <p>Users should not interact directly with this class. Instead, use {@link Metrics} and the * returned objects to create and modify metrics. * - * <p>The runner should create {@link MetricsContainer} for each context in which metrics are + * <p>The runner should create a {@link MetricsContainer} for each context in which metrics are * reported (by step and name) and call {@link #setCurrentContainer} before invoking any code that * may update metrics within that step. It should call {@link #setCurrentContainer} again to restore * the previous container. @@ -39,9 +42,11 @@ import org.slf4j.LoggerFactory; * container for the current thread and get a {@link Closeable} that will restore the previous * container when closed. */ +@Experimental(Kind.METRICS) +@Internal public class MetricsEnvironment { - private static final Logger LOG = LoggerFactory.getLogger(MetricsContainer.class); + private static final Logger LOG = LoggerFactory.getLogger(MetricsEnvironment.class); private static final AtomicBoolean METRICS_SUPPORTED = new AtomicBoolean(false); private static final AtomicBoolean REPORTED_MISSING_CONTAINER = new AtomicBoolean(false); http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java deleted file mode 100644 index 8c26f18..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.metrics; - -import com.google.common.base.MoreObjects; -import com.google.common.collect.Iterables; -import java.io.Serializable; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import javax.annotation.Nullable; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.annotations.Experimental.Kind; - -/** - * A map from {@code K} to {@code T} that supports getting or creating values associated with a key - * in a thread-safe manner. - */ -@Experimental(Kind.METRICS) -public class MetricsMap<K, T> implements Serializable { - - /** Interface for creating instances to populate the {@link MetricsMap}. */ - public interface Factory<K, T> extends Serializable { - /** - * Create an instance of {@code T} to use with the given {@code key}. - * - * <p>It must be safe to call this from multiple threads. - */ - T createInstance(K key); - } - - private final Factory<K, T> factory; - private final ConcurrentMap<K, T> metrics = new ConcurrentHashMap<>(); - - public MetricsMap(Factory<K, T> factory) { - this.factory = factory; - } - - /** - * Get or create the value associated with the given key. - */ - public T get(K key) { - T metric = metrics.get(key); - if (metric == null) { - metric = factory.createInstance(key); - metric = MoreObjects.firstNonNull(metrics.putIfAbsent(key, metric), metric); - } - return metric; - } - - /** - * Get the value associated with the given key, if it exists. - */ - @Nullable - public T tryGet(K key) { - return metrics.get(key); - } - - /** - * Return an iterable over the entries in the current {@link MetricsMap}. - */ - public Iterable<Map.Entry<K, T>> entries() { - return Iterables.unmodifiableIterable(metrics.entrySet()); - } - - /** - * Return an iterable over the values in the current {@link MetricsMap}. - */ - public Iterable<T> values() { - return Iterables.unmodifiableIterable(metrics.values()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SinkMetrics.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SinkMetrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SinkMetrics.java index f96b6ac..f6f00bd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SinkMetrics.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SinkMetrics.java @@ -17,9 +17,13 @@ */ package org.apache.beam.sdk.metrics; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + /** * Standard Sink Metrics. */ +@Experimental(Kind.METRICS) public class SinkMetrics { private static final String SINK_NAMESPACE = "sink"; http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SourceMetrics.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SourceMetrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SourceMetrics.java index 4479f3a..1adbf0c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SourceMetrics.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SourceMetrics.java @@ -18,10 +18,13 @@ package org.apache.beam.sdk.metrics; import com.google.common.base.Joiner; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; /** * Standard {@link org.apache.beam.sdk.io.Source} Metrics. */ +@Experimental(Kind.METRICS) public class SourceMetrics { private static final String SOURCE_NAMESPACE = "source"; http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java deleted file mode 100644 index 26554d4..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.metrics; - -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link CounterCell}. - */ -@RunWith(JUnit4.class) -public class CounterCellTest { - - private CounterCell cell = new CounterCell(); - - @Test - public void testDeltaAndCumulative() { - cell.update(5); - cell.update(7); - assertThat(cell.getCumulative(), equalTo(12L)); - assertThat("getCumulative is idempotent", cell.getCumulative(), equalTo(12L)); - - assertThat(cell.getDirty().beforeCommit(), equalTo(true)); - cell.getDirty().afterCommit(); - assertThat(cell.getDirty().beforeCommit(), equalTo(false)); - assertThat(cell.getCumulative(), equalTo(12L)); - - cell.update(30); - assertThat(cell.getCumulative(), equalTo(42L)); - - assertThat(cell.getDirty().beforeCommit(), equalTo(true)); - cell.getDirty().afterCommit(); - assertThat(cell.getDirty().beforeCommit(), equalTo(false)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java deleted file mode 100644 index d00f8cd..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.metrics; - -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link DirtyStateTest}. - */ -@RunWith(JUnit4.class) -public class DirtyStateTest { - - private final DirtyState dirty = new DirtyState(); - - @Test - public void basicPath() { - assertThat("Should start dirty", dirty.beforeCommit(), is(true)); - dirty.afterCommit(); - assertThat("Should be clean after commit", dirty.beforeCommit(), is(false)); - - dirty.afterModification(); - assertThat("Should be dirty after change", dirty.beforeCommit(), is(true)); - dirty.afterCommit(); - assertThat("Should be clean after commit", dirty.beforeCommit(), is(false)); - } - - @Test - public void changeAfterBeforeCommit() { - assertThat("Should start dirty", dirty.beforeCommit(), is(true)); - dirty.afterModification(); - dirty.afterCommit(); - assertThat("Changes after beforeCommit should be dirty after afterCommit", - dirty.beforeCommit(), is(true)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java deleted file mode 100644 index 07e0b26..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.metrics; - -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link DistributionCell}. - */ -@RunWith(JUnit4.class) -public class DistributionCellTest { - private DistributionCell cell = new DistributionCell(); - - @Test - public void testDeltaAndCumulative() { - cell.update(5); - cell.update(7); - assertThat(cell.getCumulative(), equalTo(DistributionData.create(12, 2, 5, 7))); - assertThat("getCumulative is idempotent", - cell.getCumulative(), equalTo(DistributionData.create(12, 2, 5, 7))); - - assertThat(cell.getDirty().beforeCommit(), equalTo(true)); - cell.getDirty().afterCommit(); - assertThat(cell.getDirty().beforeCommit(), equalTo(false)); - - cell.update(30); - assertThat(cell.getCumulative(), equalTo(DistributionData.create(42, 3, 5, 30))); - - assertThat("Adding a new value made the cell dirty", - cell.getDirty().beforeCommit(), equalTo(true)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/GaugeCellTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/GaugeCellTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/GaugeCellTest.java deleted file mode 100644 index 7dbfc4b..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/GaugeCellTest.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.metrics; - -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; - -import org.junit.Test; - -/** - * Tests for {@link GaugeCell}. - */ -public class GaugeCellTest { - private GaugeCell cell = new GaugeCell(); - - @Test - public void testDeltaAndCumulative() { - cell.set(5); - cell.set(7); - assertThat(cell.getCumulative().value(), equalTo(GaugeData.create(7).value())); - assertThat("getCumulative is idempotent", - cell.getCumulative().value(), equalTo(7L)); - - assertThat(cell.getDirty().beforeCommit(), equalTo(true)); - cell.getDirty().afterCommit(); - assertThat(cell.getDirty().beforeCommit(), equalTo(false)); - - cell.set(30); - assertThat(cell.getCumulative().value(), equalTo(30L)); - - assertThat("Adding a new value made the cell dirty", - cell.getDirty().beforeCommit(), equalTo(true)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java deleted file mode 100644 index dc2fa0a..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.metrics; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.HashSet; -import java.util.Set; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link MetricFiltering}. - */ -@RunWith(JUnit4.class) -public class MetricFilteringTest { - private static final MetricName NAME1 = MetricName.named("ns1", "name1"); - - - private boolean matchesSubPath(String actualScope, String subPath) { - return MetricFiltering.subPathMatches(actualScope, subPath); - } - - @Test - public void testMatchCompositeStepNameFilters() { - // MetricsFilter with a Class-namespace + name filter + step filter. - // Successful match. - assertTrue(MetricFiltering.matches( - MetricsFilter.builder().addNameFilter( - MetricNameFilter.named(MetricFilteringTest.class, "myMetricName")) - .addStep("myStep").build(), - MetricKey.create( - "myBigStep/myStep", MetricName.named(MetricFilteringTest.class, "myMetricName")))); - - // Unsuccessful match. - assertFalse(MetricFiltering.matches( - MetricsFilter.builder().addNameFilter( - MetricNameFilter.named(MetricFilteringTest.class, "myMetricName")) - .addStep("myOtherStep").build(), - MetricKey.create( - "myOtherStepNoMatch/myStep", - MetricName.named(MetricFilteringTest.class, "myMetricName")))); - } - - @Test - public void testMatchStepNameFilters() { - // MetricsFilter with a Class-namespace + name filter + step filter. - // Successful match. - assertTrue(MetricFiltering.matches( - MetricsFilter.builder().addNameFilter( - MetricNameFilter.named(MetricFilteringTest.class, "myMetricName")) - .addStep("myStep").build(), - MetricKey.create("myStep", MetricName.named(MetricFilteringTest.class, "myMetricName")))); - - // Unsuccessful match. - assertFalse(MetricFiltering.matches( - MetricsFilter.builder().addNameFilter( - MetricNameFilter.named(MetricFilteringTest.class, "myMetricName")) - .addStep("myOtherStep").build(), - MetricKey.create("myStep", MetricName.named(MetricFilteringTest.class, "myMetricName")))); - } - - @Test - public void testMatchClassNamespaceFilters() { - // MetricsFilter with a Class-namespace + name filter. Without step filter. - // Successful match. - assertTrue(MetricFiltering.matches( - MetricsFilter.builder().addNameFilter( - MetricNameFilter.named(MetricFilteringTest.class, "myMetricName")).build(), - MetricKey.create("anyStep", MetricName.named(MetricFilteringTest.class, "myMetricName")))); - - // Unsuccessful match. - assertFalse(MetricFiltering.matches( - MetricsFilter.builder().addNameFilter( - MetricNameFilter.named(MetricFilteringTest.class, "myMetricName")).build(), - MetricKey.create("anyStep", MetricName.named(MetricFiltering.class, "myMetricName")))); - } - - @Test - public void testMatchStringNamespaceFilters() { - // MetricsFilter with a String-namespace + name filter. Without step filter. - // Successful match. - assertTrue( - MetricFiltering.matches( - MetricsFilter.builder().addNameFilter( - MetricNameFilter.named("myNamespace", "myMetricName")).build(), - MetricKey.create("anyStep", MetricName.named("myNamespace", "myMetricName")))); - - // Unsuccessful match. - assertFalse( - MetricFiltering.matches( - MetricsFilter.builder().addNameFilter( - MetricNameFilter.named("myOtherNamespace", "myMetricName")).build(), - MetricKey.create("anyStep", MetricName.named("myNamespace", "myMetricname")))); - } - - @Test - public void testMatchesSubPath() { - assertTrue("Match of the first element", - matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top1")); - assertTrue("Match of the first elements", - matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1")); - assertTrue("Match of the last elements", - matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Inner1/Bottom1")); - assertFalse("Substring match but no subpath match", - matchesSubPath("Top1/Outer1/Inner1/Bottom1", "op1/Outer1/Inner1")); - assertFalse("Substring match from start - but no subpath match", - matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top")); - } - - private boolean matchesScopeWithSingleFilter(String actualScope, String filter) { - Set<String> scopeFilter = new HashSet<String>(); - scopeFilter.add(filter); - return MetricFiltering.matchesScope(actualScope, scopeFilter); - } - - @Test - public void testMatchesScope() { - assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1")); - assertTrue(matchesScopeWithSingleFilter( - "Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inner1/Bottom1")); - assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1")); - assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inner1")); - assertFalse(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Inner1")); - assertFalse(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inn")); - } -}
