[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16539673#comment-16539673
 ] 

ASF GitHub Bot commented on FLINK-8558:
---------------------------------------

Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6264#discussion_r201593060
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactoryService.scala
 ---
    @@ -0,0 +1,245 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.formats
    +
    +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
    +
    +import org.apache.flink.table.api._
    +import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION
    +import org.apache.flink.table.descriptors._
    +import org.apache.flink.table.util.Logging
    +
    +import _root_.scala.collection.JavaConverters._
    +import _root_.scala.collection.mutable
    +
    +/**
    +  * Service provider interface for finding a suitable 
[[TableFormatFactory]] for the
    +  * given properties.
    +  */
    +object TableFormatFactoryService extends Logging {
    +
    +  private lazy val defaultLoader = 
ServiceLoader.load(classOf[TableFormatFactory[_]])
    +
    +  /**
    +    * Finds a table format factory of the given class and creates 
configured instances from the
    +    * given descriptor.
    +    *
    +    * @param factoryClass desired format factory
    +    * @param descriptor descriptor that describes the format
    +    * @tparam T factory class type
    +    * @return configured instance from factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
    +    find(factoryClass, descriptor, null)
    +  }
    +
    +  /**
    +    * Finds a table format factory of the given class and creates 
configured instances from the
    +    * given descriptor and classloader.
    +    *
    +    * @param factoryClass desired format factory
    +    * @param descriptor descriptor that describes the format
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return configured instance from factory
    +    */
    +  def find[T](
    +      factoryClass: Class[T],
    +      descriptor: Descriptor,
    +      classLoader: ClassLoader)
    +    : T = {
    +
    +    val properties = new DescriptorProperties()
    +    descriptor.addProperties(properties)
    +    find(factoryClass, properties.asMap, classLoader)
    +  }
    +
    +  /**
    +    * Finds a table format factory of the given class and creates 
configured instances from the
    +    * given property map.
    +    *
    +    * @param factoryClass desired format factory
    +    * @param propertyMap properties that describes the format
    +    * @tparam T factory class type
    +    * @return configured instance from factory
    +    */
    +  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): 
T = {
    +    find(factoryClass, propertyMap, null)
    +  }
    +
    +  /**
    +    * Finds a table format factory of the given class and creates 
configured instances from the
    +    * given property map and classloader.
    +    *
    +    * @param factoryClass desired format factory
    +    * @param propertyMap properties that describes the format
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return configured instance from factory
    +    */
    +  def find[T](
    +      factoryClass: Class[T],
    +      propertyMap: JMap[String, String],
    +      classLoader: ClassLoader)
    +    : T = {
    +
    +    val properties = propertyMap.asScala.toMap
    +
    +    // find matching context
    +    val (foundFactories, contextFactories) = 
findMatchingContext(properties, classLoader)
    +    if (contextFactories.isEmpty) {
    +      throw new NoMatchingTableFormatException(
    +        "No context matches.",
    +        factoryClass,
    +        foundFactories,
    +        properties)
    +    }
    +
    +    // filter for factory class
    +    val classFactories = contextFactories.filter(f => 
factoryClass.isAssignableFrom(f.getClass))
    +    if (classFactories.isEmpty) {
    +      throw new NoMatchingTableFormatException(
    +        s"No factory implements '${factoryClass.getCanonicalName}'.",
    +        factoryClass,
    +        foundFactories,
    +        properties)
    +    }
    +
    +    // filter by supported keys
    +    val plainGivenKeys = mutable.ArrayBuffer[String]()
    +    properties.keys.foreach { k =>
    +      // replace arrays with wildcard
    +      val key = k.replaceAll(".\\d+", ".#")
    +      // ignore duplicates
    +      if (!plainGivenKeys.contains(key)) {
    +        plainGivenKeys += key
    +      }
    +    }
    +    var lastKey: Option[String] = None
    +    val supportedFactories = classFactories.filter { factory =>
    +      val requiredContextKeys = normalizeContext(factory).keySet
    +      val includeSchema = factory.supportsSchemaDerivation()
    +      val supportedKeys = normalizeSupportedProperties(factory)
    +      val givenKeys = plainGivenKeys
    +        // ignore context keys
    +        .filter(!requiredContextKeys.contains(_))
    +        // ignore non-format (or schema) keys
    +        .filter { k =>
    +          if (includeSchema) {
    +            k.startsWith(SchemaValidator.SCHEMA + ".") ||
    +              k.startsWith(FormatDescriptorValidator.FORMAT + ".")
    +          } else {
    +            k.startsWith(FormatDescriptorValidator.FORMAT + ".")
    +          }
    +        }
    +      givenKeys.forall { k =>
    +        lastKey = Option(k)
    +        supportedKeys.contains(k)
    +      }
    +    }
    +    if (supportedFactories.isEmpty && classFactories.length == 1 && 
lastKey.isDefined) {
    +      // special case: when there is only one matching factory but the 
last property key
    +      // was incorrect
    +      val factory = classFactories.head
    +      val supportedKeys = normalizeSupportedProperties(factory)
    +      throw new NoMatchingTableFormatException(
    +        s"""
    +          |The matching factory '${factory.getClass.getName}' doesn't 
support '${lastKey.get}'.
    +          |
    +          |Supported properties of this factory are:
    +          |${supportedKeys.sorted.mkString("\n")}""".stripMargin,
    +        factoryClass,
    +        foundFactories,
    +        properties)
    +    } else if (supportedFactories.isEmpty) {
    +      throw new NoMatchingTableFormatException(
    +        s"No factory supports all properties.",
    +        factoryClass,
    +        foundFactories,
    +        properties)
    +    } else if (supportedFactories.length > 1) {
    +      throw new AmbiguousTableFormatException(
    +        supportedFactories,
    +        factoryClass,
    +        foundFactories,
    +        properties)
    +    }
    +
    +    supportedFactories.head.asInstanceOf[T]
    +  }
    +
    +  private def findMatchingContext(
    +      properties: Map[String, String],
    +      classLoader: ClassLoader)
    +    : (Seq[TableFormatFactory[_]], Seq[TableFormatFactory[_]]) = {
    +
    +    val foundFactories = mutable.ArrayBuffer[TableFormatFactory[_]]()
    +    val matchingFactories = mutable.ArrayBuffer[TableFormatFactory[_]]()
    +
    +    try {
    +      val iter = if (classLoader == null) {
    +        defaultLoader.iterator()
    +      } else {
    +        val customLoader = 
ServiceLoader.load(classOf[TableFormatFactory[_]], classLoader)
    +        customLoader.iterator()
    +      }
    +      while (iter.hasNext) {
    +        val factory = iter.next()
    +        foundFactories += factory
    +
    +        val requestedContext = normalizeContext(factory)
    +
    +        val plainContext = mutable.Map[String, String]()
    +        plainContext ++= requestedContext
    +        // we remove the version for now until we have the first backwards 
compatibility case
    +        // with the version we can provide mappings in case the format 
changes
    +        plainContext.remove(FORMAT_PROPERTY_VERSION)
    +
    +        // check if required context is met
    +        if (plainContext.forall(e => properties.contains(e._1) && 
properties(e._1) == e._2)) {
    +          matchingFactories += factory
    +        }
    +      }
    +    } catch {
    +      case e: ServiceConfigurationError =>
    +        LOG.error("Could not load service provider for table format 
factories.", e)
    +        throw new TableException("Could not load service provider for 
table format factories.", e)
    +    }
    +
    +    (foundFactories, matchingFactories)
    +  }
    +
    +  private def normalizeContext(factory: TableFormatFactory[_]): 
Map[String, String] = {
    +    val requiredContextJava = factory.requiredContext()
    +    if (requiredContextJava != null) {
    +      requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2)).toMap
    +    } else {
    +      Map[String, String]()
    --- End diff --
    
    `checkNotNull(requiredContextJava)`? The interface doesn't seem to allow 
for nulls.


> Add unified format interfaces and format discovery
> --------------------------------------------------
>
>                 Key: FLINK-8558
>                 URL: https://issues.apache.org/jira/browse/FLINK-8558
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming Connectors
>            Reporter: Timo Walther
>            Assignee: Timo Walther
>            Priority: Major
>              Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently 
> only {{flink-avro}} is located there but we will add more formats such as 
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of 
> concerns we want decouple connectors from formats: e.g., remove 
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to 
> discovery available formats in the classpath (similar to how file systems are 
> discovered now). A {{Format}} will provide a method for converting {{byte[]}} 
> to target record type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to