This is an automated email from the ASF dual-hosted git repository.

udim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5df59a943d1 Add flag to control automatic exception sampling in Java 
(disabled) (#28403)
5df59a943d1 is described below

commit 5df59a943d18723942cd0531c12bef256ef90e20
Author: Sam Rohde <[email protected]>
AuthorDate: Thu Sep 14 14:21:24 2023 -0700

    Add flag to control automatic exception sampling in Java (disabled) (#28403)
    
    * Add flag to control automatic exception sampling in Java (disabled)
    
    * Fix NPE in FnHarness with DataSampler
    
    * trigger tests
    
    * trigger tests
    
    * trigger tests
    
    * trigger tests
    
    ---------
    
    Co-authored-by: Sam Rohde <[email protected]>
---
 .../java/org/apache/beam/fn/harness/FnHarness.java |  17 ++--
 .../apache/beam/fn/harness/debug/DataSampler.java  |  63 +++++++++++-
 .../beam/fn/harness/debug/OutputSampler.java       |   9 +-
 .../beam/fn/harness/debug/DataSamplerTest.java     | 106 ++++++++++++++++++++-
 .../beam/fn/harness/debug/OutputSamplerTest.java   |  47 ++++++---
 5 files changed, 216 insertions(+), 26 deletions(-)

diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index 3f018c376f0..448c8d42df7 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -97,7 +97,6 @@ public class FnHarness {
   private static final String PIPELINE_OPTIONS_FILE = "PIPELINE_OPTIONS_FILE";
   private static final String PIPELINE_OPTIONS = "PIPELINE_OPTIONS";
   private static final String RUNNER_CAPABILITIES = "RUNNER_CAPABILITIES";
-  private static final String ENABLE_DATA_SAMPLING_EXPERIMENT = 
"enable_data_sampling";
   private static final Logger LOG = LoggerFactory.getLogger(FnHarness.class);
 
   private static Endpoints.ApiServiceDescriptor getApiServiceDescriptor(String 
descriptor)
@@ -248,7 +247,8 @@ public class FnHarness {
         options.as(ExecutorOptions.class).getScheduledExecutorService();
     ExecutionStateSampler executionStateSampler =
         new ExecutionStateSampler(options, System::currentTimeMillis);
-    final DataSampler dataSampler = new DataSampler();
+
+    final @Nullable DataSampler dataSampler = DataSampler.create(options);
 
     // The logging client variable is not used per se, but during its lifetime 
(until close()) it
     // intercepts logging and sends it to the logging service.
@@ -276,10 +276,6 @@ public class FnHarness {
 
       FinalizeBundleHandler finalizeBundleHandler = new 
FinalizeBundleHandler(executorService);
 
-      // Create the sampler, if the experiment is enabled.
-      boolean shouldSample =
-          ExperimentalOptions.hasExperiment(options, 
ENABLE_DATA_SAMPLING_EXPERIMENT);
-
       // Retrieves the ProcessBundleDescriptor from cache. Requests the PBD 
from the Runner if it
       // doesn't exist. Additionally, runs any graph modifications.
       Function<String, BeamFnApi.ProcessBundleDescriptor> 
getProcessBundleDescriptor =
@@ -314,7 +310,7 @@ public class FnHarness {
               metricsShortIds,
               executionStateSampler,
               processWideCache,
-              shouldSample ? dataSampler : null);
+              dataSampler);
       logging.setProcessBundleHandler(processBundleHandler);
 
       BeamFnStatusClient beamFnStatusClient = null;
@@ -363,7 +359,12 @@ public class FnHarness {
           InstructionRequest.RequestCase.HARNESS_MONITORING_INFOS,
           processWideHandler::harnessMonitoringInfos);
       handlers.put(
-          InstructionRequest.RequestCase.SAMPLE_DATA, 
dataSampler::handleDataSampleRequest);
+          InstructionRequest.RequestCase.SAMPLE_DATA,
+          request ->
+              dataSampler == null
+                  ? BeamFnApi.InstructionResponse.newBuilder()
+                      .setSampleData(BeamFnApi.SampleDataResponse.newBuilder())
+                  : dataSampler.handleDataSampleRequest(request));
 
       JvmInitializers.runBeforeProcessing(options);
 
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java
index b03c453475b..29011b82a4d 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java
@@ -23,9 +23,12 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,19 +40,66 @@ import org.slf4j.LoggerFactory;
  */
 public class DataSampler {
   private static final Logger LOG = LoggerFactory.getLogger(DataSampler.class);
+  private static final String ENABLE_DATA_SAMPLING_EXPERIMENT = 
"enable_data_sampling";
+  private static final String ENABLE_ALWAYS_ON_EXCEPTION_SAMPLING_EXPERIMENT =
+      "enable_always_on_exception_sampling";
+  private static final String DISABLE_ALWAYS_ON_EXCEPTION_SAMPLING_EXPERIMENT =
+      "disable_always_on_exception_sampling";
+
+  /**
+   * Optionally returns a DataSampler if the experiment "enable_data_sampling" 
is present or
+   * "enable_always_on_exception_sampling" is present. Returns null is data 
sampling is not enabled
+   * or "disable_always_on_exception_sampling" experiment is given.
+   *
+   * @param options the pipeline options given to this SDK Harness.
+   * @return the DataSampler if enabled or null, otherwise.
+   */
+  public static @Nullable DataSampler create(PipelineOptions options) {
+    boolean disableAlwaysOnExceptionSampling =
+        ExperimentalOptions.hasExperiment(options, 
DISABLE_ALWAYS_ON_EXCEPTION_SAMPLING_EXPERIMENT);
+    boolean enableAlwaysOnExceptionSampling =
+        ExperimentalOptions.hasExperiment(options, 
ENABLE_ALWAYS_ON_EXCEPTION_SAMPLING_EXPERIMENT);
+    boolean enableDataSampling =
+        ExperimentalOptions.hasExperiment(options, 
ENABLE_DATA_SAMPLING_EXPERIMENT);
+    // Enable exception sampling, unless the user specifies for it to be 
disabled.
+    enableAlwaysOnExceptionSampling =
+        enableAlwaysOnExceptionSampling && !disableAlwaysOnExceptionSampling;
+
+    // If no sampling is enabled, don't create the DataSampler.
+    if (enableDataSampling || enableAlwaysOnExceptionSampling) {
+      // For performance reasons, sampling all elements should only be done 
when the user requests
+      // it.
+      // But, exception sampling doesn't need to worry about performance 
implications, since the SDK
+      // is already in a bad state. Thus, enable only exception sampling when 
the user does not
+      // request for the sampling of all elements.
+      boolean onlySampleExceptions = enableAlwaysOnExceptionSampling && 
!enableDataSampling;
+      return new DataSampler(onlySampleExceptions);
+    } else {
+      return null;
+    }
+  }
 
   /**
    * Creates a DataSampler to sample every 1000 elements while keeping a 
maximum of 10 in memory.
    */
   public DataSampler() {
-    this(10, 1000);
+    this(10, 1000, false);
+  }
+
+  /**
+   * Creates a DataSampler to sample every 1000 elements while keeping a 
maximum of 10 in memory.
+   *
+   * @param onlySampleExceptions If true, only samples elements from 
exceptions.
+   */
+  public DataSampler(Boolean onlySampleExceptions) {
+    this(10, 1000, onlySampleExceptions);
   }
 
   /**
    * @param maxSamples Sets the maximum number of samples held in memory at 
once.
    * @param sampleEveryN Sets how often to sample.
    */
-  public DataSampler(int maxSamples, int sampleEveryN) {
+  public DataSampler(int maxSamples, int sampleEveryN, Boolean 
onlySampleExceptions) {
     checkArgument(
         maxSamples > 0,
         "Expected positive number of samples, did you mean to disable data 
sampling?");
@@ -58,6 +108,7 @@ public class DataSampler {
         "Expected positive number for sampling period, did you mean to disable 
data sampling?");
     this.maxSamples = maxSamples;
     this.sampleEveryN = sampleEveryN;
+    this.onlySampleExceptions = onlySampleExceptions;
   }
 
   // Maximum number of elements in buffer.
@@ -66,6 +117,9 @@ public class DataSampler {
   // Sampling rate.
   private final int sampleEveryN;
 
+  // If true, only takes samples when exceptions in UDFs occur.
+  private final Boolean onlySampleExceptions;
+
   // The fully-qualified type is: Map[PCollectionId, OutputSampler]. In order 
to sample
   // on a PCollection-basis and not per-bundle, this keeps track of shared 
samples between states.
   private final Map<String, OutputSampler<?>> outputSamplers = new 
ConcurrentHashMap<>();
@@ -86,7 +140,10 @@ public class DataSampler {
   public <T> OutputSampler<T> sampleOutput(String pcollectionId, Coder<T> 
coder) {
     return (OutputSampler<T>)
         outputSamplers.computeIfAbsent(
-            pcollectionId, k -> new OutputSampler<>(coder, this.maxSamples, 
this.sampleEveryN));
+            pcollectionId,
+            k ->
+                new OutputSampler<>(
+                    coder, this.maxSamples, this.sampleEveryN, 
this.onlySampleExceptions));
   }
 
   /**
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
index f1e710d3ec7..f7fabae0cc2 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
@@ -60,14 +60,19 @@ public class OutputSampler<T> {
   // Index into the buffer of where to overwrite samples.
   private int resampleIndex = 0;
 
+  // If true, only takes samples when exceptions in UDFs occur.
+  private final Boolean onlySampleExceptions;
+
   @Nullable private final Coder<T> valueCoder;
 
   @Nullable private final Coder<WindowedValue<T>> windowedValueCoder;
 
-  public OutputSampler(Coder<?> coder, int maxElements, int sampleEveryN) {
+  public OutputSampler(
+      Coder<?> coder, int maxElements, int sampleEveryN, boolean 
onlySampleExceptions) {
     this.maxElements = maxElements;
     this.sampleEveryN = sampleEveryN;
     this.buffer = new ArrayList<>(this.maxElements);
+    this.onlySampleExceptions = onlySampleExceptions;
 
     // The samples taken and encoded should match exactly to the specification 
from the
     // ProcessBundleDescriptor. The coder given can either be a 
WindowedValueCoder, in which the
@@ -103,7 +108,7 @@ public class OutputSampler<T> {
 
     ElementSample<T> elementSample =
         new ElementSample<>(ThreadLocalRandom.current().nextInt(), element);
-    if (samples > 10 && samples % sampleEveryN != 0) {
+    if (onlySampleExceptions || (samples > 10 && samples % sampleEveryN != 0)) 
{
       return elementSample;
     }
 
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java
index a05efca120a..1cad5210380 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java
@@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayOutputStream;
@@ -32,10 +33,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
@@ -117,6 +121,21 @@ public class DataSamplerTest {
     assertTrue(elementList.getElementsList().containsAll(expectedSamples));
   }
 
+  void assertHasSamples(
+      BeamFnApi.InstructionResponse response,
+      String pcollection,
+      List<BeamFnApi.SampledElement> elements) {
+    Map<String, BeamFnApi.SampleDataResponse.ElementList> elementSamplesMap =
+        response.getSampleData().getElementSamplesMap();
+
+    assertFalse(elementSamplesMap.isEmpty());
+
+    BeamFnApi.SampleDataResponse.ElementList elementList = 
elementSamplesMap.get(pcollection);
+    assertNotNull(elementList);
+
+    assertTrue(elementList.getElementsList().containsAll(elements));
+  }
+
   /**
    * Smoke test that a samples show in the output map.
    *
@@ -203,7 +222,7 @@ public class DataSamplerTest {
    */
   @Test
   public void testFiltersSinglePCollectionId() throws Exception {
-    DataSampler sampler = new DataSampler(10, 10);
+    DataSampler sampler = new DataSampler(10, 10, false);
     generateStringSamples(sampler);
 
     BeamFnApi.InstructionResponse samples = getSamplesForPCollection(sampler, 
"a");
@@ -219,7 +238,7 @@ public class DataSamplerTest {
   public void testFiltersMultiplePCollectionIds() throws Exception {
     List<String> pcollectionIds = ImmutableList.of("a", "c");
 
-    DataSampler sampler = new DataSampler(10, 10);
+    DataSampler sampler = new DataSampler(10, 10, false);
     generateStringSamples(sampler);
 
     BeamFnApi.InstructionResponse samples = getSamplesForPCollections(sampler, 
pcollectionIds);
@@ -275,4 +294,87 @@ public class DataSamplerTest {
       sampleThread.join();
     }
   }
+
+  /**
+   * Tests that including the "enable_always_on_exception_sampling" can sample.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testEnableAlwaysOnExceptionSampling() throws Exception {
+    ExperimentalOptions experimentalOptions = 
PipelineOptionsFactory.as(ExperimentalOptions.class);
+    experimentalOptions.setExperiments(
+        Collections.singletonList("enable_always_on_exception_sampling"));
+    DataSampler sampler = DataSampler.create(experimentalOptions);
+    assertNotNull(sampler);
+
+    VarIntCoder coder = VarIntCoder.of();
+    OutputSampler<Integer> outputSampler = 
sampler.sampleOutput("pcollection-id", coder);
+    ElementSample<Integer> elementSample = 
outputSampler.sample(globalWindowedValue(1));
+    outputSampler.exception(elementSample, new RuntimeException(), "", "");
+
+    outputSampler.sample(globalWindowedValue(2));
+
+    BeamFnApi.InstructionResponse samples = getAllSamples(sampler);
+    List<SampledElement> expectedSamples =
+        ImmutableList.of(
+            SampledElement.newBuilder()
+                .setElement(ByteString.copyFrom(encodeInt(1)))
+                .setException(
+                    SampledElement.Exception.newBuilder()
+                        .setError(new RuntimeException().toString()))
+                .build());
+    assertHasSamples(samples, "pcollection-id", expectedSamples);
+  }
+
+  /**
+   * Tests that "disable_always_on_exception_sampling" overrides the always on 
experiment.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testDisableAlwaysOnExceptionSampling() throws Exception {
+    ExperimentalOptions experimentalOptions = 
PipelineOptionsFactory.as(ExperimentalOptions.class);
+    experimentalOptions.setExperiments(
+        ImmutableList.of(
+            "enable_always_on_exception_sampling", 
"disable_always_on_exception_sampling"));
+    DataSampler sampler = DataSampler.create(experimentalOptions);
+    assertNull(sampler);
+  }
+
+  /**
+   * Tests that the "enable_data_sampling" experiment overrides
+   * "disable_always_on_exception_sampling".
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testDisableAlwaysOnExceptionSamplingWithEnableDataSampling() 
throws Exception {
+    ExperimentalOptions experimentalOptions = 
PipelineOptionsFactory.as(ExperimentalOptions.class);
+    experimentalOptions.setExperiments(
+        ImmutableList.of(
+            "enable_data_sampling",
+            "enable_always_on_exception_sampling",
+            "disable_always_on_exception_sampling"));
+    DataSampler sampler = DataSampler.create(experimentalOptions);
+    assertNotNull(sampler);
+
+    VarIntCoder coder = VarIntCoder.of();
+    OutputSampler<Integer> outputSampler = 
sampler.sampleOutput("pcollection-id", coder);
+    ElementSample<Integer> elementSample = 
outputSampler.sample(globalWindowedValue(1));
+    outputSampler.exception(elementSample, new RuntimeException(), "", "");
+
+    outputSampler.sample(globalWindowedValue(2));
+
+    BeamFnApi.InstructionResponse samples = getAllSamples(sampler);
+    List<SampledElement> expectedSamples =
+        ImmutableList.of(
+            SampledElement.newBuilder()
+                .setElement(ByteString.copyFrom(encodeInt(1)))
+                .setException(
+                    SampledElement.Exception.newBuilder()
+                        .setError(new RuntimeException().toString()))
+                .build());
+    assertHasSamples(samples, "pcollection-id", expectedSamples);
+  }
 }
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
index 5ca562e1c24..26285205bd3 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
@@ -90,7 +90,7 @@ public class OutputSamplerTest {
   @Test
   public void testSamplesFirstN() throws IOException {
     VarIntCoder coder = VarIntCoder.of();
-    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 10, 10);
+    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 10, 10, 
false);
 
     // Purposely go over maxSamples and sampleEveryN. This helps to increase 
confidence.
     for (int i = 0; i < 15; ++i) {
@@ -112,7 +112,7 @@ public class OutputSamplerTest {
     WindowedValue.WindowedValueCoder<Integer> coder =
         WindowedValue.FullWindowedValueCoder.of(VarIntCoder.of(), 
GlobalWindow.Coder.INSTANCE);
 
-    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 10, 10);
+    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 10, 10, 
false);
     outputSampler.sample(WindowedValue.valueInGlobalWindow(0));
 
     // The expected list is only 0..9 inclusive.
@@ -125,7 +125,7 @@ public class OutputSamplerTest {
   public void testNonWindowedValueSample() throws IOException {
     VarIntCoder coder = VarIntCoder.of();
 
-    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 10, 10);
+    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 10, 10, 
false);
     outputSampler.sample(WindowedValue.valueInGlobalWindow(0));
 
     // The expected list is only 0..9 inclusive.
@@ -142,7 +142,7 @@ public class OutputSamplerTest {
   @Test
   public void testActsLikeCircularBuffer() throws IOException {
     VarIntCoder coder = VarIntCoder.of();
-    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20);
+    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20, 
false);
 
     for (int i = 0; i < 100; ++i) {
       outputSampler.sample(WindowedValue.valueInGlobalWindow(i));
@@ -171,7 +171,7 @@ public class OutputSamplerTest {
   @Test
   public void testCanSampleExceptions() throws IOException {
     VarIntCoder coder = VarIntCoder.of();
-    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20);
+    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20, 
false);
 
     WindowedValue<Integer> windowedValue = 
WindowedValue.valueInGlobalWindow(1);
     ElementSample<Integer> elementSample = outputSampler.sample(windowedValue);
@@ -197,7 +197,7 @@ public class OutputSamplerTest {
   @Test
   public void testNoDuplicateExceptions() throws IOException {
     VarIntCoder coder = VarIntCoder.of();
-    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20);
+    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20, 
false);
 
     ElementSample<Integer> elementSampleA =
         outputSampler.sample(WindowedValue.valueInGlobalWindow(1));
@@ -227,7 +227,7 @@ public class OutputSamplerTest {
   @Test
   public void testExceptionOnlySampledIfNonNullProcessBundle() throws 
IOException {
     VarIntCoder coder = VarIntCoder.of();
-    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20);
+    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20, 
false);
 
     WindowedValue<Integer> windowedValue = 
WindowedValue.valueInGlobalWindow(1);
     ElementSample<Integer> elementSample = outputSampler.sample(windowedValue);
@@ -244,15 +244,14 @@ public class OutputSamplerTest {
   }
 
   /**
-   * Tests that multiple samples don't push out exception samples. TODO: test 
that the exception
-   * metadata is set.
+   * Tests that multiple samples don't push out exception samples.
    *
    * @throws IOException when encoding fails (shouldn't happen).
    */
   @Test
   public void testExceptionSamplesAreNotRemoved() throws IOException {
     VarIntCoder coder = VarIntCoder.of();
-    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20);
+    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20, 
false);
 
     WindowedValue<Integer> windowedValue = 
WindowedValue.valueInGlobalWindow(0);
     ElementSample<Integer> elementSample = outputSampler.sample(windowedValue);
@@ -281,6 +280,32 @@ public class OutputSamplerTest {
     assertThat(samples, containsInAnyOrder(expected.toArray()));
   }
 
+  /**
+   * Test that elements the onlySampleExceptions flag works.
+   *
+   * @throws IOException when encoding fails (shouldn't happen).
+   */
+  @Test
+  public void testOnlySampleExceptions() throws IOException {
+    VarIntCoder coder = VarIntCoder.of();
+    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20, 
true);
+
+    WindowedValue<Integer> windowedValue = 
WindowedValue.valueInGlobalWindow(1);
+    outputSampler.sample(WindowedValue.valueInGlobalWindow(2));
+    ElementSample<Integer> elementSample = outputSampler.sample(windowedValue);
+
+    Exception exception = new RuntimeException("Test exception");
+    String ptransformId = "ptransform";
+    String processBundleId = "processBundle";
+    outputSampler.exception(elementSample, exception, ptransformId, 
processBundleId);
+
+    List<BeamFnApi.SampledElement> expected = new ArrayList<>();
+    expected.add(encodeException(1, exception.toString(), ptransformId, 
processBundleId));
+
+    List<BeamFnApi.SampledElement> samples = outputSampler.samples();
+    assertThat(samples, containsInAnyOrder(expected.toArray()));
+  }
+
   /**
    * Test that sampling a PCollection while retrieving samples from multiple 
threads is ok.
    *
@@ -289,7 +314,7 @@ public class OutputSamplerTest {
   @Test
   public void testConcurrentSamples() throws IOException, InterruptedException 
{
     VarIntCoder coder = VarIntCoder.of();
-    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 10, 2);
+    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 10, 2, 
false);
 
     CountDownLatch startSignal = new CountDownLatch(1);
     CountDownLatch doneSignal = new CountDownLatch(2);

Reply via email to