This is an automated email from the ASF dual-hosted git repository.
udim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5df59a943d1 Add flag to control automatic exception sampling in Java
(disabled) (#28403)
5df59a943d1 is described below
commit 5df59a943d18723942cd0531c12bef256ef90e20
Author: Sam Rohde <[email protected]>
AuthorDate: Thu Sep 14 14:21:24 2023 -0700
Add flag to control automatic exception sampling in Java (disabled) (#28403)
* Add flag to control automatic exception sampling in Java (disabled)
* Fix NPE in FnHarness with DataSampler
* trigger tests
* trigger tests
* trigger tests
* trigger tests
---------
Co-authored-by: Sam Rohde <[email protected]>
---
.../java/org/apache/beam/fn/harness/FnHarness.java | 17 ++--
.../apache/beam/fn/harness/debug/DataSampler.java | 63 +++++++++++-
.../beam/fn/harness/debug/OutputSampler.java | 9 +-
.../beam/fn/harness/debug/DataSamplerTest.java | 106 ++++++++++++++++++++-
.../beam/fn/harness/debug/OutputSamplerTest.java | 47 ++++++---
5 files changed, 216 insertions(+), 26 deletions(-)
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index 3f018c376f0..448c8d42df7 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -97,7 +97,6 @@ public class FnHarness {
private static final String PIPELINE_OPTIONS_FILE = "PIPELINE_OPTIONS_FILE";
private static final String PIPELINE_OPTIONS = "PIPELINE_OPTIONS";
private static final String RUNNER_CAPABILITIES = "RUNNER_CAPABILITIES";
- private static final String ENABLE_DATA_SAMPLING_EXPERIMENT =
"enable_data_sampling";
private static final Logger LOG = LoggerFactory.getLogger(FnHarness.class);
private static Endpoints.ApiServiceDescriptor getApiServiceDescriptor(String
descriptor)
@@ -248,7 +247,8 @@ public class FnHarness {
options.as(ExecutorOptions.class).getScheduledExecutorService();
ExecutionStateSampler executionStateSampler =
new ExecutionStateSampler(options, System::currentTimeMillis);
- final DataSampler dataSampler = new DataSampler();
+
+ final @Nullable DataSampler dataSampler = DataSampler.create(options);
// The logging client variable is not used per se, but during its lifetime
(until close()) it
// intercepts logging and sends it to the logging service.
@@ -276,10 +276,6 @@ public class FnHarness {
FinalizeBundleHandler finalizeBundleHandler = new
FinalizeBundleHandler(executorService);
- // Create the sampler, if the experiment is enabled.
- boolean shouldSample =
- ExperimentalOptions.hasExperiment(options,
ENABLE_DATA_SAMPLING_EXPERIMENT);
-
// Retrieves the ProcessBundleDescriptor from cache. Requests the PBD
from the Runner if it
// doesn't exist. Additionally, runs any graph modifications.
Function<String, BeamFnApi.ProcessBundleDescriptor>
getProcessBundleDescriptor =
@@ -314,7 +310,7 @@ public class FnHarness {
metricsShortIds,
executionStateSampler,
processWideCache,
- shouldSample ? dataSampler : null);
+ dataSampler);
logging.setProcessBundleHandler(processBundleHandler);
BeamFnStatusClient beamFnStatusClient = null;
@@ -363,7 +359,12 @@ public class FnHarness {
InstructionRequest.RequestCase.HARNESS_MONITORING_INFOS,
processWideHandler::harnessMonitoringInfos);
handlers.put(
- InstructionRequest.RequestCase.SAMPLE_DATA,
dataSampler::handleDataSampleRequest);
+ InstructionRequest.RequestCase.SAMPLE_DATA,
+ request ->
+ dataSampler == null
+ ? BeamFnApi.InstructionResponse.newBuilder()
+ .setSampleData(BeamFnApi.SampleDataResponse.newBuilder())
+ : dataSampler.handleDataSampleRequest(request));
JvmInitializers.runBeforeProcessing(options);
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java
index b03c453475b..29011b82a4d 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java
@@ -23,9 +23,12 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import
org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,19 +40,66 @@ import org.slf4j.LoggerFactory;
*/
public class DataSampler {
private static final Logger LOG = LoggerFactory.getLogger(DataSampler.class);
+ private static final String ENABLE_DATA_SAMPLING_EXPERIMENT =
"enable_data_sampling";
+ private static final String ENABLE_ALWAYS_ON_EXCEPTION_SAMPLING_EXPERIMENT =
+ "enable_always_on_exception_sampling";
+ private static final String DISABLE_ALWAYS_ON_EXCEPTION_SAMPLING_EXPERIMENT =
+ "disable_always_on_exception_sampling";
+
+ /**
+ * Optionally returns a DataSampler if the experiment "enable_data_sampling"
is present or
+ * "enable_always_on_exception_sampling" is present. Returns null is data
sampling is not enabled
+ * or "disable_always_on_exception_sampling" experiment is given.
+ *
+ * @param options the pipeline options given to this SDK Harness.
+ * @return the DataSampler if enabled or null, otherwise.
+ */
+ public static @Nullable DataSampler create(PipelineOptions options) {
+ boolean disableAlwaysOnExceptionSampling =
+ ExperimentalOptions.hasExperiment(options,
DISABLE_ALWAYS_ON_EXCEPTION_SAMPLING_EXPERIMENT);
+ boolean enableAlwaysOnExceptionSampling =
+ ExperimentalOptions.hasExperiment(options,
ENABLE_ALWAYS_ON_EXCEPTION_SAMPLING_EXPERIMENT);
+ boolean enableDataSampling =
+ ExperimentalOptions.hasExperiment(options,
ENABLE_DATA_SAMPLING_EXPERIMENT);
+ // Enable exception sampling, unless the user specifies for it to be
disabled.
+ enableAlwaysOnExceptionSampling =
+ enableAlwaysOnExceptionSampling && !disableAlwaysOnExceptionSampling;
+
+ // If no sampling is enabled, don't create the DataSampler.
+ if (enableDataSampling || enableAlwaysOnExceptionSampling) {
+ // For performance reasons, sampling all elements should only be done
when the user requests
+ // it.
+ // But, exception sampling doesn't need to worry about performance
implications, since the SDK
+ // is already in a bad state. Thus, enable only exception sampling when
the user does not
+ // request for the sampling of all elements.
+ boolean onlySampleExceptions = enableAlwaysOnExceptionSampling &&
!enableDataSampling;
+ return new DataSampler(onlySampleExceptions);
+ } else {
+ return null;
+ }
+ }
/**
* Creates a DataSampler to sample every 1000 elements while keeping a
maximum of 10 in memory.
*/
public DataSampler() {
- this(10, 1000);
+ this(10, 1000, false);
+ }
+
+ /**
+ * Creates a DataSampler to sample every 1000 elements while keeping a
maximum of 10 in memory.
+ *
+ * @param onlySampleExceptions If true, only samples elements from
exceptions.
+ */
+ public DataSampler(Boolean onlySampleExceptions) {
+ this(10, 1000, onlySampleExceptions);
}
/**
* @param maxSamples Sets the maximum number of samples held in memory at
once.
* @param sampleEveryN Sets how often to sample.
*/
- public DataSampler(int maxSamples, int sampleEveryN) {
+ public DataSampler(int maxSamples, int sampleEveryN, Boolean
onlySampleExceptions) {
checkArgument(
maxSamples > 0,
"Expected positive number of samples, did you mean to disable data
sampling?");
@@ -58,6 +108,7 @@ public class DataSampler {
"Expected positive number for sampling period, did you mean to disable
data sampling?");
this.maxSamples = maxSamples;
this.sampleEveryN = sampleEveryN;
+ this.onlySampleExceptions = onlySampleExceptions;
}
// Maximum number of elements in buffer.
@@ -66,6 +117,9 @@ public class DataSampler {
// Sampling rate.
private final int sampleEveryN;
+ // If true, only takes samples when exceptions in UDFs occur.
+ private final Boolean onlySampleExceptions;
+
// The fully-qualified type is: Map[PCollectionId, OutputSampler]. In order
to sample
// on a PCollection-basis and not per-bundle, this keeps track of shared
samples between states.
private final Map<String, OutputSampler<?>> outputSamplers = new
ConcurrentHashMap<>();
@@ -86,7 +140,10 @@ public class DataSampler {
public <T> OutputSampler<T> sampleOutput(String pcollectionId, Coder<T>
coder) {
return (OutputSampler<T>)
outputSamplers.computeIfAbsent(
- pcollectionId, k -> new OutputSampler<>(coder, this.maxSamples,
this.sampleEveryN));
+ pcollectionId,
+ k ->
+ new OutputSampler<>(
+ coder, this.maxSamples, this.sampleEveryN,
this.onlySampleExceptions));
}
/**
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
index f1e710d3ec7..f7fabae0cc2 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
@@ -60,14 +60,19 @@ public class OutputSampler<T> {
// Index into the buffer of where to overwrite samples.
private int resampleIndex = 0;
+ // If true, only takes samples when exceptions in UDFs occur.
+ private final Boolean onlySampleExceptions;
+
@Nullable private final Coder<T> valueCoder;
@Nullable private final Coder<WindowedValue<T>> windowedValueCoder;
- public OutputSampler(Coder<?> coder, int maxElements, int sampleEveryN) {
+ public OutputSampler(
+ Coder<?> coder, int maxElements, int sampleEveryN, boolean
onlySampleExceptions) {
this.maxElements = maxElements;
this.sampleEveryN = sampleEveryN;
this.buffer = new ArrayList<>(this.maxElements);
+ this.onlySampleExceptions = onlySampleExceptions;
// The samples taken and encoded should match exactly to the specification
from the
// ProcessBundleDescriptor. The coder given can either be a
WindowedValueCoder, in which the
@@ -103,7 +108,7 @@ public class OutputSampler<T> {
ElementSample<T> elementSample =
new ElementSample<>(ThreadLocalRandom.current().nextInt(), element);
- if (samples > 10 && samples % sampleEveryN != 0) {
+ if (onlySampleExceptions || (samples > 10 && samples % sampleEveryN != 0))
{
return elementSample;
}
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java
index a05efca120a..1cad5210380 100644
---
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java
@@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
@@ -32,10 +33,13 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
@@ -117,6 +121,21 @@ public class DataSamplerTest {
assertTrue(elementList.getElementsList().containsAll(expectedSamples));
}
+ void assertHasSamples(
+ BeamFnApi.InstructionResponse response,
+ String pcollection,
+ List<BeamFnApi.SampledElement> elements) {
+ Map<String, BeamFnApi.SampleDataResponse.ElementList> elementSamplesMap =
+ response.getSampleData().getElementSamplesMap();
+
+ assertFalse(elementSamplesMap.isEmpty());
+
+ BeamFnApi.SampleDataResponse.ElementList elementList =
elementSamplesMap.get(pcollection);
+ assertNotNull(elementList);
+
+ assertTrue(elementList.getElementsList().containsAll(elements));
+ }
+
/**
* Smoke test that a samples show in the output map.
*
@@ -203,7 +222,7 @@ public class DataSamplerTest {
*/
@Test
public void testFiltersSinglePCollectionId() throws Exception {
- DataSampler sampler = new DataSampler(10, 10);
+ DataSampler sampler = new DataSampler(10, 10, false);
generateStringSamples(sampler);
BeamFnApi.InstructionResponse samples = getSamplesForPCollection(sampler,
"a");
@@ -219,7 +238,7 @@ public class DataSamplerTest {
public void testFiltersMultiplePCollectionIds() throws Exception {
List<String> pcollectionIds = ImmutableList.of("a", "c");
- DataSampler sampler = new DataSampler(10, 10);
+ DataSampler sampler = new DataSampler(10, 10, false);
generateStringSamples(sampler);
BeamFnApi.InstructionResponse samples = getSamplesForPCollections(sampler,
pcollectionIds);
@@ -275,4 +294,87 @@ public class DataSamplerTest {
sampleThread.join();
}
}
+
+ /**
+ * Tests that including the "enable_always_on_exception_sampling" can sample.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testEnableAlwaysOnExceptionSampling() throws Exception {
+ ExperimentalOptions experimentalOptions =
PipelineOptionsFactory.as(ExperimentalOptions.class);
+ experimentalOptions.setExperiments(
+ Collections.singletonList("enable_always_on_exception_sampling"));
+ DataSampler sampler = DataSampler.create(experimentalOptions);
+ assertNotNull(sampler);
+
+ VarIntCoder coder = VarIntCoder.of();
+ OutputSampler<Integer> outputSampler =
sampler.sampleOutput("pcollection-id", coder);
+ ElementSample<Integer> elementSample =
outputSampler.sample(globalWindowedValue(1));
+ outputSampler.exception(elementSample, new RuntimeException(), "", "");
+
+ outputSampler.sample(globalWindowedValue(2));
+
+ BeamFnApi.InstructionResponse samples = getAllSamples(sampler);
+ List<SampledElement> expectedSamples =
+ ImmutableList.of(
+ SampledElement.newBuilder()
+ .setElement(ByteString.copyFrom(encodeInt(1)))
+ .setException(
+ SampledElement.Exception.newBuilder()
+ .setError(new RuntimeException().toString()))
+ .build());
+ assertHasSamples(samples, "pcollection-id", expectedSamples);
+ }
+
+ /**
+ * Tests that "disable_always_on_exception_sampling" overrides the always on
experiment.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testDisableAlwaysOnExceptionSampling() throws Exception {
+ ExperimentalOptions experimentalOptions =
PipelineOptionsFactory.as(ExperimentalOptions.class);
+ experimentalOptions.setExperiments(
+ ImmutableList.of(
+ "enable_always_on_exception_sampling",
"disable_always_on_exception_sampling"));
+ DataSampler sampler = DataSampler.create(experimentalOptions);
+ assertNull(sampler);
+ }
+
+ /**
+ * Tests that the "enable_data_sampling" experiment overrides
+ * "disable_always_on_exception_sampling".
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testDisableAlwaysOnExceptionSamplingWithEnableDataSampling()
throws Exception {
+ ExperimentalOptions experimentalOptions =
PipelineOptionsFactory.as(ExperimentalOptions.class);
+ experimentalOptions.setExperiments(
+ ImmutableList.of(
+ "enable_data_sampling",
+ "enable_always_on_exception_sampling",
+ "disable_always_on_exception_sampling"));
+ DataSampler sampler = DataSampler.create(experimentalOptions);
+ assertNotNull(sampler);
+
+ VarIntCoder coder = VarIntCoder.of();
+ OutputSampler<Integer> outputSampler =
sampler.sampleOutput("pcollection-id", coder);
+ ElementSample<Integer> elementSample =
outputSampler.sample(globalWindowedValue(1));
+ outputSampler.exception(elementSample, new RuntimeException(), "", "");
+
+ outputSampler.sample(globalWindowedValue(2));
+
+ BeamFnApi.InstructionResponse samples = getAllSamples(sampler);
+ List<SampledElement> expectedSamples =
+ ImmutableList.of(
+ SampledElement.newBuilder()
+ .setElement(ByteString.copyFrom(encodeInt(1)))
+ .setException(
+ SampledElement.Exception.newBuilder()
+ .setError(new RuntimeException().toString()))
+ .build());
+ assertHasSamples(samples, "pcollection-id", expectedSamples);
+ }
}
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
index 5ca562e1c24..26285205bd3 100644
---
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
@@ -90,7 +90,7 @@ public class OutputSamplerTest {
@Test
public void testSamplesFirstN() throws IOException {
VarIntCoder coder = VarIntCoder.of();
- OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 10, 10);
+ OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 10, 10,
false);
// Purposely go over maxSamples and sampleEveryN. This helps to increase
confidence.
for (int i = 0; i < 15; ++i) {
@@ -112,7 +112,7 @@ public class OutputSamplerTest {
WindowedValue.WindowedValueCoder<Integer> coder =
WindowedValue.FullWindowedValueCoder.of(VarIntCoder.of(),
GlobalWindow.Coder.INSTANCE);
- OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 10, 10);
+ OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 10, 10,
false);
outputSampler.sample(WindowedValue.valueInGlobalWindow(0));
// The expected list is only 0..9 inclusive.
@@ -125,7 +125,7 @@ public class OutputSamplerTest {
public void testNonWindowedValueSample() throws IOException {
VarIntCoder coder = VarIntCoder.of();
- OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 10, 10);
+ OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 10, 10,
false);
outputSampler.sample(WindowedValue.valueInGlobalWindow(0));
// The expected list is only 0..9 inclusive.
@@ -142,7 +142,7 @@ public class OutputSamplerTest {
@Test
public void testActsLikeCircularBuffer() throws IOException {
VarIntCoder coder = VarIntCoder.of();
- OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20);
+ OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20,
false);
for (int i = 0; i < 100; ++i) {
outputSampler.sample(WindowedValue.valueInGlobalWindow(i));
@@ -171,7 +171,7 @@ public class OutputSamplerTest {
@Test
public void testCanSampleExceptions() throws IOException {
VarIntCoder coder = VarIntCoder.of();
- OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20);
+ OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20,
false);
WindowedValue<Integer> windowedValue =
WindowedValue.valueInGlobalWindow(1);
ElementSample<Integer> elementSample = outputSampler.sample(windowedValue);
@@ -197,7 +197,7 @@ public class OutputSamplerTest {
@Test
public void testNoDuplicateExceptions() throws IOException {
VarIntCoder coder = VarIntCoder.of();
- OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20);
+ OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20,
false);
ElementSample<Integer> elementSampleA =
outputSampler.sample(WindowedValue.valueInGlobalWindow(1));
@@ -227,7 +227,7 @@ public class OutputSamplerTest {
@Test
public void testExceptionOnlySampledIfNonNullProcessBundle() throws
IOException {
VarIntCoder coder = VarIntCoder.of();
- OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20);
+ OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20,
false);
WindowedValue<Integer> windowedValue =
WindowedValue.valueInGlobalWindow(1);
ElementSample<Integer> elementSample = outputSampler.sample(windowedValue);
@@ -244,15 +244,14 @@ public class OutputSamplerTest {
}
/**
- * Tests that multiple samples don't push out exception samples. TODO: test
that the exception
- * metadata is set.
+ * Tests that multiple samples don't push out exception samples.
*
* @throws IOException when encoding fails (shouldn't happen).
*/
@Test
public void testExceptionSamplesAreNotRemoved() throws IOException {
VarIntCoder coder = VarIntCoder.of();
- OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20);
+ OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20,
false);
WindowedValue<Integer> windowedValue =
WindowedValue.valueInGlobalWindow(0);
ElementSample<Integer> elementSample = outputSampler.sample(windowedValue);
@@ -281,6 +280,32 @@ public class OutputSamplerTest {
assertThat(samples, containsInAnyOrder(expected.toArray()));
}
+ /**
+ * Test that elements the onlySampleExceptions flag works.
+ *
+ * @throws IOException when encoding fails (shouldn't happen).
+ */
+ @Test
+ public void testOnlySampleExceptions() throws IOException {
+ VarIntCoder coder = VarIntCoder.of();
+ OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20,
true);
+
+ WindowedValue<Integer> windowedValue =
WindowedValue.valueInGlobalWindow(1);
+ outputSampler.sample(WindowedValue.valueInGlobalWindow(2));
+ ElementSample<Integer> elementSample = outputSampler.sample(windowedValue);
+
+ Exception exception = new RuntimeException("Test exception");
+ String ptransformId = "ptransform";
+ String processBundleId = "processBundle";
+ outputSampler.exception(elementSample, exception, ptransformId,
processBundleId);
+
+ List<BeamFnApi.SampledElement> expected = new ArrayList<>();
+ expected.add(encodeException(1, exception.toString(), ptransformId,
processBundleId));
+
+ List<BeamFnApi.SampledElement> samples = outputSampler.samples();
+ assertThat(samples, containsInAnyOrder(expected.toArray()));
+ }
+
/**
* Test that sampling a PCollection while retrieving samples from multiple
threads is ok.
*
@@ -289,7 +314,7 @@ public class OutputSamplerTest {
@Test
public void testConcurrentSamples() throws IOException, InterruptedException
{
VarIntCoder coder = VarIntCoder.of();
- OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 10, 2);
+ OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 10, 2,
false);
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(2);