[
https://issues.apache.org/jira/browse/BEAM-10327?focusedWorklogId=452163&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-452163
]
ASF GitHub Bot logged work on BEAM-10327:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 29/Jun/20 04:27
Start Date: 29/Jun/20 04:27
Worklog Time Spent: 10m
Work Description: rezarokni commented on a change in pull request #12097:
URL: https://github.com/apache/beam/pull/12097#discussion_r446763798
##########
File path: website/www/site/content/en/documentation/patterns/schema.md
##########
@@ -0,0 +1,56 @@
+---
+title: "Schema Patterns"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Schema Patterns
+
+The samples on this page describe common patterns using Schemas.
+A Schema is a way to represent records with a fixed structure. Schemas are
useful because Beam sources commonly produce JSON, Avro or database row objects
all of which have a well-defined structure.
Review comment:
Maybe just borrow the text from:
https://beam.apache.org/documentation/programming-guide/#what-is-a-schema
Schemas provide us a type-system for Beam records that is independent of any
specific programming-language type. There might be multiple Java classes that
all have the same schema (for example a Protocol-Buffer class or a POJO class),
and Beam will allow us to seamlessly convert between these types. Schemas also
provide a simple way to reason about types across different
programming-language APIs.
##########
File path:
examples/java/src/test/java/org/apache/beam/examples/snippets/SnippetsTest.java
##########
@@ -154,6 +156,73 @@ public void testCoGroupByKeyTuple() throws IOException {
p.run();
}
+ /* Tests SchemaJoinPattern */
+ @Test
+ public void testSchemaJoinPattern() {
+ // [START SchemaJoinPatternCreate]
+ // Define Schemas
+ Schema emailSchema =
+ Schema.of(
+ Schema.Field.of("name", Schema.FieldType.STRING),
+ Schema.Field.of("email", Schema.FieldType.STRING));
+
+ Schema phoneSchema =
+ Schema.of(
+ Schema.Field.of("name", Schema.FieldType.STRING),
+ Schema.Field.of("phone", Schema.FieldType.STRING));
+
+ // Create User Data Collections
+ final List<Row> emailUsers =
+ Arrays.asList(
+
Row.withSchema(emailSchema).addValue("person1").addValue("[email protected]").build(),
+
Row.withSchema(emailSchema).addValue("person2").addValue("[email protected]").build(),
+
Row.withSchema(emailSchema).addValue("person3").addValue("[email protected]").build(),
+ Row.withSchema(emailSchema)
+ .addValue("person4")
+ .addValue("[email protected]")
+ .build());
+
+ final List<Row> phoneUsers =
+ Arrays.asList(
+
Row.withSchema(phoneSchema).addValue("person1").addValue("111-222-3333").build(),
+
Row.withSchema(phoneSchema).addValue("person2").addValue("222-333-4444").build(),
+
Row.withSchema(phoneSchema).addValue("person3").addValue("444-333-4444").build(),
+
Row.withSchema(phoneSchema).addValue("person4").addValue("555-333-4444").build());
+
+ // [END SchemaJoinPatternCreate]
+
+ PCollection<String> actualFormattedResult =
+ Snippets.SchemaJoinPattern.main(p, emailUsers, phoneUsers,
emailSchema, phoneSchema);
Review comment:
As this is example snippet code, consider if it would be easier for the
reader to have the code be inlined here rather than be abstracted in a class.
##########
File path:
examples/java/src/test/java/org/apache/beam/examples/snippets/SnippetsTest.java
##########
@@ -154,6 +156,73 @@ public void testCoGroupByKeyTuple() throws IOException {
p.run();
}
+ /* Tests SchemaJoinPattern */
+ @Test
+ public void testSchemaJoinPattern() {
+ // [START SchemaJoinPatternCreate]
+ // Define Schemas
+ Schema emailSchema =
+ Schema.of(
+ Schema.Field.of("name", Schema.FieldType.STRING),
+ Schema.Field.of("email", Schema.FieldType.STRING));
+
+ Schema phoneSchema =
+ Schema.of(
+ Schema.Field.of("name", Schema.FieldType.STRING),
+ Schema.Field.of("phone", Schema.FieldType.STRING));
+
+ // Create User Data Collections
+ final List<Row> emailUsers =
Review comment:
Consider if adding a non-match would help the sample. For example a
missing id from emailUsers or the other. So that the reader is clear on the
output when no match occurs.
##########
File path:
examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
##########
@@ -914,4 +917,71 @@ public void process(ProcessContext c) {
return result;
}
}
+
+ public static class SchemaJoinPattern {
+ public static PCollection<String> main(
+ Pipeline p,
+ final List<Row> emailUsers,
+ final List<Row> phoneUsers,
+ Schema emailSchema,
+ Schema phoneSchema) {
+ // [START SchemaJoinPatternJoin]
+ // Create/Read Schema PCollections
+ PCollection<Row> emailList =
+ p.apply("CreateEmails",
Create.of(emailUsers).withRowSchema(emailSchema));
+
+ PCollection<Row> phoneList =
+ p.apply("CreatePhones",
Create.of(phoneUsers).withRowSchema(phoneSchema));
+
+ // Perform Join
+ PCollection<Row> resultRow =
+ emailList.apply("Apply Join", Join.<Row,
Row>innerJoin(phoneList).using("name"));
+
+ // Preview Result
+ resultRow.apply(
+ "Preview Result",
+ MapElements.into(TypeDescriptors.strings())
+ .via(
+ x -> {
+ System.out.println(x);
+ return "";
+ }));
+
+ /* Sample Output From the pipeline:
Review comment:
As this class is abstracted, the sample output can be difficult to tie
back as the data is not in this snippet but later on. Consider if inline will
be easier to follow for the example.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 452163)
Time Spent: 1h (was: 50m)
> Patterns: Create a pattern showing use of Schema
> ------------------------------------------------
>
> Key: BEAM-10327
> URL: https://issues.apache.org/jira/browse/BEAM-10327
> Project: Beam
> Issue Type: New Feature
> Components: website
> Reporter: Abhishek Yadav
> Assignee: Abhishek Yadav
> Priority: P3
> Labels: pipeline-patterns
> Time Spent: 1h
> Remaining Estimate: 0h
>
> Currently the Documentation shows how we can perform a Join of PCollections
> using
> [CoGroupByKey|https://beam.apache.org/documentation/programming-guide/#cogroupbykey],
> but it requires a lot of boilerplate code, it would be nice to have a
> pattern that shows using Joins with the help of
> [Schemas|https://beam.apache.org/documentation/programming-guide/#what-is-a-schema].
--
This message was sent by Atlassian Jira
(v8.3.4#803005)