This is an automated email from the ASF dual-hosted git repository.
damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0554372b25b Finish Java Exception Sampling (#27257)
0554372b25b is described below
commit 0554372b25bc8921a451b7ef1068f4bd8fc1651e
Author: Sam Rohde <[email protected]>
AuthorDate: Wed Jun 28 09:22:32 2023 -0700
Finish Java Exception Sampling (#27257)
* Finish Java Exception Sampling
* wrong param name in comment
* run tests
* run tests
* run tests
---
.../fn/harness/control/ExecutionStateSampler.java | 16 +++-
.../harness/data/PCollectionConsumerRegistry.java | 27 +++++--
.../beam/fn/harness/debug/ElementSample.java | 15 +++-
.../beam/fn/harness/debug/OutputSampler.java | 93 +++++++++++++++-------
.../beam/fn/harness/debug/OutputSamplerTest.java | 91 +++++++++++++++++++--
.../fn/harness/status/BeamFnStatusClientTest.java | 2 +-
6 files changed, 198 insertions(+), 46 deletions(-)
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java
index 2c2485dd842..c8cef8cf861 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java
@@ -363,9 +363,14 @@ public class ExecutionStateSampler {
ExecutionStateImpl current = currentStateLazy.get();
if (current != null) {
return ExecutionStateTrackerStatus.create(
- current.ptransformId, current.ptransformUniqueName, thread,
lastTransitionTimeMs);
+ current.ptransformId,
+ current.ptransformUniqueName,
+ thread,
+ lastTransitionTimeMs,
+ processBundleId.get());
} else {
- return ExecutionStateTrackerStatus.create(null, null, thread,
lastTransitionTimeMs);
+ return ExecutionStateTrackerStatus.create(
+ null, null, thread, lastTransitionTimeMs, processBundleId.get());
}
}
@@ -518,9 +523,10 @@ public class ExecutionStateSampler {
@Nullable String ptransformId,
@Nullable String ptransformUniqueName,
Thread trackedThread,
- long lastTransitionTimeMs) {
+ long lastTransitionTimeMs,
+ @Nullable String processBundleId) {
return new AutoValue_ExecutionStateSampler_ExecutionStateTrackerStatus(
- ptransformId, ptransformUniqueName, trackedThread,
lastTransitionTimeMs);
+ ptransformId, ptransformUniqueName, trackedThread,
lastTransitionTimeMs, processBundleId);
}
public abstract @Nullable String getPTransformId();
@@ -530,5 +536,7 @@ public class ExecutionStateSampler {
public abstract Thread getTrackedThread();
public abstract long getLastTransitionTimeMillis();
+
+ public abstract @Nullable String getProcessBundleId();
}
}
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
index 150580c7f64..a7a8766ffc7 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
@@ -27,6 +27,7 @@ import java.util.Random;
import javax.annotation.Nullable;
import org.apache.beam.fn.harness.HandlesSplits;
import org.apache.beam.fn.harness.control.BundleProgressReporter;
+import org.apache.beam.fn.harness.control.ExecutionStateSampler;
import org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionState;
import
org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionStateTracker;
import org.apache.beam.fn.harness.control.Metrics;
@@ -71,9 +72,12 @@ public class PCollectionConsumerRegistry {
@SuppressWarnings({"rawtypes"})
abstract static class ConsumerAndMetadata {
public static ConsumerAndMetadata forConsumer(
- FnDataReceiver consumer, String pTransformId, ExecutionState state) {
+ FnDataReceiver consumer,
+ String pTransformId,
+ ExecutionState state,
+ ExecutionStateTracker stateTracker) {
return new AutoValue_PCollectionConsumerRegistry_ConsumerAndMetadata(
- consumer, pTransformId, state);
+ consumer, pTransformId, state, stateTracker);
}
public abstract FnDataReceiver getConsumer();
@@ -81,6 +85,8 @@ public class PCollectionConsumerRegistry {
public abstract String getPTransformId();
public abstract ExecutionState getExecutionState();
+
+ public abstract ExecutionStateTracker getExecutionStateTracker();
}
private final ExecutionStateTracker stateTracker;
@@ -176,7 +182,7 @@ public class PCollectionConsumerRegistry {
List<ConsumerAndMetadata> consumerAndMetadatas =
pCollectionIdsToConsumers.computeIfAbsent(pCollectionId, (unused) ->
new ArrayList<>());
consumerAndMetadatas.add(
- ConsumerAndMetadata.forConsumer(consumer, pTransformId,
executionState));
+ ConsumerAndMetadata.forConsumer(consumer, pTransformId,
executionState, stateTracker));
}
/**
@@ -250,6 +256,8 @@ public class PCollectionConsumerRegistry {
private final SampleByteSizeDistribution<T> sampledByteSizeDistribution;
private final Coder<T> coder;
private final @Nullable OutputSampler<T> outputSampler;
+ private final String ptransformId;
+ private final ExecutionStateTracker executionStateTracker;
public MetricTrackingFnDataReceiver(
String pCollectionId,
@@ -258,6 +266,8 @@ public class PCollectionConsumerRegistry {
@Nullable OutputSampler<T> outputSampler) {
this.delegate = consumerAndMetadata.getConsumer();
this.executionState = consumerAndMetadata.getExecutionState();
+ this.executionStateTracker =
consumerAndMetadata.getExecutionStateTracker();
+ this.ptransformId = consumerAndMetadata.getPTransformId();
HashMap<String, String> labels = new HashMap<>();
labels.put(Labels.PCOLLECTION, pCollectionId);
@@ -315,7 +325,10 @@ public class PCollectionConsumerRegistry {
this.delegate.accept(input);
} catch (Exception e) {
if (outputSampler != null) {
- outputSampler.exception(elementSample, e);
+ ExecutionStateSampler.ExecutionStateTrackerStatus status =
+ executionStateTracker.getStatus();
+ String processBundleId = status == null ? null :
status.getProcessBundleId();
+ outputSampler.exception(elementSample, e, ptransformId,
processBundleId);
}
throw e;
} finally {
@@ -407,7 +420,11 @@ public class PCollectionConsumerRegistry {
consumerAndMetadata.getConsumer().accept(input);
} catch (Exception e) {
if (outputSampler != null) {
- outputSampler.exception(elementSample, e);
+ ExecutionStateSampler.ExecutionStateTrackerStatus status =
+ consumerAndMetadata.getExecutionStateTracker().getStatus();
+ String processBundleId = status == null ? null :
status.getProcessBundleId();
+ outputSampler.exception(
+ elementSample, e, consumerAndMetadata.getPTransformId(),
processBundleId);
}
throw e;
} finally {
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/ElementSample.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/ElementSample.java
index 85abd02e1d9..4ef1674c9ec 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/ElementSample.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/ElementSample.java
@@ -34,8 +34,21 @@ public class ElementSample<T> {
// The element sample to be serialized and later queried.
public final WindowedValue<T> sample;
+ public static class ExceptionMetadata {
+ ExceptionMetadata(String message, String ptransformId) {
+ this.message = message;
+ this.ptransformId = ptransformId;
+ }
+
+ // The stringified exception that caused the bundle to fail.
+ public final String message;
+
+ // The PTransform of where the exception occurred first.
+ public final String ptransformId;
+ }
+
// An optional exception to be given as metadata on the FnApi for the given
sample.
- @Nullable public Exception exception = null;
+ @Nullable public ExceptionMetadata exception = null;
ElementSample(long id, WindowedValue<T> sample) {
this.id = id;
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
index a81796a239a..e86b2d5b5ce 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java
@@ -19,8 +19,10 @@ package org.apache.beam.fn.harness.debug;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
@@ -44,7 +46,7 @@ public class OutputSampler<T> {
// Temporarily holds exceptional elements. These elements can also be
duplicated in the main
// buffer. This is in order to always track exceptional elements even if the
number of samples in
// the main buffer drops it.
- private final List<ElementSample<T>> exceptions = new ArrayList<>();
+ private final Map<String, ElementSample<T>> exceptions = new HashMap<>();
// Maximum number of elements in buffer.
private final int maxElements;
@@ -120,22 +122,68 @@ public class OutputSampler<T> {
}
/**
- * Samples an exceptional element to be later queried.
+ * Samples an exceptional element to be later queried. The enforces that
only one exception occurs
+ * per bundle.
*
* @param elementSample the sampled element to add an exception to.
* @param e the exception.
+ * @param ptransformId the source of the exception.
+ * @param processBundleId the failing bundle.
*/
- public void exception(ElementSample<T> elementSample, Exception e) {
- if (elementSample == null) {
+ public void exception(
+ ElementSample<T> elementSample, Exception e, String ptransformId, String
processBundleId) {
+ if (elementSample == null || processBundleId == null) {
return;
}
synchronized (this) {
- elementSample.exception = e;
- exceptions.add(elementSample);
+ exceptions.computeIfAbsent(
+ processBundleId,
+ pbId -> {
+ elementSample.exception =
+ new ElementSample.ExceptionMetadata(e.toString(),
ptransformId);
+ return elementSample;
+ });
}
}
+ /**
+ * Fills and returns the BeamFnApi proto.
+ *
+ * @param sample the sampled element.
+ * @param stream the stream to use to serialize the element.
+ * @param processBundleId the bundle the element belongs to. Currently only
set when there is an
+ * exception.
+ */
+ private BeamFnApi.SampledElement sampleToProto(
+ ElementSample<T> sample, ByteStringOutputStream stream, @Nullable String
processBundleId)
+ throws IOException {
+ if (valueCoder != null) {
+ this.valueCoder.encode(sample.sample.getValue(), stream,
Coder.Context.NESTED);
+ } else if (windowedValueCoder != null) {
+ this.windowedValueCoder.encode(sample.sample, stream,
Coder.Context.NESTED);
+ }
+
+ BeamFnApi.SampledElement.Builder elementBuilder =
+
BeamFnApi.SampledElement.newBuilder().setElement(stream.toByteStringAndReset());
+
+ ElementSample.ExceptionMetadata exception = sample.exception;
+ if (exception != null) {
+ BeamFnApi.SampledElement.Exception.Builder exceptionBuilder =
+ BeamFnApi.SampledElement.Exception.newBuilder()
+ .setTransformId(exception.ptransformId)
+ .setError(exception.message);
+
+ if (processBundleId != null) {
+ exceptionBuilder.setInstructionId(processBundleId);
+ }
+
+ elementBuilder.setException(exceptionBuilder);
+ }
+
+ return elementBuilder.build();
+ }
+
/**
* Clears samples at end of call. This is to help mitigate memory use.
*
@@ -162,39 +210,26 @@ public class OutputSampler<T> {
// to deduplicate samples.
HashSet<Long> seen = new HashSet<>();
ByteStringOutputStream stream = new ByteStringOutputStream();
- for (int i = 0; i < bufferToSend.size(); i++) {
- int index = (sampleIndex + i) % bufferToSend.size();
- ElementSample<T> sample = bufferToSend.get(index);
+ for (Map.Entry<String, ElementSample<T>> pair : exceptions.entrySet()) {
+ String processBundleId = pair.getKey();
+ ElementSample<T> sample = pair.getValue();
seen.add(sample.id);
- if (valueCoder != null) {
- this.valueCoder.encode(sample.sample.getValue(), stream,
Coder.Context.NESTED);
- } else if (windowedValueCoder != null) {
- this.windowedValueCoder.encode(sample.sample, stream,
Coder.Context.NESTED);
- }
-
- ret.add(
-
BeamFnApi.SampledElement.newBuilder().setElement(stream.toByteStringAndReset()).build());
+ ret.add(sampleToProto(sample, stream, processBundleId));
}
+ exceptions.clear();
- // TODO: set the exception metadata on the proto once that PR is merged.
- for (ElementSample<T> sample : exceptions) {
+ for (int i = 0; i < bufferToSend.size(); i++) {
+ int index = (sampleIndex + i) % bufferToSend.size();
+
+ ElementSample<T> sample = bufferToSend.get(index);
if (seen.contains(sample.id)) {
continue;
}
- if (valueCoder != null) {
- this.valueCoder.encode(sample.sample.getValue(), stream,
Coder.Context.NESTED);
- } else if (windowedValueCoder != null) {
- this.windowedValueCoder.encode(sample.sample, stream,
Coder.Context.NESTED);
- }
-
- ret.add(
-
BeamFnApi.SampledElement.newBuilder().setElement(stream.toByteStringAndReset()).build());
+ ret.add(sampleToProto(sample, stream, null));
}
- exceptions.clear();
-
return ret;
}
}
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
index 7946251b30f..761f710b0ba 100644
---
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -59,6 +60,28 @@ public class OutputSamplerTest {
.build();
}
+ public BeamFnApi.SampledElement encodeException(
+ Integer i, String error, String ptransformId, @Nullable String
processBundleId)
+ throws IOException {
+ VarIntCoder coder = VarIntCoder.of();
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ coder.encode(i, stream);
+
+ BeamFnApi.SampledElement.Exception.Builder builder =
+ BeamFnApi.SampledElement.Exception.newBuilder()
+ .setTransformId(ptransformId)
+ .setError(error);
+
+ if (processBundleId != null) {
+ builder.setInstructionId(processBundleId);
+ }
+
+ return BeamFnApi.SampledElement.newBuilder()
+ .setElement(ByteString.copyFrom(stream.toByteArray()))
+ .setException(builder)
+ .build();
+ }
+
/**
* Test that the first N are always sampled.
*
@@ -154,11 +177,65 @@ public class OutputSamplerTest {
ElementSample<Integer> elementSample = outputSampler.sample(windowedValue);
Exception exception = new RuntimeException("Test exception");
- outputSampler.exception(elementSample, exception);
+ String ptransformId = "ptransform";
+ String processBundleId = "processBundle";
+ outputSampler.exception(elementSample, exception, ptransformId,
processBundleId);
+
+ List<BeamFnApi.SampledElement> expected = new ArrayList<>();
+ expected.add(encodeException(1, exception.toString(), ptransformId,
processBundleId));
+
+ List<BeamFnApi.SampledElement> samples = outputSampler.samples();
+ assertThat(samples, containsInAnyOrder(expected.toArray()));
+ }
+
+ /**
+ * Test that in the event that an exception happens multiple times in a
bundle, it's only recorded
+ * at the source.
+ *
+ * @throws IOException when encoding fails (shouldn't happen).
+ */
+ @Test
+ public void testNoDuplicateExceptions() throws IOException {
+ VarIntCoder coder = VarIntCoder.of();
+ OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20);
+
+ ElementSample<Integer> elementSampleA =
+ outputSampler.sample(WindowedValue.valueInGlobalWindow(1));
+ ElementSample<Integer> elementSampleB =
+ outputSampler.sample(WindowedValue.valueInGlobalWindow(2));
+
+ Exception exception = new RuntimeException("Test exception");
+ String ptransformIdA = "ptransformA";
+ String ptransformIdB = "ptransformB";
+ String processBundleId = "processBundle";
+ outputSampler.exception(elementSampleA, exception, ptransformIdA,
processBundleId);
+ outputSampler.exception(elementSampleB, exception, ptransformIdB,
processBundleId);
+
+ List<BeamFnApi.SampledElement> expected = new ArrayList<>();
+ expected.add(encodeException(1, exception.toString(), ptransformIdA,
processBundleId));
+ expected.add(encodeInt(2));
+
+ List<BeamFnApi.SampledElement> samples = outputSampler.samples();
+ assertThat(samples, containsInAnyOrder(expected.toArray()));
+ }
+
+ /**
+ * Test that exception metadata is only set if there is a process bundle.
+ *
+ * @throws IOException when encoding fails (shouldn't happen).
+ */
+ @Test
+ public void testExceptionOnlySampledIfNonNullProcessBundle() throws
IOException {
+ VarIntCoder coder = VarIntCoder.of();
+ OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 5, 20);
+
+ WindowedValue<Integer> windowedValue =
WindowedValue.valueInGlobalWindow(1);
+ ElementSample<Integer> elementSample = outputSampler.sample(windowedValue);
+
+ Exception exception = new RuntimeException("Test exception");
+ String ptransformId = "ptransform";
+ outputSampler.exception(elementSample, exception, ptransformId, null);
- // The first 10 are always sampled, but with maxSamples = 5, the first ten
are downsampled to
- // 4..9 inclusive. Then,
- // the 20th element is sampled (19) and every 20 after.
List<BeamFnApi.SampledElement> expected = new ArrayList<>();
expected.add(encodeInt(1));
@@ -185,7 +262,9 @@ public class OutputSamplerTest {
}
Exception exception = new RuntimeException("Test exception");
- outputSampler.exception(elementSample, exception);
+ String ptransformId = "ptransform";
+ String processBundleId = "processBundle";
+ outputSampler.exception(elementSample, exception, ptransformId,
processBundleId);
// The first 10 are always sampled, but with maxSamples = 5, the first ten
are downsampled to
// 4..9 inclusive. Then, the 20th element is sampled (19) and every 20
after. Finally,
@@ -196,7 +275,7 @@ public class OutputSamplerTest {
expected.add(encodeInt(59));
expected.add(encodeInt(79));
expected.add(encodeInt(99));
- expected.add(encodeInt(0));
+ expected.add(encodeException(0, exception.toString(), ptransformId,
processBundleId));
List<BeamFnApi.SampledElement> samples = outputSampler.samples();
assertThat(samples, containsInAnyOrder(expected.toArray()));
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/BeamFnStatusClientTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/BeamFnStatusClientTest.java
index f4265b15834..7a71a58d04c 100644
---
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/BeamFnStatusClientTest.java
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/BeamFnStatusClientTest.java
@@ -75,7 +75,7 @@ public class BeamFnStatusClientTest {
when(executionStateTracker.getStatus())
.thenReturn(
ExecutionStateTrackerStatus.create(
- "ptransformId", "ptransformIdName", Thread.currentThread(),
i * 1000));
+ "ptransformId", "ptransformIdName", Thread.currentThread(),
i * 1000, null));
String instruction = Integer.toString(i);
when(processorCache.find(instruction)).thenReturn(processor);
bundleProcessorMap.put(instruction, processor);