    [FLINK-8538] [table] Add a Kafka table source factory with JSON format

    ## What is the purpose of the change
    This PR is a continuation of #5505. Since this is the first connector with 
format, time attributes, and table source factory, I encountered many 
inconsistencies that I try to fix in this PR.
    ## Brief change log
    - Change property `connector.version` to `` in 
order to use `version` for things like Kafka
    - Add more utility functions for better Java<->Scala interoperability
    - Add full rowtime support
    - Derive JSON mapping from schema
    - Derive schema from source if required
    ## Verifying this change
    Various unit tests implemented.
    ## Does this pull request potentially affect one of the following parts:
      - Dependencies (does it add or upgrade a dependency): yes
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
      - The S3 file system connector: no
    ## Documentation
      - Does this pull request introduce a new feature? yes
      - If yes, how is the feature documented? JavaDocs/ScalaDocs

