[
https://issues.apache.org/jira/browse/BEAM-3157?focusedWorklogId=94848&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94848
]
ASF GitHub Bot logged work on BEAM-3157:
----------------------------------------
Author: ASF GitHub Bot
Created on: 24/Apr/18 23:03
Start Date: 24/Apr/18 23:03
Worklog Time Spent: 10m
Work Description: kennknowles closed pull request #5215: [BEAM-3157][SQL]
Add primitive java types support to Row generation logic, add example
URL: https://github.com/apache/beam/pull/5215
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/DefaultRowTypeFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/DefaultRowTypeFactory.java
index b6f1fb4cae9..6932312c90d 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/DefaultRowTypeFactory.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/DefaultRowTypeFactory.java
@@ -50,27 +50,26 @@
private static final Map<Class, TypeName> SUPPORTED_TYPES =
ImmutableMap.<Class, TypeName>builder()
.put(Boolean.class, TypeName.BOOLEAN)
+ .put(boolean.class, TypeName.BOOLEAN)
.put(Byte.class, TypeName.BYTE)
+ .put(byte.class, TypeName.BYTE)
.put(Character.class, TypeName.BYTE)
+ .put(char.class, TypeName.BYTE)
.put(String.class, TypeName.STRING)
.put(Short.class, TypeName.INT16)
+ .put(short.class, TypeName.INT16)
.put(Integer.class, TypeName.INT32)
+ .put(int.class, TypeName.INT32)
.put(Long.class, TypeName.INT64)
+ .put(long.class, TypeName.INT64)
.put(Float.class, TypeName.FLOAT)
+ .put(float.class, TypeName.FLOAT)
.put(Double.class, TypeName.DOUBLE)
+ .put(double.class, TypeName.DOUBLE)
.put(BigDecimal.class, TypeName.DECIMAL)
.put(DateTime.class, TypeName.DATETIME)
.build();
- // Does not support neested types.
- private FieldType getTypeDescriptor(Class clazz) {
- TypeName typeName = SUPPORTED_TYPES.get(clazz);
- if (typeName == null) {
- throw new UnsupportedOperationException("Unsupported type");
- }
- return FieldType.of(typeName);
- }
-
/**
* Uses {@link FieldValueGetter#name()} as field names.
* Uses {@link CoderRegistry#createDefault()} to get coders for {@link
FieldValueGetter#type()}.
@@ -84,4 +83,12 @@ public Schema createRowType(Iterable<FieldValueGetter>
fieldValueGetters) {
return Schema.builder().addFields(fields).build();
}
+ // Does not support nested types.
+ private FieldType getTypeDescriptor(Class clazz) {
+ TypeName typeName = SUPPORTED_TYPES.get(clazz);
+ if (typeName == null) {
+ throw new UnsupportedOperationException("Unsupported type: " + clazz);
+ }
+ return FieldType.of(typeName);
+ }
}
diff --git a/sdks/java/extensions/sql/build.gradle
b/sdks/java/extensions/sql/build.gradle
index 2d04f6d6f64..bdd4f3e2a63 100644
--- a/sdks/java/extensions/sql/build.gradle
+++ b/sdks/java/extensions/sql/build.gradle
@@ -141,3 +141,19 @@ idea {
generatedSourceDirs += file(generatedJavaccSourceDir)
}
}
+
+// Run basic SQL example
+task runBasicExample(type: JavaExec) {
+ description = "Run basic SQL example"
+ main = "org.apache.beam.sdk.extensions.sql.example.BeamSqlExample"
+ classpath = sourceSets.main.runtimeClasspath
+ args = ["--runner=DirectRunner"]
+}
+
+// Run SQL example on POJO inputs
+task runPojoExample(type: JavaExec) {
+ description = "Run SQL example for PCollections of POJOs"
+ main = "org.apache.beam.sdk.extensions.sql.example.BeamSqlPojoExample"
+ classpath = sourceSets.main.runtimeClasspath
+ args = ["--runner=DirectRunner"]
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
index aed08424fb9..3078305e450 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
@@ -36,12 +36,14 @@
/**
* This is a quick example, which uses Beam SQL DSL to create a data pipeline.
*
- * <p>Run the example with
+ * <p>Run the example from the Beam source root with
* <pre>
- * mvn -pl sdks/java/extensions/sql \
- * compile exec:java
-Dexec.mainClass=org.apache.beam.sdk.extensions.sql.example.BeamSqlExample \
- * -Dexec.args="--runner=DirectRunner" -Pdirect-runner
+ * ./gradlew :beam-sdks-java-extensions-sql:runBasicExample
* </pre>
+ *
+ * <p>The above command executes the example locally using direct runner.
+ * Running the pipeline in other runners require additional setup and are out
of scope
+ * of the SQL examples. Please consult Beam documentation on how to run
pipelines.
*/
class BeamSqlExample {
public static void main(String[] args) throws Exception {
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlPojoExample.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlPojoExample.java
new file mode 100644
index 00000000000..f7a0ac68402
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlPojoExample.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.example;
+
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.BeamSql;
+import org.apache.beam.sdk.extensions.sql.example.model.Customer;
+import org.apache.beam.sdk.extensions.sql.example.model.Order;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.reflect.InferredRowCoder;
+
+/**
+ * This example uses Beam SQL DSL to query a data pipeline with Java objects
in it.
+ *
+ * <p>Run the example from the Beam source root with
+ * <pre>
+ * ./gradlew :beam-sdks-java-extensions-sql:runPojoExample
+ * </pre>
+ *
+ * <p>The above command executes the example locally using direct runner.
+ * Running the pipeline in other runners require additional setup and are out
of scope
+ * of the SQL examples. Please consult Beam documentation on how to run
pipelines.
+ *
+ * <p>This example models a scenario of customers buying goods.
+ * <ul>
+ * <li>{@link Customer} represents a customer</li>
+ * <li>{@link Order} represents an order by a customer</li>
+ * </ul>
+ *
+ * <p>{@link InferredRowCoder} is used to adapt Java objects to {@link Row
Rows}
+ * that are understood by Beam SQL.
+ */
+class BeamSqlPojoExample {
+ public static void main(String[] args) {
+ Pipeline pipeline = createPipeline(args);
+
+ // First step is to get PCollections of source objects.
+ // In this example we create them directly in memory using Create.of().
+ //
+ // In real world such PCollections will likely be obtained from some other
source,
+ // e.g. a database or a text file. This process is not specific to Beam
SQL,
+ // please consult Beam programming guide for details.
+
+ PCollection<Customer> customers = loadCustomers(pipeline);
+ PCollection<Order> orders = loadOrders(pipeline);
+
+ // Example 1. Run a simple query over java objects:
+ PCollection<Row> customersFromWonderland = customers.apply(
+ BeamSql.query(
+ "SELECT id, name "
+ + " FROM PCOLLECTION "
+ + " WHERE countryOfResidence = 'Wonderland'"));
+
+ // Output the results of the query:
+ customersFromWonderland.apply(logRecords(": is from Wonderland"));
+
+ // Example 2. Query the results of the first query:
+ PCollection<Row> totalInWonderland = customersFromWonderland.apply(
+ BeamSql.query("SELECT COUNT(id) FROM PCOLLECTION"));
+
+ // Output the results of the query:
+ totalInWonderland.apply(logRecords(": total customers in Wonderland"));
+
+ // Example 3. Query multiple PCollections of Java objects:
+ PCollection<Row> ordersByGrault = PCollectionTuple
+ .of(new TupleTag<>("customers"), customers)
+ .and(new TupleTag<>("orders"), orders)
+ .apply(
+ BeamSql.query(
+ "SELECT customers.name, ('order id:' || CAST(orders.id AS
VARCHAR))"
+ + " FROM orders "
+ + " JOIN customers ON orders.customerId = customers.id"
+ + " WHERE customers.name = 'Grault'"));
+
+ // Output the results of the query:
+ ordersByGrault.apply(logRecords(": ordered by 'Grault'"));
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ private static MapElements<Row, Void> logRecords(String suffix) {
+ return MapElements.via(
+ new SimpleFunction<Row, Void>() {
+ public @Nullable
+ Void apply(Row input) {
+ System.out.println(input.getValues() + suffix);
+ return null;
+ }
+ });
+ }
+
+ private static Pipeline createPipeline(String[] args) {
+ PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
+ return Pipeline.create(options);
+ }
+
+ private static PCollection<Customer> loadCustomers(Pipeline pipeline) {
+ return
+ PBegin
+ .in(pipeline)
+ .apply(
+ Create.of(
+ new Customer(1, "Foo", "Wonderland"),
+ new Customer(2, "Bar", "Super Kingdom"),
+ new Customer(3, "Baz", "Wonderland"),
+ new Customer(4, "Grault", "Wonderland"),
+ new Customer(5, "Qux", "Super Kingdom"))
+
.withCoder(InferredRowCoder.ofSerializable(Customer.class)));
+ }
+
+ private static PCollection<Order> loadOrders(Pipeline pipeline) {
+ return
+ PBegin
+ .in(pipeline)
+ .apply(
+ Create.of(
+ new Order(1, 5),
+ new Order(2, 2),
+ new Order(3, 1),
+ new Order(4, 3),
+ new Order(5, 1),
+ new Order(6, 5),
+ new Order(7, 4),
+ new Order(8, 4),
+ new Order(9, 1))
+
.withCoder(InferredRowCoder.ofSerializable(Order.class)));
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/model/Customer.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/model/Customer.java
new file mode 100644
index 00000000000..345e3dd5e69
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/model/Customer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.example.model;
+
+import java.io.Serializable;
+
+/**
+ * Describes a customer.
+ */
+public class Customer implements Serializable {
+ private String name;
+ private int id;
+ private String countryOfResidence;
+
+ public Customer(int id, String name, String countryOfResidence) {
+ this.id = id;
+ this.name = name;
+ this.countryOfResidence = countryOfResidence;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public String getCountryOfResidence() {
+ return countryOfResidence;
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/model/Order.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/model/Order.java
new file mode 100644
index 00000000000..bb0c867fcbf
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/model/Order.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.example.model;
+
+import java.io.Serializable;
+
+/**
+ * Describes an order.
+ */
+public class Order implements Serializable {
+ private int id;
+ private int customerId;
+
+ public Order(int id, int customerId) {
+ this.id = id;
+ this.customerId = customerId;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public int getCustomerId() {
+ return customerId;
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/model/package-info.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/model/package-info.java
new file mode 100644
index 00000000000..225b5594329
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/model/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Java classes used to for modeling the examples.
+ */
+package org.apache.beam.sdk.extensions.sql.example.model;
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/package-info.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/package-info.java
index f1569178906..f2690833f6f 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/package-info.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/package-info.java
@@ -17,7 +17,6 @@
*/
/**
- * examples on how to use BeamSQL.
- *
+ * Examples on how to use Beam SQL.
*/
package org.apache.beam.sdk.extensions.sql.example;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 94848)
Time Spent: 6h 20m (was: 6h 10m)
> BeamSql transform should support other PCollection types
> --------------------------------------------------------
>
> Key: BEAM-3157
> URL: https://issues.apache.org/jira/browse/BEAM-3157
> Project: Beam
> Issue Type: Improvement
> Components: dsl-sql
> Reporter: Ismaël Mejía
> Assignee: Anton Kedin
> Priority: Major
> Fix For: Not applicable
>
> Time Spent: 6h 20m
> Remaining Estimate: 0h
>
> Currently the Beam SQL transform only supports input and output data
> represented as a BeamRecord. This seems to me like an usability limitation
> (even if we can do a ParDo to prepare objects before and after the transform).
> I suppose this constraint comes from the fact that we need to map
> name/type/value from an object field into Calcite so it is convenient to have
> a specific data type (BeamRecord) for this. However we can accomplish the
> same by using a PCollection of JavaBean (where we know the same information
> via the field names/types/values) or by using Avro records where we also have
> the Schema information. For the output PCollection we can map the object via
> a Reference (e.g. a JavaBean to be filled with the names of an Avro object).
> Note: I am assuming for the moment simple mappings since the SQL does not
> support composite types for the moment.
> A simple API idea would be something like this:
> A simple filter:
> PCollection<MyPojo> col = BeamSql.query("SELECT * FROM .... WHERE
> ...").from(MyPojo.class);
> A projection:
> PCollection<MyNewPojo> newCol = BeamSql.query("SELECT id,
> name").from(MyPojo.class).as(MyNewPojo.class);
> A first approach could be to just add the extra ParDos + transform DoFns
> however I suppose that for memory use reasons maybe mapping directly into
> Calcite would make sense.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)