[
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16539673#comment-16539673
]
ASF GitHub Bot commented on FLINK-8558:
---------------------------------------
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6264#discussion_r201593060
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/formats/TableFormatFactoryService.scala
---
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.formats
+
+import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
+
+import org.apache.flink.table.api._
+import
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION
+import org.apache.flink.table.descriptors._
+import org.apache.flink.table.util.Logging
+
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
+
+/**
+ * Service provider interface for finding a suitable
[[TableFormatFactory]] for the
+ * given properties.
+ */
+object TableFormatFactoryService extends Logging {
+
+ private lazy val defaultLoader =
ServiceLoader.load(classOf[TableFormatFactory[_]])
+
+ /**
+ * Finds a table format factory of the given class and creates
configured instances from the
+ * given descriptor.
+ *
+ * @param factoryClass desired format factory
+ * @param descriptor descriptor that describes the format
+ * @tparam T factory class type
+ * @return configured instance from factory
+ */
+ def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
+ find(factoryClass, descriptor, null)
+ }
+
+ /**
+ * Finds a table format factory of the given class and creates
configured instances from the
+ * given descriptor and classloader.
+ *
+ * @param factoryClass desired format factory
+ * @param descriptor descriptor that describes the format
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return configured instance from factory
+ */
+ def find[T](
+ factoryClass: Class[T],
+ descriptor: Descriptor,
+ classLoader: ClassLoader)
+ : T = {
+
+ val properties = new DescriptorProperties()
+ descriptor.addProperties(properties)
+ find(factoryClass, properties.asMap, classLoader)
+ }
+
+ /**
+ * Finds a table format factory of the given class and creates
configured instances from the
+ * given property map.
+ *
+ * @param factoryClass desired format factory
+ * @param propertyMap properties that describes the format
+ * @tparam T factory class type
+ * @return configured instance from factory
+ */
+ def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]):
T = {
+ find(factoryClass, propertyMap, null)
+ }
+
+ /**
+ * Finds a table format factory of the given class and creates
configured instances from the
+ * given property map and classloader.
+ *
+ * @param factoryClass desired format factory
+ * @param propertyMap properties that describes the format
+ * @param classLoader classloader for service loading
+ * @tparam T factory class type
+ * @return configured instance from factory
+ */
+ def find[T](
+ factoryClass: Class[T],
+ propertyMap: JMap[String, String],
+ classLoader: ClassLoader)
+ : T = {
+
+ val properties = propertyMap.asScala.toMap
+
+ // find matching context
+ val (foundFactories, contextFactories) =
findMatchingContext(properties, classLoader)
+ if (contextFactories.isEmpty) {
+ throw new NoMatchingTableFormatException(
+ "No context matches.",
+ factoryClass,
+ foundFactories,
+ properties)
+ }
+
+ // filter for factory class
+ val classFactories = contextFactories.filter(f =>
factoryClass.isAssignableFrom(f.getClass))
+ if (classFactories.isEmpty) {
+ throw new NoMatchingTableFormatException(
+ s"No factory implements '${factoryClass.getCanonicalName}'.",
+ factoryClass,
+ foundFactories,
+ properties)
+ }
+
+ // filter by supported keys
+ val plainGivenKeys = mutable.ArrayBuffer[String]()
+ properties.keys.foreach { k =>
+ // replace arrays with wildcard
+ val key = k.replaceAll(".\\d+", ".#")
+ // ignore duplicates
+ if (!plainGivenKeys.contains(key)) {
+ plainGivenKeys += key
+ }
+ }
+ var lastKey: Option[String] = None
+ val supportedFactories = classFactories.filter { factory =>
+ val requiredContextKeys = normalizeContext(factory).keySet
+ val includeSchema = factory.supportsSchemaDerivation()
+ val supportedKeys = normalizeSupportedProperties(factory)
+ val givenKeys = plainGivenKeys
+ // ignore context keys
+ .filter(!requiredContextKeys.contains(_))
+ // ignore non-format (or schema) keys
+ .filter { k =>
+ if (includeSchema) {
+ k.startsWith(SchemaValidator.SCHEMA + ".") ||
+ k.startsWith(FormatDescriptorValidator.FORMAT + ".")
+ } else {
+ k.startsWith(FormatDescriptorValidator.FORMAT + ".")
+ }
+ }
+ givenKeys.forall { k =>
+ lastKey = Option(k)
+ supportedKeys.contains(k)
+ }
+ }
+ if (supportedFactories.isEmpty && classFactories.length == 1 &&
lastKey.isDefined) {
+ // special case: when there is only one matching factory but the
last property key
+ // was incorrect
+ val factory = classFactories.head
+ val supportedKeys = normalizeSupportedProperties(factory)
+ throw new NoMatchingTableFormatException(
+ s"""
+ |The matching factory '${factory.getClass.getName}' doesn't
support '${lastKey.get}'.
+ |
+ |Supported properties of this factory are:
+ |${supportedKeys.sorted.mkString("\n")}""".stripMargin,
+ factoryClass,
+ foundFactories,
+ properties)
+ } else if (supportedFactories.isEmpty) {
+ throw new NoMatchingTableFormatException(
+ s"No factory supports all properties.",
+ factoryClass,
+ foundFactories,
+ properties)
+ } else if (supportedFactories.length > 1) {
+ throw new AmbiguousTableFormatException(
+ supportedFactories,
+ factoryClass,
+ foundFactories,
+ properties)
+ }
+
+ supportedFactories.head.asInstanceOf[T]
+ }
+
+ private def findMatchingContext(
+ properties: Map[String, String],
+ classLoader: ClassLoader)
+ : (Seq[TableFormatFactory[_]], Seq[TableFormatFactory[_]]) = {
+
+ val foundFactories = mutable.ArrayBuffer[TableFormatFactory[_]]()
+ val matchingFactories = mutable.ArrayBuffer[TableFormatFactory[_]]()
+
+ try {
+ val iter = if (classLoader == null) {
+ defaultLoader.iterator()
+ } else {
+ val customLoader =
ServiceLoader.load(classOf[TableFormatFactory[_]], classLoader)
+ customLoader.iterator()
+ }
+ while (iter.hasNext) {
+ val factory = iter.next()
+ foundFactories += factory
+
+ val requestedContext = normalizeContext(factory)
+
+ val plainContext = mutable.Map[String, String]()
+ plainContext ++= requestedContext
+ // we remove the version for now until we have the first backwards
compatibility case
+ // with the version we can provide mappings in case the format
changes
+ plainContext.remove(FORMAT_PROPERTY_VERSION)
+
+ // check if required context is met
+ if (plainContext.forall(e => properties.contains(e._1) &&
properties(e._1) == e._2)) {
+ matchingFactories += factory
+ }
+ }
+ } catch {
+ case e: ServiceConfigurationError =>
+ LOG.error("Could not load service provider for table format
factories.", e)
+ throw new TableException("Could not load service provider for
table format factories.", e)
+ }
+
+ (foundFactories, matchingFactories)
+ }
+
+ private def normalizeContext(factory: TableFormatFactory[_]):
Map[String, String] = {
+ val requiredContextJava = factory.requiredContext()
+ if (requiredContextJava != null) {
+ requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2)).toMap
+ } else {
+ Map[String, String]()
--- End diff --
`checkNotNull(requiredContextJava)`? The interface doesn't seem to allow
for nulls.
> Add unified format interfaces and format discovery
> --------------------------------------------------
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
> Issue Type: New Feature
> Components: Streaming Connectors
> Reporter: Timo Walther
> Assignee: Timo Walther
> Priority: Major
> Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently
> only {{flink-avro}} is located there but we will add more formats such as
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of
> concerns we want decouple connectors from formats: e.g., remove
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to
> discovery available formats in the classpath (similar to how file systems are
> discovered now). A {{Format}} will provide a method for converting {{byte[]}}
> to target record type.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)