rohdesamuel commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1113713598
##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java:
##########
@@ -193,4 +221,44 @@ public void testFiltersMultiplePCollectionIds() throws
Exception {
assertHasSamples(samples, "a", ImmutableList.of(encodeString("a1"),
encodeString("a2")));
assertHasSamples(samples, "c", ImmutableList.of(encodeString("c1"),
encodeString("c2")));
}
+
+ /**
+ * Test that samples can be taken from the DataSampler while adding new
OutputSamplers. This fails
+ * with a ConcurrentModificationException if there is a bug.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testConcurrentNewSampler() throws Exception {
+ DataSampler sampler = new DataSampler();
+ VarIntCoder coder = VarIntCoder.of();
+
+ // Create a thread that constantly creates new samplers.
+ Thread sampleThread =
+ new Thread(
+ () -> {
+ for (int i = 0; i < 1000000; i++) {
+ sampler.sampleOutput("pcollection-" + i, coder).sample(0);
+
+ // This sleep is here to allow for the test to stop this
thread.
+ try {
+ Thread.sleep(0);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ });
+
+ sampleThread.start();
+
+ for (int i = 0; i < 20; i++) {
+ sampler.handleDataSampleRequest(
Review Comment:
Cool, used a CountDownLatch
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]