This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 26049437ebb Switch to use ConcurrentMap for StringSetData (#33057)
26049437ebb is described below
commit 26049437ebbe9cd1ef2574e2a704cc3403fe871a
Author: Yi Hu <[email protected]>
AuthorDate: Tue Nov 12 14:28:38 2024 -0500
Switch to use ConcurrentMap for StringSetData (#33057)
* Switch to use ConcurrentMap for StringSetData
* address comments
---
.../beam/runners/core/metrics/StringSetData.java | 23 ++++++-----
.../runners/core/metrics/StringSetCellTest.java | 44 ++++++++++++++++++++++
2 files changed, 57 insertions(+), 10 deletions(-)
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetData.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetData.java
index 4fc5d3beca3..5f9bb6392ec 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetData.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetData.java
@@ -20,8 +20,8 @@ package org.apache.beam.runners.core.metrics;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.Arrays;
-import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.sdk.metrics.StringSetResult;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
@@ -54,13 +54,13 @@ public abstract class StringSetData implements Serializable
{
if (set.isEmpty()) {
return empty();
}
- HashSet<String> combined = new HashSet<>();
+ Set<String> combined = ConcurrentHashMap.newKeySet();
long stringSize = addUntilCapacity(combined, 0L, set);
return new AutoValue_StringSetData(combined, stringSize);
}
/** Returns a {@link StringSetData} which is made from the given set in
place. */
- private static StringSetData createInPlace(HashSet<String> set, long
stringSize) {
+ private static StringSetData createInPlace(Set<String> set, long stringSize)
{
return new AutoValue_StringSetData(set, stringSize);
}
@@ -76,11 +76,12 @@ public abstract class StringSetData implements Serializable
{
* <p>>Should only be used by {@link StringSetCell#add}.
*/
public StringSetData addAll(String... strings) {
- HashSet<String> combined;
- if (this.stringSet() instanceof HashSet) {
- combined = (HashSet<String>) this.stringSet();
+ Set<String> combined;
+ if (this.stringSet() instanceof ConcurrentHashMap.KeySetView) {
+ combined = this.stringSet();
} else {
- combined = new HashSet<>(this.stringSet());
+ combined = ConcurrentHashMap.newKeySet();
+ combined.addAll(this.stringSet());
}
long stringSize = addUntilCapacity(combined, this.stringSize(),
Arrays.asList(strings));
return StringSetData.createInPlace(combined, stringSize);
@@ -95,7 +96,8 @@ public abstract class StringSetData implements Serializable {
} else if (other.stringSet().isEmpty()) {
return this;
} else {
- HashSet<String> combined = new HashSet<>(this.stringSet());
+ Set<String> combined = ConcurrentHashMap.newKeySet();
+ combined.addAll(this.stringSet());
long stringSize = addUntilCapacity(combined, this.stringSize(),
other.stringSet());
return StringSetData.createInPlace(combined, stringSize);
}
@@ -105,7 +107,8 @@ public abstract class StringSetData implements Serializable
{
* Combines this {@link StringSetData} with others, all original
StringSetData are left intact.
*/
public StringSetData combine(Iterable<StringSetData> others) {
- HashSet<String> combined = new HashSet<>(this.stringSet());
+ Set<String> combined = ConcurrentHashMap.newKeySet();
+ combined.addAll(this.stringSet());
long stringSize = this.stringSize();
for (StringSetData other : others) {
stringSize = addUntilCapacity(combined, stringSize, other.stringSet());
@@ -120,7 +123,7 @@ public abstract class StringSetData implements Serializable
{
/** Add strings into set until reach capacity. Return the all string size of
added set. */
private static long addUntilCapacity(
- HashSet<String> combined, long currentSize, Iterable<String> others) {
+ Set<String> combined, long currentSize, Iterable<String> others) {
if (currentSize > STRING_SET_SIZE_LIMIT) {
// already at capacity
return currentSize;
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetCellTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetCellTest.java
index f78ed01603f..9497bbe43d0 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetCellTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetCellTest.java
@@ -20,7 +20,13 @@ package org.apache.beam.runners.core.metrics;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.metrics.MetricName;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.junit.Assert;
@@ -94,4 +100,42 @@ public class StringSetCellTest {
assertThat(stringSetCell.getCumulative(), equalTo(StringSetData.empty()));
assertThat(stringSetCell.getDirty(), equalTo(new DirtyState()));
}
+
+ @Test(timeout = 5000)
+ public void testStringSetCellConcurrentAddRetrieval() throws
InterruptedException {
+ StringSetCell cell = new StringSetCell(MetricName.named("namespace",
"name"));
+ AtomicBoolean finished = new AtomicBoolean(false);
+ Thread increment =
+ new Thread(
+ () -> {
+ for (long i = 0; !finished.get(); ++i) {
+ cell.add(String.valueOf(i));
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ });
+ increment.start();
+ Instant start = Instant.now();
+ try {
+ while (true) {
+ Set<String> s = cell.getCumulative().stringSet();
+ List<String> snapshot = new ArrayList<>(s);
+ if (Instant.now().isAfter(start.plusSeconds(3)) && snapshot.size() >
0) {
+ finished.compareAndSet(false, true);
+ break;
+ }
+ }
+ } finally {
+ increment.interrupt();
+ increment.join();
+ }
+
+ Set<String> s = cell.getCumulative().stringSet();
+ for (long i = 0; i < s.size(); ++i) {
+ assertTrue(s.contains(String.valueOf(i)));
+ }
+ }
}