Revise DebuggingWordCount to be more portable
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9bc9c3f0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9bc9c3f0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9bc9c3f0 Branch: refs/heads/master Commit: 9bc9c3f0fcab4571f60d4eb872df0904ee0eb99d Parents: 0b05a8c Author: Kenneth Knowles <[email protected]> Authored: Thu Nov 3 14:50:02 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue Nov 8 13:51:25 2016 -0800 ---------------------------------------------------------------------- .../beam/examples/DebuggingWordCount.java | 67 +++++--------------- .../beam/examples/DebuggingWordCountTest.java | 15 ++++- 2 files changed, 28 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9bc9c3f0/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java index 1d2c83a..f7c537c 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java @@ -50,10 +50,9 @@ import org.slf4j.LoggerFactory; * * <p>New Concepts: * <pre> - * 1. Logging to Cloud Logging - * 2. Controlling worker log levels - * 3. Creating a custom aggregator - * 4. Testing your Pipeline via PAssert + * 1. Logging using SLF4J, even in a distributed environment + * 2. Creating a custom aggregator (runners have varying levels of support) + * 3. Testing your Pipeline via PAssert * </pre> * * <p>To execute this pipeline locally, specify general pipeline configuration: @@ -68,51 +67,20 @@ import org.slf4j.LoggerFactory; * } * </pre> * - * <p>To use the additional logging discussed below, specify: - * <pre>{@code - * --workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"} - * } - * </pre> - * - * <p>Note that when you run via <code>mvn exec</code>, you may need to escape - * the quotations as appropriate for your shell. For example, in <code>bash</code>: - * <pre> - * mvn compile exec:java ... \ - * -Dexec.args="... \ - * --workerLogLevelOverrides={\\\"org.apache.beam.examples\\\":\\\"DEBUG\\\"}" - * </pre> + * <p>The input file defaults to a public data set containing the text of of King Lear, + * by William Shakespeare. You can override it and choose your own input with {@code --inputFile}. * - * <p>Concept #2: Dataflow workers which execute user code are configured to log to Cloud - * Logging by default at "INFO" log level and higher. One may override log levels for specific - * logging namespaces by specifying: - * <pre><code> - * --workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...} - * </code></pre> - * For example, by specifying: - * <pre><code> - * --workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"} - * </code></pre> - * when executing this pipeline using the Dataflow service, Cloud Logging would contain only - * "DEBUG" or higher level logs for the {@code org.apache.beam.examples} package in - * addition to the default "INFO" or higher level logs. In addition, the default Dataflow worker - * logging configuration can be overridden by specifying - * {@code --defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>}. For example, - * by specifying {@code --defaultWorkerLogLevel=DEBUG} when executing this pipeline with - * the Dataflow service, Cloud Logging would contain all "DEBUG" or higher level logs. Note - * that changing the default worker log level to TRACE or DEBUG will significantly increase - * the amount of logs output. - * - * <p>The input file defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt} - * and can be overridden with {@code --inputFile}. */ public class DebuggingWordCount { /** A DoFn that filters for a specific key based upon a regular expression. */ public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> { /** - * Concept #1: The logger below uses the fully qualified class name of FilterTextFn - * as the logger. All log statements emitted by this logger will be referenced by this name - * and will be visible in the Cloud Logging UI. Learn more at https://cloud.google.com/logging - * about the Cloud Logging UI. + * Concept #1: The logger below uses the fully qualified class name of FilterTextFn as the + * logger. Depending on your SLF4J configuration, log statements will likely be qualified by + * this name. + * + * <p>Note that this is entirely standard SLF4J usage. Some runners may provide a default SLF4J + * configuration that is most appropriate for their logging integration. */ private static final Logger LOG = LoggerFactory.getLogger(FilterTextFn.class); @@ -122,11 +90,9 @@ public class DebuggingWordCount { } /** - * Concept #3: A custom aggregator can track values in your pipeline as it runs. Those - * values will be displayed in the Dataflow Monitoring UI when this pipeline is run using the - * Dataflow service. These aggregators below track the number of matched and unmatched words. - * Learn more at https://cloud.google.com/dataflow/pipelines/dataflow-monitoring-intf about - * the Dataflow Monitoring UI. + * Concept #2: A custom aggregator can track values in your pipeline as it runs. Each + * runner provides varying levels of support for aggregators, and may expose them + * in a dashboard, etc. */ private final Aggregator<Long, Long> matchedWords = createAggregator("matchedWords", new Sum.SumLongFn()); @@ -137,8 +103,7 @@ public class DebuggingWordCount { public void processElement(ProcessContext c) { if (filter.matcher(c.element().getKey()).matches()) { // Log at the "DEBUG" level each element that we match. When executing this pipeline - // using the Dataflow service, these log lines will appear in the Cloud Logging UI - // only if the log level is set to "DEBUG" or lower. + // these log lines will appear only if the log level is set to "DEBUG" or lower. LOG.debug("Matched: " + c.element().getKey()); matchedWords.addValue(1L); c.output(c.element()); @@ -178,7 +143,7 @@ public class DebuggingWordCount { .apply(ParDo.of(new FilterTextFn(options.getFilterPattern()))); /** - * Concept #4: PAssert is a set of convenient PTransforms in the style of + * Concept #3: PAssert is a set of convenient PTransforms in the style of * Hamcrest's collection matchers that can be used when writing Pipeline level tests * to validate the contents of PCollections. PAssert is best used in unit tests * with small data sets but is demonstrated here as a teaching tool. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9bc9c3f0/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java b/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java index c1bd5d4..054277a 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java +++ b/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java @@ -20,6 +20,8 @@ package org.apache.beam.examples; import com.google.common.io.Files; import java.io.File; import java.nio.charset.StandardCharsets; +import org.apache.beam.examples.DebuggingWordCount.WordCountOptions; +import org.apache.beam.sdk.testing.TestPipeline; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -35,9 +37,16 @@ public class DebuggingWordCountTest { @Test public void testDebuggingWordCount() throws Exception { - File file = tmpFolder.newFile(); - Files.write("stomach secret Flourish message Flourish here Flourish", file, + File inputFile = tmpFolder.newFile(); + File outputFile = tmpFolder.newFile(); + Files.write( + "stomach secret Flourish message Flourish here Flourish", + inputFile, StandardCharsets.UTF_8); - DebuggingWordCount.main(new String[]{"--inputFile=" + file.getAbsolutePath()}); + WordCountOptions options = + TestPipeline.testingPipelineOptions().as(WordCountOptions.class); + options.setInputFile(inputFile.getAbsolutePath()); + options.setOutput(outputFile.getAbsolutePath()); + DebuggingWordCount.main(TestPipeline.convertToArgs(options)); } }
