Revise WindowedWordCount for runner and execution mode portability
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/42595dcd Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/42595dcd Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/42595dcd Branch: refs/heads/master Commit: 42595dcd29c248bd3572596c9bb8464d18acd19b Parents: db41940 Author: Kenneth Knowles <[email protected]> Authored: Thu Nov 3 14:37:26 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Mon Dec 12 15:23:38 2016 -0800 ---------------------------------------------------------------------- .../apache/beam/examples/WindowedWordCount.java | 177 +++++++++--------- .../examples/common/WriteWindowedFilesDoFn.java | 77 ++++++++ .../beam/examples/WindowedWordCountIT.java | 182 ++++++++++++++++--- 3 files changed, 326 insertions(+), 110 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42595dcd/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java index 4e254bd..5c19454 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java @@ -17,26 +17,25 @@ */ package org.apache.beam.examples; -import com.google.api.services.bigquery.model.TableFieldSchema; -import com.google.api.services.bigquery.model.TableReference; -import com.google.api.services.bigquery.model.TableRow; -import com.google.api.services.bigquery.model.TableSchema; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; +import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.examples.common.ExampleBigQueryTableOptions; import org.apache.beam.examples.common.ExampleOptions; -import org.apache.beam.examples.common.ExampleUtils; +import org.apache.beam.examples.common.WriteWindowedFilesDoFn; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -63,7 +62,8 @@ import org.joda.time.Instant; * 2. Adding timestamps to data * 3. Windowing * 4. Re-using PTransforms over windowed PCollections - * 5. Writing to BigQuery + * 5. Accessing the window of an element + * 6. Writing data to per-window text files * </pre> * * <p>By default, the examples will run with the {@code DirectRunner}. @@ -74,25 +74,23 @@ import org.joda.time.Instant; * </pre> * See examples/java/README.md for instructions about how to configure different runners. * - * <p>Optionally specify the input file path via: - * {@code --inputFile=gs://INPUT_PATH}, - * which defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt}. + * <p>To execute this pipeline locally, specify a local output file (if using the + * {@code DirectRunner}) or output prefix on a supported distributed file system. + * <pre>{@code + * --output=[YOUR_LOCAL_FILE | YOUR_OUTPUT_PREFIX] + * }</pre> * - * <p>Specify an output BigQuery dataset and optionally, a table for the output. If you don't - * specify the table, one will be created for you using the job name. If you don't specify the - * dataset, a dataset called {@code beam_examples} must already exist in your project. - * {@code --bigQueryDataset=YOUR-DATASET --bigQueryTable=YOUR-NEW-TABLE-NAME}. + * <p>The input file defaults to a public data set containing the text of of King Lear, + * by William Shakespeare. You can override it and choose your own input with {@code --inputFile}. * * <p>By default, the pipeline will do fixed windowing, on 1-minute windows. You can * change this interval by setting the {@code --windowSize} parameter, e.g. {@code --windowSize=10} * for 10-minute windows. * - * <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C) - * and then exits. + * <p>The example will try to cancel the pipeline on the signal to terminate the process (CTRL-C). */ public class WindowedWordCount { - static final int WINDOW_SIZE = 1; // Default window duration in minutes - + static final int WINDOW_SIZE = 10; // Default window duration in minutes /** * Concept #2: A DoFn that sets the data element timestamp. This is a silly method, just for * this example, for the bounded data case. @@ -102,18 +100,22 @@ public class WindowedWordCount { * 2-hour period. */ static class AddTimestampFn extends DoFn<String, String> { - private static final Duration RAND_RANGE = Duration.standardHours(2); + private static final Duration RAND_RANGE = Duration.standardHours(1); private final Instant minTimestamp; + private final Instant maxTimestamp; - AddTimestampFn() { - this.minTimestamp = new Instant(System.currentTimeMillis()); + AddTimestampFn(Instant minTimestamp, Instant maxTimestamp) { + this.minTimestamp = minTimestamp; + this.maxTimestamp = maxTimestamp; } @ProcessElement public void processElement(ProcessContext c) { - // Generate a timestamp that falls somewhere in the past two hours. - long randMillis = (long) (Math.random() * RAND_RANGE.getMillis()); - Instant randomTimestamp = minTimestamp.plus(randMillis); + Instant randomTimestamp = + new Instant( + ThreadLocalRandom.current() + .nextLong(minTimestamp.getMillis(), maxTimestamp.getMillis())); + /** * Concept #2: Set the data element with that timestamp. */ @@ -121,50 +123,29 @@ public class WindowedWordCount { } } - /** A DoFn that converts a Word and Count into a BigQuery table row. */ - static class FormatAsTableRowFn extends DoFn<KV<String, Long>, TableRow> { - @ProcessElement - public void processElement(ProcessContext c) { - TableRow row = new TableRow() - .set("word", c.element().getKey()) - .set("count", c.element().getValue()) - // include a field for the window timestamp - .set("window_timestamp", c.timestamp().toString()); - c.output(row); + /** A {@link DefaultValueFactory} that returns the current system time. */ + public static class DefaultToCurrentSystemTime implements DefaultValueFactory<Long> { + @Override + public Long create(PipelineOptions options) { + return System.currentTimeMillis(); } } - /** - * Helper method that defines the BigQuery schema used for the output. - */ - private static TableSchema getSchema() { - List<TableFieldSchema> fields = new ArrayList<>(); - fields.add(new TableFieldSchema().setName("word").setType("STRING")); - fields.add(new TableFieldSchema().setName("count").setType("INTEGER")); - fields.add(new TableFieldSchema().setName("window_timestamp").setType("TIMESTAMP")); - TableSchema schema = new TableSchema().setFields(fields); - return schema; - } - - /** - * Concept #5: We'll stream the results to a BigQuery table. The BigQuery output source is one - * that supports both bounded and unbounded data. This is a helper method that creates a - * TableReference from input options, to tell the pipeline where to write its BigQuery results. - */ - private static TableReference getTableReference(Options options) { - TableReference tableRef = new TableReference(); - tableRef.setProjectId(options.getProject()); - tableRef.setDatasetId(options.getBigQueryDataset()); - tableRef.setTableId(options.getBigQueryTable()); - return tableRef; + /** A {@link DefaultValueFactory} that returns the minimum timestamp plus one hour. */ + public static class DefaultToMinTimestampPlusOneHour implements DefaultValueFactory<Long> { + @Override + public Long create(PipelineOptions options) { + return options.as(Options.class).getMinTimestampMillis() + + Duration.standardHours(1).getMillis(); + } } /** - * Options supported by {@link WindowedWordCount}. + * Options for {@link WindowedWordCount}. * - * <p>Inherits standard example configuration options, which allow specification of the BigQuery - * table, as well as the {@link WordCount.WordCountOptions} support for - * specification of the input file. + * <p>Inherits standard example configuration options, which allow specification of the + * runner, as well as the {@link WordCount.WordCountOptions} support for + * specification of the input and output files. */ public interface Options extends WordCount.WordCountOptions, ExampleOptions, ExampleBigQueryTableOptions { @@ -172,14 +153,24 @@ public class WindowedWordCount { @Default.Integer(WINDOW_SIZE) Integer getWindowSize(); void setWindowSize(Integer value); + + @Description("Minimum randomly assigned timestamp, in milliseconds-since-epoch") + @Default.InstanceFactory(DefaultToCurrentSystemTime.class) + Long getMinTimestampMillis(); + void setMinTimestampMillis(Long value); + + @Description("Maximum randomly assigned timestamp, in milliseconds-since-epoch") + @Default.InstanceFactory(DefaultToMinTimestampPlusOneHour.class) + Long getMaxTimestampMillis(); + void setMaxTimestampMillis(Long value); } public static void main(String[] args) throws IOException { Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - options.setBigQuerySchema(getSchema()); - // ExampleUtils creates the necessary input sources to simplify execution of this Pipeline. - ExampleUtils exampleUtils = new ExampleUtils(options); - exampleUtils.setup(); + final String output = options.getOutput(); + final Duration windowSize = Duration.standardMinutes(options.getWindowSize()); + final Instant minTimestamp = new Instant(options.getMinTimestampMillis()); + final Instant maxTimestamp = new Instant(options.getMaxTimestampMillis()); Pipeline pipeline = Pipeline.create(options); @@ -192,7 +183,7 @@ public class WindowedWordCount { .apply(TextIO.Read.from(options.getInputFile())) // Concept #2: Add an element timestamp, using an artificial time just to show windowing. // See AddTimestampFn for more detail on this. - .apply(ParDo.of(new AddTimestampFn())); + .apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp))); /** * Concept #3: Window into fixed windows. The fixed window size for this example defaults to 1 @@ -200,9 +191,10 @@ public class WindowedWordCount { * information on how fixed windows work, and for information on the other types of windowing * available (e.g., sliding windows). */ - PCollection<String> windowedWords = input - .apply(Window.<String>into( - FixedWindows.of(Duration.standardMinutes(options.getWindowSize())))); + PCollection<String> windowedWords = + input.apply( + Window.<String>into( + FixedWindows.of(Duration.standardMinutes(options.getWindowSize())))); /** * Concept #4: Re-use our existing CountWords transform that does not have knowledge of @@ -211,19 +203,40 @@ public class WindowedWordCount { PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords()); /** - * Concept #5: Format the results for a BigQuery table, then write to BigQuery. - * The BigQuery output source supports both bounded and unbounded data. + * Concept #5: Customize the output format using windowing information + * + * <p>At this point, the data is organized by window. We're writing text files and and have no + * late data, so for simplicity we can use the window as the key and {@link GroupByKey} to get + * one output file per window. (if we had late data this key would not be unique) + * + * <p>To access the window in a {@link DoFn}, add a {@link BoundedWindow} parameter. This will + * be automatically detected and populated with the window for the current element. */ - wordCounts.apply(ParDo.of(new FormatAsTableRowFn())) - .apply(BigQueryIO.Write - .to(getTableReference(options)) - .withSchema(getSchema()) - .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) - .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); + PCollection<KV<IntervalWindow, KV<String, Long>>> keyedByWindow = + wordCounts.apply( + ParDo.of( + new DoFn<KV<String, Long>, KV<IntervalWindow, KV<String, Long>>>() { + @ProcessElement + public void processElement(ProcessContext context, IntervalWindow window) { + context.output(KV.of(window, context.element())); + } + })); - PipelineResult result = pipeline.run(); + /** + * Concept #6: Format the results and write to a sharded file partitioned by window, using a + * simple ParDo operation. Because there may be failures followed by retries, the + * writes must be idempotent, but the details of writing to files is elided here. + */ + keyedByWindow + .apply(GroupByKey.<IntervalWindow, KV<String, Long>>create()) + .apply(ParDo.of(new WriteWindowedFilesDoFn(output))); - // ExampleUtils will try to cancel the pipeline before the program exists. - exampleUtils.waitToFinish(result); + PipelineResult result = pipeline.run(); + try { + result.waitUntilFinish(); + } catch (Exception exc) { + result.cancel(); + } } + } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42595dcd/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java new file mode 100644 index 0000000..cd6baad --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.common; + +import com.google.common.annotations.VisibleForTesting; +import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.charset.StandardCharsets; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.util.IOChannelFactory; +import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.values.KV; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.ISODateTimeFormat; + +/** + * A {@link DoFn} that writes elements to files with names deterministically derived from the lower + * and upper bounds of their key (an {@link IntervalWindow}). + * + * <p>This is test utility code, not for end-users, so examples can be focused + * on their primary lessons. + */ +public class WriteWindowedFilesDoFn + extends DoFn<KV<IntervalWindow, Iterable<KV<String, Long>>>, Void> { + + static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); + static final Coder<String> STRING_CODER = StringUtf8Coder.of(); + + private static DateTimeFormatter formatter = ISODateTimeFormat.hourMinute(); + + private final String output; + + public WriteWindowedFilesDoFn(String output) { + this.output = output; + } + + @VisibleForTesting + public static String fileForWindow(String output, IntervalWindow window) { + return String.format( + "%s-%s-%s", output, formatter.print(window.start()), formatter.print(window.end())); + } + + @ProcessElement + public void processElement(ProcessContext context) throws Exception { + // Build a file name from the window + IntervalWindow window = context.element().getKey(); + String outputShard = fileForWindow(output, window); + + // Open the file and write all the values + IOChannelFactory factory = IOChannelUtils.getFactory(outputShard); + OutputStream out = Channels.newOutputStream(factory.create(outputShard, "text/plain")); + for (KV<String, Long> wordCount : context.element().getValue()) { + STRING_CODER.encode( + wordCount.getKey() + ": " + wordCount.getValue(), out, Coder.Context.OUTER); + out.write(NEWLINE); + } + out.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42595dcd/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java index 5d77dd5..e4570ac 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java @@ -17,37 +17,59 @@ */ package org.apache.beam.examples; -import java.io.IOException; +import static org.hamcrest.Matchers.equalTo; + +import com.google.api.client.util.Sleeper; +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import java.util.Collections; import java.util.Date; +import java.util.List; +import java.util.SortedMap; +import java.util.TreeMap; +import org.apache.beam.examples.common.WriteWindowedFilesDoFn; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; -import org.apache.beam.sdk.testing.BigqueryMatcher; +import org.apache.beam.sdk.testing.FileChecksumMatcher; +import org.apache.beam.sdk.testing.SerializableMatcher; import org.apache.beam.sdk.testing.StreamingIT; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.util.ExplicitShardedFile; +import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.ShardedFile; +import org.hamcrest.Description; +import org.hamcrest.TypeSafeMatcher; +import org.joda.time.Duration; +import org.joda.time.Instant; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -/** - * End-to-end integration test of {@link WindowedWordCount}. - */ +/** End-to-end integration test of {@link WindowedWordCount}. */ @RunWith(JUnit4.class) public class WindowedWordCountIT { private static final String DEFAULT_INPUT = "gs://apache-beam-samples/shakespeare/winterstale-personae"; - private static final String DEFAULT_OUTPUT_CHECKSUM = "cd5b52939257e12428a9fa085c32a84dd209b180"; + static final int MAX_READ_RETRIES = 4; + static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L); + static final FluentBackoff BACK_OFF_FACTORY = + FluentBackoff.DEFAULT + .withInitialBackoff(DEFAULT_SLEEP_DURATION) + .withMaxRetries(MAX_READ_RETRIES); - /** - * Options for the {@link WindowedWordCount} Integration Test. - */ + /** Options for the {@link WindowedWordCount} Integration Test. */ public interface WindowedWordCountITOptions - extends WindowedWordCount.Options, TestPipelineOptions, StreamingOptions { - } + extends WindowedWordCount.Options, TestPipelineOptions, StreamingOptions {} @BeforeClass public static void setUp() { @@ -55,36 +77,140 @@ public class WindowedWordCountIT { } @Test - public void testWindowedWordCountInBatch() throws IOException { - testWindowedWordCountPipeline(false /* isStreaming */); + public void testWindowedWordCountInBatch() throws Exception { + testWindowedWordCountPipeline(defaultOptions()); } @Test @Category(StreamingIT.class) - public void testWindowedWordCountInStreaming() throws IOException { - testWindowedWordCountPipeline(true /* isStreaming */); + public void testWindowedWordCountInStreaming() throws Exception { + testWindowedWordCountPipeline(streamingOptions()); } - private void testWindowedWordCountPipeline(boolean isStreaming) throws IOException { + private WindowedWordCountITOptions defaultOptions() throws Exception { WindowedWordCountITOptions options = TestPipeline.testingPipelineOptions().as(WindowedWordCountITOptions.class); - options.setStreaming(isStreaming); options.setInputFile(DEFAULT_INPUT); + options.setTestTimeoutSeconds(1200L); + + options.setMinTimestampMillis(0L); + options.setMinTimestampMillis(Duration.standardHours(1).getMillis()); + options.setWindowSize(10); + + options.setOutput( + IOChannelUtils.resolve( + options.getTempRoot(), + String.format("WindowedWordCountIT-%tF-%<tH-%<tM-%<tS-%<tL", new Date()), + "output", + "results")); + return options; + } + + private WindowedWordCountITOptions streamingOptions() throws Exception { + WindowedWordCountITOptions options = defaultOptions(); + options.setStreaming(true); + return options; + } + + private WindowedWordCountITOptions batchOptions() throws Exception { + WindowedWordCountITOptions options = defaultOptions(); + // This is the default value, but make it explicit + options.setStreaming(false); + return options; + } + + private void testWindowedWordCountPipeline(WindowedWordCountITOptions options) throws Exception { + + String outputPrefix = options.getOutput(); + + List<String> expectedOutputFiles = Lists.newArrayListWithCapacity(6); + for (int startMinute : ImmutableList.of(0, 10, 20, 30, 40, 50)) { + Instant windowStart = + new Instant(options.getMinTimestampMillis()).plus(Duration.standardMinutes(startMinute)); + expectedOutputFiles.add( + WriteWindowedFilesDoFn.fileForWindow( + outputPrefix, + new IntervalWindow(windowStart, windowStart.plus(Duration.standardMinutes(10))))); + } - // Note: currently unused because the example writes to BigQuery, but WindowedWordCount.Options - // are tightly coupled to WordCount.Options, where the option is required. - options.setOutput(IOChannelUtils.resolve( - options.getTempRoot(), - String.format("WindowedWordCountIT-%tF-%<tH-%<tM-%<tS-%<tL", new Date()), - "output", - "results")); + ShardedFile inputFile = + new ExplicitShardedFile(Collections.singleton(options.getInputFile())); + + // For this integration test, input is tiny and we can build the expected counts + SortedMap<String, Long> expectedWordCounts = new TreeMap<>(); + for (String line : + inputFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff())) { + String[] words = line.split("[^a-zA-Z']+"); + + for (String word : words) { + if (!word.isEmpty()) { + expectedWordCounts.put(word, + MoreObjects.firstNonNull(expectedWordCounts.get(word), 0L) + 1L); + } + } + } - String query = String.format("SELECT word, SUM(count) FROM [%s:%s.%s] GROUP BY word", - options.getProject(), options.getBigQueryDataset(), options.getBigQueryTable()); options.setOnSuccessMatcher( - new BigqueryMatcher( - options.getAppName(), options.getProject(), query, DEFAULT_OUTPUT_CHECKSUM)); + new WordCountsMatcher(expectedWordCounts, new ExplicitShardedFile(expectedOutputFiles))); WindowedWordCount.main(TestPipeline.convertToArgs(options)); } + + /** + * A matcher that bakes in expected word counts, so they can be read directly via some other + * mechanism, and compares a sharded output file with the result. + */ + private static class WordCountsMatcher extends TypeSafeMatcher<PipelineResult> + implements SerializableMatcher<PipelineResult> { + + private static final Logger LOG = LoggerFactory.getLogger(FileChecksumMatcher.class); + + private final SortedMap<String, Long> expectedWordCounts; + private final ShardedFile outputFile; + private SortedMap<String, Long> actualCounts; + + public WordCountsMatcher(SortedMap<String, Long> expectedWordCounts, ShardedFile outputFile) { + this.expectedWordCounts = expectedWordCounts; + this.outputFile = outputFile; + } + + @Override + public boolean matchesSafely(PipelineResult pipelineResult) { + try { + // Load output data + List<String> lines = + outputFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff()); + + // Since the windowing is nondeterministic we only check the sums + actualCounts = new TreeMap<>(); + for (String line : lines) { + String[] splits = line.split(": "); + String word = splits[0]; + long count = Long.parseLong(splits[1]); + + Long current = actualCounts.get(word); + if (current == null) { + actualCounts.put(word, count); + } else { + actualCounts.put(word, current + count); + } + } + + return actualCounts.equals(expectedWordCounts); + } catch (Exception e) { + throw new RuntimeException( + String.format("Failed to read from sharded output: %s", outputFile)); + } + } + + @Override + public void describeTo(Description description) { + equalTo(expectedWordCounts).describeTo(description); + } + + @Override + public void describeMismatchSafely(PipelineResult pResult, Description description) { + equalTo(expectedWordCounts).describeMismatch(actualCounts, description); + } + } }
