This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 26049437ebb Switch to use ConcurrentMap for StringSetData (#33057)
26049437ebb is described below

commit 26049437ebbe9cd1ef2574e2a704cc3403fe871a
Author: Yi Hu <[email protected]>
AuthorDate: Tue Nov 12 14:28:38 2024 -0500

    Switch to use ConcurrentMap for StringSetData (#33057)
    
    * Switch to use ConcurrentMap for StringSetData
    
    * address comments
---
 .../beam/runners/core/metrics/StringSetData.java   | 23 ++++++-----
 .../runners/core/metrics/StringSetCellTest.java    | 44 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 10 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetData.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetData.java
index 4fc5d3beca3..5f9bb6392ec 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetData.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetData.java
@@ -20,8 +20,8 @@ package org.apache.beam.runners.core.metrics;
 import com.google.auto.value.AutoValue;
 import java.io.Serializable;
 import java.util.Arrays;
-import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.beam.sdk.metrics.StringSetResult;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
@@ -54,13 +54,13 @@ public abstract class StringSetData implements Serializable 
{
     if (set.isEmpty()) {
       return empty();
     }
-    HashSet<String> combined = new HashSet<>();
+    Set<String> combined = ConcurrentHashMap.newKeySet();
     long stringSize = addUntilCapacity(combined, 0L, set);
     return new AutoValue_StringSetData(combined, stringSize);
   }
 
   /** Returns a {@link StringSetData} which is made from the given set in 
place. */
-  private static StringSetData createInPlace(HashSet<String> set, long 
stringSize) {
+  private static StringSetData createInPlace(Set<String> set, long stringSize) 
{
     return new AutoValue_StringSetData(set, stringSize);
   }
 
@@ -76,11 +76,12 @@ public abstract class StringSetData implements Serializable 
{
    * <p>>Should only be used by {@link StringSetCell#add}.
    */
   public StringSetData addAll(String... strings) {
-    HashSet<String> combined;
-    if (this.stringSet() instanceof HashSet) {
-      combined = (HashSet<String>) this.stringSet();
+    Set<String> combined;
+    if (this.stringSet() instanceof ConcurrentHashMap.KeySetView) {
+      combined = this.stringSet();
     } else {
-      combined = new HashSet<>(this.stringSet());
+      combined = ConcurrentHashMap.newKeySet();
+      combined.addAll(this.stringSet());
     }
     long stringSize = addUntilCapacity(combined, this.stringSize(), 
Arrays.asList(strings));
     return StringSetData.createInPlace(combined, stringSize);
@@ -95,7 +96,8 @@ public abstract class StringSetData implements Serializable {
     } else if (other.stringSet().isEmpty()) {
       return this;
     } else {
-      HashSet<String> combined = new HashSet<>(this.stringSet());
+      Set<String> combined = ConcurrentHashMap.newKeySet();
+      combined.addAll(this.stringSet());
       long stringSize = addUntilCapacity(combined, this.stringSize(), 
other.stringSet());
       return StringSetData.createInPlace(combined, stringSize);
     }
@@ -105,7 +107,8 @@ public abstract class StringSetData implements Serializable 
{
    * Combines this {@link StringSetData} with others, all original 
StringSetData are left intact.
    */
   public StringSetData combine(Iterable<StringSetData> others) {
-    HashSet<String> combined = new HashSet<>(this.stringSet());
+    Set<String> combined = ConcurrentHashMap.newKeySet();
+    combined.addAll(this.stringSet());
     long stringSize = this.stringSize();
     for (StringSetData other : others) {
       stringSize = addUntilCapacity(combined, stringSize, other.stringSet());
@@ -120,7 +123,7 @@ public abstract class StringSetData implements Serializable 
{
 
   /** Add strings into set until reach capacity. Return the all string size of 
added set. */
   private static long addUntilCapacity(
-      HashSet<String> combined, long currentSize, Iterable<String> others) {
+      Set<String> combined, long currentSize, Iterable<String> others) {
     if (currentSize > STRING_SET_SIZE_LIMIT) {
       // already at capacity
       return currentSize;
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetCellTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetCellTest.java
index f78ed01603f..9497bbe43d0 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetCellTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetCellTest.java
@@ -20,7 +20,13 @@ package org.apache.beam.runners.core.metrics;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.sdk.metrics.MetricName;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
 import org.junit.Assert;
@@ -94,4 +100,42 @@ public class StringSetCellTest {
     assertThat(stringSetCell.getCumulative(), equalTo(StringSetData.empty()));
     assertThat(stringSetCell.getDirty(), equalTo(new DirtyState()));
   }
+
+  @Test(timeout = 5000)
+  public void testStringSetCellConcurrentAddRetrieval() throws 
InterruptedException {
+    StringSetCell cell = new StringSetCell(MetricName.named("namespace", 
"name"));
+    AtomicBoolean finished = new AtomicBoolean(false);
+    Thread increment =
+        new Thread(
+            () -> {
+              for (long i = 0; !finished.get(); ++i) {
+                cell.add(String.valueOf(i));
+                try {
+                  Thread.sleep(1);
+                } catch (InterruptedException e) {
+                  break;
+                }
+              }
+            });
+    increment.start();
+    Instant start = Instant.now();
+    try {
+      while (true) {
+        Set<String> s = cell.getCumulative().stringSet();
+        List<String> snapshot = new ArrayList<>(s);
+        if (Instant.now().isAfter(start.plusSeconds(3)) && snapshot.size() > 
0) {
+          finished.compareAndSet(false, true);
+          break;
+        }
+      }
+    } finally {
+      increment.interrupt();
+      increment.join();
+    }
+
+    Set<String> s = cell.getCumulative().stringSet();
+    for (long i = 0; i < s.size(); ++i) {
+      assertTrue(s.contains(String.valueOf(i)));
+    }
+  }
 }

Reply via email to