This is an automated email from the ASF dual-hosted git repository.
damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 22c6e728e82 Implement Java exception sampling (#27121)
22c6e728e82 is described below
commit 22c6e728e82cf5415e3ecd2f965d3538d026a921
Author: Sam Rohde <[email protected]>
AuthorDate: Thu Jun 15 10:36:49 2023 -0700
Implement Java exception sampling (#27121)
* Implement Java exception sampling
* spotless
* s/Exception/IOException
---
.../harness/data/PCollectionConsumerRegistry.java | 17 ++++-
.../beam/fn/harness/debug/ElementSample.java | 44 ++++++++++++
.../beam/fn/harness/debug/OutputSampler.java | 68 ++++++++++++++++---
.../beam/fn/harness/debug/OutputSamplerTest.java | 79 +++++++++++++++++++---
4 files changed, 190 insertions(+), 18 deletions(-)
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
index 84a82e83b88..150580c7f64 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
@@ -33,6 +33,7 @@ import org.apache.beam.fn.harness.control.Metrics;
import org.apache.beam.fn.harness.control.Metrics.BundleCounter;
import org.apache.beam.fn.harness.control.Metrics.BundleDistribution;
import org.apache.beam.fn.harness.debug.DataSampler;
+import org.apache.beam.fn.harness.debug.ElementSample;
import org.apache.beam.fn.harness.debug.OutputSampler;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
@@ -301,8 +302,9 @@ public class PCollectionConsumerRegistry {
// we have window optimization.
this.sampledByteSizeDistribution.tryUpdate(input.getValue(), this.coder);
+ ElementSample<T> elementSample = null;
if (outputSampler != null) {
- outputSampler.sample(input);
+ elementSample = outputSampler.sample(input);
}
// Use the ExecutionStateTracker and enter an appropriate state to track
the
@@ -311,6 +313,11 @@ public class PCollectionConsumerRegistry {
executionState.activate();
try {
this.delegate.accept(input);
+ } catch (Exception e) {
+ if (outputSampler != null) {
+ outputSampler.exception(elementSample, e);
+ }
+ throw e;
} finally {
executionState.deactivate();
}
@@ -383,8 +390,9 @@ public class PCollectionConsumerRegistry {
// when we have window optimization.
this.sampledByteSizeDistribution.tryUpdate(input.getValue(), coder);
+ ElementSample<T> elementSample = null;
if (outputSampler != null) {
- outputSampler.sample(input);
+ elementSample = outputSampler.sample(input);
}
// Use the ExecutionStateTracker and enter an appropriate state to track
the
@@ -397,6 +405,11 @@ public class PCollectionConsumerRegistry {
state.activate();
try {
consumerAndMetadata.getConsumer().accept(input);
+ } catch (Exception e) {
+ if (outputSampler != null) {
+ outputSampler.exception(elementSample, e);
+ }
+ throw e;
} finally {
state.deactivate();
}
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/ElementSample.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/ElementSample.java
new file mode 100644
index 00000000000..85abd02e1d9
--- /dev/null
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/ElementSample.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * A record class that wraps an element sample with additional metadata. This
ensures the ability to
+ * add an exception to a sample even if it is pushed out of the buffer.
+ *
+ * @param <T> the element type of the PCollection.
+ */
+public class ElementSample<T> {
+ // Used for deduplication purposes. Randomly chosen, should be unique within
a given
+ // OutputSampler.
+ public final long id;
+
+ // The element sample to be serialized and later queried.
+ public final WindowedValue<T> sample;
+
+ // An optional exception to be given as metadata on the FnApi for the given
sample.
+ @Nullable public Exception exception = null;
+
+ ElementSample(long id, WindowedValue<T> sample) {
+ this.id = id;
+ this.sample = sample;
+ }
+}
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
index 3c8b057f44c..a81796a239a 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
@@ -19,7 +19,9 @@ package org.apache.beam.fn.harness.debug;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
@@ -37,7 +39,12 @@ import org.apache.beam.sdk.util.WindowedValue;
public class OutputSampler<T> {
// Temporarily holds elements until the SDK receives a sample data request.
- private List<WindowedValue<T>> buffer;
+ private List<ElementSample<T>> buffer;
+
+ // Temporarily holds exceptional elements. These elements can also be
duplicated in the main
+ // buffer. This is in order to always track exceptional elements even if the
number of samples in
+ // the main buffer drops it.
+ private final List<ElementSample<T>> exceptions = new ArrayList<>();
// Maximum number of elements in buffer.
private final int maxElements;
@@ -83,7 +90,7 @@ public class OutputSampler<T> {
*
* @param element the element to sample.
*/
- public void sample(WindowedValue<T> element) {
+ public ElementSample<T> sample(WindowedValue<T> element) {
// Only sample the first 10 elements then after every `sampleEveryN`th
element.
long samples = numSamples.get() + 1;
@@ -91,20 +98,42 @@ public class OutputSampler<T> {
// the slowest thread accessing the atomic. But over time, it will still
increase. This is ok
// because this is a debugging feature and doesn't need strict atomics.
numSamples.lazySet(samples);
+
+ ElementSample<T> elementSample =
+ new ElementSample<>(ThreadLocalRandom.current().nextInt(), element);
if (samples > 10 && samples % sampleEveryN != 0) {
- return;
+ return elementSample;
}
synchronized (this) {
// Fill buffer until maxElements.
if (buffer.size() < maxElements) {
- buffer.add(element);
+ buffer.add(elementSample);
} else {
// Then rewrite sampled elements as a circular buffer.
- buffer.set(resampleIndex, element);
+ buffer.set(resampleIndex, elementSample);
resampleIndex = (resampleIndex + 1) % maxElements;
}
}
+
+ return elementSample;
+ }
+
+ /**
+ * Samples an exceptional element to be later queried.
+ *
+ * @param elementSample the sampled element to add an exception to.
+ * @param e the exception.
+ */
+ public void exception(ElementSample<T> elementSample, Exception e) {
+ if (elementSample == null) {
+ return;
+ }
+
+ synchronized (this) {
+ elementSample.exception = e;
+ exceptions.add(elementSample);
+ }
}
/**
@@ -120,7 +149,7 @@ public class OutputSampler<T> {
// Serializing can take a lot of CPU time for larger or complex elements.
Copy the array here
// so as to not slow down the main processing hot path.
- List<WindowedValue<T>> bufferToSend;
+ List<ElementSample<T>> bufferToSend;
int sampleIndex = 0;
synchronized (this) {
bufferToSend = buffer;
@@ -129,20 +158,43 @@ public class OutputSampler<T> {
resampleIndex = 0;
}
+ // An element can live in both the main samples and exception buffer. Use
a small look up table
+ // to deduplicate samples.
+ HashSet<Long> seen = new HashSet<>();
ByteStringOutputStream stream = new ByteStringOutputStream();
for (int i = 0; i < bufferToSend.size(); i++) {
int index = (sampleIndex + i) % bufferToSend.size();
+ ElementSample<T> sample = bufferToSend.get(index);
+ seen.add(sample.id);
+
+ if (valueCoder != null) {
+ this.valueCoder.encode(sample.sample.getValue(), stream,
Coder.Context.NESTED);
+ } else if (windowedValueCoder != null) {
+ this.windowedValueCoder.encode(sample.sample, stream,
Coder.Context.NESTED);
+ }
+
+ ret.add(
+
BeamFnApi.SampledElement.newBuilder().setElement(stream.toByteStringAndReset()).build());
+ }
+
+ // TODO: set the exception metadata on the proto once that PR is merged.
+ for (ElementSample<T> sample : exceptions) {
+ if (seen.contains(sample.id)) {
+ continue;
+ }
if (valueCoder != null) {
- this.valueCoder.encode(bufferToSend.get(index).getValue(), stream,
Coder.Context.NESTED);
+ this.valueCoder.encode(sample.sample.getValue(), stream,
Coder.Context.NESTED);
} else if (windowedValueCoder != null) {
- this.windowedValueCoder.encode(bufferToSend.get(index), stream,
Coder.Context.NESTED);
+ this.windowedValueCoder.encode(sample.sample, stream,
Coder.Context.NESTED);
}
ret.add(
BeamFnApi.SampledElement.newBuilder().setElement(stream.toByteStringAndReset()).build());
}
+ exceptions.clear();
+
return ret;
}
}
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
index 68515c8e7b0..7946251b30f 100644
---
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
@@ -62,10 +62,10 @@ public class OutputSamplerTest {
/**
* Test that the first N are always sampled.
*
- * @throws Exception when encoding fails (shouldn't happen).
+ * @throws IOException when encoding fails (shouldn't happen).
*/
@Test
- public void testSamplesFirstN() throws Exception {
+ public void testSamplesFirstN() throws IOException {
VarIntCoder coder = VarIntCoder.of();
OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 10, 10);
@@ -85,7 +85,7 @@ public class OutputSamplerTest {
}
@Test
- public void testWindowedValueSample() throws Exception {
+ public void testWindowedValueSample() throws IOException {
WindowedValue.WindowedValueCoder<Integer> coder =
WindowedValue.FullWindowedValueCoder.of(VarIntCoder.of(),
GlobalWindow.Coder.INSTANCE);
@@ -99,7 +99,7 @@ public class OutputSamplerTest {
}
@Test
- public void testNonWindowedValueSample() throws Exception {
+ public void testNonWindowedValueSample() throws IOException {
VarIntCoder coder = VarIntCoder.of();
OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 10, 10);
@@ -114,10 +114,10 @@ public class OutputSamplerTest {
/**
* Test that the previous values are overwritten and only the most recent
`maxSamples` are kept.
*
- * @throws Exception when encoding fails (shouldn't happen).
+ * @throws IOException when encoding fails (shouldn't happen).
*/
@Test
- public void testActsLikeCircularBuffer() throws Exception {
+ public void testActsLikeCircularBuffer() throws IOException {
VarIntCoder coder = VarIntCoder.of();
OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20);
@@ -139,13 +139,76 @@ public class OutputSamplerTest {
assertThat(samples, containsInAnyOrder(expected.toArray()));
}
+ /**
+ * Test that elements with exceptions can be sampled. TODO: test that the
exception metadata is
+ * set.
+ *
+ * @throws IOException when encoding fails (shouldn't happen).
+ */
+ @Test
+ public void testCanSampleExceptions() throws IOException {
+ VarIntCoder coder = VarIntCoder.of();
+ OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20);
+
+ WindowedValue<Integer> windowedValue =
WindowedValue.valueInGlobalWindow(1);
+ ElementSample<Integer> elementSample = outputSampler.sample(windowedValue);
+
+ Exception exception = new RuntimeException("Test exception");
+ outputSampler.exception(elementSample, exception);
+
+ // The first 10 are always sampled, but with maxSamples = 5, the first ten
are downsampled to
+ // 4..9 inclusive. Then,
+ // the 20th element is sampled (19) and every 20 after.
+ List<BeamFnApi.SampledElement> expected = new ArrayList<>();
+ expected.add(encodeInt(1));
+
+ List<BeamFnApi.SampledElement> samples = outputSampler.samples();
+ assertThat(samples, containsInAnyOrder(expected.toArray()));
+ }
+
+ /**
+ * Tests that multiple samples don't push out exception samples. TODO: test
that the exception
+ * metadata is set.
+ *
+ * @throws IOException when encoding fails (shouldn't happen).
+ */
+ @Test
+ public void testExceptionSamplesAreNotRemoved() throws IOException {
+ VarIntCoder coder = VarIntCoder.of();
+ OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20);
+
+ WindowedValue<Integer> windowedValue =
WindowedValue.valueInGlobalWindow(0);
+ ElementSample<Integer> elementSample = outputSampler.sample(windowedValue);
+
+ for (int i = 1; i < 100; ++i) {
+ outputSampler.sample(WindowedValue.valueInGlobalWindow(i));
+ }
+
+ Exception exception = new RuntimeException("Test exception");
+ outputSampler.exception(elementSample, exception);
+
+ // The first 10 are always sampled, but with maxSamples = 5, the first ten
are downsampled to
+ // 4..9 inclusive. Then, the 20th element is sampled (19) and every 20
after. Finally,
+ // exceptions are added to the list.
+ List<BeamFnApi.SampledElement> expected = new ArrayList<>();
+ expected.add(encodeInt(19));
+ expected.add(encodeInt(39));
+ expected.add(encodeInt(59));
+ expected.add(encodeInt(79));
+ expected.add(encodeInt(99));
+ expected.add(encodeInt(0));
+
+ List<BeamFnApi.SampledElement> samples = outputSampler.samples();
+ assertThat(samples, containsInAnyOrder(expected.toArray()));
+ }
+
/**
* Test that sampling a PCollection while retrieving samples from multiple
threads is ok.
*
- * @throws Exception
+ * @throws IOException, InterruptedException
*/
@Test
- public void testConcurrentSamples() throws Exception {
+ public void testConcurrentSamples() throws IOException, InterruptedException
{
VarIntCoder coder = VarIntCoder.of();
OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 10, 2);