[
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16534329#comment-16534329
]
ASF GitHub Bot commented on FLINK-8866:
---------------------------------------
Github user walterddr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6201#discussion_r200521222
--- Diff:
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
---
@@ -56,23 +58,44 @@ public Environment() {
return tables;
}
+ private static TableDescriptor create(String name, Map<String, Object>
config) {
+ if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE()))
{
+ throw new SqlClientException("The 'type' attribute of a
table is missing.");
+ }
+ final String tableType = (String)
config.get(TableDescriptorValidator.TABLE_TYPE());
+ if
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) {
+ return new Source(name,
ConfigUtil.normalizeYaml(config));
+ } else if
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) {
+ return new Sink(name, ConfigUtil.normalizeYaml(config));
+ } else if
(tableType.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) {
+ return new SourceSink(name,
ConfigUtil.normalizeYaml(config));
+ }
+ return null;
+ }
+
public void setTables(List<Map<String, Object>> tables) {
this.tables = new HashMap<>(tables.size());
tables.forEach(config -> {
- if
(!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) {
- throw new SqlClientException("The 'type'
attribute of a table is missing.");
+ if (!config.containsKey(NAME)) {
+ throw new SqlClientException("The 'name'
attribute of a table is missing.");
}
- if
(config.get(TableDescriptorValidator.TABLE_TYPE()).equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE()))
{
-
config.remove(TableDescriptorValidator.TABLE_TYPE());
- final Source s = Source.create(config);
- if (this.tables.containsKey(s.getName())) {
- throw new SqlClientException("Duplicate
source name '" + s + "'.");
- }
- this.tables.put(s.getName(), s);
- } else {
+ final Object name = config.get(NAME);
+ if (name == null || !(name instanceof String) ||
((String) name).length() <= 0) {
+ throw new SqlClientException("Invalid table
name '" + name + "'.");
+ }
+ final String tableName = (String) name;
+ final Map<String, Object> properties = new
HashMap<>(config);
+ properties.remove(NAME);
+
+ TableDescriptor tableDescriptor = create(tableName,
properties);
+ if (null == tableDescriptor) {
throw new SqlClientException(
- "Invalid table 'type' attribute
value, only 'source' is supported");
+ "Invalid table 'type' attribute
value, only 'source' or 'sink' is supported");
+ }
+ if (this.tables.containsKey(tableName)) {
+ throw new SqlClientException("Duplicate table
name '" + tableName + "'.");
--- End diff --
if only `"source"` and `"sink"` is allowed, should we allow the same name
but different type. e.g. `{"name": "t1", "type": "source"}` and `{"name": "t1",
"type": "sink"}` co-exist? this is actually following up with the previous
comment. I think we just need one, either should work.
> Create unified interfaces to configure and instatiate TableSinks
> ----------------------------------------------------------------
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Shuyi Chen
> Priority: Major
> Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure
> and instantiate TableSinks. Among other applications, this is necessary in
> order to declare table sinks in an environment file of the SQL client. Such
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind.
> 1) Add TableSinkFactory/TableSinkFactoryService similar to
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both)
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to
> identify whether it's source or sink.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)