alxp1982 commented on code in PR #24488: URL: https://github.com/apache/beam/pull/24488#discussion_r1080670036
########## learning/tour-of-beam/learning-content/java/schema-based-transforms/coder/description.md: ########## @@ -0,0 +1,126 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Coder + +A `Coder<T>` defines how to encode and decode values of type `T` into byte streams. + +> Note that coders are unrelated to parsing or formatting data when interacting with external data sources or sinks. You need to do such parsing or formatting explicitly, using transforms such as `ParDo` or `MapElements`. + +The Beam SDK requires a coder for every PCollection in your pipeline. In many cases, Beam can automatically infer the Coder for type in `PCollection` and use predefined coders to perform encoding and decoding. However, in some cases, you will need to specify the Coder explicitly or create a Coder for custom types. + +To set the `Coder` for `PCollection`, you need to call `PCollection.setCoder`. You can also get the Coder associated with PCollection using the `PCollection.getCoder` method. + +### CoderRegistry + +When Beam tries to infer Coder for `PCollection`, it uses mappings stored in the `CoderRegistry` object associated with `PCollection`. You can access the `CoderRegistry` for a given pipeline using the method `Pipeline.getCoderRegistry` or get a coder for a particular type using `CoderRegistry.getCoder`. + +Please note that since `CoderRegistry` is associated with each `PCollection`, you can encode\decode the same type differently in different `PCollection`. + +The following example demonstrates how to register a coder for a type using `CoderRegistry`: + +``` +PipelineOptions options = PipelineOptionsFactory.create(); +Pipeline p = Pipeline.create(options); + +CoderRegistry cr = p.getCoderRegistry(); +cr.registerCoder(Integer.class, BigEndianIntegerCoder.class); +``` + +### Specifying default coder for a type + +You can specify the default coder for your custom type by annotating it with `@defaultcoder` annotation. For example: +``` +@DefaultCoder(AvroCoder.class) +public class MyCustomDataType { + ... +} +``` + + +`Coder` classes for compound types are often composed of coder classes for types contains therein. The composition of `Coder` instances into a coder for the compound class is the subject of the `Coder` Provider type, which enables automatic generic composition of Coder classes within the CoderRegistry. See `Coder` Provider and `CoderRegistry` for more information about how coders are inferred. + +When you create custom objects and schemas, you need to create a subclass of Coder for your object and implement the following methods: +* `encode` - converting objects to bytes +* `decode` - converting bytes to objects +* `getCoderArguments` - If it is a `Coder` for a parameterized type, returns a list of `Coders` used for each of the parameters, in the same order in which they appear in the type signature of the parameterized type. +* `verifyDeterministic` - throw the `Coder.NonDeterministicException`, if the encoding is not deterministic. + +When you get the data and when you paint it as a structure, you will need a `dto` class. In this case, `VendorToPassengerDTO`: + +``` +@DefaultSchema(JavaFieldSchema.class) +class VendorToPassengerDTO { + + @JsonProperty(value = "PassengerCount") + Integer PassengerCount; + + @JsonProperty(value = "VendorID") + Integer VendorID; + + @SchemaCreate + public VendorToPassengerDTO(Integer passengerCount, Integer vendorID) { + this.PassengerCount = passengerCount; + this.VendorID = vendorID; + } + + // Function for TypeDescription + public static VendorToPassengerDTO of(final Integer passengerCount, final Integer vendorID) { + return new VendorToPassengerDTO(passengerCount, vendorID); + } + + // Setter + // Getter + // ToString +} +``` + +`Pipeline` can't use select, group, and so on, because it doesn't understand the data structure, so we need to write our own `Coder`: + +``` +class CustomCoderSecond extends Coder<VendorToPassengerDTO> { + final ObjectMapper objectMapper = new ObjectMapper(); + + private static final CustomCoderSecond INSTANCE = new CustomCoderSecond(); + + public static CustomCoderSecond of() { + return INSTANCE; + } + + @Override + public void encode(VendorToPassengerDTO dto, OutputStream outStream) throws IOException { + final String result = dto.toString(); + outStream.write(result.getBytes()); + } + + @Override + public VendorToPassengerDTO decode(InputStream inStream) throws IOException { + final String serializedDTOs = new String(StreamUtils.getBytesWithoutClosing(inStream)); + return objectMapper.readValue(serializedDTOs, VendorToPassengerDTO.class); + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return Collections.emptyList(); + } + + @Override + public void verifyDeterministic() { + } +} +``` + +### Playground exercise + +In the playground window you can find examples of using `Coder`. By running this example, you will see user statistics in certain games. Can you change its structure so that an **additional field** appears if the point is greater than **10** to supply `true`. Review Comment: Could you pleas re-phrase the user challenge? I'm not sure what we ask user to do ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/co-group/description.md: ########## @@ -0,0 +1,88 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# CoGroup + +A transform that performs equijoins across multiple schema PCollections. + +This transform has similarities to `CoGroupByKey`, however works on PCollections that have schemas. This allows users of the transform to simply specify schema fields to join on. The output type of the transform is Row that contains one row field for the key and an ITERABLE field for each input containing the rows that joined on that key; by default the cross product is not expanded, but the cross product can be optionally expanded. By default the key field is named "key" (the name can be overridden using `withKeyField`) and has index 0. The tags in the `PCollectionTuple` control the names of the value fields in the `Row`. + +For example, the following demonstrates joining three PCollections on the "user" and "country" fields: + + +``` +PCollection<Row> joined = +PCollectionTuple.of("input1", input1, "input2", input2, "input3", input3) +.apply(CoGroup.join(By.fieldNames("user", "country"))); +``` + +### JOIN DIFFERENT FIELDS + +It's also possible to join between different fields in two inputs, as long as the types of those fields match. In this case, fields must be specified for every input PCollection. For example: + +For example, consider the SQL join: `SELECT * FROM input1Tag JOIN input2Tag ON input1Tag.referringUser = input2Tag.user` + +``` +PCollection joined = PCollectionTuple.of("input1Tag", input1, "input2Tag", input2) + .apply(CoGroup + .join("input1Tag", By.fieldNames("referringUser"))) + .join("input2Tag", By.fieldNames("user"))); +``` + + +### INNER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 INNER JOIN input2 ON input1.user = input2.user` + +``` +PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2) + .apply(CoGroup.join(By.fieldNames("user")).crossProductJoin(); +``` + +### LEFT OUTER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 LEFT OUTER JOIN input2 ON input1.user = input2.user` + +``` +PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2) + .apply(CoGroup.join("input1", By.fieldNames("user").withOptionalParticipation()) + .join("input2", By.fieldNames("user")) + .crossProductJoin(); +``` + +### RIGHT OUTER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 RIGHT OUTER JOIN input2 ON input1.user = input2.user` + +``` +PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2) + .apply(CoGroup.join("input1", By.fieldNames("user")) + .join("input2", By.fieldNames("user").withOptionalParticipation()) + .crossProductJoin(); +``` + +### FULL OUTER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 FULL OUTER JOIN input2 ON input1.user = input2.user` + +``` +PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2) + .apply(CoGroup.join("input1", By.fieldNames("user").withOptionalParticipation()) + .join("input2", By.fieldNames("user").withOptionalParticipation()) + .crossProductJoin(); +``` + +### Playground exercise + +In the playground window you can find examples of using the `CoGroup`. By running this example, you will see user statistics in certain games. Can you add your **own class** to have **real-time** time in it and do `CoGroup`? Review Comment: Not sure what user is expected to do here? ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/schema-concept/creating-schema/description.md: ########## @@ -0,0 +1,153 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Overview + +Most structured records share some common characteristics: + +→ They can be subdivided into separate named fields. Fields usually have string names, but sometimes - as in the case of indexed tuples - have numerical indices instead. + +→ There is a confined list of primitive types that a field can have. These often match primitive types in most programming languages: int, long, string, etc. + +→ Often a field type can be marked as optional (sometimes referred to as nullable) or required. + +Often records have a nested structure. A nested structure occurs when a field itself has subfields so the type of the field itself has a schema. Fields that are array or map types is also a common feature of these structured records. + +For example, consider the following schema, representing actions in a fictitious e-commerce company: + +**Purchase** + +``` +Field Name Field Type +userId STRING +itemId INT64 +shippingAddress ROW(ShippingAddress) +cost INT64 +transactions ARRAY[ROW(Transaction)] +``` + +**ShippingAddress** + +``` +Field Name Field Type +streetAddress STRING +city STRING +state nullable STRING +country STRING +postCode STRING +``` + +**Transaction** + +``` +Field Name Field Type +bank STRING +purchaseAmount DOUBLE +``` + +Schemas provide us a type-system for Beam records that is independent of any specific programming-language type. There might be multiple Java classes that all have the same schema (for example a Protocol-Buffer class or a POJO class), and Beam will allow us to seamlessly convert between these types. Schemas also provide a simple way to reason about types across different programming-language APIs. + +A `PCollection` with a schema does not need to have a `Coder` specified, as Beam knows how to encode and decode Schema rows; Beam uses a special coder to encode schema types. + +### Creating Schemas + +While schemas themselves are language independent, they are designed to embed naturally into the programming languages of the Beam SDK being used. This allows Beam users to continue using native types while reaping the advantage of having Beam understand their element schemas. + +In Java you could use the following set of classes to represent the purchase schema. Beam will automatically infer the correct schema based on the members of the class. + +#### Java POJOs + +A `POJO` (Plain Old Java Object) is a Java object that is not bound by any restriction other than the Java Language Specification. A `POJO` can contain member variables that are primitives, that are other POJOs, or are collections maps or arrays thereof. `POJO`s do not have to extend prespecified classes or extend any specific interfaces. + +If a `POJO` class is annotated with `@DefaultSchema(JavaFieldSchema.class)`, Beam will automatically infer a schema for this class. Nested classes are supported as are classes with List, array, and Map fields. + +For example, annotating the following class tells Beam to infer a schema from this `POJO` class and apply it to any `PCollection<TransactionPojo>`. + +``` +@DefaultSchema(JavaFieldSchema.class) +public class TransactionPojo { + public final String bank; + public final double purchaseAmount; + @SchemaCreate + public TransactionPojo(String bank, double purchaseAmount) { + this.bank = bank; + this.purchaseAmount = purchaseAmount; + } +} +// Beam will automatically infer the correct schema for this PCollection. No coder is needed as a result. +PCollection<TransactionPojo> pojos = readPojos(); +``` +The `@SchemaCreate` annotation tells Beam that this constructor can be used to create instances of `TransactionPojo`, assuming that constructor parameters have the same names as the field names. `@SchemaCreate` can also be used to annotate static factory methods on the class, allowing the constructor to remain private. If there is no @SchemaCreate annotation then all the fields must be non-final and the class must have a zero-argument constructor. Review Comment: The original language uses passive voice too often, suggest applying the above comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
