[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16374587#comment-16374587
 ] 

ASF GitHub Bot commented on FLINK-8538:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170272030
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
 ---
    @@ -67,14 +92,188 @@ class SchemaValidator(isStreamEnvironment: Boolean = 
true) extends DescriptorVal
     object SchemaValidator {
     
       val SCHEMA = "schema"
    -  val SCHEMA_VERSION = "schema.version"
    +  val SCHEMA_PROPERTY_VERSION = "schema.property-version"
    +  val SCHEMA_FIELDS = "schema.fields"
    +  val SCHEMA_FIELDS_NAME = "name"
    +  val SCHEMA_FIELDS_TYPE = "type"
    +  val SCHEMA_FIELDS_PROCTIME = "proctime"
    +  val SCHEMA_FIELDS_FROM = "from"
    +  val SCHEMA_DERIVE_FIELDS = "schema.derive-fields"
    +  val SCHEMA_DERIVE_FIELDS_VALUE_ALPHABETICALLY = "alphabetically"
    +  val SCHEMA_DERIVE_FIELDS_VALUE_SEQUENTIALLY = "sequentially"
    +
    +  // utilities
    +
    +  /**
    +    * Derives a schema from properties and source.
    +    */
    +  def deriveSchema(
    +      properties: DescriptorProperties,
    +      sourceSchema: Option[TableSchema])
    +    : TableSchema = {
    +
    +    val builder = TableSchema.builder()
    +
    +    val schema = properties.getTableSchema(SCHEMA_FIELDS)
    +
    +    val derivationMode = properties.getString(SCHEMA_DERIVE_FIELDS)
    +
    +    val sourceNamesAndTypes = derivationMode match {
    +      case Some(SCHEMA_DERIVE_FIELDS_VALUE_ALPHABETICALLY) if 
sourceSchema.isDefined =>
    +        // sort by name
    +        sourceSchema.get.getColumnNames
    +          .zip(sourceSchema.get.getTypes)
    +          .sortBy(_._1)
    +
    +      case Some(SCHEMA_DERIVE_FIELDS_VALUE_SEQUENTIALLY) if 
sourceSchema.isDefined =>
    +        sourceSchema.get.getColumnNames.zip(sourceSchema.get.getTypes)
    +
    +      case Some(_) =>
    +        throw new ValidationException("Derivation of fields is not 
supported from this source.")
    +
    +      case None =>
    +        Array[(String, TypeInformation[_])]()
    +    }
    +
    +    // add source fields
    +    sourceNamesAndTypes.foreach { case (n, t) =>
    +      builder.field(n, t)
    +    }
    +
    +    // add schema fields
    +    schema.foreach { ts =>
    +      val schemaNamesAndTypes = ts.getColumnNames.zip(ts.getTypes)
    +      schemaNamesAndTypes.foreach { case (n, t) =>
    +          // do not allow overwriting
    +          if (sourceNamesAndTypes.exists(_._1 == n)) {
    +            throw new ValidationException(
    +              "Specified schema fields must not overwrite fields derived 
from the source.")
    +          }
    +          builder.field(n, t)
    +      }
    +    }
    +
    +    builder.build()
    +  }
    +
    +  /**
    +    * Derives a schema from properties and source.
    +    * This method is intended for Java code.
    +    */
    +  def deriveSchema(
    +      properties: DescriptorProperties,
    +      sourceSchema: Optional[TableSchema])
    +    : TableSchema = {
    +    deriveSchema(
    +      properties,
    +      Option(sourceSchema.orElse(null)))
    +  }
    +
    +  /**
    +    * Finds the proctime attribute if defined.
    +    */
    +  def deriveProctimeAttribute(properties: DescriptorProperties): 
Option[String] = {
    +    val names = properties.getIndexedProperty(SCHEMA_FIELDS, 
SCHEMA_FIELDS_NAME)
    +
    +    for (i <- 0 until names.size) {
    +      val isProctime = 
properties.getBoolean(s"$SCHEMA_FIELDS.$i.$SCHEMA_FIELDS_PROCTIME")
    +      isProctime.foreach { isSet =>
    +        if (isSet) {
    +          return names.get(s"$SCHEMA_FIELDS.$i.$SCHEMA_FIELDS_NAME")
    +        }
    +      }
    +    }
    +    None
    +  }
    +
    +  /**
    +    * Finds the proctime attribute if defined.
    +    * This method is intended for Java code.
    +    */
    +  def deriveProctimeOptional(properties: DescriptorProperties): 
Optional[String] = {
    +    Optional.ofNullable(deriveProctimeAttribute(properties).orNull)
    +  }
    +
    +  /**
    +    * Finds the rowtime attributes if defined.
    +    */
    +  def deriveRowtimeAttributes(properties: DescriptorProperties)
    +    : util.List[RowtimeAttributeDescriptor] = {
    +
    +    val names = properties.getIndexedProperty(SCHEMA_FIELDS, 
SCHEMA_FIELDS_NAME)
    +
    +    var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]()
    +
    +    // check for rowtime in every field
    +    for (i <- 0 until names.size) {
    +      RowtimeValidator
    +        .getRowtimeComponents(properties, s"$SCHEMA_FIELDS.$i.")
    +        .foreach { case (extractor, strategy) =>
    +          // create descriptor
    +          attributes += new RowtimeAttributeDescriptor(
    +            
properties.getString(s"$SCHEMA_FIELDS.$i.$SCHEMA_FIELDS_NAME").get,
    +            extractor,
    +            strategy)
    +        }
    +    }
    +
    +    attributes.asJava
    +  }
    +
    +  /**
    +    * Find a table source field mapping.
    +    * This method is intended for Java code.
    +    */
    +  def deriveFieldMapping(
    --- End diff --
    
    Provide only Java-friendly methods?


> Add a Kafka table source factory with JSON format support
> ---------------------------------------------------------
>
>                 Key: FLINK-8538
>                 URL: https://issues.apache.org/jira/browse/FLINK-8538
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API &amp; SQL
>            Reporter: Timo Walther
>            Assignee: Xingcan Cui
>            Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to