lukecwik commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1113627698
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -35,22 +34,22 @@
* simultaneously, even if computing the same logical PCollection.
*/
public class DataSampler {
+ private static final Logger LOG = LoggerFactory.getLogger(DataSampler.class);
/**
* Creates a DataSampler to sample every 1000 elements while keeping a
maximum of 10 in memory.
*/
public DataSampler() {
- this.maxSamples = 10;
- this.sampleEveryN = 1000;
+ this(10, 1000);
}
/**
* @param maxSamples Sets the maximum number of samples held in memory at
once.
* @param sampleEveryN Sets how often to sample.
*/
public DataSampler(int maxSamples, int sampleEveryN) {
- this.maxSamples = maxSamples;
- this.sampleEveryN = sampleEveryN;
+ this.maxSamples = maxSamples <= 0 ? 10 : maxSamples;
+ this.sampleEveryN = sampleEveryN <= 0 ? 1000 : sampleEveryN;
Review Comment:
Its usually better to throw an IllegalArgumentException in these cases
instead of silently having a different behavior then before.
```suggestion
checkArgument(maxSamples > 0, "Expected positive number of samples, did
you mean to disable data sampling?");
checkArgument(sampleEveryN > 0, "Expected positive number for sampling
period, did you mean to disable data sampling?");
this.maxSamples = maxSamples;
this.sampleEveryN = sampleEveryN;
```
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java:
##########
@@ -91,10 +94,13 @@ public void sample(T element) {
/**
* Clears samples at end of call. This is to help mitigate memory use.
*
+ * <p>This method is invoked by a thread handling a data sampling request in
parallel to any calls
+ * to {@link #sample}.
+ *
* @return samples taken since last call.
*/
- public List<byte[]> samples() {
- List<byte[]> ret = new ArrayList<>();
+ public List<BeamFnApi.SampledElement> samples() throws IOException {
+ List<BeamFnApi.SampledElement> ret = new ArrayList<>();
// Serializing can take a lot of CPU time for larger or complex elements.
Copy the array here
// so as to not slow down the main processing hot path.
Review Comment:
Since your only ever accessing buffer under a synchronized block it will be
better to do a buffer swap then to copy the buffer.
e.g.
```
List<T> bufferToSend;
synchronized (this) {
bufferToSend = buffer;
buffer = new ArrayList(maxElements);
resampleIndex = 0;
}
```
Saves on copying the elements and clearing the original.
##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java:
##########
@@ -117,6 +127,24 @@ public void testSingleOutput() throws Exception {
assertHasSamples(samples, "pcollection-id",
Collections.singleton(encodeInt(1)));
}
+ /**
+ * Smoke test that a samples show in the output map.
Review Comment:
```suggestion
* Smoke test that a sample shows in the output map.
```
##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java:
##########
@@ -117,6 +127,24 @@ public void testSingleOutput() throws Exception {
assertHasSamples(samples, "pcollection-id",
Collections.singleton(encodeInt(1)));
}
+ /**
+ * Smoke test that a samples show in the output map.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testNestedContext() throws Exception {
+ DataSampler sampler = new DataSampler();
+
+ String rawString = "hello";
+ byte[] byteArray = rawString.getBytes(Charset.forName("ASCII"));
Review Comment:
```suggestion
byte[] byteArray = rawString.getBytes(StandardCharsets.US_ASCII);
```
##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java:
##########
@@ -80,14 +84,50 @@ public void testActsLikeCircularBuffer() throws Exception {
// The first 10 are always sampled, but with maxSamples = 5, the first ten
are downsampled to
// 4..9 inclusive. Then,
// the 20th element is sampled (19) and every 20 after.
- List<byte[]> expected = new ArrayList<>();
+ List<BeamFnApi.SampledElement> expected = new ArrayList<>();
expected.add(encodeInt(19));
expected.add(encodeInt(39));
expected.add(encodeInt(59));
expected.add(encodeInt(79));
expected.add(encodeInt(99));
- List<byte[]> samples = outputSampler.samples();
+ List<BeamFnApi.SampledElement> samples = outputSampler.samples();
assertThat(samples, containsInAnyOrder(expected.toArray()));
}
+
+ /**
+ * Test that sampling a PCollection while retrieving samples from multiple
threads is ok.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testConcurrentSamples() throws Exception {
+ VarIntCoder coder = VarIntCoder.of();
+ OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 100000,
1);
Review Comment:
```suggestion
OutputSampler<Integer> outputSampler = new OutputSampler<>(coder,
100000, 1);
```
You can use a small maxElements size like 10. Using a bigger number may
decrease contention. Also using sampleEveryN of 2 means that we swap between
choosing to sample and not to sample often instead of sampling every element.
##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java:
##########
@@ -80,14 +84,50 @@ public void testActsLikeCircularBuffer() throws Exception {
// The first 10 are always sampled, but with maxSamples = 5, the first ten
are downsampled to
// 4..9 inclusive. Then,
// the 20th element is sampled (19) and every 20 after.
- List<byte[]> expected = new ArrayList<>();
+ List<BeamFnApi.SampledElement> expected = new ArrayList<>();
expected.add(encodeInt(19));
expected.add(encodeInt(39));
expected.add(encodeInt(59));
expected.add(encodeInt(79));
expected.add(encodeInt(99));
- List<byte[]> samples = outputSampler.samples();
+ List<BeamFnApi.SampledElement> samples = outputSampler.samples();
assertThat(samples, containsInAnyOrder(expected.toArray()));
}
+
+ /**
+ * Test that sampling a PCollection while retrieving samples from multiple
threads is ok.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testConcurrentSamples() throws Exception {
+ VarIntCoder coder = VarIntCoder.of();
+ OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 100000,
1);
+
+ // Iteration count was empirically chosen to have a high probability of
failure without the
+ // test going for too long.
+ Thread sampleThreadA =
+ new Thread(
+ () -> {
+ for (int i = 0; i < 10000000; i++) {
+ outputSampler.sample(i);
+ }
+ });
+
+ Thread sampleThreadB =
+ new Thread(
+ () -> {
+ for (int i = 0; i < 10000000; i++) {
+ outputSampler.sample(i);
+ }
+ });
+
+ sampleThreadA.start();
+ sampleThreadB.start();
+
+ for (int i = 0; i < 10000; i++) {
+ outputSampler.samples();
+ }
Review Comment:
Grab samples until both of the above threads are done, no need to go to a
fixed number.
Also perform the validation that I described above on each of the output
samples.
##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java:
##########
@@ -80,14 +84,50 @@ public void testActsLikeCircularBuffer() throws Exception {
// The first 10 are always sampled, but with maxSamples = 5, the first ten
are downsampled to
// 4..9 inclusive. Then,
// the 20th element is sampled (19) and every 20 after.
- List<byte[]> expected = new ArrayList<>();
+ List<BeamFnApi.SampledElement> expected = new ArrayList<>();
expected.add(encodeInt(19));
expected.add(encodeInt(39));
expected.add(encodeInt(59));
expected.add(encodeInt(79));
expected.add(encodeInt(99));
- List<byte[]> samples = outputSampler.samples();
+ List<BeamFnApi.SampledElement> samples = outputSampler.samples();
assertThat(samples, containsInAnyOrder(expected.toArray()));
}
+
+ /**
+ * Test that sampling a PCollection while retrieving samples from multiple
threads is ok.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testConcurrentSamples() throws Exception {
+ VarIntCoder coder = VarIntCoder.of();
+ OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 100000,
1);
+
+ // Iteration count was empirically chosen to have a high probability of
failure without the
+ // test going for too long.
+ Thread sampleThreadA =
Review Comment:
You should use the count down latch to increase contention.
##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java:
##########
@@ -193,4 +221,44 @@ public void testFiltersMultiplePCollectionIds() throws
Exception {
assertHasSamples(samples, "a", ImmutableList.of(encodeString("a1"),
encodeString("a2")));
assertHasSamples(samples, "c", ImmutableList.of(encodeString("c1"),
encodeString("c2")));
}
+
+ /**
+ * Test that samples can be taken from the DataSampler while adding new
OutputSamplers. This fails
+ * with a ConcurrentModificationException if there is a bug.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testConcurrentNewSampler() throws Exception {
+ DataSampler sampler = new DataSampler();
+ VarIntCoder coder = VarIntCoder.of();
+
+ // Create a thread that constantly creates new samplers.
+ Thread sampleThread =
+ new Thread(
+ () -> {
+ for (int i = 0; i < 1000000; i++) {
+ sampler.sampleOutput("pcollection-" + i, coder).sample(0);
+
+ // This sleep is here to allow for the test to stop this
thread.
+ try {
+ Thread.sleep(0);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ });
+
+ sampleThread.start();
+
+ for (int i = 0; i < 20; i++) {
+ sampler.handleDataSampleRequest(
Review Comment:
This tests concurrency between a single sampling thread and getting the
progress request.
You'll want to update this test to cover multiple threads (e.g. like 100)
all creating the same set of 100 output samplers. You can have them all wait on
a CountDownLatch which you release from the test thread before the output
sampler creation starts allowing for all the threads to be ready to go (as done
in
https://github.com/apache/beam/blob/679d30256c6bd64d9760702c667d7d355e70166b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ExecutionStateSamplerTest.java#L94).
At the same time you should continuously call handleDataSampleRequest until
all the threads state has transitioned to TERMINATED (note that checking
isAlive() can expose you to a race since start() doesn't mean that the thread
is alive, just that the thread is scheduled to become alive at some point in
the future). You can also ensure it is alive if you use another CountDownLatch
to block the test thread from advancing to check if the sampler creating
threads are alive as well. There are also futures as well.
##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java:
##########
@@ -193,4 +221,44 @@ public void testFiltersMultiplePCollectionIds() throws
Exception {
assertHasSamples(samples, "a", ImmutableList.of(encodeString("a1"),
encodeString("a2")));
assertHasSamples(samples, "c", ImmutableList.of(encodeString("c1"),
encodeString("c2")));
}
+
+ /**
+ * Test that samples can be taken from the DataSampler while adding new
OutputSamplers. This fails
+ * with a ConcurrentModificationException if there is a bug.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testConcurrentNewSampler() throws Exception {
+ DataSampler sampler = new DataSampler();
+ VarIntCoder coder = VarIntCoder.of();
+
+ // Create a thread that constantly creates new samplers.
+ Thread sampleThread =
+ new Thread(
+ () -> {
+ for (int i = 0; i < 1000000; i++) {
+ sampler.sampleOutput("pcollection-" + i, coder).sample(0);
+
+ // This sleep is here to allow for the test to stop this
thread.
+ try {
+ Thread.sleep(0);
+ } catch (InterruptedException e) {
+ return;
+ }
Review Comment:
This and the interrupt below are unnecessary. Your test should be able to
join all the sampler creating threads.
```suggestion
```
##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java:
##########
@@ -80,14 +84,50 @@ public void testActsLikeCircularBuffer() throws Exception {
// The first 10 are always sampled, but with maxSamples = 5, the first ten
are downsampled to
// 4..9 inclusive. Then,
// the 20th element is sampled (19) and every 20 after.
- List<byte[]> expected = new ArrayList<>();
+ List<BeamFnApi.SampledElement> expected = new ArrayList<>();
expected.add(encodeInt(19));
expected.add(encodeInt(39));
expected.add(encodeInt(59));
expected.add(encodeInt(79));
expected.add(encodeInt(99));
- List<byte[]> samples = outputSampler.samples();
+ List<BeamFnApi.SampledElement> samples = outputSampler.samples();
assertThat(samples, containsInAnyOrder(expected.toArray()));
}
+
+ /**
+ * Test that sampling a PCollection while retrieving samples from multiple
threads is ok.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testConcurrentSamples() throws Exception {
+ VarIntCoder coder = VarIntCoder.of();
+ OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 100000,
1);
+
+ // Iteration count was empirically chosen to have a high probability of
failure without the
+ // test going for too long.
+ Thread sampleThreadA =
+ new Thread(
+ () -> {
+ for (int i = 0; i < 10000000; i++) {
Review Comment:
If you have this thread produce elements between 1 and 100000 and the other
thread produce elements between 100000 and 200000 then you can validate below
that when you get the samples that the numbers in series A and in series B are
always greater than the largest seen from the previous sample. The series could
be negative numbers and positive numbers or even and odd, just something that
allows you to know which thread it came from so you can perform the validation
on it.
##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java:
##########
@@ -193,4 +221,44 @@ public void testFiltersMultiplePCollectionIds() throws
Exception {
assertHasSamples(samples, "a", ImmutableList.of(encodeString("a1"),
encodeString("a2")));
assertHasSamples(samples, "c", ImmutableList.of(encodeString("c1"),
encodeString("c2")));
}
+
+ /**
+ * Test that samples can be taken from the DataSampler while adding new
OutputSamplers. This fails
+ * with a ConcurrentModificationException if there is a bug.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testConcurrentNewSampler() throws Exception {
+ DataSampler sampler = new DataSampler();
+ VarIntCoder coder = VarIntCoder.of();
+
+ // Create a thread that constantly creates new samplers.
+ Thread sampleThread =
+ new Thread(
+ () -> {
+ for (int i = 0; i < 1000000; i++) {
+ sampler.sampleOutput("pcollection-" + i, coder).sample(0);
+
+ // This sleep is here to allow for the test to stop this
thread.
+ try {
+ Thread.sleep(0);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ });
+
+ sampleThread.start();
+
+ for (int i = 0; i < 20; i++) {
+ sampler.handleDataSampleRequest(
+ BeamFnApi.InstructionRequest.newBuilder()
+ .setSampleData(BeamFnApi.SampleDataRequest.newBuilder())
+ .build());
+ }
+
+ sampleThread.interrupt();
Review Comment:
I don't think you need to interrupt since the thread will finish by itself.
You should just invoke join below.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]