JayajP commented on code in PR #30769: URL: https://github.com/apache/beam/pull/30769#discussion_r1543499731
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/LockFreeHistogram.java: ########## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker; + +import com.google.auto.value.AutoValue; +import com.google.auto.value.extension.memoized.Memoized; +import java.io.Serializable; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLongArray; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.beam.sdk.metrics.Histogram; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.util.HistogramData; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.ImmutableLongArray; + +/** + * A lock free implementation of {@link org.apache.beam.sdk.metrics.Histogram}. This class supports + * extracting delta updates with the {@link #getSnapshotAndReset} method. + */ +public class LockFreeHistogram implements Histogram { + private final HistogramData.BucketType bucketType; + private final AtomicLongArray buckets; + private final MetricName name; + + private AtomicReference<OutlierStatistic> underflowStatistic = + new AtomicReference<LockFreeHistogram.OutlierStatistic>(OutlierStatistic.EMPTY); + private AtomicReference<OutlierStatistic> overflowStatistic = + new AtomicReference<LockFreeHistogram.OutlierStatistic>(OutlierStatistic.EMPTY); + + /** + * Whether this histogram has updates that have not been extracted by {@code getSnapshotAndReset}. + * This values should be flipped to true AFTER recording a value, and flipped to false BEFORE + * extracting a snapshot. This ensures that recorded values will always be seen by a futrue {@code + * getSnapshotAndReset} call. + */ + private AtomicBoolean dirty = new AtomicBoolean(false); + + /** + * Represents the sum and mean of a collection of numbers. Used to represent the + * underflow/overflow statistics of a histogram. + */ + @AutoValue + public abstract static class OutlierStatistic implements Serializable { + abstract double sum(); + + public abstract long count(); + + public static final OutlierStatistic EMPTY = create(0, 0); + + public static OutlierStatistic create(double sum, long count) { + return new AutoValue_LockFreeHistogram_OutlierStatistic(sum, count); + } + + public OutlierStatistic combine(double value) { + return create(sum() + value, count() + 1); + } + + public double mean() { + if (count() == 0) { + return 0; + } + return sum() / count(); + } + } + + /** + * The snapshot of a histogram. The snapshot contains the overflow/underflow statistic, number of + * values recorded in each bucket, and the BucketType of the underlying histogram. + */ + @AutoValue + public abstract static class Snapshot { + public abstract OutlierStatistic underflowStatistic(); + + public abstract OutlierStatistic overflowStatistic(); + + public abstract ImmutableLongArray buckets(); + + public abstract HistogramData.BucketType bucketType(); + + public static Snapshot create( + OutlierStatistic underflowStatistic, + OutlierStatistic overflowStatistic, + ImmutableLongArray buckets, + HistogramData.BucketType bucketType) { + return new AutoValue_LockFreeHistogram_Snapshot( + underflowStatistic, overflowStatistic, buckets, bucketType); + } + + @Memoized + public long totalCount() { + long count = 0; + count += underflowStatistic().count(); + count += overflowStatistic().count(); + count += buckets().stream().sum(); + + return count; + } + } + + /** + * Extract a delta update of this histogram. Update represents values that have been recorded in + * this histogram since the last time this method was called. + * + * <p>If this histogram is being updated concurrent to this method, then the returned snapshot is + * not guarenteed to contain those updates. However, those updates are not dropped and will be + * represented in a future call to this method. + * + * <p>If this histogram has not been updated since the last call to this method, an empty optional + * is returned. + */ + public Optional<Snapshot> getSnapshotAndReset() { + if (!dirty.getAndSet(false)) { + return Optional.empty(); + } + + ImmutableLongArray.Builder bucketsSnapshotBuilder = + ImmutableLongArray.builder(buckets.length()); + for (int i = 0; i < buckets.length(); i++) { + bucketsSnapshotBuilder.add(buckets.getAndSet(i, 0)); + } + OutlierStatistic overflowSnapshot = overflowStatistic.getAndSet(OutlierStatistic.EMPTY); + OutlierStatistic underflowSnapshot = underflowStatistic.getAndSet(OutlierStatistic.EMPTY); + + return Optional.of( + Snapshot.create( + underflowSnapshot, overflowSnapshot, bucketsSnapshotBuilder.build(), bucketType)); + } + + /** Create a histogram. */ + public LockFreeHistogram(KV<MetricName, HistogramData.BucketType> kv) { + this.name = kv.getKey(); + this.bucketType = kv.getValue(); + this.buckets = new AtomicLongArray(bucketType.getNumBuckets()); + } + + @Override + public MetricName getName() { + return name; + } + + private void updateInternal(double value) { + double rangeTo = bucketType.getRangeTo(); + double rangeFrom = bucketType.getRangeFrom(); + if (value >= rangeTo) { + recordTopRecordsValue(value); + } else if (value < rangeFrom) { + recordBottomRecordsValue(value); + } else { + recordInBoundsValue(value); + } + } + + @Override + public void update(double value) { + updateInternal(value); + dirty.set(true); Review Comment: It won’t throw any ConcurrentModificationsErrors but, you’re right we don’t atomically flip this variable and update the metric. I’m using this as an optimization for the thread that extracts metrics if there’s a LockFreeHistogram that’s not being written into. However, this variable is not necessary from a correctness standpoint (see details below), so I can remove it if you’d like. This variable is updated when recording metrics/extracting updates. It does not affect the record path. On the extract path, we can either get a false positive (`dirty` is `true` when there’s no updates in the histogram) or a false negative (`dirty` is `false` when there are updates to the underlying histogram). - In the false positive scenario, after we extract a snapshot we check that it’s non-empty before sending it to DFE so empty snapshots will just get dropped. - In the false negative scenario, since we always set this value to true after recording a value in the histogram a future call to `getSnapshotAndReset` will see `dirty` set to `true` and extract the update. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/LockFreeHistogram.java: ########## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker; + +import com.google.auto.value.AutoValue; +import com.google.auto.value.extension.memoized.Memoized; +import java.io.Serializable; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLongArray; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.beam.sdk.metrics.Histogram; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.util.HistogramData; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.ImmutableLongArray; + +/** + * A lock free implementation of {@link org.apache.beam.sdk.metrics.Histogram}. This class supports + * extracting delta updates with the {@link #getSnapshotAndReset} method. + */ +public class LockFreeHistogram implements Histogram { + private final HistogramData.BucketType bucketType; + private final AtomicLongArray buckets; + private final MetricName name; + + private AtomicReference<OutlierStatistic> underflowStatistic = + new AtomicReference<LockFreeHistogram.OutlierStatistic>(OutlierStatistic.EMPTY); + private AtomicReference<OutlierStatistic> overflowStatistic = + new AtomicReference<LockFreeHistogram.OutlierStatistic>(OutlierStatistic.EMPTY); + + /** + * Whether this histogram has updates that have not been extracted by {@code getSnapshotAndReset}. + * This values should be flipped to true AFTER recording a value, and flipped to false BEFORE + * extracting a snapshot. This ensures that recorded values will always be seen by a futrue {@code + * getSnapshotAndReset} call. + */ + private AtomicBoolean dirty = new AtomicBoolean(false); + + /** + * Represents the sum and mean of a collection of numbers. Used to represent the + * underflow/overflow statistics of a histogram. + */ + @AutoValue + public abstract static class OutlierStatistic implements Serializable { + abstract double sum(); + + public abstract long count(); + + public static final OutlierStatistic EMPTY = create(0, 0); + + public static OutlierStatistic create(double sum, long count) { + return new AutoValue_LockFreeHistogram_OutlierStatistic(sum, count); + } + + public OutlierStatistic combine(double value) { + return create(sum() + value, count() + 1); + } + + public double mean() { + if (count() == 0) { + return 0; + } + return sum() / count(); + } + } + + /** + * The snapshot of a histogram. The snapshot contains the overflow/underflow statistic, number of + * values recorded in each bucket, and the BucketType of the underlying histogram. + */ + @AutoValue + public abstract static class Snapshot { + public abstract OutlierStatistic underflowStatistic(); + + public abstract OutlierStatistic overflowStatistic(); + + public abstract ImmutableLongArray buckets(); + + public abstract HistogramData.BucketType bucketType(); + + public static Snapshot create( + OutlierStatistic underflowStatistic, + OutlierStatistic overflowStatistic, + ImmutableLongArray buckets, + HistogramData.BucketType bucketType) { + return new AutoValue_LockFreeHistogram_Snapshot( + underflowStatistic, overflowStatistic, buckets, bucketType); + } + + @Memoized + public long totalCount() { + long count = 0; + count += underflowStatistic().count(); + count += overflowStatistic().count(); + count += buckets().stream().sum(); + + return count; + } + } + + /** + * Extract a delta update of this histogram. Update represents values that have been recorded in + * this histogram since the last time this method was called. + * + * <p>If this histogram is being updated concurrent to this method, then the returned snapshot is + * not guarenteed to contain those updates. However, those updates are not dropped and will be + * represented in a future call to this method. + * + * <p>If this histogram has not been updated since the last call to this method, an empty optional + * is returned. + */ + public Optional<Snapshot> getSnapshotAndReset() { + if (!dirty.getAndSet(false)) { + return Optional.empty(); + } + + ImmutableLongArray.Builder bucketsSnapshotBuilder = + ImmutableLongArray.builder(buckets.length()); + for (int i = 0; i < buckets.length(); i++) { + bucketsSnapshotBuilder.add(buckets.getAndSet(i, 0)); + } + OutlierStatistic overflowSnapshot = overflowStatistic.getAndSet(OutlierStatistic.EMPTY); + OutlierStatistic underflowSnapshot = underflowStatistic.getAndSet(OutlierStatistic.EMPTY); + + return Optional.of( + Snapshot.create( + underflowSnapshot, overflowSnapshot, bucketsSnapshotBuilder.build(), bucketType)); + } + + /** Create a histogram. */ + public LockFreeHistogram(KV<MetricName, HistogramData.BucketType> kv) { + this.name = kv.getKey(); + this.bucketType = kv.getValue(); + this.buckets = new AtomicLongArray(bucketType.getNumBuckets()); + } + + @Override + public MetricName getName() { + return name; + } + + private void updateInternal(double value) { + double rangeTo = bucketType.getRangeTo(); + double rangeFrom = bucketType.getRangeFrom(); + if (value >= rangeTo) { + recordTopRecordsValue(value); + } else if (value < rangeFrom) { + recordBottomRecordsValue(value); + } else { + recordInBoundsValue(value); + } + } + + @Override + public void update(double value) { + updateInternal(value); + dirty.set(true); + } + + public void update(double... values) { Review Comment: See comments above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
