This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0554372b25b Finish Java Exception Sampling (#27257)
0554372b25b is described below

commit 0554372b25bc8921a451b7ef1068f4bd8fc1651e
Author: Sam Rohde <[email protected]>
AuthorDate: Wed Jun 28 09:22:32 2023 -0700

    Finish Java Exception Sampling (#27257)
    
    * Finish Java Exception Sampling
    
    * wrong param name in comment
    
    * run tests
    
    * run tests
    
    * run tests
---
 .../fn/harness/control/ExecutionStateSampler.java  | 16 +++-
 .../harness/data/PCollectionConsumerRegistry.java  | 27 +++++--
 .../beam/fn/harness/debug/ElementSample.java       | 15 +++-
 .../beam/fn/harness/debug/OutputSampler.java       | 93 +++++++++++++++-------
 .../beam/fn/harness/debug/OutputSamplerTest.java   | 91 +++++++++++++++++++--
 .../fn/harness/status/BeamFnStatusClientTest.java  |  2 +-
 6 files changed, 198 insertions(+), 46 deletions(-)

diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java
index 2c2485dd842..c8cef8cf861 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java
@@ -363,9 +363,14 @@ public class ExecutionStateSampler {
       ExecutionStateImpl current = currentStateLazy.get();
       if (current != null) {
         return ExecutionStateTrackerStatus.create(
-            current.ptransformId, current.ptransformUniqueName, thread, 
lastTransitionTimeMs);
+            current.ptransformId,
+            current.ptransformUniqueName,
+            thread,
+            lastTransitionTimeMs,
+            processBundleId.get());
       } else {
-        return ExecutionStateTrackerStatus.create(null, null, thread, 
lastTransitionTimeMs);
+        return ExecutionStateTrackerStatus.create(
+            null, null, thread, lastTransitionTimeMs, processBundleId.get());
       }
     }
 
@@ -518,9 +523,10 @@ public class ExecutionStateSampler {
         @Nullable String ptransformId,
         @Nullable String ptransformUniqueName,
         Thread trackedThread,
-        long lastTransitionTimeMs) {
+        long lastTransitionTimeMs,
+        @Nullable String processBundleId) {
       return new AutoValue_ExecutionStateSampler_ExecutionStateTrackerStatus(
-          ptransformId, ptransformUniqueName, trackedThread, 
lastTransitionTimeMs);
+          ptransformId, ptransformUniqueName, trackedThread, 
lastTransitionTimeMs, processBundleId);
     }
 
     public abstract @Nullable String getPTransformId();
@@ -530,5 +536,7 @@ public class ExecutionStateSampler {
     public abstract Thread getTrackedThread();
 
     public abstract long getLastTransitionTimeMillis();
+
+    public abstract @Nullable String getProcessBundleId();
   }
 }
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
index 150580c7f64..a7a8766ffc7 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
@@ -27,6 +27,7 @@ import java.util.Random;
 import javax.annotation.Nullable;
 import org.apache.beam.fn.harness.HandlesSplits;
 import org.apache.beam.fn.harness.control.BundleProgressReporter;
+import org.apache.beam.fn.harness.control.ExecutionStateSampler;
 import org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionState;
 import 
org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionStateTracker;
 import org.apache.beam.fn.harness.control.Metrics;
@@ -71,9 +72,12 @@ public class PCollectionConsumerRegistry {
   @SuppressWarnings({"rawtypes"})
   abstract static class ConsumerAndMetadata {
     public static ConsumerAndMetadata forConsumer(
-        FnDataReceiver consumer, String pTransformId, ExecutionState state) {
+        FnDataReceiver consumer,
+        String pTransformId,
+        ExecutionState state,
+        ExecutionStateTracker stateTracker) {
       return new AutoValue_PCollectionConsumerRegistry_ConsumerAndMetadata(
-          consumer, pTransformId, state);
+          consumer, pTransformId, state, stateTracker);
     }
 
     public abstract FnDataReceiver getConsumer();
@@ -81,6 +85,8 @@ public class PCollectionConsumerRegistry {
     public abstract String getPTransformId();
 
     public abstract ExecutionState getExecutionState();
+
+    public abstract ExecutionStateTracker getExecutionStateTracker();
   }
 
   private final ExecutionStateTracker stateTracker;
@@ -176,7 +182,7 @@ public class PCollectionConsumerRegistry {
     List<ConsumerAndMetadata> consumerAndMetadatas =
         pCollectionIdsToConsumers.computeIfAbsent(pCollectionId, (unused) -> 
new ArrayList<>());
     consumerAndMetadatas.add(
-        ConsumerAndMetadata.forConsumer(consumer, pTransformId, 
executionState));
+        ConsumerAndMetadata.forConsumer(consumer, pTransformId, 
executionState, stateTracker));
   }
 
   /**
@@ -250,6 +256,8 @@ public class PCollectionConsumerRegistry {
     private final SampleByteSizeDistribution<T> sampledByteSizeDistribution;
     private final Coder<T> coder;
     private final @Nullable OutputSampler<T> outputSampler;
+    private final String ptransformId;
+    private final ExecutionStateTracker executionStateTracker;
 
     public MetricTrackingFnDataReceiver(
         String pCollectionId,
@@ -258,6 +266,8 @@ public class PCollectionConsumerRegistry {
         @Nullable OutputSampler<T> outputSampler) {
       this.delegate = consumerAndMetadata.getConsumer();
       this.executionState = consumerAndMetadata.getExecutionState();
+      this.executionStateTracker = 
consumerAndMetadata.getExecutionStateTracker();
+      this.ptransformId = consumerAndMetadata.getPTransformId();
 
       HashMap<String, String> labels = new HashMap<>();
       labels.put(Labels.PCOLLECTION, pCollectionId);
@@ -315,7 +325,10 @@ public class PCollectionConsumerRegistry {
         this.delegate.accept(input);
       } catch (Exception e) {
         if (outputSampler != null) {
-          outputSampler.exception(elementSample, e);
+          ExecutionStateSampler.ExecutionStateTrackerStatus status =
+              executionStateTracker.getStatus();
+          String processBundleId = status == null ? null : 
status.getProcessBundleId();
+          outputSampler.exception(elementSample, e, ptransformId, 
processBundleId);
         }
         throw e;
       } finally {
@@ -407,7 +420,11 @@ public class PCollectionConsumerRegistry {
           consumerAndMetadata.getConsumer().accept(input);
         } catch (Exception e) {
           if (outputSampler != null) {
-            outputSampler.exception(elementSample, e);
+            ExecutionStateSampler.ExecutionStateTrackerStatus status =
+                consumerAndMetadata.getExecutionStateTracker().getStatus();
+            String processBundleId = status == null ? null : 
status.getProcessBundleId();
+            outputSampler.exception(
+                elementSample, e, consumerAndMetadata.getPTransformId(), 
processBundleId);
           }
           throw e;
         } finally {
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/ElementSample.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/ElementSample.java
index 85abd02e1d9..4ef1674c9ec 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/ElementSample.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/ElementSample.java
@@ -34,8 +34,21 @@ public class ElementSample<T> {
   // The element sample to be serialized and later queried.
   public final WindowedValue<T> sample;
 
+  public static class ExceptionMetadata {
+    ExceptionMetadata(String message, String ptransformId) {
+      this.message = message;
+      this.ptransformId = ptransformId;
+    }
+
+    // The stringified exception that caused the bundle to fail.
+    public final String message;
+
+    // The PTransform of where the exception occurred first.
+    public final String ptransformId;
+  }
+
   // An optional exception to be given as metadata on the FnApi for the given 
sample.
-  @Nullable public Exception exception = null;
+  @Nullable public ExceptionMetadata exception = null;
 
   ElementSample(long id, WindowedValue<T> sample) {
     this.id = id;
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
index a81796a239a..e86b2d5b5ce 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
@@ -19,8 +19,10 @@ package org.apache.beam.fn.harness.debug;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
@@ -44,7 +46,7 @@ public class OutputSampler<T> {
   // Temporarily holds exceptional elements. These elements can also be 
duplicated in the main
   // buffer. This is in order to always track exceptional elements even if the 
number of samples in
   // the main buffer drops it.
-  private final List<ElementSample<T>> exceptions = new ArrayList<>();
+  private final Map<String, ElementSample<T>> exceptions = new HashMap<>();
 
   // Maximum number of elements in buffer.
   private final int maxElements;
@@ -120,22 +122,68 @@ public class OutputSampler<T> {
   }
 
   /**
-   * Samples an exceptional element to be later queried.
+   * Samples an exceptional element to be later queried. The enforces that 
only one exception occurs
+   * per bundle.
    *
    * @param elementSample the sampled element to add an exception to.
    * @param e the exception.
+   * @param ptransformId the source of the exception.
+   * @param processBundleId the failing bundle.
    */
-  public void exception(ElementSample<T> elementSample, Exception e) {
-    if (elementSample == null) {
+  public void exception(
+      ElementSample<T> elementSample, Exception e, String ptransformId, String 
processBundleId) {
+    if (elementSample == null || processBundleId == null) {
       return;
     }
 
     synchronized (this) {
-      elementSample.exception = e;
-      exceptions.add(elementSample);
+      exceptions.computeIfAbsent(
+          processBundleId,
+          pbId -> {
+            elementSample.exception =
+                new ElementSample.ExceptionMetadata(e.toString(), 
ptransformId);
+            return elementSample;
+          });
     }
   }
 
+  /**
+   * Fills and returns the BeamFnApi proto.
+   *
+   * @param sample the sampled element.
+   * @param stream the stream to use to serialize the element.
+   * @param processBundleId the bundle the element belongs to. Currently only 
set when there is an
+   *     exception.
+   */
+  private BeamFnApi.SampledElement sampleToProto(
+      ElementSample<T> sample, ByteStringOutputStream stream, @Nullable String 
processBundleId)
+      throws IOException {
+    if (valueCoder != null) {
+      this.valueCoder.encode(sample.sample.getValue(), stream, 
Coder.Context.NESTED);
+    } else if (windowedValueCoder != null) {
+      this.windowedValueCoder.encode(sample.sample, stream, 
Coder.Context.NESTED);
+    }
+
+    BeamFnApi.SampledElement.Builder elementBuilder =
+        
BeamFnApi.SampledElement.newBuilder().setElement(stream.toByteStringAndReset());
+
+    ElementSample.ExceptionMetadata exception = sample.exception;
+    if (exception != null) {
+      BeamFnApi.SampledElement.Exception.Builder exceptionBuilder =
+          BeamFnApi.SampledElement.Exception.newBuilder()
+              .setTransformId(exception.ptransformId)
+              .setError(exception.message);
+
+      if (processBundleId != null) {
+        exceptionBuilder.setInstructionId(processBundleId);
+      }
+
+      elementBuilder.setException(exceptionBuilder);
+    }
+
+    return elementBuilder.build();
+  }
+
   /**
    * Clears samples at end of call. This is to help mitigate memory use.
    *
@@ -162,39 +210,26 @@ public class OutputSampler<T> {
     // to deduplicate samples.
     HashSet<Long> seen = new HashSet<>();
     ByteStringOutputStream stream = new ByteStringOutputStream();
-    for (int i = 0; i < bufferToSend.size(); i++) {
-      int index = (sampleIndex + i) % bufferToSend.size();
-      ElementSample<T> sample = bufferToSend.get(index);
+    for (Map.Entry<String, ElementSample<T>> pair : exceptions.entrySet()) {
+      String processBundleId = pair.getKey();
+      ElementSample<T> sample = pair.getValue();
       seen.add(sample.id);
 
-      if (valueCoder != null) {
-        this.valueCoder.encode(sample.sample.getValue(), stream, 
Coder.Context.NESTED);
-      } else if (windowedValueCoder != null) {
-        this.windowedValueCoder.encode(sample.sample, stream, 
Coder.Context.NESTED);
-      }
-
-      ret.add(
-          
BeamFnApi.SampledElement.newBuilder().setElement(stream.toByteStringAndReset()).build());
+      ret.add(sampleToProto(sample, stream, processBundleId));
     }
+    exceptions.clear();
 
-    // TODO: set the exception metadata on the proto once that PR is merged.
-    for (ElementSample<T> sample : exceptions) {
+    for (int i = 0; i < bufferToSend.size(); i++) {
+      int index = (sampleIndex + i) % bufferToSend.size();
+
+      ElementSample<T> sample = bufferToSend.get(index);
       if (seen.contains(sample.id)) {
         continue;
       }
 
-      if (valueCoder != null) {
-        this.valueCoder.encode(sample.sample.getValue(), stream, 
Coder.Context.NESTED);
-      } else if (windowedValueCoder != null) {
-        this.windowedValueCoder.encode(sample.sample, stream, 
Coder.Context.NESTED);
-      }
-
-      ret.add(
-          
BeamFnApi.SampledElement.newBuilder().setElement(stream.toByteStringAndReset()).build());
+      ret.add(sampleToProto(sample, stream, null));
     }
 
-    exceptions.clear();
-
     return ret;
   }
 }
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
index 7946251b30f..761f710b0ba 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import javax.annotation.Nullable;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -59,6 +60,28 @@ public class OutputSamplerTest {
         .build();
   }
 
+  public BeamFnApi.SampledElement encodeException(
+      Integer i, String error, String ptransformId, @Nullable String 
processBundleId)
+      throws IOException {
+    VarIntCoder coder = VarIntCoder.of();
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    coder.encode(i, stream);
+
+    BeamFnApi.SampledElement.Exception.Builder builder =
+        BeamFnApi.SampledElement.Exception.newBuilder()
+            .setTransformId(ptransformId)
+            .setError(error);
+
+    if (processBundleId != null) {
+      builder.setInstructionId(processBundleId);
+    }
+
+    return BeamFnApi.SampledElement.newBuilder()
+        .setElement(ByteString.copyFrom(stream.toByteArray()))
+        .setException(builder)
+        .build();
+  }
+
   /**
    * Test that the first N are always sampled.
    *
@@ -154,11 +177,65 @@ public class OutputSamplerTest {
     ElementSample<Integer> elementSample = outputSampler.sample(windowedValue);
 
     Exception exception = new RuntimeException("Test exception");
-    outputSampler.exception(elementSample, exception);
+    String ptransformId = "ptransform";
+    String processBundleId = "processBundle";
+    outputSampler.exception(elementSample, exception, ptransformId, 
processBundleId);
+
+    List<BeamFnApi.SampledElement> expected = new ArrayList<>();
+    expected.add(encodeException(1, exception.toString(), ptransformId, 
processBundleId));
+
+    List<BeamFnApi.SampledElement> samples = outputSampler.samples();
+    assertThat(samples, containsInAnyOrder(expected.toArray()));
+  }
+
+  /**
+   * Test that in the event that an exception happens multiple times in a 
bundle, it's only recorded
+   * at the source.
+   *
+   * @throws IOException when encoding fails (shouldn't happen).
+   */
+  @Test
+  public void testNoDuplicateExceptions() throws IOException {
+    VarIntCoder coder = VarIntCoder.of();
+    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20);
+
+    ElementSample<Integer> elementSampleA =
+        outputSampler.sample(WindowedValue.valueInGlobalWindow(1));
+    ElementSample<Integer> elementSampleB =
+        outputSampler.sample(WindowedValue.valueInGlobalWindow(2));
+
+    Exception exception = new RuntimeException("Test exception");
+    String ptransformIdA = "ptransformA";
+    String ptransformIdB = "ptransformB";
+    String processBundleId = "processBundle";
+    outputSampler.exception(elementSampleA, exception, ptransformIdA, 
processBundleId);
+    outputSampler.exception(elementSampleB, exception, ptransformIdB, 
processBundleId);
+
+    List<BeamFnApi.SampledElement> expected = new ArrayList<>();
+    expected.add(encodeException(1, exception.toString(), ptransformIdA, 
processBundleId));
+    expected.add(encodeInt(2));
+
+    List<BeamFnApi.SampledElement> samples = outputSampler.samples();
+    assertThat(samples, containsInAnyOrder(expected.toArray()));
+  }
+
+  /**
+   * Test that exception metadata is only set if there is a process bundle.
+   *
+   * @throws IOException when encoding fails (shouldn't happen).
+   */
+  @Test
+  public void testExceptionOnlySampledIfNonNullProcessBundle() throws 
IOException {
+    VarIntCoder coder = VarIntCoder.of();
+    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20);
+
+    WindowedValue<Integer> windowedValue = 
WindowedValue.valueInGlobalWindow(1);
+    ElementSample<Integer> elementSample = outputSampler.sample(windowedValue);
+
+    Exception exception = new RuntimeException("Test exception");
+    String ptransformId = "ptransform";
+    outputSampler.exception(elementSample, exception, ptransformId, null);
 
-    // The first 10 are always sampled, but with maxSamples = 5, the first ten 
are downsampled to
-    // 4..9 inclusive. Then,
-    // the 20th element is sampled (19) and every 20 after.
     List<BeamFnApi.SampledElement> expected = new ArrayList<>();
     expected.add(encodeInt(1));
 
@@ -185,7 +262,9 @@ public class OutputSamplerTest {
     }
 
     Exception exception = new RuntimeException("Test exception");
-    outputSampler.exception(elementSample, exception);
+    String ptransformId = "ptransform";
+    String processBundleId = "processBundle";
+    outputSampler.exception(elementSample, exception, ptransformId, 
processBundleId);
 
     // The first 10 are always sampled, but with maxSamples = 5, the first ten 
are downsampled to
     // 4..9 inclusive. Then, the 20th element is sampled (19) and every 20 
after. Finally,
@@ -196,7 +275,7 @@ public class OutputSamplerTest {
     expected.add(encodeInt(59));
     expected.add(encodeInt(79));
     expected.add(encodeInt(99));
-    expected.add(encodeInt(0));
+    expected.add(encodeException(0, exception.toString(), ptransformId, 
processBundleId));
 
     List<BeamFnApi.SampledElement> samples = outputSampler.samples();
     assertThat(samples, containsInAnyOrder(expected.toArray()));
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/BeamFnStatusClientTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/BeamFnStatusClientTest.java
index f4265b15834..7a71a58d04c 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/BeamFnStatusClientTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/BeamFnStatusClientTest.java
@@ -75,7 +75,7 @@ public class BeamFnStatusClientTest {
       when(executionStateTracker.getStatus())
           .thenReturn(
               ExecutionStateTrackerStatus.create(
-                  "ptransformId", "ptransformIdName", Thread.currentThread(), 
i * 1000));
+                  "ptransformId", "ptransformIdName", Thread.currentThread(), 
i * 1000, null));
       String instruction = Integer.toString(i);
       when(processorCache.find(instruction)).thenReturn(processor);
       bundleProcessorMap.put(instruction, processor);

Reply via email to