TheNeuralBit commented on a change in pull request #14507:
URL: https://github.com/apache/beam/pull/14507#discussion_r612620237



##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
##########
@@ -45,23 +48,57 @@
  *   NAME VARCHAR(127) COMMENT 'this is the name'
  * )
  * COMMENT 'this is the table orders'
- * LOCATION 'kafka://localhost:2181/brokers?topic=test'
- * TBLPROPERTIES '{"bootstrap.servers":"localhost:9092", "topics": ["topic1", 
"topic2"]}'
+ * TYPE kafka
+ * // One broker host:port pair to bootstrap with and a topic.
+ * // Only one topic overall may be provided for writing.
+ * LOCATION 'my.company.url.com:2181/topic1'
+ * // Extra bootstrap_servers and topics can be provided explicitly. These 
will be merged
+ * // with the server and topic in LOCATION.
+ * TBLPROPERTIES '{
+ *   "bootstrap_servers": ["104.126.7.88:7743", "104.111.9.22:7743"],
+ *   "topics": ["topic2", "topic3"]
+ * }'
  * }</pre>
  */
 @AutoService(TableProvider.class)
 public class KafkaTableProvider extends InMemoryMetaTableProvider {
+  private static class ParsedLocation {
+    String brokerLocation = "";
+    String topic = "";
+  }
+
+  private static ParsedLocation parseLocation(String location) {
+    ParsedLocation parsed = new ParsedLocation();
+    List<String> split = Splitter.on('/').splitToList(location);
+    checkArgument(
+        split.size() >= 2,
+        "Location string `%s` invalid: must be <broker bootstrap 
location>/<topic>.",
+        location);
+    parsed.topic = Iterables.getLast(split);
+    parsed.brokerLocation = String.join("/", split.subList(0, split.size() - 
1));
+    return parsed;
+  }
+
+  private static List<String> mergeParam(String initial, @Nullable 
List<Object> toMerge) {
+    if (toMerge == null) {
+      return ImmutableList.of(initial);
+    }
+    ImmutableList.Builder<String> merged = ImmutableList.builder();
+    merged.add(initial);
+    toMerge.forEach(o -> merged.add(o.toString()));
+    return merged.build();
+  }
+
   @Override
   public BeamSqlTable buildBeamSqlTable(Table table) {
     Schema schema = table.getSchema();
-
     JSONObject properties = table.getProperties();
-    String bootstrapServers = properties.getString("bootstrap.servers");
-    JSONArray topicsArr = properties.getJSONArray("topics");
-    List<String> topics = new ArrayList<>(topicsArr.size());
-    for (Object topic : topicsArr) {
-      topics.add(topic.toString());
-    }
+
+    ParsedLocation parsedLocation = 
parseLocation(checkArgumentNotNull(table.getLocation()));
+    List<String> topics = mergeParam(parsedLocation.topic, 
properties.getJSONArray("topics"));
+    List<String> allBootstrapServers =
+        mergeParam(parsedLocation.brokerLocation, 
properties.getJSONArray("bootstrap_servers"));
+    String bootstrapServers = String.join(",", allBootstrapServers);

Review comment:
       I think LOCATION is optional in the DDL. The grammar is defined here, 
but it's a little opaque to me: 
https://github.com/apache/beam/blob/8e6a7988765c24169c3872c4cdebb03c3def1d50/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl#L163-L192
   
   Maybe @apilloud can confirm.
   
   But if that's the case I'm not sure why we have  examples where `LOCATION` 
is specified and ignored. Why didn't we just create examples that don't specify 
`LOCATION`?
   
   I think the value in allowing LOCATION to be unspecified would be:
   1. backwards compatibility
   1. could be more convenient for the multiple bootstrap server and topic case




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to