dpcollins-google commented on a change in pull request #14507:
URL: https://github.com/apache/beam/pull/14507#discussion_r612634894



##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
##########
@@ -45,23 +48,57 @@
  *   NAME VARCHAR(127) COMMENT 'this is the name'
  * )
  * COMMENT 'this is the table orders'
- * LOCATION 'kafka://localhost:2181/brokers?topic=test'
- * TBLPROPERTIES '{"bootstrap.servers":"localhost:9092", "topics": ["topic1", 
"topic2"]}'
+ * TYPE kafka
+ * // One broker host:port pair to bootstrap with and a topic.
+ * // Only one topic overall may be provided for writing.
+ * LOCATION 'my.company.url.com:2181/topic1'
+ * // Extra bootstrap_servers and topics can be provided explicitly. These 
will be merged
+ * // with the server and topic in LOCATION.
+ * TBLPROPERTIES '{
+ *   "bootstrap_servers": ["104.126.7.88:7743", "104.111.9.22:7743"],
+ *   "topics": ["topic2", "topic3"]
+ * }'
  * }</pre>
  */
 @AutoService(TableProvider.class)
 public class KafkaTableProvider extends InMemoryMetaTableProvider {
+  private static class ParsedLocation {
+    String brokerLocation = "";
+    String topic = "";
+  }
+
+  private static ParsedLocation parseLocation(String location) {
+    ParsedLocation parsed = new ParsedLocation();
+    List<String> split = Splitter.on('/').splitToList(location);
+    checkArgument(
+        split.size() >= 2,
+        "Location string `%s` invalid: must be <broker bootstrap 
location>/<topic>.",
+        location);
+    parsed.topic = Iterables.getLast(split);
+    parsed.brokerLocation = String.join("/", split.subList(0, split.size() - 
1));
+    return parsed;
+  }
+
+  private static List<String> mergeParam(String initial, @Nullable 
List<Object> toMerge) {
+    if (toMerge == null) {
+      return ImmutableList.of(initial);
+    }
+    ImmutableList.Builder<String> merged = ImmutableList.builder();
+    merged.add(initial);
+    toMerge.forEach(o -> merged.add(o.toString()));
+    return merged.build();
+  }
+
   @Override
   public BeamSqlTable buildBeamSqlTable(Table table) {
     Schema schema = table.getSchema();
-
     JSONObject properties = table.getProperties();
-    String bootstrapServers = properties.getString("bootstrap.servers");
-    JSONArray topicsArr = properties.getJSONArray("topics");
-    List<String> topics = new ArrayList<>(topicsArr.size());
-    for (Object topic : topicsArr) {
-      topics.add(topic.toString());
-    }
+
+    ParsedLocation parsedLocation = 
parseLocation(checkArgumentNotNull(table.getLocation()));
+    List<String> topics = mergeParam(parsedLocation.topic, 
properties.getJSONArray("topics"));
+    List<String> allBootstrapServers =
+        mergeParam(parsedLocation.brokerLocation, 
properties.getJSONArray("bootstrap_servers"));
+    String bootstrapServers = String.join(",", allBootstrapServers);

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to