[
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341347#comment-16341347
]
ASF GitHub Bot commented on FLINK-8240:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5240#discussion_r164151340
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
---
@@ -69,17 +72,76 @@ class Schema extends Descriptor {
*/
def field(fieldName: String, fieldType: String): Schema = {
if (tableSchema.contains(fieldName)) {
- throw new IllegalArgumentException(s"Duplicate field name
$fieldName.")
+ throw new ValidationException(s"Duplicate field name $fieldName.")
+ }
+
+ val fieldProperties = mutable.LinkedHashMap[String, String]()
+ fieldProperties += (DescriptorUtils.TYPE -> fieldType)
+
+ tableSchema += (fieldName -> fieldProperties)
+
+ lastField = Some(fieldName)
+ this
+ }
+
+ /**
+ * Specifies the origin of the previously defined field. The origin
field is defined by a
+ * connector or format.
+ *
+ * E.g. field("myString", Types.STRING).from("CSV_MY_STRING")
+ */
+ def from(originFieldName: String): Schema = {
+ lastField match {
+ case None => throw new ValidationException("No field defined
previously. Use field() before.")
+ case Some(f) =>
+ tableSchema(f) += (DescriptorUtils.FROM -> originFieldName)
+ lastField = None
+ }
+ this
+ }
+
+ /**
+ * Specifies the previously defined field as a processing-time
attribute.
+ *
+ * E.g. field("myString", Types.STRING).proctime()
--- End diff --
`field("procTime", Types.SQL_TIMESTAMP).proctime()`
> Create unified interfaces to configure and instatiate TableSources
> ------------------------------------------------------------------
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Timo Walther
> Priority: Major
>
> At the moment every table source has different ways for configuration and
> instantiation. Some table source are tailored to a specific encoding (e.g.,
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining
> common properties, and instantiation. The {{TableSourceConverters}} provide a
> similar functionality but use an external catalog. We might generialize this
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
> - Type (e.g. Kafka, Custom)
> - Properties (e.g. topic, connection info)
> - Encoding
> - Type (e.g. Avro, JSON, CSV)
> - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
> - Watermark strategy and Watermark properties
> - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is
> very welcome.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)