Github user walterddr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6201#discussion_r200521222
--- Diff:
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
---
@@ -56,23 +58,44 @@ public Environment() {
return tables;
}
+ private static TableDescriptor create(String name, Map<String, Object>
config) {
+ if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE()))
{
+ throw new SqlClientException("The 'type' attribute of a
table is missing.");
+ }
+ final String tableType = (String)
config.get(TableDescriptorValidator.TABLE_TYPE());
+ if
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) {
+ return new Source(name,
ConfigUtil.normalizeYaml(config));
+ } else if
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) {
+ return new Sink(name, ConfigUtil.normalizeYaml(config));
+ } else if
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) {
+ return new SourceSink(name,
ConfigUtil.normalizeYaml(config));
+ }
+ return null;
+ }
+
public void setTables(List<Map<String, Object>> tables) {
this.tables = new HashMap<>(tables.size());
tables.forEach(config -> {
- if
(!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) {
- throw new SqlClientException("The 'type'
attribute of a table is missing.");
+ if (!config.containsKey(NAME)) {
+ throw new SqlClientException("The 'name'
attribute of a table is missing.");
}
- if
(config.get(TableDescriptorValidator.TABLE_TYPE()).equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE()))
{
-
config.remove(TableDescriptorValidator.TABLE_TYPE());
- final Source s = Source.create(config);
- if (this.tables.containsKey(s.getName())) {
- throw new SqlClientException("Duplicate
source name '" + s + "'.");
- }
- this.tables.put(s.getName(), s);
- } else {
+ final Object name = config.get(NAME);
+ if (name == null || !(name instanceof String) ||
((String) name).length() <= 0) {
+ throw new SqlClientException("Invalid table
name '" + name + "'.");
+ }
+ final String tableName = (String) name;
+ final Map<String, Object> properties = new
HashMap<>(config);
+ properties.remove(NAME);
+
+ TableDescriptor tableDescriptor = create(tableName,
properties);
+ if (null == tableDescriptor) {
throw new SqlClientException(
- "Invalid table 'type' attribute
value, only 'source' is supported");
+ "Invalid table 'type' attribute
value, only 'source' or 'sink' is supported");
+ }
+ if (this.tables.containsKey(tableName)) {
+ throw new SqlClientException("Duplicate table
name '" + tableName + "'.");
--- End diff --
if only `"source"` and `"sink"` is allowed, should we allow the same name
but different type. e.g. `{"name": "t1", "type": "source"}` and `{"name": "t1",
"type": "sink"}` co-exist? this is actually following up with the previous
comment. I think we just need one, either should work.
---