Repository: beam
Updated Branches:
  refs/heads/master 3360b1f68 -> 0f096b12e


Created Java snippets file


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9f1db3f1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9f1db3f1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9f1db3f1

Branch: refs/heads/master
Commit: 9f1db3f11680c08407bc8dd0101f8b047c090620
Parents: 3360b1f
Author: David Cavazos <[email protected]>
Authored: Tue Sep 26 10:38:38 2017 -0700
Committer: Ahmet Altay <[email protected]>
Committed: Thu Oct 19 12:10:45 2017 -0700

----------------------------------------------------------------------
 .../examples/website_snippets/Snippets.java     |  87 ++++++++++++++
 .../examples/website_snippets/SnippetsTest.java | 114 +++++++++++++++++++
 .../apache_beam/examples/snippets/snippets.py   |  45 +++-----
 .../examples/snippets/snippets_test.py          |  38 ++++---
 4 files changed, 239 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9f1db3f1/examples/java8/src/main/java/org/apache/beam/examples/website_snippets/Snippets.java
----------------------------------------------------------------------
diff --git 
a/examples/java8/src/main/java/org/apache/beam/examples/website_snippets/Snippets.java
 
b/examples/java8/src/main/java/org/apache/beam/examples/website_snippets/Snippets.java
new file mode 100644
index 0000000..f17171e
--- /dev/null
+++ 
b/examples/java8/src/main/java/org/apache/beam/examples/website_snippets/Snippets.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Code snippets used in webdocs.
+ */
+public class Snippets {
+
+  /* Helper function to format results in coGroupByKeyTuple */
+  public static String formatCoGbkResults(
+      String name, Iterable<String> emails, Iterable<String> phones) {
+
+    List<String> emailsList = new ArrayList<>();
+    for (String elem : emails) {
+      emailsList.add("'" + elem + "'");
+    }
+    Collections.<String>sort(emailsList);
+    String emailsStr = "[" + String.join(", ", emailsList) + "]";
+
+    List<String> phonesList = new ArrayList<>();
+    for (String elem : phones) {
+      phonesList.add("'" + elem + "'");
+    }
+    Collections.<String>sort(phonesList);
+    String phonesStr = "[" + String.join(", ", phonesList) + "]";
+
+    return name + "; " + emailsStr + "; " + phonesStr;
+  }
+
+  public static PCollection<String> coGroupByKeyTuple(
+      TupleTag<String> emailsTag,
+      TupleTag<String> phonesTag,
+      PCollection<KV<String, String>> emails,
+      PCollection<KV<String, String>> phones) {
+
+    // [START CoGroupByKeyTuple]
+    PCollection<KV<String, CoGbkResult>> results =
+        KeyedPCollectionTuple
+        .of(emailsTag, emails)
+        .and(phonesTag, phones)
+        .apply(CoGroupByKey.<String>create());
+
+    PCollection<String> contactLines = results.apply(ParDo.of(
+      new DoFn<KV<String, CoGbkResult>, String>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          KV<String, CoGbkResult> e = c.element();
+          String name = e.getKey();
+          Iterable<String> emailsIter = e.getValue().getAll(emailsTag);
+          Iterable<String> phonesIter = e.getValue().getAll(phonesTag);
+          String formattedResult = Snippets.formatCoGbkResults(name, 
emailsIter, phonesIter);
+          c.output(formattedResult);
+        }
+      }
+    ));
+    // [END CoGroupByKeyTuple]
+    return contactLines;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/9f1db3f1/examples/java8/src/test/java/org/apache/beam/examples/website_snippets/SnippetsTest.java
----------------------------------------------------------------------
diff --git 
a/examples/java8/src/test/java/org/apache/beam/examples/website_snippets/SnippetsTest.java
 
b/examples/java8/src/test/java/org/apache/beam/examples/website_snippets/SnippetsTest.java
new file mode 100644
index 0000000..3ca6c9a
--- /dev/null
+++ 
b/examples/java8/src/test/java/org/apache/beam/examples/website_snippets/SnippetsTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+
+/**
+ * Tests for Snippets.
+ */
+@RunWith(JUnit4.class)
+public class SnippetsTest implements Serializable {
+
+  @Rule
+  public transient TestPipeline p = TestPipeline.create();
+
+  /* Tests CoGroupByKeyTuple */
+  @Test
+  public void testCoGroupByKeyTuple() throws IOException {
+    // [START CoGroupByKeyTupleInputs]
+    final List<KV<String, String>> emailsList = Arrays.asList(
+        KV.of("amy", "[email protected]"),
+        KV.of("carl", "[email protected]"),
+        KV.of("julia", "[email protected]"),
+        KV.of("carl", "[email protected]"));
+
+    final List<KV<String, String>> phonesList = Arrays.asList(
+        KV.of("amy", "111-222-3333"),
+        KV.of("james", "222-333-4444"),
+        KV.of("amy", "333-444-5555"),
+        KV.of("carl", "444-555-6666"));
+
+    PCollection<KV<String, String>> emails = p.apply("CreateEmails", 
Create.of(emailsList));
+    PCollection<KV<String, String>> phones = p.apply("CreatePhones", 
Create.of(phonesList));
+    // [END CoGroupByKeyTupleInputs]
+
+    // [START CoGroupByKeyTupleOutputs]
+    final TupleTag<String> emailsTag = new TupleTag();
+    final TupleTag<String> phonesTag = new TupleTag();
+
+    final List<KV<String, CoGbkResult>> expectedResults = Arrays.asList(
+        KV.of("amy", CoGbkResult
+          .of(emailsTag, Arrays.asList("[email protected]"))
+          .and(phonesTag, Arrays.asList("111-222-3333", "333-444-5555"))),
+        KV.of("carl", CoGbkResult
+          .of(emailsTag, Arrays.asList("[email protected]", "[email protected]"))
+          .and(phonesTag, Arrays.asList("444-555-6666"))),
+        KV.of("james", CoGbkResult
+          .of(emailsTag, Arrays.asList())
+          .and(phonesTag, Arrays.asList("222-333-4444"))),
+        KV.of("julia", CoGbkResult
+          .of(emailsTag, Arrays.asList("[email protected]"))
+          .and(phonesTag, Arrays.asList())));
+    // [END CoGroupByKeyTupleOutputs]
+
+    PCollection<String> actualFormattedResults =
+        Snippets.coGroupByKeyTuple(emailsTag, phonesTag, emails, phones);
+
+    // [START CoGroupByKeyTupleFormattedOutputs]
+    final List<String> formattedResults = Arrays.asList(
+        "amy; ['[email protected]']; ['111-222-3333', '333-444-5555']",
+        "carl; ['[email protected]', '[email protected]']; ['444-555-6666']",
+        "james; []; ['222-333-4444']",
+        "julia; ['[email protected]']; []");
+    // [END CoGroupByKeyTupleFormattedOutputs]
+
+    // Make sure that both 'expectedResults' and 'actualFormattedResults' 
match with the
+    // 'formattedResults'. 'expectedResults' will have to be formatted before 
comparing
+    List<String> expectedFormattedResultsList = new 
ArrayList<String>(expectedResults.size());
+    for (KV<String, CoGbkResult> e : expectedResults) {
+      String name = e.getKey();
+      Iterable<String> emailsIter = e.getValue().getAll(emailsTag);
+      Iterable<String> phonesIter = e.getValue().getAll(phonesTag);
+      String formattedResult = Snippets.formatCoGbkResults(name, emailsIter, 
phonesIter);
+      expectedFormattedResultsList.add(formattedResult);
+    }
+    PCollection<String> expectedFormattedResultsPColl =
+        p.apply(Create.of(expectedFormattedResultsList));
+    
PAssert.that(expectedFormattedResultsPColl).containsInAnyOrder(formattedResults);
+    PAssert.that(actualFormattedResults).containsInAnyOrder(formattedResults);
+
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/9f1db3f1/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py 
b/sdks/python/apache_beam/examples/snippets/snippets.py
index a7751a7..6cc96ef 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -1149,39 +1149,26 @@ def model_group_by_key(contents, output_path):
      | beam.io.WriteToText(output_path))
 
 
-def model_co_group_by_key_tuple(email_list, phone_list, output_path):
+def model_co_group_by_key_tuple(emails, phones, output_path):
   """Applying a CoGroupByKey Transform to a tuple."""
   import apache_beam as beam
-  with TestPipeline() as p:  # Use TestPipeline for testing.
-    # [START model_group_by_key_cogroupbykey_tuple]
-    # Each data set is represented by key-value pairs in separate PCollections.
-    # Both data sets share a common key type (in this example str).
-    # The email_list contains values such as: ('joe', '[email protected]') with
-    # multiple possible values for each key.
-    # The phone_list contains values such as: ('mary': '111-222-3333') with
-    # multiple possible values for each key.
-    emails_pcoll = p | 'create emails' >> beam.Create(email_list)
-    phones_pcoll = p | 'create phones' >> beam.Create(phone_list)
-
-    # The result PCollection contains one key-value element for each key in the
-    # input PCollections. The key of the pair will be the key from the input 
and
-    # the value will be a dictionary with two entries: 'emails' - an iterable 
of
-    # all values for the current key in the emails PCollection and 'phones': an
-    # iterable of all values for the current key in the phones PCollection.
-    # For instance, if 'emails' contained ('joe', '[email protected]') and
-    # ('joe', '[email protected]'), then 'result' will contain the element:
-    # ('joe', {'emails': ['[email protected]', '[email protected]'], 'phones': 
...})
-    results = ({'emails': emails_pcoll, 'phones': phones_pcoll}
-               | beam.CoGroupByKey())
-
-    def join_info(name_info):
-      (name, info) = name_info
-      return '%s; %s; %s' %\
+  # [START model_group_by_key_cogroupbykey_tuple]
+  # The result PCollection contains one key-value element for each key in the
+  # input PCollections. The key of the pair will be the key from the input and
+  # the value will be a dictionary with two entries: 'emails' - an iterable of
+  # all values for the current key in the emails PCollection and 'phones': an
+  # iterable of all values for the current key in the phones PCollection.
+  results = ({'emails': emails, 'phones': phones}
+             | beam.CoGroupByKey())
+
+  def join_info(name_info):
+    (name, info) = name_info
+    return '%s; %s; %s' %\
         (name, sorted(info['emails']), sorted(info['phones']))
 
-    contact_lines = results | beam.Map(join_info)
-    # [END model_group_by_key_cogroupbykey_tuple]
-    contact_lines | beam.io.WriteToText(output_path)
+  contact_lines = results | beam.Map(join_info)
+  # [END model_group_by_key_cogroupbykey_tuple]
+  contact_lines | beam.io.WriteToText(output_path)
 
 
 def model_join_using_side_inputs(

http://git-wip-us.apache.org/repos/asf/beam/blob/9f1db3f1/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py 
b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 8f88ab9..505858a 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -694,22 +694,28 @@ class SnippetsTest(unittest.TestCase):
     self.assertEqual([str(s) for s in expected], self.get_output(result_path))
 
   def test_model_co_group_by_key_tuple(self):
-    # [START model_group_by_key_cogroupbykey_tuple_inputs]
-    email_list = [
-        ('amy', '[email protected]'),
-        ('carl', '[email protected]'),
-        ('julia', '[email protected]'),
-        ('carl', '[email protected]'),
-    ]
-    phone_list = [
-        ('amy', '111-222-3333'),
-        ('james', '222-333-4444'),
-        ('amy', '333-444-5555'),
-        ('carl', '444-555-6666'),
-    ]
-    # [END model_group_by_key_cogroupbykey_tuple_inputs]
-    result_path = self.create_temp_file()
-    snippets.model_co_group_by_key_tuple(email_list, phone_list, result_path)
+    with TestPipeline() as p:
+      # [START model_group_by_key_cogroupbykey_tuple_inputs]
+      emails_list = [
+          ('amy', '[email protected]'),
+          ('carl', '[email protected]'),
+          ('julia', '[email protected]'),
+          ('carl', '[email protected]'),
+      ]
+      phones_list = [
+          ('amy', '111-222-3333'),
+          ('james', '222-333-4444'),
+          ('amy', '333-444-5555'),
+          ('carl', '444-555-6666'),
+      ]
+
+      emails = p | 'CreateEmails' >> beam.Create(emails_list)
+      phones = p | 'CreatePhones' >> beam.Create(phones_list)
+      # [END model_group_by_key_cogroupbykey_tuple_inputs]
+
+      result_path = self.create_temp_file()
+      snippets.model_co_group_by_key_tuple(emails, phones, result_path)
+
     # [START model_group_by_key_cogroupbykey_tuple_outputs]
     results = [
         ('amy', {

Reply via email to