Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5240#discussion_r164150350
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
---
@@ -69,17 +72,76 @@ class Schema extends Descriptor {
*/
def field(fieldName: String, fieldType: String): Schema = {
if (tableSchema.contains(fieldName)) {
- throw new IllegalArgumentException(s"Duplicate field name
$fieldName.")
+ throw new ValidationException(s"Duplicate field name $fieldName.")
+ }
+
+ val fieldProperties = mutable.LinkedHashMap[String, String]()
+ fieldProperties += (DescriptorUtils.TYPE -> fieldType)
+
+ tableSchema += (fieldName -> fieldProperties)
+
+ lastField = Some(fieldName)
+ this
+ }
+
+ /**
+ * Specifies the origin of the previously defined field. The origin
field is defined by a
+ * connector or format.
+ *
+ * E.g. field("myString", Types.STRING).from("CSV_MY_STRING")
+ */
+ def from(originFieldName: String): Schema = {
+ lastField match {
+ case None => throw new ValidationException("No field defined
previously. Use field() before.")
--- End diff --
"previously defined"
---