rohdesamuel commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1113713921
##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java:
##########
@@ -80,14 +84,50 @@ public void testActsLikeCircularBuffer() throws Exception {
// The first 10 are always sampled, but with maxSamples = 5, the first ten
are downsampled to
// 4..9 inclusive. Then,
// the 20th element is sampled (19) and every 20 after.
- List<byte[]> expected = new ArrayList<>();
+ List<BeamFnApi.SampledElement> expected = new ArrayList<>();
expected.add(encodeInt(19));
expected.add(encodeInt(39));
expected.add(encodeInt(59));
expected.add(encodeInt(79));
expected.add(encodeInt(99));
- List<byte[]> samples = outputSampler.samples();
+ List<BeamFnApi.SampledElement> samples = outputSampler.samples();
assertThat(samples, containsInAnyOrder(expected.toArray()));
}
+
+ /**
+ * Test that sampling a PCollection while retrieving samples from multiple
threads is ok.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testConcurrentSamples() throws Exception {
+ VarIntCoder coder = VarIntCoder.of();
+ OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 100000,
1);
+
+ // Iteration count was empirically chosen to have a high probability of
failure without the
+ // test going for too long.
+ Thread sampleThreadA =
+ new Thread(
+ () -> {
+ for (int i = 0; i < 10000000; i++) {
+ outputSampler.sample(i);
+ }
+ });
+
+ Thread sampleThreadB =
+ new Thread(
+ () -> {
+ for (int i = 0; i < 10000000; i++) {
+ outputSampler.sample(i);
+ }
+ });
+
+ sampleThreadA.start();
+ sampleThreadB.start();
+
+ for (int i = 0; i < 10000; i++) {
+ outputSampler.samples();
+ }
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]