Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r171280725
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
 ---
    @@ -67,14 +85,129 @@ class SchemaValidator(isStreamEnvironment: Boolean = 
true) extends DescriptorVal
     object SchemaValidator {
     
       val SCHEMA = "schema"
    -  val SCHEMA_VERSION = "schema.version"
    +  val SCHEMA_NAME = "name"
    +  val SCHEMA_TYPE = "type"
    +  val SCHEMA_PROCTIME = "proctime"
    +  val SCHEMA_FROM = "from"
    +
    +  // utilities
    +
    +  /**
    +    * Finds the proctime attribute if defined.
    +    */
    +  def deriveProctimeAttribute(properties: DescriptorProperties): 
Optional[String] = {
    +    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
    +
    +    for (i <- 0 until names.size) {
    +      val isProctime = toScala(
    +        properties.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME"))
    +      isProctime.foreach { isSet =>
    +        if (isSet) {
    +          return toJava(names.asScala.get(s"$SCHEMA.$i.$SCHEMA_NAME"))
    +        }
    +      }
    +    }
    +    toJava(None)
    +  }
    +
    +  /**
    +    * Finds the rowtime attributes if defined.
    +    */
    +  def deriveRowtimeAttributes(properties: DescriptorProperties)
    +    : util.List[RowtimeAttributeDescriptor] = {
    +
    +    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
    +
    +    var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]()
    +
    +    // check for rowtime in every field
    +    for (i <- 0 until names.size) {
    +      RowtimeValidator
    +        .getRowtimeComponents(properties, s"$SCHEMA.$i.")
    +        .foreach { case (extractor, strategy) =>
    +          // create descriptor
    +          attributes += new RowtimeAttributeDescriptor(
    +            properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME"),
    +            extractor,
    +            strategy)
    +        }
    +    }
    +
    +    attributes.asJava
    +  }
    +
    +  /**
    +    * Finds a table source field mapping.
    +    */
    +  def deriveFieldMapping(
    +      properties: DescriptorProperties,
    +      sourceSchema: Optional[TableSchema])
    +    : util.Map[String, String] = {
    +
    +    val mapping = mutable.Map[String, String]()
    +
    +    val schema = properties.getTableSchema(SCHEMA)
    +
    +    // add all schema fields first for implicit mappings
    +    schema.getColumnNames.foreach { name =>
    +      mapping.put(name, name)
    +    }
    +
    +    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
    +
    +    for (i <- 0 until names.size) {
    +      val name = properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME")
    +      toScala(properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")) 
match {
     
    -  // per column properties
    +        // add explicit mapping
    +        case Some(source) =>
    +          mapping.put(name, source)
     
    -  val NAME = "name"
    -  val TYPE = "type"
    -  val PROCTIME = "proctime"
    -  val PROCTIME_VALUE_TRUE = "true"
    -  val FROM = "from"
    +        // implicit mapping or time
    +        case None =>
    +          val isProctime = properties
    +            .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
    +            .orElse(false)
    +          val isRowtime = properties
    +            .containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE")
    +          // remove proctime/rowtime from mapping
    +          if (isProctime || isRowtime) {
    +            mapping.remove(name)
    +          }
    +          // check for invalid fields
    +          else if (toScala(sourceSchema).forall(s => 
!s.getColumnNames.contains(name))) {
    +            throw new ValidationException(s"Could not map the schema field 
'$name' to a field " +
    +              s"from source. Please specify the source field from which it 
can be derived.")
    +          }
    +      }
    +    }
     
    +    mapping.toMap.asJava
    +  }
    +
    +  /**
    +    * Finds the fields that can be used for a format schema (without time 
attributes).
    +    */
    +  def deriveFormatFields(properties: DescriptorProperties): TableSchema = {
    --- End diff --
    
    Hi @twalthr, sorry for mentioning you again. I was a little confused about 
this method. Could you help explain its usage? Besides, the rowtime field 
should be an existing field in the input format. Why removing it here?
    
    Thanks, Xingcan


---

Reply via email to