This is an automated email from the ASF dual-hosted git repository.
jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f3f80d8 Fixes a bug in Sample.Any
new fd8ad27 This closes #4216: Fixes a bug in Sample.Any
f3f80d8 is described below
commit f3f80d841897f4481fdb51609659d76c3fe7e550
Author: Eugene Kirpichov <[email protected]>
AuthorDate: Mon Dec 4 19:08:14 2017 -0800
Fixes a bug in Sample.Any
(I introduced the bug while merging a different PR...)
Also adds a test for the combine fn and exposes the fn.
Documents the difference between any() and fixedSize().
---
.../java/org/apache/beam/sdk/testing/CombineFnTester.java | 2 +-
.../main/java/org/apache/beam/sdk/transforms/Sample.java | 15 +++++++++++++--
.../java/org/apache/beam/sdk/transforms/SampleTest.java | 15 +++++++++++++++
3 files changed, 29 insertions(+), 3 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CombineFnTester.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CombineFnTester.java
index efd2af3..896d955 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CombineFnTester.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CombineFnTester.java
@@ -95,7 +95,7 @@ public class CombineFnTester {
CombineFn<InputT, AccumT, OutputT> fn,
List<? extends Iterable<InputT>> shards,
Matcher<? super OutputT> matcher) {
- AccumT accumulator = null;
+ AccumT accumulator = shards.isEmpty() ? fn.createAccumulator() : null;
for (AccumT inputAccum : combineInputs(fn, shards)) {
if (accumulator == null) {
accumulator = inputAccum;
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
index 2eb12d6..d7cba7e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
@@ -38,17 +38,28 @@ import org.apache.beam.sdk.values.PCollection;
* {@code PCollection}, or samples of the values associated with each
* key in a {@code PCollection} of {@code KV}s.
*
+ * {@link #fixedSizeGlobally(int)} and {@link #fixedSizePerKey(int)} compute
uniformly random
+ * samples. {@link #any(long)} is faster, but provides no uniformity
guarantees.
+ *
* <p>{@link #combineFn} can also be used manually, in combination with state
and with the
* {@link Combine} transform.
*/
public class Sample {
- /** Returns a {@link CombineFn} that computes a fixed-sized sample of its
inputs. */
+ /** Returns a {@link CombineFn} that computes a fixed-sized uniform sample
of its inputs. */
public static <T> CombineFn<T, ?, Iterable<T>> combineFn(int sampleSize) {
return new FixedSizedSampleFn<>(sampleSize);
}
/**
+ * Returns a {@link CombineFn} that computes a fixed-sized potentially
non-uniform sample of its
+ * inputs.
+ */
+ public static <T> CombineFn<T, ?, Iterable<T>> anyCombineFn(int sampleSize) {
+ return new SampleAnyCombineFn<>(sampleSize);
+ }
+
+ /**
* {@code Sample#any(long)} takes a {@code PCollection<T>} and a limit, and
* produces a new {@code PCollection<T>} containing up to limit
* elements of the input {@code PCollection}.
@@ -233,10 +244,10 @@ public class Sample {
List<T> res = iter.next();
while (iter.hasNext()) {
for (T t : iter.next()) {
- res.add(t);
if (res.size() >= limit) {
return res;
}
+ res.add(t);
}
}
return res;
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
index 357f256..ed6905d 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
@@ -20,6 +20,9 @@ package org.apache.beam.sdk.transforms;
import static com.google.common.base.Preconditions.checkArgument;
import static
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.isIn;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -37,6 +40,7 @@ import java.util.TreeSet;
import org.apache.beam.sdk.TestUtils;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.CombineFnTester;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.ValidatesRunner;
@@ -46,6 +50,7 @@ import
org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
+import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
@@ -174,6 +179,16 @@ public class SampleTest {
public void testPickAny() {
runPickAnyTest(lines, limit);
}
+
+ @Test
+ public void testCombineFn() {
+ CombineFnTester.testCombineFn(
+ Sample.<String>combineFn(limit),
+ lines,
+ allOf(
+ Matchers.<String>iterableWithSize(Math.min(lines.size(), limit)),
+ everyItem(isIn(lines))));
+ }
}
/**
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].