sjvanrossum commented on code in PR #34165:
URL: https://github.com/apache/beam/pull/34165#discussion_r1989505870


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java:
##########
@@ -129,19 +131,80 @@ static Map<String, Object> getOffsetConsumerConfig(
     return offsetConsumerConfig;
   }
 
-  // Maintains approximate average over last 1000 elements
-  static class MovingAvg {
+  /*
+   * Attempt to prevent false sharing by padding to at least 64 bytes.
+   * object header: 4, 8, 12 or 16 bytes
+   * alignment: at least 8 bytes
+   */
+  @SuppressFBWarnings("UUF_UNUSED_FIELD")
+  private static class MovingAvgPadding {
+    byte p000, p001, p002, p003, p004, p005, p006, p007;
+    byte p010, p011, p012, p013, p014, p015, p016, p017;
+    byte p020, p021, p022, p023, p024, p025, p026, p027;
+    byte p030, p031, p032, p033, p034, p035, p036, p037;
+    byte p040, p041, p042, p043, p044, p045, p046, p047;
+    byte p050, p051, p052, p053, p054, p055, p056, p057;
+    byte p060, p061, p062, p063, p064, p065, p066, p067;
+  }
+
+  // The accumulator's fields should be padded to at least 128 bytes (at least 
1 or 2
+  // cache lines).
+  private static class MovingAvgFields extends MovingAvgPadding {
     private static final int MOVING_AVG_WINDOW = 1000;
-    private double avg = 0;
+
+    private static final AtomicLongFieldUpdater<MovingAvgFields> AVG =
+        AtomicLongFieldUpdater.newUpdater(MovingAvgFields.class, "avg");
+
+    private volatile long avg = 0;
     private long numUpdates = 0;
 
-    void update(double quantity) {
-      numUpdates++;
-      avg += (quantity - avg) / Math.min(MOVING_AVG_WINDOW, numUpdates);
+    protected double getAvg() {
+      return Double.longBitsToDouble(avg);
+    }
+
+    protected void setAvg(final double value) {
+      AVG.lazySet(this, Double.doubleToRawLongBits(value));
+    }
+
+    protected long incrementAndGetNumUpdates() {
+      final long nextNumUpdates = Math.min(MOVING_AVG_WINDOW, numUpdates + 1);
+      numUpdates = nextNumUpdates;
+      return nextNumUpdates;
+    }
+  }
+
+  /*
+   * Maintains approximate average over last 1000 elements.
+   * Usage is only thread-safe for a single producer and multiple consumers.
+   *
+   * Attempt to prevent false sharing by padding to 64 bytes.
+   * avg: 8 bytes
+   * numUpdates: 8 bytes
+   * alignment: at least 8 bytes
+   *
+   * Visibility and ordering of non-volatile loads/stores on numUpdates is 
guaranteed by volatile loads/stores on avg.
+   * Sanity of visibility is only useful when the writer thread changes since 
avg is the only field that can be shared between multiple concurrent threads.
+   */
+  @SuppressFBWarnings("UUF_UNUSED_FIELD")
+  public static final class MovingAvg extends MovingAvgFields {

Review Comment:
   Removed layout padding, see comment below.



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java:
##########
@@ -129,19 +131,80 @@ static Map<String, Object> getOffsetConsumerConfig(
     return offsetConsumerConfig;
   }
 
-  // Maintains approximate average over last 1000 elements
-  static class MovingAvg {
+  /*
+   * Attempt to prevent false sharing by padding to at least 64 bytes.
+   * object header: 4, 8, 12 or 16 bytes
+   * alignment: at least 8 bytes
+   */
+  @SuppressFBWarnings("UUF_UNUSED_FIELD")
+  private static class MovingAvgPadding {
+    byte p000, p001, p002, p003, p004, p005, p006, p007;
+    byte p010, p011, p012, p013, p014, p015, p016, p017;
+    byte p020, p021, p022, p023, p024, p025, p026, p027;
+    byte p030, p031, p032, p033, p034, p035, p036, p037;
+    byte p040, p041, p042, p043, p044, p045, p046, p047;
+    byte p050, p051, p052, p053, p054, p055, p056, p057;
+    byte p060, p061, p062, p063, p064, p065, p066, p067;
+  }
+
+  // The accumulator's fields should be padded to at least 128 bytes (at least 
1 or 2
+  // cache lines).
+  private static class MovingAvgFields extends MovingAvgPadding {
     private static final int MOVING_AVG_WINDOW = 1000;
-    private double avg = 0;
+
+    private static final AtomicLongFieldUpdater<MovingAvgFields> AVG =
+        AtomicLongFieldUpdater.newUpdater(MovingAvgFields.class, "avg");
+
+    private volatile long avg = 0;
     private long numUpdates = 0;
 
-    void update(double quantity) {
-      numUpdates++;
-      avg += (quantity - avg) / Math.min(MOVING_AVG_WINDOW, numUpdates);
+    protected double getAvg() {
+      return Double.longBitsToDouble(avg);
+    }
+
+    protected void setAvg(final double value) {

Review Comment:
   Removed layout padding, see comment below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to