lukecwik commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1099191235
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java:
##########
@@ -166,6 +157,8 @@ public class ProcessBundleHandler {
@VisibleForTesting final BundleProcessorCache bundleProcessorCache;
private final Set<String> runnerCapabilities;
+ private DataSampler dataSampler;
Review Comment:
```suggestion
private final DataSampler dataSampler;
```
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import
org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates
taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to
executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different
PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+ /** Creates a DataSampler to sample every 10 elements while keeping a
maximum of 10 in memory. */
+ public DataSampler() {}
+
+ /**
+ * @param maxSamples Sets the maximum number of samples held in memory at
once.
+ * @param sampleEveryN Sets how often to sample.
+ */
+ public DataSampler(int maxSamples, int sampleEveryN) {
+ this.maxSamples = maxSamples;
+ this.sampleEveryN = sampleEveryN;
+ }
+
+ // Maximum number of elements in buffer.
+ private int maxSamples = 10;
Review Comment:
```suggestion
private final int maxSamples = 10;
```
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import
org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates
taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to
executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different
PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+ /** Creates a DataSampler to sample every 10 elements while keeping a
maximum of 10 in memory. */
+ public DataSampler() {}
+
+ /**
+ * @param maxSamples Sets the maximum number of samples held in memory at
once.
+ * @param sampleEveryN Sets how often to sample.
+ */
+ public DataSampler(int maxSamples, int sampleEveryN) {
+ this.maxSamples = maxSamples;
+ this.sampleEveryN = sampleEveryN;
+ }
+
+ // Maximum number of elements in buffer.
+ private int maxSamples = 10;
+
+ // Sampling rate.
+ private int sampleEveryN = 1000;
+
+ // The fully-qualified type is: Map[ProcessBundleDescriptorId,
[PCollectionId, OutputSampler]].
+ // The DataSampler object lives on the same level of the FnHarness. This
means that many threads
+ // can and will
+ // access this simultaneously. However, ProcessBundleDescriptors are unique
per thread, so only
+ // synchronization
+ // is needed on the outermost map.
+ private final Map<String, Map<String, OutputSampler<?>>> outputSamplers =
+ new ConcurrentHashMap<>();
+
+ /**
+ * Creates and returns a class to sample the given PCollection in the given
+ * ProcessBundleDescriptor. Uses the given coder encode samples as bytes
when responding to a
+ * SampleDataRequest.
+ *
+ * @param processBundleDescriptorId The PBD to sample from.
+ * @param pcollectionId The PCollection to take intermittent samples from.
+ * @param coder The coder associated with the PCollection. Coder may be from
a nested context.
+ * @return the OutputSampler corresponding to the unique PBD and PCollection.
+ * @param <T> The type of element contained in the PCollection.
+ */
+ public <T> OutputSampler<T> sampleOutput(
+ String processBundleDescriptorId, String pcollectionId, Coder<T> coder) {
+ outputSamplers.putIfAbsent(processBundleDescriptorId, new HashMap<>());
+ Map<String, OutputSampler<?>> samplers =
outputSamplers.get(processBundleDescriptorId);
+ samplers.putIfAbsent(
+ pcollectionId, new OutputSampler<T>(coder, this.maxSamples,
this.sampleEveryN));
+
+ return (OutputSampler<T>) samplers.get(pcollectionId);
+ }
+
+ /**
+ * Returns all collected samples. Thread-safe.
+ *
+ * @param request The instruction request from the FnApi. Filters based on
the given
+ * SampleDataRequest.
+ * @return Returns all collected samples.
+ */
+ public BeamFnApi.InstructionResponse.Builder handleDataSampleRequest(
+ BeamFnApi.InstructionRequest request) {
+ BeamFnApi.SampleDataRequest sampleDataRequest = request.getSample();
+
+ Map<String, List<byte[]>> responseSamples =
+ samplesFor(
+
ImmutableSet.copyOf(sampleDataRequest.getProcessBundleDescriptorIdsList()),
Review Comment:
why copy?
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import
org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates
taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to
executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different
PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+ /** Creates a DataSampler to sample every 10 elements while keeping a
maximum of 10 in memory. */
+ public DataSampler() {}
+
+ /**
+ * @param maxSamples Sets the maximum number of samples held in memory at
once.
+ * @param sampleEveryN Sets how often to sample.
+ */
+ public DataSampler(int maxSamples, int sampleEveryN) {
+ this.maxSamples = maxSamples;
+ this.sampleEveryN = sampleEveryN;
+ }
+
+ // Maximum number of elements in buffer.
+ private int maxSamples = 10;
+
+ // Sampling rate.
+ private int sampleEveryN = 1000;
+
+ // The fully-qualified type is: Map[ProcessBundleDescriptorId,
[PCollectionId, OutputSampler]].
+ // The DataSampler object lives on the same level of the FnHarness. This
means that many threads
+ // can and will
+ // access this simultaneously. However, ProcessBundleDescriptors are unique
per thread, so only
Review Comment:
This is not true, ProcessBundleDescriptors can be re-used across multiple
process bundle requests. We see this regularly on Dataflow streaming pipelines
and under some circumstances on batch pipelines.
This is true for ProcessBundleIds but I don't think that is what you want.
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import
org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates
taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to
executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different
PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+ /** Creates a DataSampler to sample every 10 elements while keeping a
maximum of 10 in memory. */
+ public DataSampler() {}
+
+ /**
+ * @param maxSamples Sets the maximum number of samples held in memory at
once.
+ * @param sampleEveryN Sets how often to sample.
+ */
+ public DataSampler(int maxSamples, int sampleEveryN) {
+ this.maxSamples = maxSamples;
+ this.sampleEveryN = sampleEveryN;
+ }
+
+ // Maximum number of elements in buffer.
+ private int maxSamples = 10;
+
+ // Sampling rate.
+ private int sampleEveryN = 1000;
Review Comment:
```suggestion
private final int sampleEveryN = 1000;
```
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class holds samples for a single PCollection until queried by the
parent DataSampler. This
+ * class is meant to hold only a limited number of elements in memory. So old
values are constantly
+ * being overridden in a circular buffer.
+ *
+ * @param <T> the element type of the PCollection.
+ */
+public class OutputSampler<T> {
+ private final Coder<T> coder;
+ private final List<T> buffer = new ArrayList<>();
+ private static final Logger LOG =
LoggerFactory.getLogger(OutputSampler.class);
+
+ // Maximum number of elements in buffer.
+ private int maxElements = 10;
+
+ // Sampling rate.
+ private int sampleEveryN = 1000;
+
+ // Total number of samples taken.
+ private long numSamples = 0;
+
+ // Index into the buffer of where to overwrite samples.
+ private int resampleIndex = 0;
+
+ public OutputSampler(Coder<T> coder) {
+ this.coder = coder;
+ }
+
+ public OutputSampler(Coder<T> coder, int maxElements, int sampleEveryN) {
+ this(coder);
+ this.maxElements = maxElements;
+ this.sampleEveryN = sampleEveryN;
+ }
+
+ /**
+ * Samples every 1000th element or if it is part of the first 10 in the
(local) PCollection.
+ *
+ * @param element the element to sample.
+ */
+ public void sample(T element) {
+ // Only sample the first 10 elements then after every `sampleEveryN`th
element.
+ numSamples += 1;
+ if (numSamples > 10 && numSamples % sampleEveryN != 0) {
+ return;
+ }
+
+ // Fill buffer until maxElements.
+ if (buffer.size() < maxElements) {
+ buffer.add(element);
+ } else {
+ // Then rewrite sampled elements as a circular buffer.
+ buffer.set(resampleIndex, element);
+ resampleIndex = (resampleIndex + 1) % maxElements;
+ }
+ }
+
+ /**
+ * Clears samples at end of call. This is to help mitigate memory use.
+ *
+ * @return samples taken since last call.
+ */
+ public List<byte[]> samples() {
Review Comment:
note that sample() and samples() will be invoked by two different threads
(say T1 and T2).
sample() is invoked a lot and samples() is invoked rarely. Currently there
is no synchronization between T1 and T2 which means that the changes from T1 do
not have to be made visible to the T2 and vice versa. It will be dependent on
the CPU architecture and the contents of the registers/L* caches.
Note that clear() by T2 in the current implementation is unlikely to be seen
by T1 and will be overwritten whenever T1 adds a value.
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSamplingDescriptorModifier.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import org.apache.beam.fn.harness.ProcessBundleDescriptorModifier;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+
+/**
+ * Modifies the given ProcessBundleDescriptor by adding a DataSampling
operation as a consumer to
+ * every PCollection.
+ */
+public class DataSamplingDescriptorModifier implements
ProcessBundleDescriptorModifier {
Review Comment:
Why did you want to insert this as a separate transform in the graph instead
of modifying
https://github.com/apache/beam/blob/d20d0b01c3c6bcde551420f36e13d794c930f1e2/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java#L174
to support passing elements to the OutputSampler or being an OutputSampler
itself?
Also typically we would ask the runner to do these PBD modifications itself
so it could selectively choose which PCollections to sample and which not to.
This would also remove the need for the SDK to check for an experiment, allow
the runner to choose which to sample without needing to pass in PCollection ids
(the PBD ids would be enough). There is a cost though since transforms impose
msec and state transition book keeping overhead which is not insignificant and
you'll make the current single consumer hot-path in PCollectionConsumerRegistry
always use the multi-consumer variant which has additional overhead.
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class holds samples for a single PCollection until queried by the
parent DataSampler. This
+ * class is meant to hold only a limited number of elements in memory. So old
values are constantly
+ * being overridden in a circular buffer.
+ *
+ * @param <T> the element type of the PCollection.
+ */
+public class OutputSampler<T> {
+ private final Coder<T> coder;
+ private final List<T> buffer = new ArrayList<>();
Review Comment:
setup the capacity of the buffer so we don't pay and resizing costs
```suggestion
private final List<T> buffer = new ArrayList<>(maxElements);
```
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import
org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates
taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to
executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different
PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+ /** Creates a DataSampler to sample every 10 elements while keeping a
maximum of 10 in memory. */
+ public DataSampler() {}
+
+ /**
+ * @param maxSamples Sets the maximum number of samples held in memory at
once.
+ * @param sampleEveryN Sets how often to sample.
+ */
+ public DataSampler(int maxSamples, int sampleEveryN) {
+ this.maxSamples = maxSamples;
+ this.sampleEveryN = sampleEveryN;
+ }
+
+ // Maximum number of elements in buffer.
+ private int maxSamples = 10;
+
+ // Sampling rate.
+ private int sampleEveryN = 1000;
+
+ // The fully-qualified type is: Map[ProcessBundleDescriptorId,
[PCollectionId, OutputSampler]].
+ // The DataSampler object lives on the same level of the FnHarness. This
means that many threads
+ // can and will
+ // access this simultaneously. However, ProcessBundleDescriptors are unique
per thread, so only
+ // synchronization
+ // is needed on the outermost map.
+ private final Map<String, Map<String, OutputSampler<?>>> outputSamplers =
+ new ConcurrentHashMap<>();
+
+ /**
+ * Creates and returns a class to sample the given PCollection in the given
+ * ProcessBundleDescriptor. Uses the given coder encode samples as bytes
when responding to a
+ * SampleDataRequest.
+ *
+ * @param processBundleDescriptorId The PBD to sample from.
+ * @param pcollectionId The PCollection to take intermittent samples from.
+ * @param coder The coder associated with the PCollection. Coder may be from
a nested context.
+ * @return the OutputSampler corresponding to the unique PBD and PCollection.
+ * @param <T> The type of element contained in the PCollection.
+ */
+ public <T> OutputSampler<T> sampleOutput(
+ String processBundleDescriptorId, String pcollectionId, Coder<T> coder) {
+ outputSamplers.putIfAbsent(processBundleDescriptorId, new HashMap<>());
+ Map<String, OutputSampler<?>> samplers =
outputSamplers.get(processBundleDescriptorId);
+ samplers.putIfAbsent(
+ pcollectionId, new OutputSampler<T>(coder, this.maxSamples,
this.sampleEveryN));
+
+ return (OutputSampler<T>) samplers.get(pcollectionId);
+ }
+
+ /**
+ * Returns all collected samples. Thread-safe.
+ *
+ * @param request The instruction request from the FnApi. Filters based on
the given
+ * SampleDataRequest.
+ * @return Returns all collected samples.
+ */
+ public BeamFnApi.InstructionResponse.Builder handleDataSampleRequest(
+ BeamFnApi.InstructionRequest request) {
+ BeamFnApi.SampleDataRequest sampleDataRequest = request.getSample();
+
+ Map<String, List<byte[]>> responseSamples =
+ samplesFor(
+
ImmutableSet.copyOf(sampleDataRequest.getProcessBundleDescriptorIdsList()),
+ ImmutableSet.copyOf(sampleDataRequest.getPcollectionIdsList()));
+
+ BeamFnApi.SampleDataResponse.Builder response =
BeamFnApi.SampleDataResponse.newBuilder();
+ for (String pcollectionId : responseSamples.keySet()) {
+ ElementList.Builder elementList = ElementList.newBuilder();
+ for (byte[] sample : responseSamples.get(pcollectionId)) {
+ elementList.addElements(
+
SampledElement.newBuilder().setElement(ByteString.copyFrom(sample)).build());
+ }
+ response.putElementSamples(pcollectionId, elementList.build());
+ }
+
+ return BeamFnApi.InstructionResponse.newBuilder().setSample(response);
+ }
+
+ /**
+ * Returns a map from PCollection to its samples. Samples are filtered on
+ * ProcessBundleDescriptorIds and PCollections. Thread-safe.
+ *
+ * @param descriptors PCollections under each PBD id will be unioned. If
empty, allows all
+ * descriptors.
+ * @param pcollections Filters all PCollections on this set. If empty,
allows all PCollections.
+ * @return a map from PCollection to its samples.
+ */
+ public Map<String, List<byte[]>> samplesFor(Set<String> descriptors,
Set<String> pcollections) {
+ Map<String, List<byte[]>> samples = new HashMap<>();
+
+ // Safe to iterate as the ConcurrentHashMap will return each element at
most once and will not
+ // throw
+ // ConcurrentModificationException.
+ outputSamplers.forEach(
+ (descriptorId, samplers) -> {
+ if (!descriptors.isEmpty() && !descriptors.contains(descriptorId)) {
+ return;
+ }
+
+ samplers.forEach(
+ (pcollectionId, outputSampler) -> {
+ if (!pcollections.isEmpty() &&
!pcollections.contains(pcollectionId)) {
+ return;
+ }
+
+ samples.putIfAbsent(pcollectionId, new ArrayList<>());
+ samples.get(pcollectionId).addAll(outputSampler.samples());
+ });
+ });
+
+ return samples;
+ }
+
+ /** @return samples from all PBDs and all PCollections. */
+ public Map<String, List<byte[]>> allSamples() {
+ return samplesFor(ImmutableSet.of(), ImmutableSet.of());
+ }
+
+ /**
+ * @param descriptors PBDs to filter on.
+ * @return samples only from the given descriptors.
+ */
+ public Map<String, List<byte[]>> samplesForDescriptors(Set<String>
descriptors) {
+ return samplesFor(descriptors, ImmutableSet.of());
+ }
+
+ /**
+ * @param pcollections PCollection ids to filter on.
+ * @return samples only from the given PCollections.
+ */
+ public Map<String, List<byte[]>> samplesForPCollections(Set<String>
pcollections) {
+ return samplesFor(ImmutableSet.of(), pcollections);
+ }
Review Comment:
There are only used during testing, move them into the test class
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import
org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates
taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to
executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different
PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+ /** Creates a DataSampler to sample every 10 elements while keeping a
maximum of 10 in memory. */
+ public DataSampler() {}
+
+ /**
+ * @param maxSamples Sets the maximum number of samples held in memory at
once.
+ * @param sampleEveryN Sets how often to sample.
+ */
+ public DataSampler(int maxSamples, int sampleEveryN) {
+ this.maxSamples = maxSamples;
+ this.sampleEveryN = sampleEveryN;
+ }
+
+ // Maximum number of elements in buffer.
+ private int maxSamples = 10;
+
+ // Sampling rate.
+ private int sampleEveryN = 1000;
+
+ // The fully-qualified type is: Map[ProcessBundleDescriptorId,
[PCollectionId, OutputSampler]].
+ // The DataSampler object lives on the same level of the FnHarness. This
means that many threads
+ // can and will
+ // access this simultaneously. However, ProcessBundleDescriptors are unique
per thread, so only
+ // synchronization
+ // is needed on the outermost map.
+ private final Map<String, Map<String, OutputSampler<?>>> outputSamplers =
+ new ConcurrentHashMap<>();
+
+ /**
+ * Creates and returns a class to sample the given PCollection in the given
+ * ProcessBundleDescriptor. Uses the given coder encode samples as bytes
when responding to a
+ * SampleDataRequest.
+ *
+ * @param processBundleDescriptorId The PBD to sample from.
+ * @param pcollectionId The PCollection to take intermittent samples from.
+ * @param coder The coder associated with the PCollection. Coder may be from
a nested context.
+ * @return the OutputSampler corresponding to the unique PBD and PCollection.
+ * @param <T> The type of element contained in the PCollection.
+ */
+ public <T> OutputSampler<T> sampleOutput(
+ String processBundleDescriptorId, String pcollectionId, Coder<T> coder) {
+ outputSamplers.putIfAbsent(processBundleDescriptorId, new HashMap<>());
+ Map<String, OutputSampler<?>> samplers =
outputSamplers.get(processBundleDescriptorId);
+ samplers.putIfAbsent(
+ pcollectionId, new OutputSampler<T>(coder, this.maxSamples,
this.sampleEveryN));
+
+ return (OutputSampler<T>) samplers.get(pcollectionId);
+ }
+
+ /**
+ * Returns all collected samples. Thread-safe.
+ *
+ * @param request The instruction request from the FnApi. Filters based on
the given
+ * SampleDataRequest.
+ * @return Returns all collected samples.
+ */
+ public BeamFnApi.InstructionResponse.Builder handleDataSampleRequest(
+ BeamFnApi.InstructionRequest request) {
+ BeamFnApi.SampleDataRequest sampleDataRequest = request.getSample();
+
+ Map<String, List<byte[]>> responseSamples =
+ samplesFor(
+
ImmutableSet.copyOf(sampleDataRequest.getProcessBundleDescriptorIdsList()),
+ ImmutableSet.copyOf(sampleDataRequest.getPcollectionIdsList()));
+
+ BeamFnApi.SampleDataResponse.Builder response =
BeamFnApi.SampleDataResponse.newBuilder();
+ for (String pcollectionId : responseSamples.keySet()) {
+ ElementList.Builder elementList = ElementList.newBuilder();
+ for (byte[] sample : responseSamples.get(pcollectionId)) {
+ elementList.addElements(
+
SampledElement.newBuilder().setElement(ByteString.copyFrom(sample)).build());
+ }
+ response.putElementSamples(pcollectionId, elementList.build());
+ }
+
+ return BeamFnApi.InstructionResponse.newBuilder().setSample(response);
+ }
+
+ /**
+ * Returns a map from PCollection to its samples. Samples are filtered on
+ * ProcessBundleDescriptorIds and PCollections. Thread-safe.
+ *
+ * @param descriptors PCollections under each PBD id will be unioned. If
empty, allows all
+ * descriptors.
+ * @param pcollections Filters all PCollections on this set. If empty,
allows all PCollections.
+ * @return a map from PCollection to its samples.
+ */
+ public Map<String, List<byte[]>> samplesFor(Set<String> descriptors,
Set<String> pcollections) {
Review Comment:
don't make this public, instead have the test create and a request and
return a response having the tests validate the response itself
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import
org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates
taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to
executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different
PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+ /** Creates a DataSampler to sample every 10 elements while keeping a
maximum of 10 in memory. */
+ public DataSampler() {}
+
+ /**
+ * @param maxSamples Sets the maximum number of samples held in memory at
once.
+ * @param sampleEveryN Sets how often to sample.
+ */
+ public DataSampler(int maxSamples, int sampleEveryN) {
+ this.maxSamples = maxSamples;
+ this.sampleEveryN = sampleEveryN;
+ }
+
+ // Maximum number of elements in buffer.
+ private int maxSamples = 10;
+
+ // Sampling rate.
+ private int sampleEveryN = 1000;
+
+ // The fully-qualified type is: Map[ProcessBundleDescriptorId,
[PCollectionId, OutputSampler]].
+ // The DataSampler object lives on the same level of the FnHarness. This
means that many threads
+ // can and will
+ // access this simultaneously. However, ProcessBundleDescriptors are unique
per thread, so only
+ // synchronization
+ // is needed on the outermost map.
+ private final Map<String, Map<String, OutputSampler<?>>> outputSamplers =
+ new ConcurrentHashMap<>();
+
+ /**
+ * Creates and returns a class to sample the given PCollection in the given
+ * ProcessBundleDescriptor. Uses the given coder encode samples as bytes
when responding to a
+ * SampleDataRequest.
+ *
+ * @param processBundleDescriptorId The PBD to sample from.
+ * @param pcollectionId The PCollection to take intermittent samples from.
+ * @param coder The coder associated with the PCollection. Coder may be from
a nested context.
+ * @return the OutputSampler corresponding to the unique PBD and PCollection.
+ * @param <T> The type of element contained in the PCollection.
+ */
+ public <T> OutputSampler<T> sampleOutput(
+ String processBundleDescriptorId, String pcollectionId, Coder<T> coder) {
+ outputSamplers.putIfAbsent(processBundleDescriptorId, new HashMap<>());
+ Map<String, OutputSampler<?>> samplers =
outputSamplers.get(processBundleDescriptorId);
+ samplers.putIfAbsent(
+ pcollectionId, new OutputSampler<T>(coder, this.maxSamples,
this.sampleEveryN));
+
+ return (OutputSampler<T>) samplers.get(pcollectionId);
+ }
+
+ /**
+ * Returns all collected samples. Thread-safe.
+ *
+ * @param request The instruction request from the FnApi. Filters based on
the given
+ * SampleDataRequest.
+ * @return Returns all collected samples.
+ */
+ public BeamFnApi.InstructionResponse.Builder handleDataSampleRequest(
+ BeamFnApi.InstructionRequest request) {
+ BeamFnApi.SampleDataRequest sampleDataRequest = request.getSample();
+
+ Map<String, List<byte[]>> responseSamples =
+ samplesFor(
+
ImmutableSet.copyOf(sampleDataRequest.getProcessBundleDescriptorIdsList()),
+ ImmutableSet.copyOf(sampleDataRequest.getPcollectionIdsList()));
+
+ BeamFnApi.SampleDataResponse.Builder response =
BeamFnApi.SampleDataResponse.newBuilder();
+ for (String pcollectionId : responseSamples.keySet()) {
+ ElementList.Builder elementList = ElementList.newBuilder();
+ for (byte[] sample : responseSamples.get(pcollectionId)) {
+ elementList.addElements(
+
SampledElement.newBuilder().setElement(ByteString.copyFrom(sample)).build());
+ }
+ response.putElementSamples(pcollectionId, elementList.build());
+ }
+
+ return BeamFnApi.InstructionResponse.newBuilder().setSample(response);
+ }
+
+ /**
+ * Returns a map from PCollection to its samples. Samples are filtered on
+ * ProcessBundleDescriptorIds and PCollections. Thread-safe.
+ *
+ * @param descriptors PCollections under each PBD id will be unioned. If
empty, allows all
+ * descriptors.
+ * @param pcollections Filters all PCollections on this set. If empty,
allows all PCollections.
+ * @return a map from PCollection to its samples.
+ */
+ public Map<String, List<byte[]>> samplesFor(Set<String> descriptors,
Set<String> pcollections) {
+ Map<String, List<byte[]>> samples = new HashMap<>();
+
+ // Safe to iterate as the ConcurrentHashMap will return each element at
most once and will not
+ // throw
+ // ConcurrentModificationException.
+ outputSamplers.forEach(
+ (descriptorId, samplers) -> {
+ if (!descriptors.isEmpty() && !descriptors.contains(descriptorId)) {
+ return;
+ }
+
+ samplers.forEach(
+ (pcollectionId, outputSampler) -> {
+ if (!pcollections.isEmpty() &&
!pcollections.contains(pcollectionId)) {
+ return;
+ }
+
+ samples.putIfAbsent(pcollectionId, new ArrayList<>());
Review Comment:
```suggestion
samples.putIfAbsent(pcollectionId, Collections.EMPTY_LIST);
```
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class holds samples for a single PCollection until queried by the
parent DataSampler. This
+ * class is meant to hold only a limited number of elements in memory. So old
values are constantly
+ * being overridden in a circular buffer.
+ *
+ * @param <T> the element type of the PCollection.
+ */
+public class OutputSampler<T> {
+ private final Coder<T> coder;
+ private final List<T> buffer = new ArrayList<>();
+ private static final Logger LOG =
LoggerFactory.getLogger(OutputSampler.class);
+
+ // Maximum number of elements in buffer.
+ private int maxElements = 10;
+
+ // Sampling rate.
+ private int sampleEveryN = 1000;
Review Comment:
shouldn't this be passed in by DataSampler?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]