[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16311429#comment-16311429
 ] 

ASF GitHub Bot commented on FLINK-8240:
---------------------------------------

GitHub user twalthr opened a pull request:

    https://github.com/apache/flink/pull/5240

    [FLINK-8240] [table] Create unified interfaces to configure and instatiate 
TableSources

    ## What is the purpose of the change
    
    This PR presents the inital version of the new unified TableSource API. The 
API is based on a descriptor approach. A descriptor allows for describing 
parameters and behavior. They contain no logic but only store information and 
translate it to normalized string-based properties.
    
    The following example shows how a CSV table source could be specified in 
the future:
    
    ```
    tableEnv
          .createTable(
            Schema()
              .field("myfield", Types.STRING)
              .field("myfield2", Types.INT))
          .withConnector(
            FileSystem()
              .path("/path/to/csv"))
          .withEncoding(
            CSV()
              .field("myfield", Types.STRING)
              .field("myfield2", Types.INT)
              .quoteCharacter(';')
              .fieldDelimiter("#")
              .lineDelimiter("\r\n")
              .commentPrefix("%%")
              .ignoreFirstLine()
              .ignoreParseErrors())
          .withRowtime(
            Rowtime()
              .field("rowtime")
              .timestampFromDataStream()
              .watermarkFromDataStream())
          .withProctime(
            Proctime()
              .field("myproctime"))
    ```
    
    They get translated into:
    
    ```
    "schema.0.name" -> "myfield",
    "schema.0.type" -> "VARCHAR",
    "schema.1.name" -> "myfield2",
    "schema.1.type" -> "INT",
    "connector.type" -> "filesystem",
    "connector.path" -> "/path/to/csv",
    "encoding.type" -> "csv",
    "encoding.fields.0.name" -> "myfield",
    "encoding.fields.0.type" -> "VARCHAR",
    "encoding.fields.1.name" -> "myfield2",
    "encoding.fields.1.type" -> "INT",
    "encoding.quote-character" -> ";",
    "encoding.field-delimiter" -> "#",
    "encoding.line-delimiter" -> "\r\n",
    "encoding.comment-prefix" -> "%%",
    "encoding.ignore-first-line" -> "true",
    "encoding.ignore-parse-errors" -> "true",
    "rowtime.0.name" -> "rowtime",
    "rowtime.0.timestamp.type" -> "stream-record",
    "rowtime.0.watermark.type" -> "preserving",
    "proctime" -> "myproctime"
    ```
    
    This PR also reworks the discovery of table sources by deprecating the 
`@TableType` annotation and reflection-based discovery with 
`TableSourceFactory` interfaces and standard Java Service Provider Interfaces 
(SPI). Now the table factories can use the above properties to create table 
sources from. The `ExternalCatalogTable` class has been reworked to use the new 
descriptor-based approach as well, however, we should be fully source code 
backwards compatible.
    
    I agree that there are more tests missing and we should also decide where 
and how the validation should happen. I think it should happen mostly in the 
table source builders. We could also introduce some global dictionary class to 
use constants for properties instead of strings at different positions.
    
    What do you think?
    
    ## Brief change log
    
      - Adds descriptors for schema, connectors, encoding, statistics, 
metadata, proctime, and rowtime
      - Adds table factory discovery based on unified properties
    
    ## Verifying this change
    
     - Added `DescriptorsTest`
     - ExternalCatalog tests are still working
     - More tests will follow...
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
      - The S3 file system connector: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? yes
      - If yes, how is the feature documented? ScalaDocs

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/twalthr/flink FLINK-8240

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5240.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5240
    
----
commit a8ccac6895bde97b154c1cbb442a0ac6e901b4c3
Author: twalthr <twalthr@...>
Date:   2017-12-15T09:18:20Z

    [FLINK-8240] [table] Create unified interfaces to configure and instatiate 
TableSources

----


> Create unified interfaces to configure and instatiate TableSources
> ------------------------------------------------------------------
>
>                 Key: FLINK-8240
>                 URL: https://issues.apache.org/jira/browse/FLINK-8240
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Timo Walther
>            Assignee: Timo Walther
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to