reuvenlax commented on a change in pull request #10767: Document Beam Schemas
URL: https://github.com/apache/beam/pull/10767#discussion_r386038150
 
 

 ##########
 File path: website/src/documentation/programming-guide.md
 ##########
 @@ -1970,7 +1976,1076 @@ records.apply("WriteToText",
 See the [Beam-provided I/O Transforms]({{site.baseurl 
}}/documentation/io/built-in/)
 page for a list of the currently available I/O transforms.
 
-## 6. Data encoding and type safety {#data-encoding-and-type-safety}
+## 6. Schemas {#schemas}
+Often, the type of records being processed have an obvious structure. Common 
Beam sources produce
+JSON, Avro, Protocol Buffer, or database row objects; all of these types have 
well defined structures, 
+structures that can often be determined by examining the type. Even within a 
pipeline, Simple Java POJOs 
+(or  equivalent structures in other languages) are often used as intermediate 
types, and these also have a
+ clear structure that can be inferred by inspecting the class. By 
understanding the structure of a pipeline’s 
+ records, we can provide much more concise APIs for data processing.
+ 
+### 6.1. What is a schema {#what-is-a-schema}
+Most structured records share some common characteristics: 
+* They can be subdivided into separate named fields. Fields usually have 
string names, but sometimes - as in the case of indexed
+ tuples - have numerical indices instead.
+* There is a confined list of primitive types that a field can have. These 
often match primitive types in most programming 
+ languages: int, long, string, etc.
+* Often a field type can be marked as optional (sometimes referred to as 
nullable) or required.
+
+In addition, often records have a nested structure. A nested structure occurs 
when a field itself has subfields so the 
+type of the field itself has a schema. Fields that are  array or map types is 
also a common feature of these structured 
+records.
+
+For example, consider the following schema, representing actions in a 
fictitious e-commerce company:
+
+**Purchase**
+<table>
+  <thead>
+    <tr class="header">
+      <th><b>Field Name</b></th>
+      <th><b>Field Type</b></th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>userId</td>
+      <td>STRING</td>      
+    </tr>
+    <tr>
+      <td>itemId</td>
+      <td>INT64</td>      
+    </tr>
+    <tr>
+      <td>shippingAddress</td>
+      <td>ROW(ShippingAddress)</td>      
+    </tr>
+    <tr>
+      <td>cost</td>
+      <td>INT64</td>      
+    </tr>
+    <tr>
+      <td>transactions</td>
+      <td>ARRAY[ROW(Transaction)]</td>      
+    </tr>                  
+  </tbody>
+</table>
+<br/>
+
+**ShippingAddress**
+<table>
+  <thead>
+    <tr class="header">
+      <th><b>Field Name</b></th>
+      <th><b>Field Type</b></th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>streetAddress</td>
+      <td>STRING</td>      
+    </tr>
+    <tr>
+      <td>city</td>
+      <td>STRING</td>      
+    </tr>
+    <tr>
+      <td>state</td>
+      <td>nullable STRING</td>      
+    </tr>
+    <tr>
+      <td>country</td>
+      <td>STRING</td>      
+    </tr>
+    <tr>
+      <td>postCode</td>
+      <td>STRING</td>      
+    </tr>                  
+  </tbody>
+</table> 
+<br/>
+
+**Transaction**
+<table>
+  <thead>
+    <tr class="header">
+      <th><b>Field Name</b></th>
+      <th><b>Field Type</b></th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>bank</td>
+      <td>STRING</td>      
+    </tr>
+    <tr>
+      <td>purchaseAmount</td>
+      <td>DOUBLE</td>      
+    </tr>                  
+  </tbody>
+</table>
+<br/>
+
+Purchase event records are represented by the aove purchase schema. Each 
purchase event contains a shipping address, which
+is a nested row containing its own schema. Each purchase also contains a list 
of credit-card transactions 
+(a list, because a purchase might be split across multiple credit cards); each 
item in the transaction list is a row 
+with its own schema.
+
+This provides an abstract description of the types involved, one that is 
abstracted away from any specific programming 
+language.
+
+Schemas provide us a type-system for Beam records that is independent of any 
specific programming-language type. There
+might be multiple Java classes that all have the same schema (for example a 
Protocol-Buffer class or a POJO class),
+and Beam will allow us to seamlessly convert between these types. Schemas also 
provide a simple way to reason about 
+types across different programming-language APIs.
+
+A `PCollection` with a schema does not need to have a `Coder` specified, as 
Beam knows how to encode and decode 
+Schema rows.
+
+### 6.2. Schemas for programming language types {#schemas-for-pl-types}
+While schemas themselves are language independent, they are designed to embed 
naturally into the programming languages
+of the Beam SDK being used. This allows Beam users to continue using native 
types while reaping the advantage of 
+having Beam understand their element schemas.
+ 
+ {:.language-java}
+ In Java you could use the following set of classes to represent the purchase 
schema.  Beam will automatically  
+ infer the correct schema based on the members of the class.
+
+```java
+@DefaultSchema(JavaBeanSchema.class)
+public class Purchase {
+  public String getUserId();  // Returns the id of the user who made the 
purchase.
+  public long getItemId();  // Returns the identifier of the item that was 
purchased.
+  public ShippingAddress getShippingAddress();  // Returns the shipping 
address, a nested type.
+  public long getCostCents();  // Returns the cost of the item.
+  public List<Transaction> getTransactions();  // Returns the transactions 
that paid for this purchase (returns a list, since the purchase might be spread 
out over multiple credit cards).
+  
+  @SchemaCreate
+  public Purchase(String userId, long itemId, ShippingAddress shippingAddress, 
long costCents, 
+                  List<Transaction> transactions) {
+      ...
+  }
+}
+
+@DefaultSchema(JavaBeanSchema.class)
+public class ShippingAddress {
+  public String getStreetAddress();
+  public String getCity();
+  @Nullable public String getState();
+  public String getCountry();
+  public String getPostCode();
+  
+  @SchemaCreate
+  public ShippingAddress(String streetAddress, String city, @Nullable String 
state, String country,
+                         String postCode) {
+     ...
+  }
+}
+
+@DefaultSchema(JavaBeanSchema.class)
+public class Transaction {
+  public String getBank();
+  public double getPurchaseAmount();
+ 
+  @SchemaCreate
+  public Transaction(String bank, double purchaseAmount) {
+     ...
+  }
+}
+```
+
+Using JavaBean classes as above is one way to map a schema to Java classes. 
However multiple Java classes might have
+the same schema, in which case the different Java types can often be used 
interchangeably. For example, the above
+`Transaction` class has the same schema as the following class:
+
+```java
+@DefaultSchema(JavaFieldSchema.class)
+public class TransactionPojo {
+  public String bank;
+  public double purchaseAmount;
+}
+```
+
+So if we had two `PCollection`s as follows
+
+```java
+PCollection<Transaction> transactionBeans = readTransactionsAsJavaBean();
+PCollection<TransactionPojos> transactionPojos = readTransactionsAsPojo();
+```
+
+Then these two `PCollection`s would have the same schema, even though their 
Java types would be different. This means
+for example the the following two code snippets are valid:
+
+```java
+transactionBeans.apply(ParDo.of(new DoFn<...>() {
+   @ProcessElement public void process(@Element TransactionPojo pojo) {
+      ...
+   }
+}));
+```
+
+and
+```java
+transactionPojos.apply(ParDo.of(new DoFn<...>() {
+   @ProcessElement public void process(@Element Transaction row) {
+    }
+}));
+```
+
+EEven though the in both cases the `@Element` parameter differs from the the 
`PCollection`'s Java type, since the
+schemas are the same Beam will automatically make the conversion. The built-in 
`Convert` transform can also be used
+to translate between Java types of equivalent schemas, as detailed below.
+
+### 6.3. Schema definition {#schema-definition}
+The schema for a `PCollection` defines elements of that `PCollection` as an 
ordered list of named fields. Each field
+has a name, a type, and possibly a set of user options. The type of a field 
can be primitive or composite. The following
+are the primitive types currently supported by Beam:
+
+<table>
+  <thead>
+    <tr class="header">
+      <th>Type</th>
+      <th>Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>BYTE</td>
+      <td>An 8-bit signed value</td>
+    </tr>
+     <tr>
+       <td>INT16</td>
+       <td>A 16-bit signed value</td>
+     </tr>
+     <tr>
+       <td>INT32</td>
+       <td>A 32-bit signed value</td>
+     </tr>
+    <tr>
+      <td>INT64</td>
+       <td>A 64-bit signed value</td>
+     </tr>
+     <tr>
+       <td>DECIMAL</td>
+       <td>An arbitrary-precision decimal type</td>
+     </tr>
+     <tr>
+       <td>FLOAT</td>
+       <td>A 32-bit IEEE 754 floating point number</td>
+     </tr>
+     <tr>
+       <td>DOUBLE</td>
+       <td>A 64-bit IEEE 754 floating point number</td>
+     </tr>
+     <tr>
+       <td>STRING</td>
+       <td>A string</td>
+     </tr>
+     <tr>
+       <td>DATETIME</td>
+       <td>A timestamp represented as milliseconds since the epoch</td>
+     </tr>  
+     <tr>
+       <td>BOOLEAN</td>
+       <td>A boolean value</td>
+     </tr>
+     <tr>
+       <td>BYTES</td>
+       <td>A raw byte array</td>
+     </tr>             
+  </tbody>
+</table>
+<br/>
+
+A field can also reference a nested schema. In this case, the field will have 
type ROW, and the nested schema will 
+be an attribute of this field type.
+
+Three collection types are supported as field types: ARRAY, ITERABLE and MAP:
+* **ARRAY** This represents a repeated value type, where the repeated elements 
can have any supported type. Arrays of 
+nested rows are supported, as are arrays of arrays.
+* **ITERABLE** This is very similar to the array type, it represents a 
repeated value, but one in which the full list of 
+items is not known until iterated over. This is intended for the case where an 
iterable might be larger than the 
+available memory, and backed by external storage (for example, this can happen 
with the iterable returned by a 
+`GroupByKey`). The repeated elements can have any supported type.
+* **MAP** This represents an associative map from keys to values. All schema 
types are supported for both keys and values.
+ Values that contain map types cannot be used as keys in any grouping 
operation.
+
+### 6.4. Logical types {#logical-types}
+Users can extend the schema type system to add custom logical types that can 
be used as a field. A logical type is 
+identified by a unique identifier and an argument. A logical type also 
specifies an underlying schema type to be used 
+for storage, along with conversions to and from that type. As an example, a 
logical union can always be represented as 
+a row with nullable fields, where the user ensures that only one of those 
fields is ever set at a time. However this can
+be tedious and complex to manage. The OneOf logical type provides a value 
class that makes it easier to manage the type
+as a union, while still using a row with nullable fields as its underlying 
storage. Each logical type also has a 
+unique identifier, so they can be interpreted by other languages as well. More 
examples of logical types are listed 
+below.
+
+#### 6.4.1. Defining a logical type {#defining-a-logical-type}
+To define a logical type you must specify a Schema type to be used to 
represent the underlying type as well as a unique
+identifier for that type. A logical type imposes additional semantics on top a 
schema type. For example, a logical 
+type to represent nanosecond timestamps is represented as a schema containing 
an INT64 and an INT32 field. This schema
+alone does not say anything about how to interpret this type, however the 
logical type tells you that this represents
+a nanosecond timestamp, with the INT64 field representing seconds and the 
INT32 field representing nanoseconds.
+
+Logical types are also specified by an argument, which allows creating a class 
of related typed. For example, a 
+limited-precision decimal type would have an integer argument indicating how 
many digits of precision are represented.
+The argument is represented by a schema type, so can itself be a complex type.
+
+ {:.language-java}
+In Java, a logical type is specified as a subclass of the `LogicalType` class. 
A custom Java class can be specified to 
+represent the logical type and conversion functions must be supplied to 
convert back and forth between this Java class
+and the underlying Schema type representation. For example, the logical type 
representing nanosecond timestamp might
+be implemented as follows
+
+```java
+// A Logical type using java.time.Instant to represent the logical type.
+public class TimestampNanos implements LogicalType<Instant, Row> {
+  // The underlying schema used to represent rows.
+  private final Schema SCHEMA = 
Schema.builder().addInt64Field("seconds").addInt32Field("nanos").build();
+  @Override public String getIdentifier() { return "timestampNanos"; }
+  @Override public FieldType getBaseType() { return schema; }
+  
+  // Convert the representation type to the underlying Row type. Called by 
Beam when necessary.
+  @Override public Row toBaseType(Instant instant) {
+    return Row.withSchema(schema).addValues(instant.getEpochSecond(), 
instant.getNano()).build();
+  }
+  
+  // Convert the underlying Row type to and Instant. Called by Beam when 
necessary.
+  @Override public Instant toInputType(Row base) {
+    return Instant.of(row.getInt64("seconds"), row.getInt32("nanos"));
+  }
+
+     ...
+}
+```
+
+#### 6.4.2. Useful logical types {#built-in-logical-types}
+##### **EnumerationType**
+EnumerationType allows creating an enumeration type consisting of a set of 
named constants.
+
+```java
+Schema schema = Schema.builder()
+               …
+     .addLogicalTypeField(“color”, EnumerationType.create(“RED”, “GREEN”, 
“BLUE”))
+     .build();
+```
+
+The value of this field is stored in the row as an INT32 type, however the 
logical type defines a value type that lets 
+you access the enumeration either as a string or a value. For example:
+
+```java
+EnumerationType.Value enumValue = enumType.valueOf(“RED”);
+enumValue.getValue();  // Returns 0, the integer value of the constant.
+enumValue.toString();  // Returns “RED”, the string value of the constant
+```
+
+Given a row object with an enumeration field, you can also extract the field 
as the enumeration value.
+
+```java
+EnumerationType.Value enumValue = row.getLogicalTypeValue(“color”, 
EnumerationType.Value.class);
+```
+
+Automatic schema inference from Java POJOs and JavaBeans automatically 
converts Java enums to EnumerationType logical 
+types.
+
+##### **OneOfType**
+OneOfType allows creating a disjoint union type over a set of schema fields. 
For example:
+
+```java
+Schema schema = Schema.builder()
+               …
+     .addLogicalTypeField(“oneOfField”, 
+        OneOfType.create(Field.of(“intField”, FieldType.INT32),
+                         Field.of(“stringField”, FieldType.STRING),
+                         Field.of(“bytesField”, FieldType.BYTES)))
+      .build();
+```
+
+The value of this field is stored in the row as another Row type, where all 
the fields are marked as nullable. The 
+logical type however defines a Value object that contains an enumeration value 
indicating which field was set and allows
+ getting just that field:
+
+```java
+// Returns an enumeration indicating all possible case values for the enum.
+// For the above example, this will be 
+// EnumerationType.create(“intField”, “stringField”, “bytesField”);
+EnumerationType oneOfEnum = onOfType.getCaseEnumType();
+
+// Creates an instance of the union with the string field set.
+OneOfType.Value oneOfValue = oneOfType.createValue(“stringField”, “foobar”);
+
+// Handle the oneof
+switch (oneOfValue.getCaseEnumType().toString()) {
+  case “intField”:  
+    return processInt(oneOfValue.getValue(Integer.class));
+  case “stringField”:
+    return processString(oneOfValue.getValue(String.class));
+  case “bytesField”:
+    return processBytes(oneOfValue.getValue(bytes[].class));
+}
+```
+
+In the above example we used the field names in the switch statement for 
clarity, however the enum integer values could
+ also be used.
+
+### 6.5. Creating Schemas {#creating-schemas}
+
+In order to take advantage of schemas, your `PCollection`s must have a schema 
attached to it. Often, the source 
+itself will attach a schema to the PCollection. For example, when using 
`AvroIO` to read Avro files, the source can
+automatically infer a Beam schema from the Avro schema and attach that to the 
Beam `PCollection`. However not all sources 
+produce schemas. In addition, often Beam pipelines have intermediate stages 
and types, and those also can benefit from
+the expressiveness of schemas.
+ 
+#### 6.5.1. Inferring schemas {#inferring-schemas}
+{:.language-java}
+Beam is able to infer schemas from a variety of common Java types. The 
`@DefaultSchema` annotation can be used to tell
+Beam to infer schemas from a specific type. The annotation takes a 
`SchemaProvider` as an argument, and `SchemaProvider` 
+classes are already built in for common Java types. The `SchemaRegistry` can 
also be invoked programmatically for cases 
+where it is not practical to annotate the Java type itself.
+
+##### **Java POJOs**
+A POJO (Plain Old Java Object) is a Java object that is not bound by any 
restriction other than the Java Language 
+Specification. A POJO can contain member variables that are primitives, that 
are other POJOs, or are collections maps or
+arrays thereof. POJOs do not have to extend prespecified classes or extend any 
specific interfaces.
+
+If a POJO class is annotated with `@DefaultSchema(JavaFieldSchema.class)`, 
Beam will automatically infer a schema for 
+this class. Nested classes are supported as are classes with `List`, array, 
and `Map` fields.
+
+For example, annotating the following class tells Beam to infer a schema from 
this POJO class and apply it to any 
+`PCollection<TransactionPojo>`.
+
+```java
+@DefaultSchema(JavaFieldSchema.class)
+public class TransactionPojo {
+  public final String bank;
+  public final double purchaseAmount;
+  @SchemaCreate
+  public TransactionPojo(String bank, double purchaseAmount) {
+    this.bank = bank.
+    this.purchaseAmount = purchaseAmount;
+  }
+}
+// Beam will automatically infer the correct schema for this PCollection. No 
coder is needed as a result.
+PCollection<TransactionPojo> pojos = readPojos();
+````
+
+The `@SchemaCreate` annotation tells Beam that this constructor can be used to 
create instances of TransactionPojo, 
+assuming that constructor parameters have the same names as the field names. 
`@SchemaCreate` can also be used to annotate
+static factory methods on the class, allowing the constructor to remain 
private. If there is no `@SchemaCreate`
+ annotation then all the fields must be non-final and the class must have a 
zero-argument constructor.
+
+There are a couple of other useful annotations that affect how Beam infers 
schemas. By default the schema field names 
+inferred will match that of the class field names. However `@SchemaFieldName` 
can be used to specify a different name to
+be used for the schema field. `@SchemaIgnore` can be used to mark specific 
class fields as excluded from the inferred
+schema. For example, it’s common to have ephemeral fields in a class that 
should not be included in a schema 
+(e.g. caching the hash value to prevent expensive recomputation of the hash), 
and `@SchemaIgnore` can be used to
+exclude these fields.
+
+In some cases it is not convenient to annotate the POJO class, for example if 
the POJO is in a different package that is
+not owned by the Beam pipeline author. In these cases the schema inference can 
be triggered programmatically in 
+pipeline’s main function as follows:
+
+```java
+ pipeline.getSchemaRegistry().registerPOJO(TransactionPOJO.class); 
+```
+
+##### **Java Beans**
+Java Beans are a de-facto standard for creating reusable property classes in 
Java. While the full 
+standard has many characteristics, the key ones are that all properties are 
accessed via getter and setter classes, and 
+the name format for these getters and setters is standardized. A Java Bean 
class can be annotated with 
+`@DefaultSchema(JavaBeanSchema.class)` and Beam will automatically infer a 
schema for this class. For example:
+
+```java
+@DefaultSchema(JavaBeanSchema.class)
+public class TransactionBean {
+  public TransactionBean() { … } 
+  public String getBank() { … }
+  public void setBank(String bank) { … }
+  public double getPurchaseAmount() { … }
+  public void setPurchaseAmount(double purchaseAmount) { … }
+}
+// Beam will automatically infer the correct schema for this PCollection. No 
coder is needed as a result.
+PCollection<TransactionBean> beans = readBeans();
+```
+
+The `@SchemaCreate` annotation can be used to specify a constructor or a 
static factory method, in which case the 
+setters and zero-argument constructor can be omitted.
+
+```java
+@DefaultSchema(JavaBeanSchema.class)
+public class TransactionBean {
+  @SchemaCreate
+  Public TransactionBean(String bank, double purchaseAmount) { … }
+  public String getBank() { … }
+  public double getPurchaseAmount() { … }
+}
+```
+
+`@SchemaFieldName` and `@SchemaIgnore` can be used to alter the schema 
inferred, just like with POJO classes.
+
+##### AutoValue
+Java value classes are notoriously difficult to generate correctly. There is a 
lot of boilerplate you must create in 
+order to properly implement a value class. AutoValue is a popular framework 
for easily generating such classes by i
+mplementing a simple abstract base class.
+
+Beam can infer a schema from an AutoValue class. For example:
+
+```java
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class TransactionValue {
+  public abstract String getBank(); 
+  public abstract double getPurchaseAmount();
+}
+```
+
+This is all that’s needed to generate a simple AutoValue class, and the above 
`@DefaultSchema` annotation tells Beam to
+infer a schema from it. This also allows AutoValue elements to be used inside 
of `PCollection`s.
+
+`@SchemaFieldName` and `@SchemaIgnore` can be used to alter the schema 
inferred.
+
+### 6.6. Using Schemas {#using-schemas}
+A schema on a `PCollection` enables a rich variety of relational transforms. 
The fact that each record is composed of
+named fields allows for simple and readable aggregations that reference fields 
by name, similar to the aggregations in
+a SQL expression. 
+
+#### 6.6.1. Field selection syntax
+The advantage of schemas is that they allow referencing of element fields by 
name. Beam provides a selection syntax for
+referencing fields, including nested and repeated fields. This syntax is used 
by all of the schema transforms when 
+referencing the fields they operate on. The syntax can also be used inside of 
a DoFn to specify which schema fields to
+process.
+
+Addressing fields by name still retains type safety as Beam will check that 
schemas match at the time the pipeline graph
+is constructed. If a field is specified that does not exist in the schema, the 
pipeline will fail to launch. In addition,
+if a field is specified with a type that does not match the type of that field 
in the schema, the pipeline will fail to
+launch.
+
+##### **Top-level fields**
+In order to select a field at the top level of a schema, the name of the field 
is specified. For example, to select just
+the user ids from a `PCollection` of purchases one would write (using the 
`Select` transform)
+
+```java
+purchases.apply(Select.fieldNames(“userId”));
+```
+
+##### **Nested fields**
+Individual nested fields can be specified using the dot operator. For example, 
to select just the postal code from the
+ shipping address one would write
+
+```java
+purchases.apply(Select.fieldNames(“shippingAddress.postCode”));
+```
+
+##### **Wildcards**
+The * operator can be specified at any nesting level to represent all fields 
at that level. For example, to select all
+shipping-address fields one would write
+
+```java
+purchases.apply(Select.fieldNames(“shippingAddress.*”));
+```
+
+##### **Arrays**
+An array field, where the array element type is a row, can also have subfields 
of the element type addressed. When 
+selected, the result is an array of the selected subfield type. For example
+
+```java
+purchases.apply(Select.fieldNames(“transactions[].bank”));
+```
+
+Will result in a row containing an array field with element-type string, 
containing the list of banks for each 
+transaction. 
+
+While the use of  [] brackets in the selector is recommended, to make it clear 
that array elements are being selected, 
+they can be omitted for brevity. In the future, array slicing will be 
supported, allowing selection of portions of the 
+array.
+
+##### **Maps**
+A map field, where the value type is a row, can also have subfields of the 
value type addressed. When selected, the 
+result is a map where the keys are the same as in the original map but the 
value is the specified type. Similar to 
+arrays, the use of {} curly brackets in the selector is recommended, to make 
it clear that map value elements are being 
+selected, they can be omitted for brevity. In the future, map key selectors 
will be supported, allowing selection of 
+specific keys from the map.
+
+#### 6.6.2. Schema transforms
+Beam provides a collection of transforms that operate natively on schemas. 
These transforms are very expressive,
+allowing selections and aggregations in terms of named schema fields. 
Following are some examples of useful
+schema transforms. 
+
+##### **Selecting input**
+Often a computation is only interested in a subset of the fields in an input 
`PCollection`. The `Select` transform allows
+one to easily project out only the fields of interest. The resulting 
`PCollection` has  a schema containing each selected
+field as a top-level field. Both top-level and nested fields can be selected. 
For example, in the Purchase schema, one 
+could select only the userId and streetAddress fields as follows
+
+```java
+purchases.apply(Select.fieldNames(“userId”, shippingAddress.streetAddress”));
+```
+
+The resulting `PCollection` will have the following schema
+
+<table>
+  <thead>
+    <tr class="header">
+      <th><b>Field Name</b></th>
+      <th><b>Field Type</b></th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>userId</td>
+      <td>STRING</td>      
+    </tr>
+    <tr>
+      <td>streetAddress</td>
+      <td>STRING</td>      
+    </tr>                  
+  </tbody>
+</table>
+<br/>
+
+The same is true for wildcard selections. The following
+
+```java
+purchases.apply(Select.fieldNames(“userId”, shippingAddress.*”));
+```
+
+Will result in the following schema
+
+<table>
+  <thead>
+    <tr class="header">
+      <th><b>Field Name</b></th>
+      <th><b>Field Type</b></th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>userId</td>
+      <td>STRING</td>      
+    </tr>
+    <tr>
+      <td>streetAddress</td>
+      <td>STRING</td>      
+    </tr>  
+    <tr>
+      <td>city</td>
+      <td>STRING</td>      
+    </tr> 
+    <tr>
+      <td>state</td>
+      <td>nullable STRING</td>      
+    </tr> 
+    <tr>
+      <td>country</td>
+      <td>STRING</td>      
+    </tr> 
+    <tr>
+      <td>postCode</td>
+      <td>STRING</td>      
+    </tr>                                 
+  </tbody>
+</table>
+<br/>
+
+When selecting fields nested inside of an array, the same rule applies that 
each selected field appears separately as a 
+top-level field in the resulting row. This means that if multiple fields are 
selected from the same nested row, each 
+selected field will appear as its own array field. For example
+
+```java
+purchases.apply(Select.fieldNames( “transactions.bank”, 
transactions.purchaseAmount”));
+```
+
+Will result in the following schema
+<table>
+  <thead>
+    <tr class="header">
+      <th><b>Field Name</b></th>
+      <th><b>Field Type</b></th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>bank</td>
+      <td>ARRAY[STRING]</td>      
+    </tr>
+    <tr>
+      <td>purchaseAmount</td>
+      <td>ARRAY[DOUBLE]</td>      
+    </tr>                  
+  </tbody>
+</table>
+<br/>
+
+Wildcard selections are equivalent to separately selecting each field.
+
+Selecting fields nested inside of maps have the same semantics as arrays. If 
you select multiple fields from a map 
+, then each selected field will be expanded to its own map at the top level. 
This means that the set of map keys will
+ be copied, once for each selected field.
+
+Sometimes different nested rows will have fields with the same name. Selecting 
multiple of these fields would result in 
+a name conflict, as all selected fields are put in the same row schema. When 
this situation arises, the 
+`Select.withFieldNameAs` builder method can be used to provide an alternate 
name for the selected field.
+
+Another use of the Select transform is to flatten a nested schema into a 
single flat schema. For example
+
+```java
+purchases.apply(Select.flattenedSchema());
+```
+
+Will result in the following schema
+<table>
+  <thead>
+    <tr class="header">
+      <th><b>Field Name</b></th>
+      <th><b>Field Type</b></th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>userId</td>
+      <td>STRING</td>      
+    </tr>
+    <tr>
+      <td>itemId</td>
+      <td>STRING</td>      
+    </tr>  
+    <tr>
+      <td>shippingAddress_streetAddress</td>
+      <td>STRING</td>      
+    </tr> 
+    <tr>
+      <td>shippingAddress_city</td>
+      <td>nullable STRING</td>      
+    </tr> 
+    <tr>
+      <td>shippingAddress_state</td>
+      <td>STRING</td>      
+    </tr> 
+    <tr>
+      <td>shippingAddress_country</td>
+      <td>STRING</td>      
+    </tr>         
+    <tr>
+      <td>shippingAddress_postCode</td>
+      <td>STRING</td>      
+    </tr>    
+     <tr>
+       <td>costCents</td>
+       <td>INT64</td>      
+     </tr>    
+     <tr>
+       <td>transactions_bank</td>
+       <td>ARRAY[STRING]</td>      
+     </tr>    
+    <tr>
+      <td>transactions_purchaseAmount</td>
+      <td>ARRAY[DOUBLE]</td>      
+    </tr>                                            
+  </tbody>
+</table>
+<br/>
+
+##### **Grouping aggregations**
+The `Group` transform allows simply grouping data by any number of fields in 
the input schema, applying aggregations to
+those groupings, and storing the result of those aggregations in a new schema 
field. The output of the `Group` transform
+has a schema with one field corresponding to each aggregation performed. 
+
+The simplest usage of `Group` specifies no aggregations, in which case all 
inputs matching the provided set of fields
+are grouped together into an `ITERABLE` field. For example
+
+```java
+purchases.apply(Group.byFieldNames(“userId”, shippingAddress.streetAddress”));
+```
+
+The output schema of this is:
+
+<table>
+  <thead>
+    <tr class="header">
+      <th><b>Field Name</b></th>
+      <th><b>Field Type</b></th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>key</td>
+      <td>ROW{userId:STRING, streetAddress:STRING}</td>      
+    </tr>
+    <tr>
+      <td>values</td>
+      <td>ITERABLE[ROW[Purchase]]</td>      
+    </tr>                  
+  </tbody>
+</table>
+<br/>
+
+The key field contains the grouping key and the values field contains a list 
of all the values that matched that key.
+
+The names of the key and values fields in the output schema can be controlled 
using this withKeyField and withValueField 
+builders, as follows:
+
+```java
+purchases.apply(Group.byFieldNames(“userId”, shippingAddress.streetAddress”)
+    .withKeyField(“userAndStreet”)
+    .withValueField(“matchingPurchases”));
+```
+
+It is quite common to apply one or more aggregations to the grouped result. 
Each aggregation can  specify one or more fields 
+to aggregate, an aggregation function, and the name of the resulting field in 
the output schema. For example, the 
+following application computes three aggregations grouped by userId, with all 
aggregations represented in a single 
+output schema:
+
+```java
+purchases.apply(Group.byFieldNames(“userId”)
+    .aggregateField(“itemId”, Count.combineFn(), “numPurchases”)
+    .aggregateField(“costCents”, Sum.ofLongs(), “totalSpendCents”)
+    .aggregateField(“costCents”, Top.<Long>largestLongsFn(10), 
“topPurchases”));
+```
+
+The result of this aggregation will have the following schema:
+<table>
+  <thead>
+    <tr class="header">
+      <th><b>Field Name</b></th>
+      <th><b>Field Type</b></th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>key</td>
+      <td>ROW{userId:STRING}</td>      
+    </tr>
+    <tr>
+      <td>value</td>
+      <td>ROW{numPurchases: INT64, totalSpendCents: INT64, topPurchases: 
ARRAY[INT64]}</td>      
+    </tr>                  
+  </tbody>
+</table>
+<br/>
+
+Often `Selected.flattenedSchema` will be use to flatten the result into a 
non-nested, flat schema.
+
+##### **Joins**
+Beam supports equijoins on schema `PCollections` - namely joins where the join 
condition depends on the equality of a 
+subset of fields. For example, the following examples uses the Purchases 
schema to join transactions with the reviews
+that are likely associated with that transaction (both the user and product 
match that in the transaction). This is a
+"natural join" - one in which the same field names are used on both the 
left-hand and right-hand sides of the join - 
+and is specified with the `using` keyword:
+
+```java
+PCollection<Transaction> transactions = readTransactions();
+PCollection<Review> reviews = readReviews();
+PCollection<Row> joined = transactions.apply(
+    Join.innerJoin(reviews).using(“userId”, “productId”));
+```
+
+The resulting schema is the following:
+<table>
+  <thead>
+    <tr class="header">
+      <th><b>Field Name</b></th>
+      <th><b>Field Type</b></th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>lhs</td>
+      <td>ROW{Transaction}</td>      
+    </tr>
+    <tr>
+      <td>rhs</td>
+      <td>ROW{Review}</td>      
+    </tr>                  
+  </tbody>
+</table>
+<br/>
+
+Each resulting row contains one Review and one Review that matched the join 
condition.
+
+If the fields to match in the two schemas have different names, then the on 
function can be used. For example, if the 
+Review schema named those fields differently than the Transaction schema, then 
we could write the following:
+
+```java
+PCollection<Row> joined = transactions.apply(
+    Join.innerJoin(reviews).on(
+      FieldsEqual
+         .left(“userId”, “productId”)
+         .right(“reviewUserId”, “reviewProductId”)));
+```
+
+In addition to inner joins, the Join transform supports full outer joins, left 
outer joins, and right outer joins.
+
+##### **Complex joins**
+While most joins tend to be binary joins - joining two inputs together - 
sometimes you have more than two input
+streams that all need to be joined on a common key. The `CoGroup` transform 
allows joining multiple `PCollections`
+together based on equality of schema fields. Each `PCollection` can be marked 
as required or optional in the final 
+join record, providing a generalization of outer joins to joins with greater 
than two input `PCollection`s. The output
+can optionally be expanded - providing individual joined records, as in the 
`Join` transform. The output can also be
+processed in unexpanded format - providing the join key along with Iterables 
of all records from each input that matched
+that key.
+
+##### **Filtering events**
+The `Filter` transform can be configured with a set of predicates, each one 
based one specified fields. Only records for 
+which all predicates return true will pass the filter. For example the 
following 
+
+```java
+purchases.apply(Filter
+    .whereFieldName(“costCents”, c -> c > 100 * 20)
+    .whereFieldName(“shippingAddress.country”, c -> c.equals(“de”));
+```
+
+Will produce all purchases made from Germany with a purchase price of greater 
than twenty cents.
+
+
+##### **Adding fields to a schema**
+The AddFields transform can be used to extend a schema with new fields. Input 
rows will be extended to the new schema by
+inserting null values for the new fields, though alternate default values can 
be specified; if the default null value 
+is used then the new field type will be marked as nullable. Nested subfields 
can be added using the field selection 
+syntax, including nested fields inside arrays or map values.
+
+For example, the following application
+
+```java
+purchases.apply(AddFields.<PurchasePojo>create()
+    .field(“timeOfDaySeconds”, FieldType.INT32)
+    .field(“shippingAddress.deliveryNotes”, FieldType.STRING)
+    .field(“transactions.isFlagged, FieldType.BOOLEAN, false));
+```
+
+Results in a `PCollection` with an expanded schema. All of the rows and fields 
of the input, but also with the specified 
+fields added to the schema. All resulting rows will have null values filled in 
for the **timeOfDaySeconds** and the
+**shippingAddress.deliveryNotes** fields, and a false value filled in for the 
**transactions.isFlagged** field.
+
+##### **Removing fields from a schema**
+`DropFields` allows specific fields to be dropped from a schema. Input rows 
will have their schemas truncated, and any 
+values for dropped fields will be removed from the output. Nested fields can 
also be dropped using the field selection 
+syntax.
+
+For example, the following snippet
+
+```java
+purchases.apply(DropFields.fields(“userId”, “shippingAddress.streetAddress”));
+```
+
+Results in a copy of the input with those two fields and their corresponding 
values removed.
+
+##### **Renaming schema fields**
+`RenameFields` allows specific fields in a schema to be renamed. The field 
values in input rows are left unchanged, only 
+the schema is modified. This transform is often used to prepare records for 
output to a schema-aware sink, such as an 
+RDBMS, to make sure that the `PCollection` schema field names match that of 
the output. It can also be used to rename
+fields generated by other transforms to make them more usable (similar to 
SELECT AS in SQL). Nested fields can also be
+renamed using the field-selection syntax.
+
+For example, the following snippet
+
+```java
+purchases.apply(RenameFields.<PurchasePojo>create()
+  .rename(“userId”, “userIdentifier”)
+  .rename(“shippingAddress.streetAddress”, “shippingAddress.street”));
+```
+
+Results in the same set of unmodified input elements, however the schema on 
the PCollection has been changed to rename 
+**userId** to **userIdentifier** and **shippingAddress.streetAddress** to 
**shippingAddress.street**.
+
+##### **Converting between types**
+As mentioned, Beam can automatically convert between different Java types, as 
long as those types have equivalent
+schemas. One way to do this is by using the `Convert` transform, as follows.
+
+```java
+PCollection<PurchaseBean> purchaseBeans = readPurchasesAsBeans();
+PCollection<PurchasePojo> pojoPurchases = 
+    purchaseBeans.apply(Convert.to(PurchasePojo.class));
+```
+
+Beam will validate that the inferred schema for `PurchasePojo` matches that of 
the input `PCollection`, and will
+then cast to a `PCollection<PurchasePojo>`.
+
+Since the `Row` class can support any schema, any `PCollection` with schema 
can be cast to a `PCollection` of rows, as
+follows.
+
+```java
+PCollection<Row> purchaseRows = purchaseBeans.apply(Convert.toRows());
+```
+
+If the source type is a single-field schema, Convert will also convert to the 
type of the field if asked, effectively
+unboxing the row. For example, give a schema with a single INT64 field, the 
following will convert it to a
+`PCollection<Long>`
+
+```java
+PCollection<Long> longs = rows.apply(Convert.to(TypeDescriptors.longs()));
+```
+
+In all cases, type checking is done at pipeline graph construction, and if the 
types do not match the schema then the
+pipeline will fail to launch.
 
 Review comment:
   I think it's a fine example, but let's hold off. we will need a followup PR 
that adds a bunch of examples anyway

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to