[
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16538426#comment-16538426
]
ASF GitHub Bot commented on FLINK-8558:
---------------------------------------
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6264#discussion_r201307506
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
---
@@ -110,16 +111,44 @@ abstract class BatchTableEnvironment(
}
}
-// TODO expose this once we have enough table source factories that can
deal with it
-// /**
-// * Creates a table from a descriptor that describes the source
connector, source encoding,
-// * the resulting table schema, and other properties.
-// *
-// * @param connectorDescriptor connector descriptor describing the
source of the table
-// */
-// def from(connectorDescriptor: ConnectorDescriptor):
BatchTableSourceDescriptor = {
-// new BatchTableSourceDescriptor(this, connectorDescriptor)
-// }
+ /**
+ * Creates a table from a descriptor that describes the source
connector, the source format,
+ * the resulting table schema, and other properties.
+ *
+ * Descriptors allow for declaring communication to external systems in
an
+ * implementation-agnostic way. The classpath is scanned for connectors
and matching connectors
+ * are configured accordingly.
+ *
+ * The following example shows how to read from a Kafka connector using
a JSON format and
+ * creating a table:
+ *
+ * {{{
+ *
+ * tableEnv
+ * .from(
+ * new Kafka()
+ * .version("0.11")
+ * .topic("clicks")
+ * .property("zookeeper.connect", "localhost")
+ * .property("group.id", "click-group")
+ * .startFromEarliest())
+ * .withFormat(
+ * new Json()
+ * .jsonSchema("{...}")
+ * .failOnMissingField(false))
+ * .withSchema(
+ * new Schema()
+ * .field("user-name", "VARCHAR").from("u_name")
+ * .field("count", "DECIMAL")
+ * .field("proc-time", "TIMESTAMP").proctime())
+ * .toTable()
+ * }}}
+ *
+ * @param connectorDescriptor connector descriptor describing the
source of the table
+ */
+ def from(connectorDescriptor: ConnectorDescriptor):
BatchTableSourceDescriptor = {
--- End diff --
Once we reimplement the environments in Java and introduce proper
interfaces for hiding the implementation. This problem will be gone anyway.
> Add unified format interfaces and format discovery
> --------------------------------------------------
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
> Issue Type: New Feature
> Components: Streaming Connectors
> Reporter: Timo Walther
> Assignee: Timo Walther
> Priority: Major
> Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently
> only {{flink-avro}} is located there but we will add more formats such as
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of
> concerns we want decouple connectors from formats: e.g., remove
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to
> discovery available formats in the classpath (similar to how file systems are
> discovered now). A {{Format}} will provide a method for converting {{byte[]}}
> to target record type.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)