kennknowles commented on code in PR #34165: URL: https://github.com/apache/beam/pull/34165#discussion_r1981817787
########## sdks/java/io/kafka/jmh/build.gradle: ########## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id 'org.apache.beam.module' } Review Comment: Please use `build.gradle.kts` for new files ########## sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java: ########## @@ -129,19 +131,80 @@ static Map<String, Object> getOffsetConsumerConfig( return offsetConsumerConfig; } - // Maintains approximate average over last 1000 elements - static class MovingAvg { + /* + * Attempt to prevent false sharing by padding to at least 64 bytes. + * object header: 4, 8, 12 or 16 bytes + * alignment: at least 8 bytes + */ + @SuppressFBWarnings("UUF_UNUSED_FIELD") + private static class MovingAvgPadding { + byte p000, p001, p002, p003, p004, p005, p006, p007; + byte p010, p011, p012, p013, p014, p015, p016, p017; + byte p020, p021, p022, p023, p024, p025, p026, p027; + byte p030, p031, p032, p033, p034, p035, p036, p037; + byte p040, p041, p042, p043, p044, p045, p046, p047; + byte p050, p051, p052, p053, p054, p055, p056, p057; + byte p060, p061, p062, p063, p064, p065, p066, p067; + } + + // The accumulator's fields should be padded to at least 128 bytes (at least 1 or 2 + // cache lines). + private static class MovingAvgFields extends MovingAvgPadding { private static final int MOVING_AVG_WINDOW = 1000; - private double avg = 0; + + private static final AtomicLongFieldUpdater<MovingAvgFields> AVG = + AtomicLongFieldUpdater.newUpdater(MovingAvgFields.class, "avg"); + + private volatile long avg = 0; private long numUpdates = 0; - void update(double quantity) { - numUpdates++; - avg += (quantity - avg) / Math.min(MOVING_AVG_WINDOW, numUpdates); + protected double getAvg() { + return Double.longBitsToDouble(avg); + } + + protected void setAvg(final double value) { Review Comment: Just make these package-private and call them from the MovingAvg class. (see below: use a field instead of inheritance) ########## sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java: ########## @@ -129,19 +131,80 @@ static Map<String, Object> getOffsetConsumerConfig( return offsetConsumerConfig; } - // Maintains approximate average over last 1000 elements - static class MovingAvg { + /* + * Attempt to prevent false sharing by padding to at least 64 bytes. + * object header: 4, 8, 12 or 16 bytes + * alignment: at least 8 bytes + */ + @SuppressFBWarnings("UUF_UNUSED_FIELD") + private static class MovingAvgPadding { + byte p000, p001, p002, p003, p004, p005, p006, p007; + byte p010, p011, p012, p013, p014, p015, p016, p017; + byte p020, p021, p022, p023, p024, p025, p026, p027; + byte p030, p031, p032, p033, p034, p035, p036, p037; + byte p040, p041, p042, p043, p044, p045, p046, p047; + byte p050, p051, p052, p053, p054, p055, p056, p057; + byte p060, p061, p062, p063, p064, p065, p066, p067; + } + + // The accumulator's fields should be padded to at least 128 bytes (at least 1 or 2 + // cache lines). + private static class MovingAvgFields extends MovingAvgPadding { private static final int MOVING_AVG_WINDOW = 1000; - private double avg = 0; + + private static final AtomicLongFieldUpdater<MovingAvgFields> AVG = + AtomicLongFieldUpdater.newUpdater(MovingAvgFields.class, "avg"); + + private volatile long avg = 0; private long numUpdates = 0; - void update(double quantity) { - numUpdates++; - avg += (quantity - avg) / Math.min(MOVING_AVG_WINDOW, numUpdates); + protected double getAvg() { + return Double.longBitsToDouble(avg); + } + + protected void setAvg(final double value) { + AVG.lazySet(this, Double.doubleToRawLongBits(value)); + } + + protected long incrementAndGetNumUpdates() { + final long nextNumUpdates = Math.min(MOVING_AVG_WINDOW, numUpdates + 1); + numUpdates = nextNumUpdates; + return nextNumUpdates; + } + } + + /* + * Maintains approximate average over last 1000 elements. + * Usage is only thread-safe for a single producer and multiple consumers. + * + * Attempt to prevent false sharing by padding to 64 bytes. + * avg: 8 bytes + * numUpdates: 8 bytes + * alignment: at least 8 bytes + * + * Visibility and ordering of non-volatile loads/stores on numUpdates is guaranteed by volatile loads/stores on avg. + * Sanity of visibility is only useful when the writer thread changes since avg is the only field that can be shared between multiple concurrent threads. + */ + @SuppressFBWarnings("UUF_UNUSED_FIELD") + public static final class MovingAvg extends MovingAvgFields { Review Comment: This class should simply have a private `MovingAvgFields` member. ########## sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java: ########## @@ -129,19 +131,80 @@ static Map<String, Object> getOffsetConsumerConfig( return offsetConsumerConfig; } - // Maintains approximate average over last 1000 elements - static class MovingAvg { + /* + * Attempt to prevent false sharing by padding to at least 64 bytes. + * object header: 4, 8, 12 or 16 bytes + * alignment: at least 8 bytes + */ + @SuppressFBWarnings("UUF_UNUSED_FIELD") + private static class MovingAvgPadding { + byte p000, p001, p002, p003, p004, p005, p006, p007; + byte p010, p011, p012, p013, p014, p015, p016, p017; + byte p020, p021, p022, p023, p024, p025, p026, p027; + byte p030, p031, p032, p033, p034, p035, p036, p037; + byte p040, p041, p042, p043, p044, p045, p046, p047; + byte p050, p051, p052, p053, p054, p055, p056, p057; + byte p060, p061, p062, p063, p064, p065, p066, p067; + } + + // The accumulator's fields should be padded to at least 128 bytes (at least 1 or 2 + // cache lines). + private static class MovingAvgFields extends MovingAvgPadding { Review Comment: Why is the padding not just inlined here? It would be clearer what is going on and saves a layer of inheritance. ########## sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java: ########## @@ -129,19 +131,80 @@ static Map<String, Object> getOffsetConsumerConfig( return offsetConsumerConfig; } - // Maintains approximate average over last 1000 elements - static class MovingAvg { + /* + * Attempt to prevent false sharing by padding to at least 64 bytes. + * object header: 4, 8, 12 or 16 bytes + * alignment: at least 8 bytes + */ + @SuppressFBWarnings("UUF_UNUSED_FIELD") + private static class MovingAvgPadding { + byte p000, p001, p002, p003, p004, p005, p006, p007; Review Comment: Was this a reaction to a measured slowdown? I'm not like super opposed to having something like this, but I expect it's relevance and effectiveness to wane as the codebase evolves, unless we have some fairly deep and clear understanding of exactly what needs to be maintained. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org