alxp1982 commented on code in PR #24488:
URL: https://github.com/apache/beam/pull/24488#discussion_r1059200409


##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/group/description.md:
##########
@@ -0,0 +1,57 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Group
+
+A generic grouping transform for schema `PCollections`.
+
+When used without a combiner, this transforms simply acts as a `GroupByKey` 
but without the need for the user to explicitly extract the keys. For example, 
consider the following input type:
+
+```
+public class UserPurchase {
+   public String userId;
+   public String country;
+   public long cost;
+   public double transactionDuration;
+ }
+```
+
+### Group by fields
+
+You can group all purchases by user and country as follows:
+
+```
+PCollection<Row> byUser = purchases.apply(Group.byFieldNames("userId', 
"country"));
+```
+
+### Group with aggregation
+
+However often an aggregation of some form is desired. The builder methods 
inside the `Group` class allows building up separate aggregations for every 
field (or set of fields) on the input schema, and generating an output schema 
based on these aggregations. For example:

Review Comment:
   You will likely be using grouping to aggregate input data. The builder 
methods inside the `Group` class allow the creation of separate aggregations 
for every field (or set of fields) on the input schema and generate an output 
schema based on these aggregations. For example:



##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/group/description.md:
##########
@@ -0,0 +1,57 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Group
+
+A generic grouping transform for schema `PCollections`.
+
+When used without a combiner, this transforms simply acts as a `GroupByKey` 
but without the need for the user to explicitly extract the keys. For example, 
consider the following input type:
+
+```
+public class UserPurchase {
+   public String userId;
+   public String country;
+   public long cost;
+   public double transactionDuration;
+ }
+```
+
+### Group by fields
+
+You can group all purchases by user and country as follows:
+
+```
+PCollection<Row> byUser = purchases.apply(Group.byFieldNames("userId', 
"country"));
+```
+
+### Group with aggregation
+
+However often an aggregation of some form is desired. The builder methods 
inside the `Group` class allows building up separate aggregations for every 
field (or set of fields) on the input schema, and generating an output schema 
based on these aggregations. For example:
+
+```
+PCollection<Row> aggregated = purchases
+     .apply(Group.byFieldNames("userId', "country")
+          .aggregateField("cost", Sum.ofLongs(), "total_cost")
+          .aggregateField("cost", Top.<Long>largestLongsFn(10), 
"top_purchases")
+          .aggregateField("cost", ApproximateQuantilesCombineFn.create(21),
+              Field.of("transactionDurations", 
FieldType.array(FieldType.INT64)));
+```
+
+The result will be a new row schema containing the fields `total_cost`, 
``top_purchases``, and `transactionDurations`, containing the sum of all 
purchases costs (for that user and country), the top ten purchases, and a 
histogram of transaction durations. The schema will also contain a key field, 
which will be a row containing userId and country.
+
+Note that usually the field type can be automatically inferred from the 
`Combine.CombineFn` passed in. However sometimes it cannot be inferred, due to 
Java type erasure, in which case a `Schema.Field` object containing the field 
type must be passed in. This is currently the case for 
`ApproximateQuantilesCombineFn` in the above example.

Review Comment:
   > Note that usually, the field type can be automatically inferred from the 
`Combine.CombineFn` passed in. However, sometimes it cannot be inferred due to 
Java type erasure.  In such case, you need to specify the field type using 
`Schema.Field`. In the above example, the type is explicitly specified for the 
'transactionDurations` field. 



##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/group/example/Task.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+/*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// beam-playground:
+//   name: group
+//   description: Schema group example.
+//   multifile: false
+//   context_line: 46
+//   categories:
+//     - Quickstart
+//   complexity: ADVANCED
+//   tags:
+//     - hellobeam
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
+import org.apache.beam.sdk.schemas.transforms.Group;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class Task {
+    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+    // UserPurchase schema
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class UserPurchase {
+        public Long userId;
+        public String country;
+        public long cost;
+        public double transactionDuration;
+
+        @SchemaCreate
+        public UserPurchase(Long userId, String country, long cost, double 
transactionDuration) {
+            this.userId = userId;
+            this.country = country;
+            this.cost = cost;
+            this.transactionDuration = transactionDuration;
+        }
+    }
+
+    public static void main(String[] args) {
+        PipelineOptions options = 
PipelineOptionsFactory.fromArgs(args).create();
+        Pipeline pipeline = Pipeline.create(options);
+
+        UserPurchase user1 = new UserPurchase(1L, "America", 123, 22);
+        UserPurchase user2 = new UserPurchase(1L, "Brazilian", 645, 86);

Review Comment:
    UserPurchase user2 = new UserPurchase(1L, "Brazil", 645, 86);



##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/group/example/Task.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+/*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// beam-playground:
+//   name: group
+//   description: Schema group example.
+//   multifile: false
+//   context_line: 46
+//   categories:
+//     - Quickstart
+//   complexity: ADVANCED
+//   tags:
+//     - hellobeam
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
+import org.apache.beam.sdk.schemas.transforms.Group;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class Task {
+    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+    // UserPurchase schema
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class UserPurchase {
+        public Long userId;
+        public String country;
+        public long cost;
+        public double transactionDuration;
+
+        @SchemaCreate
+        public UserPurchase(Long userId, String country, long cost, double 
transactionDuration) {
+            this.userId = userId;
+            this.country = country;
+            this.cost = cost;
+            this.transactionDuration = transactionDuration;
+        }
+    }
+
+    public static void main(String[] args) {
+        PipelineOptions options = 
PipelineOptionsFactory.fromArgs(args).create();
+        Pipeline pipeline = Pipeline.create(options);
+
+        UserPurchase user1 = new UserPurchase(1L, "America", 123, 22);

Review Comment:
   UserPurchase user1 = new UserPurchase(1L, "USA", 123, 22);



##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/group/description.md:
##########
@@ -0,0 +1,57 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Group
+
+A generic grouping transform for schema `PCollections`.
+
+When used without a combiner, this transforms simply acts as a `GroupByKey` 
but without the need for the user to explicitly extract the keys. For example, 
consider the following input type:
+
+```
+public class UserPurchase {
+   public String userId;
+   public String country;
+   public long cost;
+   public double transactionDuration;
+ }
+```
+
+### Group by fields
+
+You can group all purchases by user and country as follows:
+
+```
+PCollection<Row> byUser = purchases.apply(Group.byFieldNames("userId', 
"country"));
+```
+
+### Group with aggregation
+
+However often an aggregation of some form is desired. The builder methods 
inside the `Group` class allows building up separate aggregations for every 
field (or set of fields) on the input schema, and generating an output schema 
based on these aggregations. For example:
+
+```
+PCollection<Row> aggregated = purchases
+     .apply(Group.byFieldNames("userId', "country")
+          .aggregateField("cost", Sum.ofLongs(), "total_cost")
+          .aggregateField("cost", Top.<Long>largestLongsFn(10), 
"top_purchases")
+          .aggregateField("cost", ApproximateQuantilesCombineFn.create(21),
+              Field.of("transactionDurations", 
FieldType.array(FieldType.INT64)));
+```
+
+The result will be a new row schema containing the fields `total_cost`, 
``top_purchases``, and `transactionDurations`, containing the sum of all 
purchases costs (for that user and country), the top ten purchases, and a 
histogram of transaction durations. The schema will also contain a key field, 
which will be a row containing userId and country.

Review Comment:
   The result will be a new row schema containing the fields `total_cost`, 
`top_purchases`, and `transactionDurations`, containing the sum of all 
purchases costs (for that user and country), the top ten purchases, and a 
histogram of transaction durations. The schema will also contain a key field, a 
row containing userId and country.



##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/group/description.md:
##########
@@ -0,0 +1,57 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Group
+
+A generic grouping transform for schema `PCollections`.
+
+When used without a combiner, this transforms simply acts as a `GroupByKey` 
but without the need for the user to explicitly extract the keys. For example, 
consider the following input type:

Review Comment:
   When used without a combiner, this transforms simply acts as a `GroupByKey` 
except that you don't have to explicitly extract keys. 
   
   For example, consider the following input schema:



##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/group/description.md:
##########
@@ -0,0 +1,57 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Group
+
+A generic grouping transform for schema `PCollections`.
+
+When used without a combiner, this transforms simply acts as a `GroupByKey` 
but without the need for the user to explicitly extract the keys. For example, 
consider the following input type:
+
+```
+public class UserPurchase {
+   public String userId;
+   public String country;
+   public long cost;
+   public double transactionDuration;
+ }
+```
+
+### Group by fields
+
+You can group all purchases by user and country as follows:
+
+```
+PCollection<Row> byUser = purchases.apply(Group.byFieldNames("userId', 
"country"));
+```
+
+### Group with aggregation
+
+However often an aggregation of some form is desired. The builder methods 
inside the `Group` class allows building up separate aggregations for every 
field (or set of fields) on the input schema, and generating an output schema 
based on these aggregations. For example:
+
+```
+PCollection<Row> aggregated = purchases
+     .apply(Group.byFieldNames("userId', "country")
+          .aggregateField("cost", Sum.ofLongs(), "total_cost")
+          .aggregateField("cost", Top.<Long>largestLongsFn(10), 
"top_purchases")
+          .aggregateField("cost", ApproximateQuantilesCombineFn.create(21),
+              Field.of("transactionDurations", 
FieldType.array(FieldType.INT64)));
+```
+
+The result will be a new row schema containing the fields `total_cost`, 
``top_purchases``, and `transactionDurations`, containing the sum of all 
purchases costs (for that user and country), the top ten purchases, and a 
histogram of transaction durations. The schema will also contain a key field, 
which will be a row containing userId and country.
+
+Note that usually the field type can be automatically inferred from the 
`Combine.CombineFn` passed in. However sometimes it cannot be inferred, due to 
Java type erasure, in which case a `Schema.Field` object containing the field 
type must be passed in. This is currently the case for 
`ApproximateQuantilesCombineFn` in the above example.
+
+### Playground exercise
+
+You can find the complete code of this example in the playground window you 
can run and experiment with.

Review Comment:
   In the playground window, you can find an example you can run and experiment 
with.  This example illustrates `Group` transform usage to group purchases by 
the user together with aggregation to calculate the total value of purchases 
made. 
   
   Can you modify it to get total value of all purchases made by user in 
different countries? 



##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/group/description.md:
##########
@@ -0,0 +1,57 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Group
+
+A generic grouping transform for schema `PCollections`.

Review Comment:
   `Group` transform can be used to group records in `PCollection` by one or 
several fields in the input schema. You can also apply aggregations to those 
groupings, which is the most common use of the `Group` transform. 
   
   The output of the Group transform has a schema with one field corresponding 
to each aggregation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to