This is an automated email from the ASF dual-hosted git repository. mergebot-role pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/beam-site.git
commit 1843b62822fcd233099b942e7d5b41dc01137e73 Author: Mergebot <[email protected]> AuthorDate: Thu Jun 28 16:13:01 2018 -0700 Prepare repository for deployment. --- .../dsls/sql/statements/create-table/index.html | 425 ++++++++++++++++++++- content/documentation/programming-guide/index.html | 103 ++--- .../get-started/mobile-gaming-example/index.html | 96 +++-- 3 files changed, 526 insertions(+), 98 deletions(-) diff --git a/content/documentation/dsls/sql/statements/create-table/index.html b/content/documentation/dsls/sql/statements/create-table/index.html index 36d67a7..99f6091 100644 --- a/content/documentation/dsls/sql/statements/create-table/index.html +++ b/content/documentation/dsls/sql/statements/create-table/index.html @@ -226,6 +226,47 @@ +<ul class="nav"> + <li><a href="#syntax">Syntax</a></li> + <li><a href="#bigquery">BigQuery</a> + <ul> + <li><a href="#syntax-1">Syntax</a></li> + <li><a href="#read-mode">Read Mode</a></li> + <li><a href="#write-mode">Write Mode</a></li> + <li><a href="#schema">Schema</a></li> + <li><a href="#example">Example</a></li> + </ul> + </li> + <li><a href="#pubsub">Pub/Sub</a> + <ul> + <li><a href="#syntax-2">Syntax</a></li> + <li><a href="#read-mode-1">Read Mode</a></li> + <li><a href="#write-mode-1">Write Mode</a></li> + <li><a href="#schema-1">Schema</a></li> + <li><a href="#supported-payload">Supported Payload</a></li> + <li><a href="#example-1">Example</a></li> + </ul> + </li> + <li><a href="#kafka">Kafka</a> + <ul> + <li><a href="#syntax-3">Syntax</a></li> + <li><a href="#read-mode-2">Read Mode</a></li> + <li><a href="#write-mode-2">Write Mode</a></li> + <li><a href="#supported-payload-1">Supported Payload</a></li> + <li><a href="#schema-2">Schema</a></li> + </ul> + </li> + <li><a href="#text">Text</a> + <ul> + <li><a href="#syntax-4">Syntax</a></li> + <li><a href="#read-mode-3">Read Mode</a></li> + <li><a href="#write-mode-3">Write Mode</a></li> + <li><a href="#supported-payload-2">Supported Payload</a></li> + <li><a href="#schema-3">Schema</a></li> + <li><a href="#example-2">Example</a></li> + </ul> + </li> +</ul> </nav> @@ -247,27 +288,387 @@ limitations under the License. <h1 id="create-table">CREATE TABLE</h1> -<p>Beam is not a storage system but reads and writes from other storage systems. -You register those systems with a <code class="highlighter-rouge">CREATE TABLE</code> statement that includes a schema -as well as a number of extended clauses:</p> +<p>Beam SQL’s <code class="highlighter-rouge">CREATE TABLE</code> statement registers a virtual table that maps to an +<a href="https://beam.apache.org/documentation/io/built-in/">external storage system</a>. +For some storage systems, <code class="highlighter-rouge">CREATE TABLE</code> does not create a physical table until +a write occurs. After the physical table exists, you can access the table with +the <code class="highlighter-rouge">SELECT</code>, <code class="highlighter-rouge">JOIN</code>, and <code class="highlighter-rouge">INSERT INTO</code> statements.</p> + +<p>The <code class="highlighter-rouge">CREATE TABLE</code> statement includes a schema and extended clauses.</p> + +<h2 id="syntax">Syntax</h2> + +<div class="highlighter-rouge"><pre class="highlight"><code>CREATE TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*) +TYPE type +[LOCATION location] +[TBLPROPERTIES tblProperties] + +simpleType: TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | DECIMAL | BOOLEAN | DATE | TIME | TIMESTAMP | CHAR | VARCHAR + +fieldType: simpleType | MAP<simpleType, fieldType> | ARRAY<fieldType> | ROW<tableElement [, tableElement ]*> + +tableElement: columnName fieldType [ NOT NULL ] +</code></pre> +</div> + +<ul> + <li><code class="highlighter-rouge">IF NOT EXISTS</code>: Optional. If the table is already registered, Beam SQL +ignores the statement instead of returning an error.</li> + <li><code class="highlighter-rouge">tableName</code>: The case sensitive name of the table to create and register, +specified as an +<a href="https://beam.apache.org/documentation/dsls/sql/lexical/#identifiers">Identifier</a>. +The table name does not need to match the name in the underlying data +storage system.</li> + <li><code class="highlighter-rouge">tableElement</code>: <code class="highlighter-rouge">columnName</code> <code class="highlighter-rouge">fieldType</code> <code class="highlighter-rouge">[ NOT NULL ]</code> + <ul> + <li><code class="highlighter-rouge">columnName</code>: The case sensitive name of the column, specified as a +backtick_quoted_expression.</li> + <li><code class="highlighter-rouge">fieldType</code>: The field’s type, specified as one of the following types: + <ul> + <li><code class="highlighter-rouge">simpleType</code>: <code class="highlighter-rouge">TINYINT</code>, <code class="highlighter-rouge">SMALLINT</code>, <code class="highlighter-rouge">INTEGER</code>, <code class="highlighter-rouge">BIGINT</code>, <code class="highlighter-rouge">FLOAT</code>, +<code class="highlighter-rouge">DOUBLE</code>, <code class="highlighter-rouge">DECIMAL</code>, <code class="highlighter-rouge">BOOLEAN</code>, <code class="highlighter-rouge">DATE</code>, <code class="highlighter-rouge">TIME</code>, <code class="highlighter-rouge">TIMESTAMP</code>, <code class="highlighter-rouge">CHAR</code>, +<code class="highlighter-rouge">VARCHAR</code></li> + <li><code class="highlighter-rouge">MAP<simpleType, fieldType></code></li> + <li><code class="highlighter-rouge">ARRAY<fieldType></code></li> + <li><code class="highlighter-rouge">ROW<tableElement [, tableElement ]*></code></li> + </ul> + </li> + <li><code class="highlighter-rouge">NOT NULL</code>: Optional. Indicates that the column is not nullable.</li> + </ul> + </li> + <li><code class="highlighter-rouge">type</code>: The I/O transform that backs the virtual table, specified as an +<a href="https://beam.apache.org/documentation/dsls/sql/lexical/#identifiers">Identifier</a> +with one of the following values: + <ul> + <li><code class="highlighter-rouge">bigquery</code></li> + <li><code class="highlighter-rouge">pubsub</code></li> + <li><code class="highlighter-rouge">kafka</code></li> + <li><code class="highlighter-rouge">text</code></li> + </ul> + </li> + <li><code class="highlighter-rouge">location</code>: The I/O specific location of the underlying table, specified as +a <a href="https://beam.apache.org/documentation/dsls/sql/lexical/#string-literals">String +Literal</a>. +See the I/O specific sections for <code class="highlighter-rouge">location</code> format requirements.</li> + <li><code class="highlighter-rouge">tblProperties</code>: The I/O specific quoted key value JSON object with extra +configuration, specified as a <a href="https://beam.apache.org/documentation/dsls/sql/lexical/#string-literals">String +Literal</a>. +See the I/O specific sections for <code class="highlighter-rouge">tblProperties</code> format requirements.</li> +</ul> + +<h2 id="bigquery">BigQuery</h2> + +<h3 id="syntax-1">Syntax</h3> + +<div class="highlighter-rouge"><pre class="highlight"><code>CREATE TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*) +TYPE bigquery +LOCATION '[PROJECT_ID]:[DATASET].[TABLE]' +</code></pre> +</div> + +<ul> + <li><code class="highlighter-rouge">LOCATION:</code>Location of the table in the BigQuery CLI format. + <ul> + <li><code class="highlighter-rouge">PROJECT_ID</code>: ID of the Google Cloud Project</li> + <li><code class="highlighter-rouge">DATASET</code>: BigQuery Dataset ID</li> + <li><code class="highlighter-rouge">TABLE</code>: BigQuery Table ID within the Dataset</li> + </ul> + </li> +</ul> + +<h3 id="read-mode">Read Mode</h3> + +<p>Not supported. BigQueryI/O is currently limited to write access only in Beam +SQL.</p> + +<h3 id="write-mode">Write Mode</h3> + +<p>if the table does not exist, Beam creates the table specified in location when +the first record is written. If the table does exist, the specified columns must +match the existing table.</p> + +<h3 id="schema">Schema</h3> + +<p>Schema-related errors will cause the pipeline to crash. The Map type is not +supported. Beam SQL types map to <a href="https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types">BigQuery Standard SQL +types</a> +as follows:</p> + +<table> + <tr> + <td>Beam SQL Type + </td> + <td>BigQuery Standard SQL Type + </td> + </tr> + <tr> + <td>TINYINT, SMALLINT, INTEGER, BIGINT + </td> + <td>INT64 + </td> + </tr> + <tr> + <td>FLOAT, DOUBLE, DECIMAL + </td> + <td>FLOAT64 + </td> + </tr> + <tr> + <td>BOOLEAN + </td> + <td>BOOL + </td> + </tr> + <tr> + <td>DATE + </td> + <td>DATE + </td> + </tr> + <tr> + <td>TIME + </td> + <td>TIME + </td> + </tr> + <tr> + <td>TIMESTAMP + </td> + <td>TIMESTAMP + </td> + </tr> + <tr> + <td>CHAR, VARCHAR + </td> + <td>STRING + </td> + </tr> + <tr> + <td>MAP + </td> + <td>(not supported) + </td> + </tr> + <tr> + <td>ARRAY + </td> + <td>ARRAY + </td> + </tr> + <tr> + <td>ROW + </td> + <td>STRUCT + </td> + </tr> +</table> + +<h3 id="example">Example</h3> + +<div class="highlighter-rouge"><pre class="highlight"><code>CREATE TABLE users (id INTEGER, username VARCHAR) +TYPE bigquery +LOCATION 'testing-integration:apache.users' +</code></pre> +</div> + +<h2 id="pubsub">Pub/Sub</h2> + +<h3 id="syntax-2">Syntax</h3> + +<div class="highlighter-rouge"><pre class="highlight"><code>CREATE TABLE [ IF NOT EXISTS ] tableName + ( + event_timestamp TIMESTAMP, + attributes MAP<VARCHAR, VARCHAR>, + payload ROW<tableElement [, tableElement ]*> + ) +TYPE pubsub +LOCATION 'projects/[PROJECT]/topics/[TOPIC]' +TBLPROPERTIES '{"timestampAttributeKey": "key", "deadLetterQueue": "projects/[PROJECT]/topics/[TOPIC]"}' +</code></pre> +</div> + +<ul> + <li><code class="highlighter-rouge">event_timestamp</code>: The event timestamp associated with the Pub/Sub message +by PubsubIO. It can be one of the following: + <ul> + <li>Message publish time, which is provided by Pub/Sub. This is the default +value if no extra configuration is provided.</li> + <li>A timestamp specified in one of the user-provided message attributes. +The attribute key is configured by the <code class="highlighter-rouge">timestampAttributeKey</code> field of +the <code class="highlighter-rouge">tblProperties</code> blob. The value of the attribute should conform to +the <a href="https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.Read.html#withTimestampAttribute-java.lang.String-">requirements of +PubsubIO</a>, +which is either millis since Unix epoch or <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 339 +</a>date string.</li> + </ul> + </li> + <li><code class="highlighter-rouge">attributes</code>: The user-provided attributes map from the Pub/Sub message;</li> + <li><code class="highlighter-rouge">payload</code>: The schema of the JSON payload of the Pub/Sub message. No other +payload formats are currently supported by Beam SQL. If a record can’t be +unmarshalled, the record is written to the topic specified in the +<code class="highlighter-rouge">deadLeaderQueue</code> field of the <code class="highlighter-rouge">tblProperties</code> blob. If no dead-letter queue +is specified in this case, an exception is thrown and the pipeline will +crash.</li> + <li><code class="highlighter-rouge">LOCATION</code>: + <ul> + <li><code class="highlighter-rouge">PROJECT</code>: ID of the Google Cloud Project</li> + <li><code class="highlighter-rouge">TOPIC</code>: The Pub/Sub topic name. A subscription will be created +automatically, but the subscription is not cleaned up automatically. +Specifying an existing subscription is not supported.</li> + </ul> + </li> + <li><code class="highlighter-rouge">TBLPROPERTIES</code>: + <ul> + <li><code class="highlighter-rouge">timestampAttributeKey</code>: Optional. The key which contains the event +timestamp associated with the Pub/Sub message. If not specified, the +message publish timestamp is used as an event timestamp for +windowing/watermarking.</li> + <li><code class="highlighter-rouge">deadLetterQueue</code>: The topic into which messages are written if the +payload was not parsed. If not specified, an exception is thrown for +parsing failures.</li> + </ul> + </li> +</ul> + +<h3 id="read-mode-1">Read Mode</h3> + +<p>PubsubIO is currently limited to read access only.</p> + +<h3 id="write-mode-1">Write Mode</h3> + +<p>Not supported. PubSubIO is currently limited to read access only in Beam SQL.</p> + +<h3 id="schema-1">Schema</h3> + +<p>Pub/Sub messages have metadata associated with them, and you can reference this +metadata in your queries. For each message, Pub/Sub exposes its publish time and +a map of user-provided attributes in addition to the payload (unstructured in +the general case). This information must be preserved and accessible from the +SQL statements. Currently, this means that PubsubIO tables require you to +declare a special set of columns, as shown below.</p> + +<h3 id="supported-payload">Supported Payload</h3> + +<ul> + <li>JSON Objects + <ul> + <li>Beam only supports querying messages with payload containing JSON +objects. Beam attempts to parse JSON to match the schema of the +<code class="highlighter-rouge">payload</code> field.</li> + </ul> + </li> +</ul> + +<h3 id="example-1">Example</h3> + +<div class="highlighter-rouge"><pre class="highlight"><code>CREATE TABLE locations (event_timestamp TIMESTAMP, attributes MAP<VARCHAR, VARCHAR>, payload ROW<id INTEGER, location VARCHAR>) +TYPE pubsub +LOCATION 'projects/testing-integration/topics/user-location' +</code></pre> +</div> + +<h2 id="kafka">Kafka</h2> + +<p>KafkaIO is experimental in Beam SQL.</p> + +<h3 id="syntax-3">Syntax</h3> + +<div class="highlighter-rouge"><pre class="highlight"><code>CREATE TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*) +TYPE kafka +LOCATION 'kafka://localhost:2181/brokers' +TBLPROPERTIES '{"bootstrap.servers":"localhost:9092", "topics": ["topic1", "topic2"]}' +</code></pre> +</div> + +<ul> + <li><code class="highlighter-rouge">LOCATION</code>: The Kafka topic URL.</li> + <li><code class="highlighter-rouge">TBLPROPERTIES</code>: + <ul> + <li><code class="highlighter-rouge">bootstrap.servers</code>: Optional. Allows you to specify the bootstrap +server.</li> + <li><code class="highlighter-rouge">topics</code>: Optional. Allows you to specify specific topics.</li> + </ul> + </li> +</ul> + +<h3 id="read-mode-2">Read Mode</h3> + +<p>Read Mode supports reading from a topic.</p> + +<h3 id="write-mode-2">Write Mode</h3> + +<p>Write Mode supports writing to a topic.</p> + +<h3 id="supported-payload-1">Supported Payload</h3> + +<ul> + <li>CSV + <ul> + <li>Beam parses the messages, attempting to parse fields according to the +types specified in the schema.</li> + </ul> + </li> +</ul> + +<h3 id="schema-2">Schema</h3> + +<p>Only simple types are supported.</p> + +<h2 id="text">Text</h2> + +<p>TextIO is experimental in Beam SQL. Read Mode and Write Mode do not currently +access the same underlying data.</p> + +<h3 id="syntax-4">Syntax</h3> + +<div class="highlighter-rouge"><pre class="highlight"><code>CREATE TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*) +TYPE text +LOCATION '/home/admin/orders' +TBLPROPERTIES '{"format: "Excel"}' +</code></pre> +</div> <ul> - <li><code class="highlighter-rouge">TYPE</code> to indicate what</li> - <li><code class="highlighter-rouge">LOCATION</code> to specify a URL or otherwise indicate where the data is</li> - <li><code class="highlighter-rouge">TBLPROPERTIES</code> to configure the endpoint</li> + <li><code class="highlighter-rouge">LOCATION</code>: The path to the file for Read Mode. The prefix for Write Mode.</li> + <li><code class="highlighter-rouge">TBLPROPERTIES</code>: + <ul> + <li><code class="highlighter-rouge">format</code>: Optional. Allows you to specify the +<a href="https://commons.apache.org/proper/commons-csv/archives/1.5/apidocs/org/apache/commons/csv/CSVFormat.Predefined.html">CSVFormat</a>.</li> + </ul> + </li> </ul> -<p>Once a table is registered, it may be read-only or it may support both read and -write access.</p> +<h3 id="read-mode-3">Read Mode</h3> -<p>Currently there are a few experimental connectors available, and the reference -for them is their Javadoc:</p> +<p>Read Mode supports reading from a file.</p> + +<h3 id="write-mode-3">Write Mode</h3> + +<p>Write Mode supports writing to a set of files. TextIO creates file on writes.</p> + +<h3 id="supported-payload-2">Supported Payload</h3> <ul> - <li><a href="/documentation/sdks/javadoc/2.5.0/index.html?org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.html">Kafka</a></li> - <li><a href="/documentation/sdks/javadoc/2.5.0/index.html?org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.html">Text</a></li> + <li>CSV + <ul> + <li>Beam parses the messages, attempting to parse fields according to the +types specified in the schema using org.apache.commons.csv.</li> + </ul> + </li> </ul> +<h3 id="schema-3">Schema</h3> + +<p>Only simple types are supported.</p> + +<h3 id="example-2">Example</h3> + +<div class="highlighter-rouge"><pre class="highlight"><code>CREATE TABLE orders (id INTEGER, price INTEGER) +TYPE text +LOCATION '/home/admin/orders' +</code></pre> +</div> + </div> </div> <!-- diff --git a/content/documentation/programming-guide/index.html b/content/documentation/programming-guide/index.html index 1899c51..53cafb3 100644 --- a/content/documentation/programming-guide/index.html +++ b/content/documentation/programming-guide/index.html @@ -1235,17 +1235,19 @@ The first set of data contains names and email addresses. The second set of data contains names and phone numbers. </span></p> -<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="kd">final</span> <span class="n">List</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">emailsList</span> <span class="o">=</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span> - <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"amy"</span><span class="o">,</span> <span class="s">"[email protected]"</span><span class="o">),</span> - <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"carl"</span><span class="o">,</span> <span class="s">"[email protected]"</span><span class="o">),</span> - <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"julia"</span><span class="o">,</span> <span class="s">"[email protected]"</span><span class="o">),</span> - <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"carl"</span><span class="o">,</span> <span class="s">"[email protected]"</span><span class="o">));</span> - -<span class="kd">final</span> <span class="n">List</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">phonesList</span> <span class="o">=</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span> - <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"amy"</span><span class="o">,</span> <span class="s">"111-222-3333"</span><span class="o">),</span> - <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"james"</span><span class="o">,</span> <span class="s">"222-333-4444"</span><span class="o">),</span> - <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"amy"</span><span class="o">,</span> <span class="s">"333-444-5555"</span><span class="o">),</span> - <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"carl"</span><span class="o">,</span> <span class="s">"444-555-6666"</span><span class="o">));</span> +<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="kd">final</span> <span class="n">List</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">emailsList</span> <span class="o">=</span> + <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span> + <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"amy"</span><span class="o">,</span> <span class="s">"[email protected]"</span><span class="o">),</span> + <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"carl"</span><span class="o">,</span> <span class="s">"[email protected]"</span><span class="o">),</span> + <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"julia"</span><span class="o">,</span> <span class="s">"[email protected]"</span><span class="o">),</span> + <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"carl"</span><span class="o">,</span> <span class="s">"[email protected]"</span><span class="o">));</span> + +<span class="kd">final</span> <span class="n">List</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">phonesList</span> <span class="o">=</span> + <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span> + <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"amy"</span><span class="o">,</span> <span class="s">"111-222-3333"</span><span class="o">),</span> + <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"james"</span><span class="o">,</span> <span class="s">"222-333-4444"</span><span class="o">),</span> + <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"amy"</span><span class="o">,</span> <span class="s">"333-444-5555"</span><span class="o">),</span> + <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"carl"</span><span class="o">,</span> <span class="s">"444-555-6666"</span><span class="o">));</span> <span class="n">PCollection</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">emails</span> <span class="o">=</span> <span class="n">p</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"CreateEmails"</span><span class="o">,</span> <span class="n">Create</span><span class="o">. [...] <span class="n">PCollection</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>></span> <span class="n">phones</span> <span class="o">=</span> <span class="n">p</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"CreatePhones"</span><span class="o">,</span> <span class="n">Create</span><span class="o">. [...] @@ -1275,19 +1277,24 @@ unique key from any of the input collections.</p> <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="kd">final</span> <span class="n">TupleTag</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">emailsTag</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TupleTag</span><span class="o"><>();</span> <span class="kd">final</span> <span class="n">TupleTag</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">phonesTag</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TupleTag</span><span class="o"><>();</span> -<span class="kd">final</span> <span class="n">List</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">CoGbkResult</span><span class="o">>></span> <span class="n">expectedResults</span> <span class="o">=</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span> - <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"amy"</span><span class="o">,</span> <span class="n">CoGbkResult</span> - <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">emailsTag</span><span class="o">,</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="s">"[email protected]"</span><span class="o">))</span> - <span class="o">.</span><span class="na">and</span><span class="o">(</span><span class="n">phonesTag</span><span class="o">,</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="s">"111-222-3333"</span><span class="o">,</span> <span class="s">"333-444-5555"</span><span class="o">))),</span> - <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"carl"</span><span class="o">,</span> <span class="n">CoGbkResult</span> - <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">emailsTag</span><span class="o">,</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="s">"[email protected]"</span><span class="o">,</span> <span class="s">"[email protected]"</span><span class="o">))</span> - <span class="o">.</span><span class="na">and</span><span class="o">(</span><span class="n">phonesTag</span><span class="o">,</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="s">"444-555-6666"</span><span class="o">))),</span> - <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"james"</span><span class="o">,</span> <span class="n">CoGbkResult</span> - <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">emailsTag</span><span class="o">,</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">())</span> - <span class="o">.</span><span class="na">and</span><span class="o">(</span><span class="n">phonesTag</span><span class="o">,</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="s">"222-333-4444"</span><span class="o">))),</span> - <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"julia"</span><span class="o">,</span> <span class="n">CoGbkResult</span> - <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">emailsTag</span><span class="o">,</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="s">"[email protected]"</span><span class="o">))</span> - <span class="o">.</span><span class="na">and</span><span class="o">(</span><span class="n">phonesTag</span><span class="o">,</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">())));</span> +<span class="kd">final</span> <span class="n">List</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">CoGbkResult</span><span class="o">>></span> <span class="n">expectedResults</span> <span class="o">=</span> + <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span> + <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span> + <span class="s">"amy"</span><span class="o">,</span> + <span class="n">CoGbkResult</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">emailsTag</span><span class="o">,</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="s">"[email protected]"</span><span class="o">))</span> + <span class="o">.</span><span class="na">and</span><span class="o">(</span><span class="n">phonesTag</span><span class="o">,</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="s">"111-222-3333"</span><span class="o">,</span> <span class="s">"333-444-5555"</span><span class="o">))),</span> + <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span> + <span class="s">"carl"</span><span class="o">,</span> + <span class="n">CoGbkResult</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">emailsTag</span><span class="o">,</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="s">"[email protected]"</span><span class="o">,</span> <span class="s">"[email protected]"</span><span class="o">))</span> + <span class="o">.</span><span class="na">and</span><span class="o">(</span><span class="n">phonesTag</span><span class="o">,</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="s">"444-555-6666"</span><span class="o">))),</span> + <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span> + <span class="s">"james"</span><span class="o">,</span> + <span class="n">CoGbkResult</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">emailsTag</span><span class="o">,</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">())</span> + <span class="o">.</span><span class="na">and</span><span class="o">(</span><span class="n">phonesTag</span><span class="o">,</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="s">"222-333-4444"</span><span class="o">))),</span> + <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span> + <span class="s">"julia"</span><span class="o">,</span> + <span class="n">CoGbkResult</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">emailsTag</span><span class="o">,</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="s">"[email protected]"</span><span class="o">))</span> + <span class="o">.</span><span class="na">and</span><span class="o">(</span><span class="n">phonesTag</span><span class="o">,</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">())));</span> </code></pre> </div> <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="n">results</span> <span class="o">=</span> <span class="p">[</span> @@ -1312,24 +1319,25 @@ followed by a <code class="highlighter-rouge">ParDo</code> to consume the result and format data from each collection.</p> <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">PCollection</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">CoGbkResult</span><span class="o">>></span> <span class="n">results</span> <span class="o">=</span> - <span class="n">KeyedPCollectionTuple</span> - <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">emailsTag</span><span class="o">,</span> <span class="n">emails</span><span class="o">)</span> - <span class="o">.</span><span class="na">and</span><span class="o">(</span><span class="n">phonesTag</span><span class="o">,</span> <span class="n">phones</span><span class="o">)</span> - <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">CoGroupByKey</span><span class="o">.</span><span class="na">create</span><span class="o">());</span> - -<span class="n">PCollection</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">contactLines</span> <span class="o">=</span> <span class="n">results</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">ParDo</span><span class="o">.</span><span class="na">of</span><span class="o">(</span> - <span class="k">new</span> <span class="n">DoFn</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">CoGbkResult</span><span class="o">>,</span> <span class="n">String</span><span class="o">>()</span> <span class="o">{</span> - <span class="nd">@ProcessElement</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="nf">processElement</span><span class="o">(</span><span class="n">ProcessContext</span> <span class="n">c</span><span class="o">)</span> <span class="o">{</span> - <span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">CoGbkResult</span><span class="o">></span> <span class="n">e</span> <span class="o">=</span> <span class="n">c</span><span class="o">.</span><span class="na">element</span><span class="o">();</span> - <span class="n">String</span> <span class="n">name</span> <span class="o">=</span> <span class="n">e</span><span class="o">.</span><span class="na">getKey</span><span class="o">();</span> - <span class="n">Iterable</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">emailsIter</span> <span class="o">=</span> <span class="n">e</span><span class="o">.</span><span class="na">getValue</span><span class="o">().</span><span class="na">getAll</span><span class="o">(</span><span class="n">emailsTag</span><span class="o">);</span> - <span class="n">Iterable</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">phonesIter</span> <span class="o">=</span> <span class="n">e</span><span class="o">.</span><span class="na">getValue</span><span class="o">().</span><span class="na">getAll</span><span class="o">(</span><span class="n">phonesTag</span><span class="o">);</span> - <span class="n">String</span> <span class="n">formattedResult</span> <span class="o">=</span> <span class="n">Snippets</span><span class="o">.</span><span class="na">formatCoGbkResults</span><span class="o">(</span><span class="n">name</span><span class="o">,</span> <span class="n">emailsIter</span><span class="o">,</span> <span class="n">phonesIter</span><span class="o">);</span> - <span class="n">c</span><span class="o">.</span><span class="na">output</span><span class="o">(</span><span class="n">formattedResult</span><span class="o">);</span> - <span class="o">}</span> - <span class="o">}</span> -<span class="o">));</span> + <span class="n">KeyedPCollectionTuple</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">emailsTag</span><span class="o">,</span> <span class="n">emails</span><span class="o">)</span> + <span class="o">.</span><span class="na">and</span><span class="o">(</span><span class="n">phonesTag</span><span class="o">,</span> <span class="n">phones</span><span class="o">)</span> + <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">CoGroupByKey</span><span class="o">.</span><span class="na">create</span><span class="o">());</span> + +<span class="n">PCollection</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">contactLines</span> <span class="o">=</span> + <span class="n">results</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span> + <span class="n">ParDo</span><span class="o">.</span><span class="na">of</span><span class="o">(</span> + <span class="k">new</span> <span class="n">DoFn</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">CoGbkResult</span><span class="o">>,</span> <span class="n">String</span><span class="o">>()</span> <span class="o">{</span> + <span class="nd">@ProcessElement</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">processElement</span><span class="o">(</span><span class="n">ProcessContext</span> <span class="n">c</span><span class="o">)</span> <span class="o">{</span> + <span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">CoGbkResult</span><span class="o">></span> <span class="n">e</span> <span class="o">=</span> <span class="n">c</span><span class="o">.</span><span class="na">element</span><span class="o">();</span> + <span class="n">String</span> <span class="n">name</span> <span class="o">=</span> <span class="n">e</span><span class="o">.</span><span class="na">getKey</span><span class="o">();</span> + <span class="n">Iterable</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">emailsIter</span> <span class="o">=</span> <span class="n">e</span><span class="o">.</span><span class="na">getValue</span><span class="o">().</span><span class="na">getAll</span><span class="o">(</span><span class="n">emailsTag</span><span class="o">);</span> + <span class="n">Iterable</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">phonesIter</span> <span class="o">=</span> <span class="n">e</span><span class="o">.</span><span class="na">getValue</span><span class="o">().</span><span class="na">getAll</span><span class="o">(</span><span class="n">phonesTag</span><span class="o">);</span> + <span class="n">String</span> <span class="n">formattedResult</span> <span class="o">=</span> + <span class="n">Snippets</span><span class="o">.</span><span class="na">formatCoGbkResults</span><span class="o">(</span><span class="n">name</span><span class="o">,</span> <span class="n">emailsIter</span><span class="o">,</span> <span class="n">phonesIter</span><span class="o">);</span> + <span class="n">c</span><span class="o">.</span><span class="na">output</span><span class="o">(</span><span class="n">formattedResult</span><span class="o">);</span> + <span class="o">}</span> + <span class="o">}));</span> </code></pre> </div> <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># The result PCollection contains one key-value element for each key in the</span> @@ -1351,11 +1359,12 @@ and format data from each collection.</p> <p>The formatted data looks like this:</p> -<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="kd">final</span> <span class="n">List</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">formattedResults</span> <span class="o">=</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span> - <span class="s">"amy; ['[email protected]']; ['111-222-3333', '333-444-5555']"</span><span class="o">,</span> - <span class="s">"carl; ['[email protected]', '[email protected]']; ['444-555-6666']"</span><span class="o">,</span> - <span class="s">"james; []; ['222-333-4444']"</span><span class="o">,</span> - <span class="s">"julia; ['[email protected]']; []"</span><span class="o">);</span> +<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="kd">final</span> <span class="n">List</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">formattedResults</span> <span class="o">=</span> + <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span> + <span class="s">"amy; ['[email protected]']; ['111-222-3333', '333-444-5555']"</span><span class="o">,</span> + <span class="s">"carl; ['[email protected]', '[email protected]']; ['444-555-6666']"</span><span class="o">,</span> + <span class="s">"james; []; ['222-333-4444']"</span><span class="o">,</span> + <span class="s">"julia; ['[email protected]']; []"</span><span class="o">);</span> </code></pre> </div> <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="n">formatted_results</span> <span class="o">=</span> <span class="p">[</span> diff --git a/content/get-started/mobile-gaming-example/index.html b/content/get-started/mobile-gaming-example/index.html index bde29e1..314599c 100644 --- a/content/get-started/mobile-gaming-example/index.html +++ b/content/get-started/mobile-gaming-example/index.html @@ -359,8 +359,7 @@ looks more like what is depicted by the red squiggly line above the ideal line.< <span class="o">}</span> <span class="nd">@Override</span> - <span class="kd">public</span> <span class="n">PCollection</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="nf">expand</span><span class="o">(</span> - <span class="n">PCollection</span><span class="o"><</span><span class="n">GameActionInfo</span><span class="o">></span> <span class="n">gameInfo</span><span class="o">)</span> <span class="o">{</span> + <span class="kd">public</span> <span class="n">PCollection</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="nf">expand</span><span class="o">(</span><span class="n">PCollection</span><span class="o"><</span><span class="n">GameActionInfo</span><span class="o">></span> <span class="n">gameInfo</span><span class="o">)</ [...] <span class="k">return</span> <span class="n">gameInfo</span> <span class="o">.</span><span class="na">apply</span><span class="o">(</span> @@ -792,8 +791,8 @@ ten minutes after data is received.</em></p> <p>The following code example shows how <code class="highlighter-rouge">LeaderBoard</code> sets the processing time trigger to output the data for user scores:</p> <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="cm">/** - * Extract user/score pairs from the event stream using processing time, via global windowing. - * Get periodic updates on all users' running scores. + * Extract user/score pairs from the event stream using processing time, via global windowing. Get + * periodic updates on all users' running scores. */</span> <span class="nd">@VisibleForTesting</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">CalculateUserScores</span> @@ -806,13 +805,16 @@ ten minutes after data is received.</em></p> <span class="nd">@Override</span> <span class="kd">public</span> <span class="n">PCollection</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="nf">expand</span><span class="o">(</span><span class="n">PCollection</span><span class="o"><</span><span class="n">GameActionInfo</span><span class="o">></span> <span class="n">input</span><span class="o">)</spa [...] - <span class="k">return</span> <span class="n">input</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"LeaderboardUserGlobalWindow"</span><span class="o">,</span> - <span class="n">Window</span><span class="o">.<</span><span class="n">GameActionInfo</span><span class="o">></span><span class="n">into</span><span class="o">(</span><span class="k">new</span> <span class="n">GlobalWindows</span><span class="o">())</span> - <span class="c1">// Get periodic results every ten minutes.</span> - <span class="o">.</span><span class="na">triggering</span><span class="o">(</span><span class="n">Repeatedly</span><span class="o">.</span><span class="na">forever</span><span class="o">(</span><span class="n">AfterProcessingTime</span><span class="o">.</span><span class="na">pastFirstElementInPane</span><span class="o">()</span> - <span class="o">.</span><span class="na">plusDelayOf</span><span class="o">(</span><span class="n">TEN_MINUTES</span><span class="o">)))</span> - <span class="o">.</span><span class="na">accumulatingFiredPanes</span><span class="o">()</span> - <span class="o">.</span><span class="na">withAllowedLateness</span><span class="o">(</span><span class="n">allowedLateness</span><span class="o">))</span> + <span class="k">return</span> <span class="n">input</span> + <span class="o">.</span><span class="na">apply</span><span class="o">(</span> + <span class="s">"LeaderboardUserGlobalWindow"</span><span class="o">,</span> + <span class="n">Window</span><span class="o">.<</span><span class="n">GameActionInfo</span><span class="o">></span><span class="n">into</span><span class="o">(</span><span class="k">new</span> <span class="n">GlobalWindows</span><span class="o">())</span> + <span class="c1">// Get periodic results every ten minutes.</span> + <span class="o">.</span><span class="na">triggering</span><span class="o">(</span> + <span class="n">Repeatedly</span><span class="o">.</span><span class="na">forever</span><span class="o">(</span> + <span class="n">AfterProcessingTime</span><span class="o">.</span><span class="na">pastFirstElementInPane</span><span class="o">().</span><span class="na">plusDelayOf</span><span class="o">(</span><span class="n">TEN_MINUTES</span><span class="o">)))</span> + <span class="o">.</span><span class="na">accumulatingFiredPanes</span><span class="o">()</span> + <span class="o">.</span><span class="na">withAllowedLateness</span><span class="o">(</span><span class="n">allowedLateness</span><span class="o">))</span> <span class="c1">// Extract and sum username/score pairs from the event data.</span> <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"ExtractUserScore"</span><span class="o">,</span> <span class="k">new</span> <span class="n">ExtractAndSumScore</span><span class="o">(</span><span class="s">"user"</span><span class="o">));</span> <span class="o">}</span> @@ -883,17 +885,22 @@ late results.</em></p> <span class="nd">@Override</span> <span class="kd">public</span> <span class="n">PCollection</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="nf">expand</span><span class="o">(</span><span class="n">PCollection</span><span class="o"><</span><span class="n">GameActionInfo</span><span class="o">></span> <span class="n">infos</span><span class="o">)</spa [...] - <span class="k">return</span> <span class="n">infos</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"LeaderboardTeamFixedWindows"</span><span class="o">,</span> - <span class="n">Window</span><span class="o">.<</span><span class="n">GameActionInfo</span><span class="o">></span><span class="n">into</span><span class="o">(</span><span class="n">FixedWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">teamWindowDuration</span><span class="o">))</span> - <span class="c1">// We will get early (speculative) results as well as cumulative</span> - <span class="c1">// processing of late data.</span> - <span class="o">.</span><span class="na">triggering</span><span class="o">(</span><span class="n">AfterWatermark</span><span class="o">.</span><span class="na">pastEndOfWindow</span><span class="o">()</span> - <span class="o">.</span><span class="na">withEarlyFirings</span><span class="o">(</span><span class="n">AfterProcessingTime</span><span class="o">.</span><span class="na">pastFirstElementInPane</span><span class="o">()</span> - <span class="o">.</span><span class="na">plusDelayOf</span><span class="o">(</span><span class="n">FIVE_MINUTES</span><span class="o">))</span> - <span class="o">.</span><span class="na">withLateFirings</span><span class="o">(</span><span class="n">AfterProcessingTime</span><span class="o">.</span><span class="na">pastFirstElementInPane</span><span class="o">()</span> - <span class="o">.</span><span class="na">plusDelayOf</span><span class="o">(</span><span class="n">TEN_MINUTES</span><span class="o">)))</span> - <span class="o">.</span><span class="na">withAllowedLateness</span><span class="o">(</span><span class="n">allowedLateness</span><span class="o">)</span> - <span class="o">.</span><span class="na">accumulatingFiredPanes</span><span class="o">())</span> + <span class="k">return</span> <span class="n">infos</span> + <span class="o">.</span><span class="na">apply</span><span class="o">(</span> + <span class="s">"LeaderboardTeamFixedWindows"</span><span class="o">,</span> + <span class="n">Window</span><span class="o">.<</span><span class="n">GameActionInfo</span><span class="o">></span><span class="n">into</span><span class="o">(</span><span class="n">FixedWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">teamWindowDuration</span><span class="o">))</span> + <span class="c1">// We will get early (speculative) results as well as cumulative</span> + <span class="c1">// processing of late data.</span> + <span class="o">.</span><span class="na">triggering</span><span class="o">(</span> + <span class="n">AfterWatermark</span><span class="o">.</span><span class="na">pastEndOfWindow</span><span class="o">()</span> + <span class="o">.</span><span class="na">withEarlyFirings</span><span class="o">(</span> + <span class="n">AfterProcessingTime</span><span class="o">.</span><span class="na">pastFirstElementInPane</span><span class="o">()</span> + <span class="o">.</span><span class="na">plusDelayOf</span><span class="o">(</span><span class="n">FIVE_MINUTES</span><span class="o">))</span> + <span class="o">.</span><span class="na">withLateFirings</span><span class="o">(</span> + <span class="n">AfterProcessingTime</span><span class="o">.</span><span class="na">pastFirstElementInPane</span><span class="o">()</span> + <span class="o">.</span><span class="na">plusDelayOf</span><span class="o">(</span><span class="n">TEN_MINUTES</span><span class="o">)))</span> + <span class="o">.</span><span class="na">withAllowedLateness</span><span class="o">(</span><span class="n">allowedLateness</span><span class="o">)</span> + <span class="o">.</span><span class="na">accumulatingFiredPanes</span><span class="o">())</span> <span class="c1">// Extract and sum teamname/score pairs from the event data.</span> <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"ExtractTeamScore"</span><span class="o">,</span> <span class="k">new</span> <span class="n">ExtractAndSumScore</span><span class="o">(</span><span class="s">"team"</span><span class="o">));</span> <span class="o">}</span> @@ -983,23 +990,34 @@ late results.</em></p> <span class="n">sumScores</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Values</span><span class="o">.</span><span class="na">create</span><span class="o">()).</span><span class="na">apply</span><span class="o">(</span><span class="n">Mean</span><span class="o">.<</span><span class="n">Integer</span><span class="o">></span><span class="n">globally</span><span class="o">().</span><span class="na">asSingletonView</span><spa [...] <span class="c1">// Filter the user sums using the global mean.</span> - <span class="n">PCollection</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">filtered</span> <span class="o">=</span> <span class="n">sumScores</span> - <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"ProcessAndFilter"</span><span class="o">,</span> <span class="n">ParDo</span> - <span class="c1">// use the derived mean total score as a side input</span> - <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="n">DoFn</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>,</span> <span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>> [...] - <span class="kd">private</span> <span class="kd">final</span> <span class="n">Counter</span> <span class="n">numSpammerUsers</span> <span class="o">=</span> <span class="n">Metrics</span><span class="o">.</span><span class="na">counter</span><span class="o">(</span><span class="s">"main"</span><span class="o">,</span> <span class="s">"SpammerUsers"</span><span class="o">);</span> - <span class="nd">@ProcessElement</span> - <span class="kd">public</span> <span class="kt">void</span> <span class="nf">processElement</span><span class="o">(</span><span class="n">ProcessContext</span> <span class="n">c</span><span class="o">)</span> <span class="o">{</span> - <span class="n">Integer</span> <span class="n">score</span> <span class="o">=</span> <span class="n">c</span><span class="o">.</span><span class="na">element</span><span class="o">().</span><span class="na">getValue</span><span class="o">();</span> - <span class="n">Double</span> <span class="n">gmc</span> <span class="o">=</span> <span class="n">c</span><span class="o">.</span><span class="na">sideInput</span><span class="o">(</span><span class="n">globalMeanScore</span><span class="o">);</span> - <span class="k">if</span> <span class="o">(</span><span class="n">score</span> <span class="o">></span> <span class="o">(</span><span class="n">gmc</span> <span class="o">*</span> <span class="n">SCORE_WEIGHT</span><span class="o">))</span> <span class="o">{</span> - <span class="n">LOG</span><span class="o">.</span><span class="na">info</span><span class="o">(</span><span class="s">"user "</span> <span class="o">+</span> <span class="n">c</span><span class="o">.</span><span class="na">element</span><span class="o">().</span><span class="na">getKey</span><span class="o">()</span> <span class="o">+</span> <span class="s">" spammer score "</span> <span class="o">+</span> <span class="n">score</span> - <span class="o">+</span> <span class="s">" with mean "</span> <span class="o">+</span> <span class="n">gmc</span><span class="o">);</span> - <span class="n">numSpammerUsers</span><span class="o">.</span><span class="na">inc</span><span class="o">();</span> - <span class="n">c</span><span class="o">.</span><span class="na">output</span><span class="o">(</span><span class="n">c</span><span class="o">.</span><span class="na">element</span><span class="o">());</span> - <span class="o">}</span> - <span class="o">}</span> - <span class="o">}).</span><span class="na">withSideInputs</span><span class="o">(</span><span class="n">globalMeanScore</span><span class="o">));</span> + <span class="n">PCollection</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>></span> <span class="n">filtered</span> <span class="o">=</span> + <span class="n">sumScores</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span> + <span class="s">"ProcessAndFilter"</span><span class="o">,</span> + <span class="n">ParDo</span> + <span class="c1">// use the derived mean total score as a side input</span> + <span class="o">.</span><span class="na">of</span><span class="o">(</span> + <span class="k">new</span> <span class="n">DoFn</span><span class="o"><</span><span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>,</span> <span class="n">KV</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>>()</span> <span class="o">{</span> + <span class="kd">private</span> <span class="kd">final</span> <span class="n">Counter</span> <span class="n">numSpammerUsers</span> <span class="o">=</span> + <span class="n">Metrics</span><span class="o">.</span><span class="na">counter</span><span class="o">(</span><span class="s">"main"</span><span class="o">,</span> <span class="s">"SpammerUsers"</span><span class="o">);</span> + + <span class="nd">@ProcessElement</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">processElement</span><span class="o">(</span><span class="n">ProcessContext</span> <span class="n">c</span><span class="o">)</span> <span class="o">{</span> + <span class="n">Integer</span> <span class="n">score</span> <span class="o">=</span> <span class="n">c</span><span class="o">.</span><span class="na">element</span><span class="o">().</span><span class="na">getValue</span><span class="o">();</span> + <span class="n">Double</span> <span class="n">gmc</span> <span class="o">=</span> <span class="n">c</span><span class="o">.</span><span class="na">sideInput</span><span class="o">(</span><span class="n">globalMeanScore</span><span class="o">);</span> + <span class="k">if</span> <span class="o">(</span><span class="n">score</span> <span class="o">></span> <span class="o">(</span><span class="n">gmc</span> <span class="o">*</span> <span class="n">SCORE_WEIGHT</span><span class="o">))</span> <span class="o">{</span> + <span class="n">LOG</span><span class="o">.</span><span class="na">info</span><span class="o">(</span> + <span class="s">"user "</span> + <span class="o">+</span> <span class="n">c</span><span class="o">.</span><span class="na">element</span><span class="o">().</span><span class="na">getKey</span><span class="o">()</span> + <span class="o">+</span> <span class="s">" spammer score "</span> + <span class="o">+</span> <span class="n">score</span> + <span class="o">+</span> <span class="s">" with mean "</span> + <span class="o">+</span> <span class="n">gmc</span><span class="o">);</span> + <span class="n">numSpammerUsers</span><span class="o">.</span><span class="na">inc</span><span class="o">();</span> + <span class="n">c</span><span class="o">.</span><span class="na">output</span><span class="o">(</span><span class="n">c</span><span class="o">.</span><span class="na">element</span><span class="o">());</span> + <span class="o">}</span> + <span class="o">}</span> + <span class="o">})</span> + <span class="o">.</span><span class="na">withSideInputs</span><span class="o">(</span><span class="n">globalMeanScore</span><span class="o">));</span> <span class="k">return</span> <span class="n">filtered</span><span class="o">;</span> <span class="o">}</span> <span class="o">}</span>
