Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/5564#discussion_r171292456
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
---
@@ -67,14 +85,129 @@ class SchemaValidator(isStreamEnvironment: Boolean =
true) extends DescriptorVal
object SchemaValidator {
val SCHEMA = "schema"
- val SCHEMA_VERSION = "schema.version"
+ val SCHEMA_NAME = "name"
+ val SCHEMA_TYPE = "type"
+ val SCHEMA_PROCTIME = "proctime"
+ val SCHEMA_FROM = "from"
+
+ // utilities
+
+ /**
+ * Finds the proctime attribute if defined.
+ */
+ def deriveProctimeAttribute(properties: DescriptorProperties):
Optional[String] = {
+ val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
+
+ for (i <- 0 until names.size) {
+ val isProctime = toScala(
+ properties.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME"))
+ isProctime.foreach { isSet =>
+ if (isSet) {
+ return toJava(names.asScala.get(s"$SCHEMA.$i.$SCHEMA_NAME"))
+ }
+ }
+ }
+ toJava(None)
+ }
+
+ /**
+ * Finds the rowtime attributes if defined.
+ */
+ def deriveRowtimeAttributes(properties: DescriptorProperties)
+ : util.List[RowtimeAttributeDescriptor] = {
+
+ val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
+
+ var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]()
+
+ // check for rowtime in every field
+ for (i <- 0 until names.size) {
+ RowtimeValidator
+ .getRowtimeComponents(properties, s"$SCHEMA.$i.")
+ .foreach { case (extractor, strategy) =>
+ // create descriptor
+ attributes += new RowtimeAttributeDescriptor(
+ properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME"),
+ extractor,
+ strategy)
+ }
+ }
+
+ attributes.asJava
+ }
+
+ /**
+ * Finds a table source field mapping.
+ */
+ def deriveFieldMapping(
+ properties: DescriptorProperties,
+ sourceSchema: Optional[TableSchema])
+ : util.Map[String, String] = {
+
+ val mapping = mutable.Map[String, String]()
+
+ val schema = properties.getTableSchema(SCHEMA)
+
+ // add all schema fields first for implicit mappings
+ schema.getColumnNames.foreach { name =>
+ mapping.put(name, name)
+ }
+
+ val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
+
+ for (i <- 0 until names.size) {
+ val name = properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME")
+ toScala(properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM"))
match {
- // per column properties
+ // add explicit mapping
+ case Some(source) =>
+ mapping.put(name, source)
- val NAME = "name"
- val TYPE = "type"
- val PROCTIME = "proctime"
- val PROCTIME_VALUE_TRUE = "true"
- val FROM = "from"
+ // implicit mapping or time
+ case None =>
+ val isProctime = properties
+ .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
+ .orElse(false)
+ val isRowtime = properties
+ .containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE")
+ // remove proctime/rowtime from mapping
+ if (isProctime || isRowtime) {
+ mapping.remove(name)
+ }
+ // check for invalid fields
+ else if (toScala(sourceSchema).forall(s =>
!s.getColumnNames.contains(name))) {
+ throw new ValidationException(s"Could not map the schema field
'$name' to a field " +
+ s"from source. Please specify the source field from which it
can be derived.")
+ }
+ }
+ }
+ mapping.toMap.asJava
+ }
+
+ /**
+ * Finds the fields that can be used for a format schema (without time
attributes).
+ */
+ def deriveFormatFields(properties: DescriptorProperties): TableSchema = {
--- End diff --
No problem @xccui. My goal was to allow users to specify all fields only
once. Because users often have tables with 30+ columns. When I opened the PR I
added a possibility to derive a `schema` from a `format` schema. But according
to a SQL DDL statement `CREATE TABLE (..) [FORMAT] ...` the `schema` must be
always complete and the `format` schema might be derived, so I changed my
initial implementation.
For simplicity `deriveFormatFields` removes the time attributes and takes
the result schema as the format's schema, because `rowtime` must not be an
existing field. If rowtime should be an existing field, the full format schema
is mandatory (because `schema` and `format` schema might differ). I agree that
we need good documentation for all of that.
---