This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 22c6e728e82 Implement Java exception sampling (#27121)
22c6e728e82 is described below

commit 22c6e728e82cf5415e3ecd2f965d3538d026a921
Author: Sam Rohde <[email protected]>
AuthorDate: Thu Jun 15 10:36:49 2023 -0700

    Implement Java exception sampling (#27121)
    
    * Implement Java exception sampling
    
    * spotless
    
    * s/Exception/IOException
---
 .../harness/data/PCollectionConsumerRegistry.java  | 17 ++++-
 .../beam/fn/harness/debug/ElementSample.java       | 44 ++++++++++++
 .../beam/fn/harness/debug/OutputSampler.java       | 68 ++++++++++++++++---
 .../beam/fn/harness/debug/OutputSamplerTest.java   | 79 +++++++++++++++++++---
 4 files changed, 190 insertions(+), 18 deletions(-)

diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
index 84a82e83b88..150580c7f64 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
@@ -33,6 +33,7 @@ import org.apache.beam.fn.harness.control.Metrics;
 import org.apache.beam.fn.harness.control.Metrics.BundleCounter;
 import org.apache.beam.fn.harness.control.Metrics.BundleDistribution;
 import org.apache.beam.fn.harness.debug.DataSampler;
+import org.apache.beam.fn.harness.debug.ElementSample;
 import org.apache.beam.fn.harness.debug.OutputSampler;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
 import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
@@ -301,8 +302,9 @@ public class PCollectionConsumerRegistry {
       // we have window optimization.
       this.sampledByteSizeDistribution.tryUpdate(input.getValue(), this.coder);
 
+      ElementSample<T> elementSample = null;
       if (outputSampler != null) {
-        outputSampler.sample(input);
+        elementSample = outputSampler.sample(input);
       }
 
       // Use the ExecutionStateTracker and enter an appropriate state to track 
the
@@ -311,6 +313,11 @@ public class PCollectionConsumerRegistry {
       executionState.activate();
       try {
         this.delegate.accept(input);
+      } catch (Exception e) {
+        if (outputSampler != null) {
+          outputSampler.exception(elementSample, e);
+        }
+        throw e;
       } finally {
         executionState.deactivate();
       }
@@ -383,8 +390,9 @@ public class PCollectionConsumerRegistry {
       // when we have window optimization.
       this.sampledByteSizeDistribution.tryUpdate(input.getValue(), coder);
 
+      ElementSample<T> elementSample = null;
       if (outputSampler != null) {
-        outputSampler.sample(input);
+        elementSample = outputSampler.sample(input);
       }
 
       // Use the ExecutionStateTracker and enter an appropriate state to track 
the
@@ -397,6 +405,11 @@ public class PCollectionConsumerRegistry {
         state.activate();
         try {
           consumerAndMetadata.getConsumer().accept(input);
+        } catch (Exception e) {
+          if (outputSampler != null) {
+            outputSampler.exception(elementSample, e);
+          }
+          throw e;
         } finally {
           state.deactivate();
         }
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/ElementSample.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/ElementSample.java
new file mode 100644
index 00000000000..85abd02e1d9
--- /dev/null
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/ElementSample.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * A record class that wraps an element sample with additional metadata. This 
ensures the ability to
+ * add an exception to a sample even if it is pushed out of the buffer.
+ *
+ * @param <T> the element type of the PCollection.
+ */
+public class ElementSample<T> {
+  // Used for deduplication purposes. Randomly chosen, should be unique within 
a given
+  // OutputSampler.
+  public final long id;
+
+  // The element sample to be serialized and later queried.
+  public final WindowedValue<T> sample;
+
+  // An optional exception to be given as metadata on the FnApi for the given 
sample.
+  @Nullable public Exception exception = null;
+
+  ElementSample(long id, WindowedValue<T> sample) {
+    this.id = id;
+    this.sample = sample;
+  }
+}
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
index 3c8b057f44c..a81796a239a 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
@@ -19,7 +19,9 @@ package org.apache.beam.fn.harness.debug;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
@@ -37,7 +39,12 @@ import org.apache.beam.sdk.util.WindowedValue;
 public class OutputSampler<T> {
 
   // Temporarily holds elements until the SDK receives a sample data request.
-  private List<WindowedValue<T>> buffer;
+  private List<ElementSample<T>> buffer;
+
+  // Temporarily holds exceptional elements. These elements can also be 
duplicated in the main
+  // buffer. This is in order to always track exceptional elements even if the 
number of samples in
+  // the main buffer drops it.
+  private final List<ElementSample<T>> exceptions = new ArrayList<>();
 
   // Maximum number of elements in buffer.
   private final int maxElements;
@@ -83,7 +90,7 @@ public class OutputSampler<T> {
    *
    * @param element the element to sample.
    */
-  public void sample(WindowedValue<T> element) {
+  public ElementSample<T> sample(WindowedValue<T> element) {
     // Only sample the first 10 elements then after every `sampleEveryN`th 
element.
     long samples = numSamples.get() + 1;
 
@@ -91,20 +98,42 @@ public class OutputSampler<T> {
     // the slowest thread accessing the atomic. But over time, it will still 
increase. This is ok
     // because this is a debugging feature and doesn't need strict atomics.
     numSamples.lazySet(samples);
+
+    ElementSample<T> elementSample =
+        new ElementSample<>(ThreadLocalRandom.current().nextInt(), element);
     if (samples > 10 && samples % sampleEveryN != 0) {
-      return;
+      return elementSample;
     }
 
     synchronized (this) {
       // Fill buffer until maxElements.
       if (buffer.size() < maxElements) {
-        buffer.add(element);
+        buffer.add(elementSample);
       } else {
         // Then rewrite sampled elements as a circular buffer.
-        buffer.set(resampleIndex, element);
+        buffer.set(resampleIndex, elementSample);
         resampleIndex = (resampleIndex + 1) % maxElements;
       }
     }
+
+    return elementSample;
+  }
+
+  /**
+   * Samples an exceptional element to be later queried.
+   *
+   * @param elementSample the sampled element to add an exception to.
+   * @param e the exception.
+   */
+  public void exception(ElementSample<T> elementSample, Exception e) {
+    if (elementSample == null) {
+      return;
+    }
+
+    synchronized (this) {
+      elementSample.exception = e;
+      exceptions.add(elementSample);
+    }
   }
 
   /**
@@ -120,7 +149,7 @@ public class OutputSampler<T> {
 
     // Serializing can take a lot of CPU time for larger or complex elements. 
Copy the array here
     // so as to not slow down the main processing hot path.
-    List<WindowedValue<T>> bufferToSend;
+    List<ElementSample<T>> bufferToSend;
     int sampleIndex = 0;
     synchronized (this) {
       bufferToSend = buffer;
@@ -129,20 +158,43 @@ public class OutputSampler<T> {
       resampleIndex = 0;
     }
 
+    // An element can live in both the main samples and exception buffer. Use 
a small look up table
+    // to deduplicate samples.
+    HashSet<Long> seen = new HashSet<>();
     ByteStringOutputStream stream = new ByteStringOutputStream();
     for (int i = 0; i < bufferToSend.size(); i++) {
       int index = (sampleIndex + i) % bufferToSend.size();
+      ElementSample<T> sample = bufferToSend.get(index);
+      seen.add(sample.id);
+
+      if (valueCoder != null) {
+        this.valueCoder.encode(sample.sample.getValue(), stream, 
Coder.Context.NESTED);
+      } else if (windowedValueCoder != null) {
+        this.windowedValueCoder.encode(sample.sample, stream, 
Coder.Context.NESTED);
+      }
+
+      ret.add(
+          
BeamFnApi.SampledElement.newBuilder().setElement(stream.toByteStringAndReset()).build());
+    }
+
+    // TODO: set the exception metadata on the proto once that PR is merged.
+    for (ElementSample<T> sample : exceptions) {
+      if (seen.contains(sample.id)) {
+        continue;
+      }
 
       if (valueCoder != null) {
-        this.valueCoder.encode(bufferToSend.get(index).getValue(), stream, 
Coder.Context.NESTED);
+        this.valueCoder.encode(sample.sample.getValue(), stream, 
Coder.Context.NESTED);
       } else if (windowedValueCoder != null) {
-        this.windowedValueCoder.encode(bufferToSend.get(index), stream, 
Coder.Context.NESTED);
+        this.windowedValueCoder.encode(sample.sample, stream, 
Coder.Context.NESTED);
       }
 
       ret.add(
           
BeamFnApi.SampledElement.newBuilder().setElement(stream.toByteStringAndReset()).build());
     }
 
+    exceptions.clear();
+
     return ret;
   }
 }
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
index 68515c8e7b0..7946251b30f 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
@@ -62,10 +62,10 @@ public class OutputSamplerTest {
   /**
    * Test that the first N are always sampled.
    *
-   * @throws Exception when encoding fails (shouldn't happen).
+   * @throws IOException when encoding fails (shouldn't happen).
    */
   @Test
-  public void testSamplesFirstN() throws Exception {
+  public void testSamplesFirstN() throws IOException {
     VarIntCoder coder = VarIntCoder.of();
     OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 10, 10);
 
@@ -85,7 +85,7 @@ public class OutputSamplerTest {
   }
 
   @Test
-  public void testWindowedValueSample() throws Exception {
+  public void testWindowedValueSample() throws IOException {
     WindowedValue.WindowedValueCoder<Integer> coder =
         WindowedValue.FullWindowedValueCoder.of(VarIntCoder.of(), 
GlobalWindow.Coder.INSTANCE);
 
@@ -99,7 +99,7 @@ public class OutputSamplerTest {
   }
 
   @Test
-  public void testNonWindowedValueSample() throws Exception {
+  public void testNonWindowedValueSample() throws IOException {
     VarIntCoder coder = VarIntCoder.of();
 
     OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 10, 10);
@@ -114,10 +114,10 @@ public class OutputSamplerTest {
   /**
    * Test that the previous values are overwritten and only the most recent 
`maxSamples` are kept.
    *
-   * @throws Exception when encoding fails (shouldn't happen).
+   * @throws IOException when encoding fails (shouldn't happen).
    */
   @Test
-  public void testActsLikeCircularBuffer() throws Exception {
+  public void testActsLikeCircularBuffer() throws IOException {
     VarIntCoder coder = VarIntCoder.of();
     OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20);
 
@@ -139,13 +139,76 @@ public class OutputSamplerTest {
     assertThat(samples, containsInAnyOrder(expected.toArray()));
   }
 
+  /**
+   * Test that elements with exceptions can be sampled. TODO: test that the 
exception metadata is
+   * set.
+   *
+   * @throws IOException when encoding fails (shouldn't happen).
+   */
+  @Test
+  public void testCanSampleExceptions() throws IOException {
+    VarIntCoder coder = VarIntCoder.of();
+    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20);
+
+    WindowedValue<Integer> windowedValue = 
WindowedValue.valueInGlobalWindow(1);
+    ElementSample<Integer> elementSample = outputSampler.sample(windowedValue);
+
+    Exception exception = new RuntimeException("Test exception");
+    outputSampler.exception(elementSample, exception);
+
+    // The first 10 are always sampled, but with maxSamples = 5, the first ten 
are downsampled to
+    // 4..9 inclusive. Then,
+    // the 20th element is sampled (19) and every 20 after.
+    List<BeamFnApi.SampledElement> expected = new ArrayList<>();
+    expected.add(encodeInt(1));
+
+    List<BeamFnApi.SampledElement> samples = outputSampler.samples();
+    assertThat(samples, containsInAnyOrder(expected.toArray()));
+  }
+
+  /**
+   * Tests that multiple samples don't push out exception samples. TODO: test 
that the exception
+   * metadata is set.
+   *
+   * @throws IOException when encoding fails (shouldn't happen).
+   */
+  @Test
+  public void testExceptionSamplesAreNotRemoved() throws IOException {
+    VarIntCoder coder = VarIntCoder.of();
+    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20);
+
+    WindowedValue<Integer> windowedValue = 
WindowedValue.valueInGlobalWindow(0);
+    ElementSample<Integer> elementSample = outputSampler.sample(windowedValue);
+
+    for (int i = 1; i < 100; ++i) {
+      outputSampler.sample(WindowedValue.valueInGlobalWindow(i));
+    }
+
+    Exception exception = new RuntimeException("Test exception");
+    outputSampler.exception(elementSample, exception);
+
+    // The first 10 are always sampled, but with maxSamples = 5, the first ten 
are downsampled to
+    // 4..9 inclusive. Then, the 20th element is sampled (19) and every 20 
after. Finally,
+    // exceptions are added to the list.
+    List<BeamFnApi.SampledElement> expected = new ArrayList<>();
+    expected.add(encodeInt(19));
+    expected.add(encodeInt(39));
+    expected.add(encodeInt(59));
+    expected.add(encodeInt(79));
+    expected.add(encodeInt(99));
+    expected.add(encodeInt(0));
+
+    List<BeamFnApi.SampledElement> samples = outputSampler.samples();
+    assertThat(samples, containsInAnyOrder(expected.toArray()));
+  }
+
   /**
    * Test that sampling a PCollection while retrieving samples from multiple 
threads is ok.
    *
-   * @throws Exception
+   * @throws IOException, InterruptedException
    */
   @Test
-  public void testConcurrentSamples() throws Exception {
+  public void testConcurrentSamples() throws IOException, InterruptedException 
{
     VarIntCoder coder = VarIntCoder.of();
     OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 10, 2);
 

Reply via email to