This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e6104a2  Adding a Sequence Generator for SQL, and posting about SQL 
data sources (#8542)
e6104a2 is described below

commit e6104a2fcb2bd8e25e06d06428cb7e6223d30083
Author: Pablo <[email protected]>
AuthorDate: Wed Jun 5 11:09:20 2019 -0700

    Adding a Sequence Generator for SQL, and posting about SQL data sources 
(#8542)
    
    * Adding a Sequence Generator for SQL, and posting about SQL data sources
    
    * Fixing issues and improving post.
    
    * Addressing comments
    
    * Addressing comments
    
    * updating dating of posting
---
 .../provider/seqgen/GenerateSequenceTable.java     |  69 +++++++
 .../seqgen/GenerateSequenceTableProvider.java      |  51 ++++++
 .../sql/meta/provider/seqgen/package-info.java     |  20 ++
 .../2019-05-01-adding-data-sources-to-sql.md       | 202 +++++++++++++++++++++
 4 files changed, 342 insertions(+)

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/seqgen/GenerateSequenceTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/seqgen/GenerateSequenceTable.java
new file mode 100644
index 0000000..0d65b63
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/seqgen/GenerateSequenceTable.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.seqgen;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+class GenerateSequenceTable extends BaseBeamTable implements Serializable {
+  public static final Schema TABLE_SCHEMA =
+      Schema.of(Field.of("sequence", FieldType.INT64), Field.of("event_time", 
FieldType.DATETIME));
+
+  Integer elementsPerSecond = 5;
+
+  GenerateSequenceTable(Table table) {
+    super(TABLE_SCHEMA);
+    if (table.getProperties().containsKey("elementsPerSecond")) {
+      elementsPerSecond = 
table.getProperties().getInteger("elementsPerSecond");
+    }
+  }
+
+  @Override
+  public PCollection.IsBounded isBounded() {
+    return IsBounded.UNBOUNDED;
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    return begin
+        .apply(GenerateSequence.from(0).withRate(elementsPerSecond, 
Duration.standardSeconds(1)))
+        .apply(
+            MapElements.into(TypeDescriptor.of(Row.class))
+                .via(elm -> Row.withSchema(TABLE_SCHEMA).addValues(elm, 
Instant.now()).build()))
+        .setRowSchema(getSchema());
+  }
+
+  @Override
+  public POutput buildIOWriter(PCollection<Row> input) {
+    throw new UnsupportedOperationException("buildIOWriter unsupported!");
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/seqgen/GenerateSequenceTableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/seqgen/GenerateSequenceTableProvider.java
new file mode 100644
index 0000000..3030bc7
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/seqgen/GenerateSequenceTableProvider.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.seqgen;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import 
org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+
+/**
+ * Sequence generator table provider.
+ *
+ * <p>A sample of text table is:
+ *
+ * <pre>{@code
+ * CREATE EXTERNAL TABLE MY_SEQUENCE(
+ *   sequence BIGINT COMMENT 'this is the primary key',
+ *   event_time TIMESTAMP COMMENT 'this is the element timestamp'
+ * )
+ * TYPE 'sequence';
+ * }</pre>
+ */
+@AutoService(TableProvider.class)
+public class GenerateSequenceTableProvider extends InMemoryMetaTableProvider {
+
+  @Override
+  public String getTableType() {
+    return "sequence";
+  }
+
+  @Override
+  public BeamSqlTable buildBeamSqlTable(Table table) {
+    return new GenerateSequenceTable(table);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/seqgen/package-info.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/seqgen/package-info.java
new file mode 100644
index 0000000..d7841db
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/seqgen/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Table schema for streaming sequence generator. */
+package org.apache.beam.sdk.extensions.sql.meta.provider.seqgen;
diff --git a/website/src/_posts/2019-05-01-adding-data-sources-to-sql.md 
b/website/src/_posts/2019-05-01-adding-data-sources-to-sql.md
new file mode 100644
index 0000000..f8d0f4c
--- /dev/null
+++ b/website/src/_posts/2019-05-01-adding-data-sources-to-sql.md
@@ -0,0 +1,202 @@
+---
+layout: post
+title:  "Adding new Data Sources to Beam SQL CLI"
+date:   2019-06-04 00:00:01 -0800
+excerpt_separator: <!--more-->
+categories: blog
+authors:
+        - pabloem
+
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+A new, exciting feature that came to Apache Beam is the ability to use
+SQL in your pipelines. This is done using Beam's
+[`SqlTransform`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/sql/SqlTransform.html)
+in Java pipelines.
+
+Beam also has a fancy new SQL command line that you can use to query your
+data interactively, be it Batch or Streaming. If you haven't tried it, check 
out
+[http://bit.ly/ExploreBeamSQL](http://bit.ly/ExploreBeamSQL).
+
+A nice feature of the SQL CLI is that you can use `CREATE EXTERNAL TABLE`
+commands to *add* data sources to be accessed in the CLI. Currently, the CLI
+supports creating tables from BigQuery, PubSub, Kafka, and text files. In this
+post, we explore how to add new data sources, so that you will be able to
+consume data from other Beam sources.
+
+<!--more-->
+
+The table provider we will be implementing in this post will be generating a
+continuous unbounded stream of integers. It will be based on the
+[`GenerateSequence` 
PTransform](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/GenerateSequence.html)
+from the Beam SDK. In the end will be able to define and use the sequence 
generator
+in SQL like this:
+
+```
+CREATE EXTERNAL TABLE                      -- all tables in Beam are external, 
they are not persisted
+  sequenceTable                              -- table alias that will be used 
in queries
+  (
+         sequence BIGINT,                  -- sequence number
+         event_timestamp TIMESTAMP         -- timestamp of the generated event
+  )
+TYPE sequence                              -- type identifies the table 
provider
+TBLPROPERTIES '{ elementsPerSecond : 12 }' -- optional rate at which events 
are generated
+```
+
+And we'll be able to use it in queries like so:
+
+```
+SELECT sequence FROM sequenceTable;
+```
+
+Let's dive in!
+
+### Implementing a `TableProvider`
+
+Beam's `SqlTransform` works by relying on `TableProvider`s, which it uses when
+one uses a `CREATE EXTERNAL TABLE` statement. If you are looking to add a new
+data source to the Beam SQL CLI, then you will want to add a `TableProvider` to
+do it. In this post, I will show what steps are necessary to create a new table
+provider for the
+[`GenerateSequence` 
transform](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/GenerateSequence.html)
 available in the Java SDK.
+
+The `TableProvider` classes are under
+[`sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/`](https://github.com/apache/beam/tree/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider).
 If you look in there, you can find providers, and their implementations, for 
all available data sources. So, you just need to add the one you want, along 
with an implementation of `BaseBeamTable`.
+
+### The GenerateSequenceTableProvider
+
+Our table provider looks like this:
+
+```java
+@AutoService(TableProvider.class)
+public class GenerateSequenceTableProvider extends InMemoryMetaTableProvider {
+
+  @Override
+  public String getTableType() {
+    return "sequence";
+  }
+
+  @Override
+  public BeamSqlTable buildBeamSqlTable(Table table) {
+    return new GenerateSequenceTable(table);
+  }
+}
+```
+
+All it does is give a type to the table - and it implements the
+`buildBeamSqlTable` method, which simply returns a `BeamSqlTable` defined by
+our `GenerateSequenceTable` implementation.
+
+### The GenerateSequenceTable
+
+We want a table implementation that supports streaming properly, so we will
+allow users to define the number of elements to be emitted per second. We will
+define a simple table that emits sequential integers in a streaming fashion.
+This looks like so:
+
+```java
+class GenerateSequenceTable extends BaseBeamTable implements Serializable {
+  public static final Schema TABLE_SCHEMA =
+      Schema.of(Field.of("sequence", FieldType.INT64), Field.of("event_time", 
FieldType.DATETIME));
+
+  Integer elementsPerSecond = 5;
+
+  GenerateSequenceTable(Table table) {
+    super(TABLE_SCHEMA);
+    if (table.getProperties().containsKey("elementsPerSecond")) {
+      elementsPerSecond = 
table.getProperties().getInteger("elementsPerSecond");
+    }
+  }
+
+  @Override
+  public PCollection.IsBounded isBounded() {
+    return IsBounded.UNBOUNDED;
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    return begin
+        .apply(GenerateSequence.from(0).withRate(elementsPerSecond, 
Duration.standardSeconds(1)))
+        .apply(
+            MapElements.into(TypeDescriptor.of(Row.class))
+                .via(elm -> Row.withSchema(TABLE_SCHEMA).addValues(elm, 
Instant.now()).build()))
+        .setRowSchema(getSchema());
+  }
+
+  @Override
+  public POutput buildIOWriter(PCollection<Row> input) {
+    throw new UnsupportedOperationException("buildIOWriter unsupported!");
+  }
+}
+```
+
+
+
+## The real fun
+
+Now that we have implemented the two basic classes (a `BaseBeamTable`, and a
+`TableProvider`), we can start playing with them. After building the
+[SQL CLI](https://beam.apache.org/documentation/dsls/sql/shell/), we
+can now perform selections on the table:
+
+```
+0: BeamSQL> CREATE EXTERNAL TABLE input_seq (
+. . . . . >   sequence BIGINT COMMENT 'this is the primary key',
+. . . . . >   event_time TIMESTAMP COMMENT 'this is the element timestamp'
+. . . . . > )
+. . . . . > TYPE 'sequence';
+No rows affected (0.005 seconds)
+```
+
+And let's select a few rows:
+
+```
+0: BeamSQL> SELECT * FROM input_seq LIMIT 5;
++---------------------+------------+
+|      sequence       | event_time |
++---------------------+------------+
+| 0                   | 2019-05-21 00:36:33 |
+| 1                   | 2019-05-21 00:36:33 |
+| 2                   | 2019-05-21 00:36:33 |
+| 3                   | 2019-05-21 00:36:33 |
+| 4                   | 2019-05-21 00:36:33 |
++---------------------+------------+
+5 rows selected (1.138 seconds)
+```
+
+Now let's try something more interesting. Such as grouping. This will also let
+us make sure that we're providing the timestamp for each row properly:
+
+```
+0: BeamSQL> SELECT
+. . . . . >   COUNT(sequence) as elements,
+. . . . . >   TUMBLE_START(event_time, INTERVAL '2' SECOND) as window_start
+. . . . . > FROM input_seq
+. . . . . > GROUP BY TUMBLE(event_time, INTERVAL '2' SECOND) LIMIT 5;
++---------------------+--------------+
+|      elements       | window_start |
++---------------------+--------------+
+| 6                   | 2019-06-05 00:39:24 |
+| 10                  | 2019-06-05 00:39:26 |
+| 10                  | 2019-06-05 00:39:28 |
+| 10                  | 2019-06-05 00:39:30 |
+| 10                  | 2019-06-05 00:39:32 |
++---------------------+--------------+
+5 rows selected (10.142 seconds)
+```
+
+And voilĂ ! We can start playing with some interesting streaming queries to our
+sequence generator.

Reply via email to