TheNeuralBit commented on a change in pull request #14507:
URL: https://github.com/apache/beam/pull/14507#discussion_r612597865
##########
File path:
website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
##########
@@ -473,18 +542,8 @@ Write Mode supports writing to a topic.
* CSV (default)
* Beam parses the messages, attempting to parse fields according to the
types specified in the schema.
Review comment:
Is there a jira for making CSV a PayloadSerializer?
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
##########
@@ -45,23 +48,57 @@
* NAME VARCHAR(127) COMMENT 'this is the name'
* )
* COMMENT 'this is the table orders'
- * LOCATION 'kafka://localhost:2181/brokers?topic=test'
- * TBLPROPERTIES '{"bootstrap.servers":"localhost:9092", "topics": ["topic1",
"topic2"]}'
+ * TYPE kafka
+ * // One broker host:port pair to bootstrap with and a topic.
+ * // Only one topic overall may be provided for writing.
+ * LOCATION 'my.company.url.com:2181/topic1'
+ * // Extra bootstrap_servers and topics can be provided explicitly. These
will be merged
+ * // with the server and topic in LOCATION.
+ * TBLPROPERTIES '{
+ * "bootstrap_servers": ["104.126.7.88:7743", "104.111.9.22:7743"],
+ * "topics": ["topic2", "topic3"]
+ * }'
* }</pre>
*/
@AutoService(TableProvider.class)
public class KafkaTableProvider extends InMemoryMetaTableProvider {
+ private static class ParsedLocation {
+ String brokerLocation = "";
+ String topic = "";
+ }
+
+ private static ParsedLocation parseLocation(String location) {
+ ParsedLocation parsed = new ParsedLocation();
+ List<String> split = Splitter.on('/').splitToList(location);
+ checkArgument(
+ split.size() >= 2,
+ "Location string `%s` invalid: must be <broker bootstrap
location>/<topic>.",
+ location);
+ parsed.topic = Iterables.getLast(split);
+ parsed.brokerLocation = String.join("/", split.subList(0, split.size() -
1));
+ return parsed;
+ }
+
+ private static List<String> mergeParam(String initial, @Nullable
List<Object> toMerge) {
+ if (toMerge == null) {
+ return ImmutableList.of(initial);
+ }
+ ImmutableList.Builder<String> merged = ImmutableList.builder();
+ merged.add(initial);
+ toMerge.forEach(o -> merged.add(o.toString()));
+ return merged.build();
+ }
+
@Override
public BeamSqlTable buildBeamSqlTable(Table table) {
Schema schema = table.getSchema();
-
JSONObject properties = table.getProperties();
- String bootstrapServers = properties.getString("bootstrap.servers");
- JSONArray topicsArr = properties.getJSONArray("topics");
- List<String> topics = new ArrayList<>(topicsArr.size());
- for (Object topic : topicsArr) {
- topics.add(topic.toString());
- }
+
+ ParsedLocation parsedLocation =
parseLocation(checkArgumentNotNull(table.getLocation()));
+ List<String> topics = mergeParam(parsedLocation.topic,
properties.getJSONArray("topics"));
+ List<String> allBootstrapServers =
+ mergeParam(parsedLocation.brokerLocation,
properties.getJSONArray("bootstrap_servers"));
+ String bootstrapServers = String.join(",", allBootstrapServers);
Review comment:
What about making `LOCATION` optional, and only merging it with
topics/bootstrap_servers if defined?
##########
File path:
website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
##########
@@ -427,38 +428,106 @@ TYPE pubsub
LOCATION 'projects/testing-integration/topics/user-location'
```
+## Pub/Sub Lite
+
+### Syntax
+```
+CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(
+ publish_timestamp DATETIME,
+ event_timestamp DATETIME,
+ message_key BYTES,
+ attributes ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>,
+ payload [BYTES, ROW<tableElement [, tableElement ]*>]
+)
+TYPE pubsublite
+// For writing
+LOCATION 'projects/[PROJECT]/locations/[GCP-LOCATION]/topics/[TOPIC]'
+// For reading
+LOCATION
'projects/[PROJECT]/locations/[GCP-LOCATION]/subscriptions/[SUBSCRIPTION]'
+```
+
+* `LOCATION`:
+ * `PROJECT`: ID of the Google Cloud Project
+ * `TOPIC`: The Pub/Sub Lite topic name.
+ * `SUBSCRIPTION`: The Pub/Sub Lite subscription name.
+ * `GCP-LOCATION`: The location for this Pub/Sub Lite topic os
subscription.
+* `TBLPROPERTIES`:
+ * `timestampAttributeKey`: Optional. The key which contains the event
+ timestamp associated with the Pub/Sub message. If not specified, the
+ message publish timestamp is used as an event timestamp for
+ windowing/watermarking.
+ * `deadLetterQueue`: Optional, supports
+ [Generic DLQ Handling](#generic-dlq-handling)
+ * `format`: Optional. Allows you to specify the payload format.
+
+### Read Mode
+
+PubsubLiteIO supports reading from subscriptions.
+
+### Write Mode
+
+PubsubLiteIO supports writing to topics.
+
+### Supported Payload
+
+* Pub/Sub Lite supports [Generic Payload
Handling](#generic-payload-handling).
+
+### Example
+
+```
+CREATE EXTERNAL TABLE locations (event_timestamp TIMESTAMP, attributes
ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>, payload ROW<id INTEGER,
location VARCHAR>)
+TYPE pubsublite
+LOCATION
'projects/testing-integration/locations/us-central1-a/topics/user-location'
+```
+
Review comment:
Thanks for updating these docs, looks great!
It looks like there's some trailing whitespace that needs to be removed
though (See whitespace precommit)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]