[
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16311429#comment-16311429
]
ASF GitHub Bot commented on FLINK-8240:
---------------------------------------
GitHub user twalthr opened a pull request:
https://github.com/apache/flink/pull/5240
[FLINK-8240] [table] Create unified interfaces to configure and instatiate
TableSources
## What is the purpose of the change
This PR presents the inital version of the new unified TableSource API. The
API is based on a descriptor approach. A descriptor allows for describing
parameters and behavior. They contain no logic but only store information and
translate it to normalized string-based properties.
The following example shows how a CSV table source could be specified in
the future:
```
tableEnv
.createTable(
Schema()
.field("myfield", Types.STRING)
.field("myfield2", Types.INT))
.withConnector(
FileSystem()
.path("/path/to/csv"))
.withEncoding(
CSV()
.field("myfield", Types.STRING)
.field("myfield2", Types.INT)
.quoteCharacter(';')
.fieldDelimiter("#")
.lineDelimiter("\r\n")
.commentPrefix("%%")
.ignoreFirstLine()
.ignoreParseErrors())
.withRowtime(
Rowtime()
.field("rowtime")
.timestampFromDataStream()
.watermarkFromDataStream())
.withProctime(
Proctime()
.field("myproctime"))
```
They get translated into:
```
"schema.0.name" -> "myfield",
"schema.0.type" -> "VARCHAR",
"schema.1.name" -> "myfield2",
"schema.1.type" -> "INT",
"connector.type" -> "filesystem",
"connector.path" -> "/path/to/csv",
"encoding.type" -> "csv",
"encoding.fields.0.name" -> "myfield",
"encoding.fields.0.type" -> "VARCHAR",
"encoding.fields.1.name" -> "myfield2",
"encoding.fields.1.type" -> "INT",
"encoding.quote-character" -> ";",
"encoding.field-delimiter" -> "#",
"encoding.line-delimiter" -> "\r\n",
"encoding.comment-prefix" -> "%%",
"encoding.ignore-first-line" -> "true",
"encoding.ignore-parse-errors" -> "true",
"rowtime.0.name" -> "rowtime",
"rowtime.0.timestamp.type" -> "stream-record",
"rowtime.0.watermark.type" -> "preserving",
"proctime" -> "myproctime"
```
This PR also reworks the discovery of table sources by deprecating the
`@TableType` annotation and reflection-based discovery with
`TableSourceFactory` interfaces and standard Java Service Provider Interfaces
(SPI). Now the table factories can use the above properties to create table
sources from. The `ExternalCatalogTable` class has been reworked to use the new
descriptor-based approach as well, however, we should be fully source code
backwards compatible.
I agree that there are more tests missing and we should also decide where
and how the validation should happen. I think it should happen mostly in the
table source builders. We could also introduce some global dictionary class to
use constants for properties instead of strings at different positions.
What do you think?
## Brief change log
- Adds descriptors for schema, connectors, encoding, statistics,
metadata, proctime, and rowtime
- Adds table factory discovery based on unified properties
## Verifying this change
- Added `DescriptorsTest`
- ExternalCatalog tests are still working
- More tests will follow...
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: no
- The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? yes
- If yes, how is the feature documented? ScalaDocs
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/twalthr/flink FLINK-8240
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5240.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5240
----
commit a8ccac6895bde97b154c1cbb442a0ac6e901b4c3
Author: twalthr <twalthr@...>
Date: 2017-12-15T09:18:20Z
[FLINK-8240] [table] Create unified interfaces to configure and instatiate
TableSources
----
> Create unified interfaces to configure and instatiate TableSources
> ------------------------------------------------------------------
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Timo Walther
>
> At the moment every table source has different ways for configuration and
> instantiation. Some table source are tailored to a specific encoding (e.g.,
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining
> common properties, and instantiation. The {{TableSourceConverters}} provide a
> similar functionality but use an external catalog. We might generialize this
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
> - Type (e.g. Kafka, Custom)
> - Properties (e.g. topic, connection info)
> - Encoding
> - Type (e.g. Avro, JSON, CSV)
> - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
> - Watermark strategy and Watermark properties
> - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is
> very welcome.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)