Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5564#discussion_r170272030
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
---
@@ -67,14 +92,188 @@ class SchemaValidator(isStreamEnvironment: Boolean =
true) extends DescriptorVal
object SchemaValidator {
val SCHEMA = "schema"
- val SCHEMA_VERSION = "schema.version"
+ val SCHEMA_PROPERTY_VERSION = "schema.property-version"
+ val SCHEMA_FIELDS = "schema.fields"
+ val SCHEMA_FIELDS_NAME = "name"
+ val SCHEMA_FIELDS_TYPE = "type"
+ val SCHEMA_FIELDS_PROCTIME = "proctime"
+ val SCHEMA_FIELDS_FROM = "from"
+ val SCHEMA_DERIVE_FIELDS = "schema.derive-fields"
+ val SCHEMA_DERIVE_FIELDS_VALUE_ALPHABETICALLY = "alphabetically"
+ val SCHEMA_DERIVE_FIELDS_VALUE_SEQUENTIALLY = "sequentially"
+
+ // utilities
+
+ /**
+ * Derives a schema from properties and source.
+ */
+ def deriveSchema(
+ properties: DescriptorProperties,
+ sourceSchema: Option[TableSchema])
+ : TableSchema = {
+
+ val builder = TableSchema.builder()
+
+ val schema = properties.getTableSchema(SCHEMA_FIELDS)
+
+ val derivationMode = properties.getString(SCHEMA_DERIVE_FIELDS)
+
+ val sourceNamesAndTypes = derivationMode match {
+ case Some(SCHEMA_DERIVE_FIELDS_VALUE_ALPHABETICALLY) if
sourceSchema.isDefined =>
+ // sort by name
+ sourceSchema.get.getColumnNames
+ .zip(sourceSchema.get.getTypes)
+ .sortBy(_._1)
+
+ case Some(SCHEMA_DERIVE_FIELDS_VALUE_SEQUENTIALLY) if
sourceSchema.isDefined =>
+ sourceSchema.get.getColumnNames.zip(sourceSchema.get.getTypes)
+
+ case Some(_) =>
+ throw new ValidationException("Derivation of fields is not
supported from this source.")
+
+ case None =>
+ Array[(String, TypeInformation[_])]()
+ }
+
+ // add source fields
+ sourceNamesAndTypes.foreach { case (n, t) =>
+ builder.field(n, t)
+ }
+
+ // add schema fields
+ schema.foreach { ts =>
+ val schemaNamesAndTypes = ts.getColumnNames.zip(ts.getTypes)
+ schemaNamesAndTypes.foreach { case (n, t) =>
+ // do not allow overwriting
+ if (sourceNamesAndTypes.exists(_._1 == n)) {
+ throw new ValidationException(
+ "Specified schema fields must not overwrite fields derived
from the source.")
+ }
+ builder.field(n, t)
+ }
+ }
+
+ builder.build()
+ }
+
+ /**
+ * Derives a schema from properties and source.
+ * This method is intended for Java code.
+ */
+ def deriveSchema(
+ properties: DescriptorProperties,
+ sourceSchema: Optional[TableSchema])
+ : TableSchema = {
+ deriveSchema(
+ properties,
+ Option(sourceSchema.orElse(null)))
+ }
+
+ /**
+ * Finds the proctime attribute if defined.
+ */
+ def deriveProctimeAttribute(properties: DescriptorProperties):
Option[String] = {
+ val names = properties.getIndexedProperty(SCHEMA_FIELDS,
SCHEMA_FIELDS_NAME)
+
+ for (i <- 0 until names.size) {
+ val isProctime =
properties.getBoolean(s"$SCHEMA_FIELDS.$i.$SCHEMA_FIELDS_PROCTIME")
+ isProctime.foreach { isSet =>
+ if (isSet) {
+ return names.get(s"$SCHEMA_FIELDS.$i.$SCHEMA_FIELDS_NAME")
+ }
+ }
+ }
+ None
+ }
+
+ /**
+ * Finds the proctime attribute if defined.
+ * This method is intended for Java code.
+ */
+ def deriveProctimeOptional(properties: DescriptorProperties):
Optional[String] = {
+ Optional.ofNullable(deriveProctimeAttribute(properties).orNull)
+ }
+
+ /**
+ * Finds the rowtime attributes if defined.
+ */
+ def deriveRowtimeAttributes(properties: DescriptorProperties)
+ : util.List[RowtimeAttributeDescriptor] = {
+
+ val names = properties.getIndexedProperty(SCHEMA_FIELDS,
SCHEMA_FIELDS_NAME)
+
+ var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]()
+
+ // check for rowtime in every field
+ for (i <- 0 until names.size) {
+ RowtimeValidator
+ .getRowtimeComponents(properties, s"$SCHEMA_FIELDS.$i.")
+ .foreach { case (extractor, strategy) =>
+ // create descriptor
+ attributes += new RowtimeAttributeDescriptor(
+
properties.getString(s"$SCHEMA_FIELDS.$i.$SCHEMA_FIELDS_NAME").get,
+ extractor,
+ strategy)
+ }
+ }
+
+ attributes.asJava
+ }
+
+ /**
+ * Find a table source field mapping.
+ * This method is intended for Java code.
+ */
+ def deriveFieldMapping(
--- End diff --
Provide only Java-friendly methods?
---