http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala index d112732..555d92d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala @@ -21,24 +21,32 @@ package org.apache.flink.table.descriptors import java.io.Serializable import java.lang.{Boolean => JBoolean, Double => JDouble, Integer => JInt, Long => JLong} import java.util +import java.util.function.{Consumer, Supplier} import java.util.regex.Pattern +import java.util.{Optional, List => JList, Map => JMap} import org.apache.commons.codec.binary.Base64 import org.apache.commons.lang.StringEscapeUtils import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.api.{TableSchema, ValidationException} -import org.apache.flink.table.descriptors.DescriptorProperties.{NAME, TYPE, normalizeTableSchema} +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.table.api.{TableException, TableSchema, ValidationException} +import org.apache.flink.table.descriptors.DescriptorProperties.{NAME, TYPE, normalizeTableSchema, toJava} import org.apache.flink.table.typeutils.TypeStringUtils import org.apache.flink.util.InstantiationUtil import org.apache.flink.util.Preconditions.checkNotNull -import scala.collection.mutable import scala.collection.JavaConverters._ +import scala.collection.mutable /** * Utility class for having a unified string-based representation of Table API related classes * such as [[TableSchema]], [[TypeInformation]], etc. * + * '''Note to implementers''': Please try to reuse key names as much as possible. Key-names + * should be hierarchical and lower case. Use "-" instead of dots or camel case. + * E.g., connector.schema.start-from = from-earliest. Try not to use the higher level in a + * key-name. E.g., instead of connector.kafka.kafka-version use connector.kafka.version. + * * @param normalizeKeys flag that indicates if keys should be normalized (this flag is * necessary for backwards compatibility) */ @@ -46,39 +54,18 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { private val properties: mutable.Map[String, String] = new mutable.HashMap[String, String]() - private def put(key: String, value: String): Unit = { - if (properties.contains(key)) { - throw new IllegalStateException("Property already present.") - } - if (normalizeKeys) { - properties.put(key.toLowerCase, value) - } else { - properties.put(key, value) - } - } - - // for testing - private[flink] def unsafePut(key: String, value: String): Unit = { - properties.put(key, value) - } - - // for testing - private[flink] def unsafeRemove(key: String): Unit = { - properties.remove(key) - } - - def putProperties(properties: Map[String, String]): Unit = { - properties.foreach { case (k, v) => - put(k, v) - } - } - - def putProperties(properties: util.Map[String, String]): Unit = { + /** + * Adds a set of properties. + */ + def putProperties(properties: JMap[String, String]): Unit = { properties.asScala.foreach { case (k, v) => put(k, v) } } + /** + * Adds a class under the given key. + */ def putClass(key: String, clazz: Class[_]): Unit = { checkNotNull(key) checkNotNull(clazz) @@ -89,43 +76,62 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { put(key, clazz.getName) } + /** + * Adds a string under the given key. + */ def putString(key: String, str: String): Unit = { checkNotNull(key) checkNotNull(str) put(key, str) } + /** + * Adds a boolean under the given key. + */ def putBoolean(key: String, b: Boolean): Unit = { checkNotNull(key) put(key, b.toString) } + /** + * Adds a long under the given key. + */ def putLong(key: String, l: Long): Unit = { checkNotNull(key) put(key, l.toString) } + /** + * Adds an integer under the given key. + */ def putInt(key: String, i: Int): Unit = { checkNotNull(key) put(key, i.toString) } + /** + * Adds a character under the given key. + */ def putCharacter(key: String, c: Character): Unit = { checkNotNull(key) checkNotNull(c) put(key, c.toString) } + /** + * Adds a table schema under the given key. + */ def putTableSchema(key: String, schema: TableSchema): Unit = { + checkNotNull(key) + checkNotNull(schema) putTableSchema(key, normalizeTableSchema(schema)) } - def putTableSchema(key: String, nameAndType: Seq[(String, String)]): Unit = { - putIndexedFixedProperties( - key, - Seq(NAME, TYPE), - nameAndType.map(t => Seq(t._1, t._2)) - ) + /** + * Adds a table schema under the given key. + */ + def putTableSchema(key: String, nameAndType: JList[JTuple2[String, String]]): Unit = { + putTableSchema(key, nameAndType.asScala.map(t => (t.f0, t.f1))) } /** @@ -140,19 +146,12 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { */ def putIndexedFixedProperties( key: String, - propertyKeys: Seq[String], - propertyValues: Seq[Seq[String]]) + propertyKeys: JList[String], + propertyValues: JList[JList[String]]) : Unit = { checkNotNull(key) checkNotNull(propertyValues) - propertyValues.zipWithIndex.foreach { case (values, idx) => - if (values.lengthCompare(propertyKeys.size) != 0) { - throw new ValidationException("Values must have same arity as keys.") - } - values.zipWithIndex.foreach { case (value, keyIdx) => - put(s"$key.$idx.${propertyKeys(keyIdx)}", value) - } - } + putIndexedFixedProperties(key, propertyKeys.asScala, propertyValues.asScala.map(_.asScala)) } /** @@ -167,65 +166,163 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { */ def putIndexedVariableProperties( key: String, - propertySets: Seq[Map[String, String]]) + propertySets: JList[JMap[String, String]]) : Unit = { checkNotNull(key) checkNotNull(propertySets) - propertySets.zipWithIndex.foreach { case (propertySet, idx) => - propertySet.foreach { case (k, v) => - put(s"$key.$idx.$k", v) - } - } + putIndexedVariableProperties(key, propertySets.asScala.map(_.asScala.toMap)) } // ---------------------------------------------------------------------------------------------- - def getString(key: String): Option[String] = { - properties.get(key) + /** + * Returns a string value under the given key if it exists. + */ + def getOptionalString(key: String): Optional[String] = toJava(properties.get(key)) + + /** + * Returns a string value under the given existing key. + */ + def getString(key: String): String = { + get(key) } - def getCharacter(key: String): Option[Character] = getString(key) match { - case Some(c) => + /** + * Returns a character value under the given key if it exists. + */ + def getOptionalCharacter(key: String): Optional[Character] = { + val value = properties.get(key).map { c => if (c.length != 1) { throw new ValidationException(s"The value of $key must only contain one character.") } - Some(c.charAt(0)) + Char.box(c.charAt(0)) + } + toJava(value) + } - case None => None + /** + * Returns a character value under the given existing key. + */ + def getCharacter(key: String): Char = { + getOptionalCharacter(key).orElseThrow(exceptionSupplier(key)) } - def getBoolean(key: String): Option[Boolean] = getString(key) match { - case Some(b) => Some(JBoolean.parseBoolean(b)) + /** + * Returns a class value under the given key if it exists. + */ + def getOptionalClass[T](key: String, superClass: Class[T]): Optional[Class[T]] = { + val value = properties.get(key).map { name => + val clazz = try { + Class.forName( + name, + true, + Thread.currentThread().getContextClassLoader).asInstanceOf[Class[T]] + } catch { + case e: Exception => + throw new ValidationException(s"Could not get class '$name' for key '$key'.", e) + } + if (!superClass.isAssignableFrom(clazz)) { + throw new ValidationException(s"Class '$name' does not extend from the required " + + s"class '${superClass.getName}' for key '$key'.") + } + clazz + } + toJava(value) + } - case None => None + /** + * Returns a class value under the given existing key. + */ + def getClass[T](key: String, superClass: Class[T]): Class[T] = { + getOptionalClass(key, superClass).orElseThrow(exceptionSupplier(key)) } - def getInt(key: String): Option[Int] = getString(key) match { - case Some(l) => Some(JInt.parseInt(l)) + /** + * Returns a boolean value under the given key if it exists. + */ + def getOptionalBoolean(key: String): Optional[JBoolean] = { + val value = properties.get(key).map(JBoolean.parseBoolean(_)).map(Boolean.box) + toJava(value) + } + + /** + * Returns a boolean value under the given existing key. + */ + def getBoolean(key: String): Boolean = { + getOptionalBoolean(key).orElseThrow(exceptionSupplier(key)) + } - case None => None + /** + * Returns an integer value under the given key if it exists. + */ + def getOptionalInt(key: String): Optional[JInt] = { + val value = properties.get(key).map(JInt.parseInt(_)).map(Int.box) + toJava(value) } - def getLong(key: String): Option[Long] = getString(key) match { - case Some(l) => Some(JLong.parseLong(l)) + /** + * Returns an integer value under the given existing key. + */ + def getInt(key: String): Int = { + getOptionalInt(key).orElseThrow(exceptionSupplier(key)) + } - case None => None + /** + * Returns a long value under the given key if it exists. + */ + def getOptionalLong(key: String): Optional[JLong] = { + val value = properties.get(key).map(JLong.parseLong(_)).map(Long.box) + toJava(value) + } + + /** + * Returns a long value under the given existing key. + */ + def getLong(key: String): Long = { + getOptionalLong(key).orElseThrow(exceptionSupplier(key)) + } + + /** + * Returns a double value under the given key if it exists. + */ + def getOptionalDouble(key: String): Optional[JDouble] = { + val value = properties.get(key).map(JDouble.parseDouble(_)).map(Double.box) + toJava(value) + } + + /** + * Returns a double value under the given key if it exists. + */ + def getDouble(key: String): Double = { + getOptionalDouble(key).orElseThrow(exceptionSupplier(key)) } - def getDouble(key: String): Option[Double] = getString(key) match { - case Some(d) => Some(JDouble.parseDouble(d)) + /** + * Returns the type information under the given key if it exists. + */ + def getOptionalType(key: String): Optional[TypeInformation[_]] = { + val value = properties.get(key).map(TypeStringUtils.readTypeInfo) + toJava(value) + } - case None => None + /** + * Returns the type information under the given existing key. + */ + def getType(key: String): TypeInformation[_] = { + getOptionalType(key).orElseThrow(exceptionSupplier(key)) } - def getTableSchema(key: String): Option[TableSchema] = { + /** + * Returns a table schema under the given key if it exists. + */ + def getOptionalTableSchema(key: String): Optional[TableSchema] = { // filter for number of columns val fieldCount = properties .filterKeys(k => k.startsWith(key) && k.endsWith(s".$NAME")) .size if (fieldCount == 0) { - return None + return toJava(None) } // validate fields and build schema @@ -243,16 +340,186 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { ) ) } - Some(schemaBuilder.build()) + toJava(Some(schemaBuilder.build())) + } + + /** + * Returns a table schema under the given existing key. + */ + def getTableSchema(key: String): TableSchema = { + getOptionalTableSchema(key).orElseThrow(exceptionSupplier(key)) + } + + /** + * Returns the property keys of fixed indexed properties. + * + * For example: + * + * schema.fields.0.type = INT, schema.fields.0.name = test + * schema.fields.1.type = LONG, schema.fields.1.name = test2 + * + * getFixedIndexedProperties("schema.fields", List("type", "name")) leads to: + * + * 0: Map("type" -> "schema.fields.0.type", "name" -> "schema.fields.0.name") + * 1: Map("type" -> "schema.fields.1.type", "name" -> "schema.fields.1.name") + */ + def getFixedIndexedProperties( + key: String, + propertyKeys: JList[String]) + : JList[JMap[String, String]] = { + + val keys = propertyKeys.asScala + + // filter for index + val escapedKey = Pattern.quote(key) + val pattern = Pattern.compile(s"$escapedKey\\.(\\d+)\\.(.*)") + + // extract index and property keys + val indexes = properties.keys.flatMap { k => + val matcher = pattern.matcher(k) + if (matcher.find()) { + Some(JInt.parseInt(matcher.group(1))) + } else { + None + } + } + + // determine max index + val maxIndex = indexes.reduceOption(_ max _).getOrElse(-1) + + // validate and create result + val list = new util.ArrayList[JMap[String, String]]() + for (i <- 0 to maxIndex) { + val map = new util.HashMap[String, String]() + + keys.foreach { subKey => + val fullKey = s"$key.$i.$subKey" + // check for existence of full key + if (!containsKey(fullKey)) { + throw exceptionSupplier(fullKey).get() + } + map.put(subKey, fullKey) + } + + list.add(map) + } + list + } + + /** + * Returns the property keys of variable indexed properties. + * + * For example: + * + * schema.fields.0.type = INT, schema.fields.0.name = test + * schema.fields.1.type = LONG + * + * getFixedIndexedProperties("schema.fields", List("type")) leads to: + * + * 0: Map("type" -> "schema.fields.0.type", "name" -> "schema.fields.0.name") + * 1: Map("type" -> "schema.fields.1.type") + */ + def getVariableIndexedProperties( + key: String, + requiredKeys: JList[String]) + : JList[JMap[String, String]] = { + + val keys = requiredKeys.asScala + + // filter for index + val escapedKey = Pattern.quote(key) + val pattern = Pattern.compile(s"$escapedKey\\.(\\d+)\\.(.*)") + + // extract index and property keys + val indexes = properties.keys.flatMap { k => + val matcher = pattern.matcher(k) + if (matcher.find()) { + Some((JInt.parseInt(matcher.group(1)), matcher.group(2))) + } else { + None + } + } + + // determine max index + val maxIndex = indexes.map(_._1).reduceOption(_ max _).getOrElse(-1) + + // validate and create result + val list = new util.ArrayList[JMap[String, String]]() + for (i <- 0 to maxIndex) { + val map = new util.HashMap[String, String]() + + // check and add required keys + keys.foreach { subKey => + val fullKey = s"$key.$i.$subKey" + // check for existence of full key + if (!containsKey(fullKey)) { + throw exceptionSupplier(fullKey).get() + } + map.put(subKey, fullKey) + } + + // add optional keys + indexes.filter(_._1 == i).foreach { case (_, subKey) => + val fullKey = s"$key.$i.$subKey" + map.put(subKey, fullKey) + } + + list.add(map) + } + list + } + + /** + * Returns all properties under a given key that contains an index in between. + * + * E.g. rowtime.0.name -> returns all rowtime.#.name properties + */ + def getIndexedProperty(key: String, property: String): JMap[String, String] = { + val escapedKey = Pattern.quote(key) + properties.filterKeys(k => k.matches(s"$escapedKey\\.\\d+\\.$property")).asJava + } + + /** + * Returns a prefix subset of properties. + */ + def getPrefix(prefixKey: String): JMap[String, String] = { + val prefix = prefixKey + '.' + properties.filterKeys(_.startsWith(prefix)).toSeq.map{ case (k, v) => + k.substring(prefix.length) -> v // remove prefix + }.toMap.asJava } // ---------------------------------------------------------------------------------------------- + /** + * Validates a string property. + */ + def validateString( + key: String, + isOptional: Boolean) + : Unit = { + validateString(key, isOptional, 0, Integer.MAX_VALUE) + } + + /** + * Validates a string property. The boundaries are inclusive. + */ def validateString( key: String, isOptional: Boolean, - minLen: Int = 0, // inclusive - maxLen: Int = Integer.MAX_VALUE) // inclusive + minLen: Int) // inclusive + : Unit = { + validateString(key, isOptional, minLen, Integer.MAX_VALUE) + } + + /** + * Validates a string property. The boundaries are inclusive. + */ + def validateString( + key: String, + isOptional: Boolean, + minLen: Int, // inclusive + maxLen: Int) // inclusive : Unit = { if (!properties.contains(key)) { @@ -269,11 +536,35 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { } } + /** + * Validates an integer property. + */ + def validateInt( + key: String, + isOptional: Boolean) + : Unit = { + validateInt(key, isOptional, Int.MinValue, Int.MaxValue) + } + + /** + * Validates an integer property. The boundaries are inclusive. + */ def validateInt( key: String, isOptional: Boolean, - min: Int = Int.MinValue, // inclusive - max: Int = Int.MaxValue) // inclusive + min: Int) // inclusive + : Unit = { + validateInt(key, isOptional, min, Int.MaxValue) + } + + /** + * Validates an integer property. The boundaries are inclusive. + */ + def validateInt( + key: String, + isOptional: Boolean, + min: Int, // inclusive + max: Int) // inclusive : Unit = { if (!properties.contains(key)) { @@ -295,11 +586,35 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { } } + /** + * Validates a long property. + */ + def validateLong( + key: String, + isOptional: Boolean) + : Unit = { + validateLong(key, isOptional, Long.MinValue, Long.MaxValue) + } + + /** + * Validates a long property. The boundaries are inclusive. + */ + def validateLong( + key: String, + isOptional: Boolean, + min: Long) // inclusive + : Unit = { + validateLong(key, isOptional, min, Long.MaxValue) + } + + /** + * Validates a long property. The boundaries are inclusive. + */ def validateLong( key: String, isOptional: Boolean, - min: Long = Long.MinValue, // inclusive - max: Long = Long.MaxValue) // inclusive + min: Long, // inclusive + max: Long) // inclusive : Unit = { if (!properties.contains(key)) { @@ -321,6 +636,9 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { } } + /** + * Validates that a certain value is present under the given key. + */ def validateValue(key: String, value: String, isOptional: Boolean): Unit = { if (!properties.contains(key)) { if (!isOptional) { @@ -334,6 +652,9 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { } } + /** + * Validates that a boolean value is present under the given key. + */ def validateBoolean(key: String, isOptional: Boolean): Unit = { if (!properties.contains(key)) { if (!isOptional) { @@ -348,11 +669,35 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { } } + /** + * Validates a double property. + */ + def validateDouble( + key: String, + isOptional: Boolean) + : Unit = { + validateDouble(key, isOptional, Double.MinValue, Double.MaxValue) + } + + /** + * Validates a double property. The boundaries are inclusive. + */ + def validateDouble( + key: String, + isOptional: Boolean, + min: Double) // inclusive + : Unit = { + validateDouble(key, isOptional, min, Double.MaxValue) + } + + /** + * Validates a double property. The boundaries are inclusive. + */ def validateDouble( key: String, isOptional: Boolean, - min: Double = Double.MinValue, // inclusive - max: Double = Double.MaxValue) // inclusive + min: Double, // inclusive + max: Double) // inclusive : Unit = { if (!properties.contains(key)) { @@ -374,25 +719,117 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { } } + /** + * Validation for variable indexed properties. + * + * For example: + * + * schema.fields.0.type = INT, schema.fields.0.name = test + * schema.fields.1.type = LONG + * + * The propertyKeys map defines e.g. "type" and a validation logic for the given full key. + * + * The validation consumer takes the current prefix e.g. "schema.fields.1.". + */ + def validateVariableIndexedProperties( + key: String, + allowEmpty: Boolean, + propertyKeys: JMap[String, Consumer[String]], + requiredKeys: JList[String]) + : Unit = { + + val keys = propertyKeys.asScala + + // filter for index + val escapedKey = Pattern.quote(key) + val pattern = Pattern.compile(s"$escapedKey\\.(\\d+)\\.(.*)") + + // extract index and property keys + val indexes = properties.keys.flatMap { k => + val matcher = pattern.matcher(k) + if (matcher.find()) { + Some(JInt.parseInt(matcher.group(1))) + } else { + None + } + } + + // determine max index + val maxIndex = indexes.reduceOption(_ max _).getOrElse(-1) + + if (maxIndex < 0 && !allowEmpty) { + throw new ValidationException(s"Property key '$key' must not be empty.") + } + + // validate + for (i <- 0 to maxIndex) { + keys.foreach { case (subKey, validation) => + val fullKey = s"$key.$i.$subKey" + // only validate if it exists + if (properties.contains(fullKey)) { + validation.accept(s"$key.$i.") + } else { + // check if it is required + if (requiredKeys.contains(subKey)) { + throw new ValidationException(s"Required property key '$fullKey' is missing.") + } + } + } + } + } + + /** + * Validation for fixed indexed properties. + * + * For example: + * + * schema.fields.0.type = INT, schema.fields.0.name = test + * schema.fields.1.type = LONG, schema.fields.1.name = test2 + * + * The propertyKeys map must define e.g. "type" and "name" and a validation logic for the + * given full key. + */ + def validateFixedIndexedProperties( + key: String, + allowEmpty: Boolean, + propertyKeys: JMap[String, Consumer[String]]) + : Unit = { + + validateVariableIndexedProperties( + key, + allowEmpty, + propertyKeys, + new util.ArrayList(propertyKeys.keySet())) + } + + /** + * Validates a table schema property. + */ def validateTableSchema(key: String, isOptional: Boolean): Unit = { - // filter for name columns - val names = getIndexedProperty(key, NAME) - // filter for type columns - val types = getIndexedProperty(key, TYPE) - if (names.isEmpty && types.isEmpty && !isOptional) { - throw new ValidationException( - s"Could not find the required schema for property '$key'.") + val nameValidation = (prefix: String) => { + validateString(prefix + NAME, isOptional = false, minLen = 1) } - for (i <- 0 until Math.max(names.size, types.size)) { - validateString(s"$key.$i.$NAME", isOptional = false, minLen = 1) - validateType(s"$key.$i.$TYPE", isOptional = false) + val typeValidation = (prefix: String) => { + validateType(prefix + TYPE, isOptional = false) } + + validateFixedIndexedProperties( + key, + isOptional, + Map( + NAME -> toJava(nameValidation), + TYPE -> toJava(typeValidation) + ).asJava + ) } + /** + * Validates a enum property with a set of validation logic for each enum value. + */ def validateEnum( key: String, isOptional: Boolean, - enumToValidation: Map[String, () => Unit]) + enumToValidation: JMap[String, Consumer[String]]) : Unit = { if (!properties.contains(key)) { @@ -401,15 +838,26 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { } } else { val value = properties(key) - if (!enumToValidation.contains(value)) { + if (!enumToValidation.containsKey(value)) { throw new ValidationException(s"Unknown value for property '$key'. " + - s"Supported values [${enumToValidation.keys.mkString(", ")}] but was: $value") + s"Supported values [${enumToValidation.keySet().asScala.mkString(", ")}] but was: $value") } else { - enumToValidation(value).apply() // run validation logic + // run validation logic + enumToValidation.get(value).accept(key) } } } + /** + * Validates a enum property with a set of enum values. + */ + def validateEnumValues(key: String, isOptional: Boolean, values: JList[String]): Unit = { + validateEnum(key, isOptional, values.asScala.map((_, noValidation())).toMap.asJava) + } + + /** + * Validates a type property. + */ def validateType(key: String, isOptional: Boolean): Unit = { if (!properties.contains(key)) { if (!isOptional) { @@ -420,6 +868,9 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { } } + /** + * Validates that the given prefix is not included in these properties. + */ def validatePrefixExclusion(prefix: String): Unit = { val invalidField = properties.find(_._1.startsWith(prefix)) if (invalidField.isDefined) { @@ -428,6 +879,9 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { } } + /** + * Validates that the given key is not included in these properties. + */ def validateExclusion(key: String): Unit = { if (properties.contains(key)) { throw new ValidationException(s"Property '$key' is not allowed in this context.") @@ -436,28 +890,159 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { // ---------------------------------------------------------------------------------------------- - def getIndexedProperty(key: String, property: String): Map[String, String] = { - val escapedKey = Pattern.quote(key) - properties.filterKeys(k => k.matches(s"$escapedKey\\.\\d+\\.$property")).toMap + /** + * Returns if any property contains parts of a given string. + */ + def containsString(str: String): Boolean = { + properties.exists(e => e._1.contains(str)) } - def contains(str: String): Boolean = { - properties.exists(e => e._1.contains(str)) + /** + * Returns if the given key is contained. + */ + def containsKey(key: String): Boolean = { + properties.contains(key) } + /** + * Returns if a given prefix exists in the properties. + */ def hasPrefix(prefix: String): Boolean = { properties.exists(e => e._1.startsWith(prefix)) } - def asMap: Map[String, String] = { - properties.toMap + /** + * Returns a Scala Map. + */ + def asMap: JMap[String, String] = { + properties.toMap.asJava + } + + // ---------------------------------------------------------------------------------------------- + + /** + * Returns an empty validation logic. + */ + def noValidation(): Consumer[String] = DescriptorProperties.emptyConsumer + + def exceptionSupplier(key: String): Supplier[TableException] = new Supplier[TableException] { + override def get(): TableException = { + new TableException(s"Property with key '$key' could not be found. " + + s"This is a bug because the validation logic should have checked that before.") + } + } + + // ---------------------------------------------------------------------------------------------- + + /** + * Adds a property. + */ + private def put(key: String, value: String): Unit = { + if (properties.contains(key)) { + throw new IllegalStateException("Property already present.") + } + if (normalizeKeys) { + properties.put(key.toLowerCase, value) + } else { + properties.put(key, value) + } + } + + /** + * Gets an existing property. + */ + private def get(key: String): String = { + properties.getOrElse( + key, + throw exceptionSupplier(key).get()) + } + + /** + * Raw access to the underlying properties map for testing purposes. + */ + private[flink] def unsafePut(key: String, value: String): Unit = { + properties.put(key, value) + } + + /** + * Raw access to the underlying properties map for testing purposes. + */ + private[flink] def unsafeRemove(key: String): Unit = { + properties.remove(key) + } + + /** + * Adds a table schema under the given key. + */ + private def putTableSchema(key: String, nameAndType: Seq[(String, String)]): Unit = { + putIndexedFixedProperties( + key, + Seq(NAME, TYPE), + nameAndType.map(t => Seq(t._1, t._2)) + ) + } + + /** + * Adds an indexed sequence of properties (with sub-properties) under a common key. + * + * For example: + * + * schema.fields.0.type = INT, schema.fields.0.name = test + * schema.fields.1.type = LONG, schema.fields.1.name = test2 + * + * The arity of each propertyValue must match the arity of propertyKeys. + */ + private def putIndexedFixedProperties( + key: String, + propertyKeys: Seq[String], + propertyValues: Seq[Seq[String]]) + : Unit = { + checkNotNull(key) + checkNotNull(propertyValues) + propertyValues.zipWithIndex.foreach { case (values, idx) => + if (values.lengthCompare(propertyKeys.size) != 0) { + throw new ValidationException("Values must have same arity as keys.") + } + values.zipWithIndex.foreach { case (value, keyIdx) => + put(s"$key.$idx.${propertyKeys(keyIdx)}", value) + } + } + } + + /** + * Adds an indexed mapping of properties under a common key. + * + * For example: + * + * schema.fields.0.type = INT, schema.fields.0.name = test + * schema.fields.1.name = test2 + * + * The arity of the propertySets can differ. + */ + private def putIndexedVariableProperties( + key: String, + propertySets: Seq[Map[String, String]]) + : Unit = { + checkNotNull(key) + checkNotNull(propertySets) + propertySets.zipWithIndex.foreach { case (propertySet, idx) => + propertySet.foreach { case (k, v) => + put(s"$key.$idx.$k", v) + } + } } } object DescriptorProperties { - val TYPE = "type" - val NAME = "name" + private val emptyConsumer: Consumer[String] = new Consumer[String] { + override def accept(t: String): Unit = { + // nothing to do + } + } + + val TYPE: String = "type" + val NAME: String = "name" // the string representation should be equal to SqlTypeName def normalizeTypeInfo(typeInfo: TypeInformation[_]): String = { @@ -487,6 +1072,24 @@ object DescriptorProperties { } } + def deserialize[T](data: String, expected: Class[T]): T = { + try { + val byteData = Base64.decodeBase64(data) + val obj = InstantiationUtil.deserializeObject[T]( + byteData, + Thread.currentThread.getContextClassLoader) + if (!expected.isAssignableFrom(obj.getClass)) { + throw new ValidationException( + s"Serialized data contains an object of unexpected type. " + + s"Expected '${expected.getName}' but was '${obj.getClass.getName}'") + } + obj + } catch { + case e: Exception => + throw new ValidationException(s"Could not deserialize data: '$data'", e) + } + } + def toString(keyOrValue: String): String = { StringEscapeUtils.escapeJava(keyOrValue) } @@ -494,4 +1097,24 @@ object DescriptorProperties { def toString(key: String, value: String): String = { toString(key) + "=" + toString(value) } + + // the following methods help for Scala <-> Java interfaces + // most of these methods are not necessary once we upgraded to Scala 2.12 + + def toJava[T](option: Option[T]): Optional[T] = option match { + case Some(v) => Optional.of(v) + case None => Optional.empty() + } + + def toScala[T](option: Optional[T]): Option[T] = Option(option.orElse(null.asInstanceOf[T])) + + def toJava[T](func: Function[T, Unit]): Consumer[T] = new Consumer[T] { + override def accept(t: T): Unit = { + func.apply(t) + } + } + + def toJava[T0, T1](tuple: (T0, T1)): JTuple2[T0, T1] = { + new JTuple2[T0, T1](tuple._1, tuple._2) + } }
http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala index b1d900f..f306b5a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala @@ -23,7 +23,8 @@ import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, C /** * Connector descriptor for a file system. */ -class FileSystem extends ConnectorDescriptor(CONNECTOR_TYPE_VALUE, version = 1) { +class FileSystem extends ConnectorDescriptor( + CONNECTOR_TYPE_VALUE, version = 1, formatNeeded = true) { private var path: Option[String] = None @@ -43,8 +44,6 @@ class FileSystem extends ConnectorDescriptor(CONNECTOR_TYPE_VALUE, version = 1) override protected def addConnectorProperties(properties: DescriptorProperties): Unit = { path.foreach(properties.putString(CONNECTOR_PATH, _)) } - - override private[flink] def needsFormat() = true } /** http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptor.scala index 86f6229..bca67c6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptor.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptor.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.descriptors -import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_VERSION} +import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_PROPERTY_VERSION} /** * Describes the format of data. @@ -37,7 +37,7 @@ abstract class FormatDescriptor( */ final private[flink] def addProperties(properties: DescriptorProperties): Unit = { properties.putString(FORMAT_TYPE, tpe) - properties.putInt(FORMAT_VERSION, version) + properties.putInt(FORMAT_PROPERTY_VERSION, version) addFormatProperties(properties) } http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala index 1aaa399..301189a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.descriptors -import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_VERSION} +import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE} /** * Validator for [[FormatDescriptor]]. @@ -27,13 +27,32 @@ class FormatDescriptorValidator extends DescriptorValidator { override def validate(properties: DescriptorProperties): Unit = { properties.validateString(FORMAT_TYPE, isOptional = false, minLen = 1) - properties.validateInt(FORMAT_VERSION, isOptional = true, 0, Integer.MAX_VALUE) + properties.validateInt(FORMAT_PROPERTY_VERSION, isOptional = true, 0, Integer.MAX_VALUE) } } object FormatDescriptorValidator { + /** + * Key for describing the type of the format. Usually used for factory discovery. + */ val FORMAT_TYPE = "format.type" + + /** + * Key for describing the property version. This property can be used for backwards + * compatibility in case the property format changes. + */ + val FORMAT_PROPERTY_VERSION = "format.property-version" + + /** + * Key for describing the version of the format. This property can be used for different + * format versions (e.g. Avro 1.8.2 or Avro 2.0). + */ val FORMAT_VERSION = "format.version" + /** + * Key for deriving the schema of the format from the table's schema. + */ + val FORMAT_DERIVE_SCHEMA = "format.derive-schema" + } http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Json.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Json.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Json.scala deleted file mode 100644 index cc46d9c..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Json.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors - -import org.apache.flink.table.descriptors.JsonValidator.{FORMAT_FAIL_ON_MISSING_FIELD, FORMAT_SCHEMA_STRING, FORMAT_TYPE_VALUE} - -/** - * Encoding descriptor for JSON. - */ -class Json extends FormatDescriptor(FORMAT_TYPE_VALUE, version = 1) { - - private var failOnMissingField: Option[Boolean] = None - - private var schema: Option[String] = None - - /** - * Sets flag whether to fail if a field is missing or not. - * - * @param failOnMissingField If set to true, the operation fails if there is a missing field. - * If set to false, a missing field is set to null. - * @return The builder. - */ - def failOnMissingField(failOnMissingField: Boolean): Json = { - this.failOnMissingField = Some(failOnMissingField) - this - } - - /** - * Sets the JSON schema string with field names and the types according to the JSON schema - * specification [[http://json-schema.org/specification.html]]. Required. - * - * The schema might be nested. - * - * @param schema JSON schema - */ - def schema(schema: String): Json = { - this.schema = Some(schema) - this - } - - /** - * Internal method for format properties conversion. - */ - override protected def addFormatProperties(properties: DescriptorProperties): Unit = { - // we distinguish between "schema string" and "schema" to allow parsing of a - // schema object in the future (such that the entire JSON schema can be defined in a YAML - // file instead of one large string) - schema.foreach(properties.putString(FORMAT_SCHEMA_STRING, _)) - failOnMissingField.foreach(properties.putBoolean(FORMAT_FAIL_ON_MISSING_FIELD, _)) - } -} - -/** - * Encoding descriptor for JSON. - */ -object Json { - - /** - * Encoding descriptor for JSON. - */ - def apply(): Json = new Json() -} http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JsonValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JsonValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JsonValidator.scala deleted file mode 100644 index 9f11caf..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JsonValidator.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors - -import org.apache.flink.table.descriptors.JsonValidator.{FORMAT_FAIL_ON_MISSING_FIELD, FORMAT_SCHEMA_STRING} - -/** - * Validator for [[Json]]. - */ -class JsonValidator extends FormatDescriptorValidator { - - override def validate(properties: DescriptorProperties): Unit = { - super.validate(properties) - properties.validateString(FORMAT_SCHEMA_STRING, isOptional = false, minLen = 1) - properties.validateBoolean(FORMAT_FAIL_ON_MISSING_FIELD, isOptional = true) - } -} - -object JsonValidator { - - val FORMAT_TYPE_VALUE = "json" - val FORMAT_SCHEMA_STRING = "format.schema-string" - val FORMAT_FAIL_ON_MISSING_FIELD = "format.fail-on-missing-field" - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala index a8d580c..6631e22 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.descriptors -import org.apache.flink.table.descriptors.MetadataValidator.{METADATA_COMMENT, METADATA_CREATION_TIME, METADATA_LAST_ACCESS_TIME, METADATA_VERSION} +import org.apache.flink.table.descriptors.MetadataValidator.{METADATA_COMMENT, METADATA_CREATION_TIME, METADATA_LAST_ACCESS_TIME, METADATA_PROPERTY_VERSION} /** * Validator for [[Metadata]]. @@ -26,7 +26,7 @@ import org.apache.flink.table.descriptors.MetadataValidator.{METADATA_COMMENT, M class MetadataValidator extends DescriptorValidator { override def validate(properties: DescriptorProperties): Unit = { - properties.validateInt(METADATA_VERSION, isOptional = true, 0, Integer.MAX_VALUE) + properties.validateInt(METADATA_PROPERTY_VERSION, isOptional = true, 0, Integer.MAX_VALUE) properties.validateString(METADATA_COMMENT, isOptional = true) properties.validateLong(METADATA_CREATION_TIME, isOptional = true) properties.validateLong(METADATA_LAST_ACCESS_TIME, isOptional = true) @@ -35,7 +35,7 @@ class MetadataValidator extends DescriptorValidator { object MetadataValidator { - val METADATA_VERSION = "metadata.version" + val METADATA_PROPERTY_VERSION = "metadata.property-version" val METADATA_COMMENT = "metadata.comment" val METADATA_CREATION_TIME = "metadata.creation-time" val METADATA_LAST_ACCESS_TIME = "metadata.last-access-time" http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala index a1c80f5..ed3854d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala @@ -19,11 +19,12 @@ package org.apache.flink.table.descriptors import org.apache.flink.table.api.Types -import org.apache.flink.table.descriptors.RowtimeValidator.{ROWTIME, ROWTIME_VERSION, normalizeTimestampExtractor, normalizeWatermarkStrategy} +import org.apache.flink.table.descriptors.RowtimeValidator.{normalizeTimestampExtractor, normalizeWatermarkStrategy} import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor} import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy} import scala.collection.mutable +import scala.collection.JavaConverters._ /** * Rowtime descriptor for describing an event time attribute in the schema. @@ -111,12 +112,9 @@ class Rowtime extends Descriptor { */ final override def addProperties(properties: DescriptorProperties): Unit = { val props = mutable.HashMap[String, String]() - props.put(ROWTIME_VERSION, "1") timestampExtractor.foreach(normalizeTimestampExtractor(_).foreach(e => props.put(e._1, e._2))) watermarkStrategy.foreach(normalizeWatermarkStrategy(_).foreach(e => props.put(e._1, e._2))) - - // use a list for the rowtime to support multiple rowtime attributes in the future - properties.putIndexedVariableProperties(ROWTIME, Seq(props.toMap)) + properties.putProperties(props.toMap.asJava) } } http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala index 74e49f1..fdec820 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala @@ -18,58 +18,62 @@ package org.apache.flink.table.descriptors -import org.apache.flink.table.descriptors.DescriptorProperties.serialize +import org.apache.flink.table.descriptors.DescriptorProperties.{serialize, toJava} import org.apache.flink.table.descriptors.RowtimeValidator._ import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor} import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy} +import scala.collection.JavaConverters._ + /** * Validator for [[Rowtime]]. */ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator { override def validate(properties: DescriptorProperties): Unit = { - properties.validateInt(prefix + ROWTIME_VERSION, isOptional = true, 0, Integer.MAX_VALUE) - - val noValidation = () => {} - - val timestampExistingField = () => { - properties.validateString(prefix + TIMESTAMPS_FROM, isOptional = false, minLen = 1) + val timestampExistingField = (_: String) => { + properties.validateString( + prefix + ROWTIME_TIMESTAMPS_FROM, isOptional = false, minLen = 1) } - val timestampCustom = () => { - properties.validateString(prefix + TIMESTAMPS_CLASS, isOptional = false, minLen = 1) - properties.validateString(prefix + TIMESTAMPS_SERIALIZED, isOptional = false, minLen = 1) + val timestampCustom = (_: String) => { + properties.validateString( + prefix + ROWTIME_TIMESTAMPS_CLASS, isOptional = false, minLen = 1) + properties.validateString( + prefix + ROWTIME_TIMESTAMPS_SERIALIZED, isOptional = false, minLen = 1) } properties.validateEnum( - prefix + TIMESTAMPS_TYPE, + prefix + ROWTIME_TIMESTAMPS_TYPE, isOptional = false, Map( - TIMESTAMPS_TYPE_VALUE_FROM_FIELD -> timestampExistingField, - TIMESTAMPS_TYPE_VALUE_FROM_SOURCE -> noValidation, - TIMESTAMPS_TYPE_VALUE_CUSTOM -> timestampCustom - ) + ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD -> toJava(timestampExistingField), + ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE -> properties.noValidation(), + ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM -> toJava(timestampCustom) + ).asJava ) - val watermarkPeriodicBounding = () => { - properties.validateLong(prefix + WATERMARKS_DELAY, isOptional = false, min = 0) + val watermarkPeriodicBounded = (_: String) => { + properties.validateLong( + prefix + ROWTIME_WATERMARKS_DELAY, isOptional = false, min = 0) } - val watermarkCustom = () => { - properties.validateString(prefix + WATERMARKS_CLASS, isOptional = false, minLen = 1) - properties.validateString(prefix + WATERMARKS_SERIALIZED, isOptional = false, minLen = 1) + val watermarkCustom = (_: String) => { + properties.validateString( + prefix + ROWTIME_WATERMARKS_CLASS, isOptional = false, minLen = 1) + properties.validateString( + prefix + ROWTIME_WATERMARKS_SERIALIZED, isOptional = false, minLen = 1) } properties.validateEnum( - prefix + WATERMARKS_TYPE, + prefix + ROWTIME_WATERMARKS_TYPE, isOptional = false, Map( - WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING -> noValidation, - WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING -> watermarkPeriodicBounding, - WATERMARKS_TYPE_VALUE_FROM_SOURCE -> noValidation, - WATERMARKS_TYPE_VALUE_CUSTOM -> watermarkCustom - ) + ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING -> properties.noValidation(), + ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED -> toJava(watermarkPeriodicBounded), + ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE -> properties.noValidation(), + ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM -> toJava(watermarkCustom) + ).asJava ) } } @@ -77,58 +81,113 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator { object RowtimeValidator { val ROWTIME = "rowtime" - - // per rowtime properties - - val ROWTIME_VERSION = "version" - val TIMESTAMPS_TYPE = "timestamps.type" - val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" - val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" - val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" - val TIMESTAMPS_FROM = "timestamps.from" - val TIMESTAMPS_CLASS = "timestamps.class" - val TIMESTAMPS_SERIALIZED = "timestamps.serialized" - - val WATERMARKS_TYPE = "watermarks.type" - val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending" - val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding" - val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source" - val WATERMARKS_TYPE_VALUE_CUSTOM = "custom" - val WATERMARKS_CLASS = "watermarks.class" - val WATERMARKS_SERIALIZED = "watermarks.serialized" - val WATERMARKS_DELAY = "watermarks.delay" + val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" + val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" + val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from" + val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class" + val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized" + + val ROWTIME_WATERMARKS_TYPE = "rowtime.watermarks.type" + val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending" + val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED = "periodic-bounded" + val ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source" + val ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM = "custom" + val ROWTIME_WATERMARKS_CLASS = "rowtime.watermarks.class" + val ROWTIME_WATERMARKS_SERIALIZED = "rowtime.watermarks.serialized" + val ROWTIME_WATERMARKS_DELAY = "rowtime.watermarks.delay" // utilities def normalizeTimestampExtractor(extractor: TimestampExtractor): Map[String, String] = extractor match { + case existing: ExistingField => Map( - TIMESTAMPS_TYPE -> TIMESTAMPS_TYPE_VALUE_FROM_FIELD, - TIMESTAMPS_FROM -> existing.getArgumentFields.apply(0)) + ROWTIME_TIMESTAMPS_TYPE -> ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD, + ROWTIME_TIMESTAMPS_FROM -> existing.getArgumentFields.apply(0)) + case _: StreamRecordTimestamp => - Map(TIMESTAMPS_TYPE -> TIMESTAMPS_TYPE_VALUE_FROM_SOURCE) + Map(ROWTIME_TIMESTAMPS_TYPE -> ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE) + case _: TimestampExtractor => Map( - TIMESTAMPS_TYPE -> TIMESTAMPS_TYPE_VALUE_CUSTOM, - TIMESTAMPS_CLASS -> extractor.getClass.getName, - TIMESTAMPS_SERIALIZED -> serialize(extractor)) + ROWTIME_TIMESTAMPS_TYPE -> ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM, + ROWTIME_TIMESTAMPS_CLASS -> extractor.getClass.getName, + ROWTIME_TIMESTAMPS_SERIALIZED -> serialize(extractor)) } def normalizeWatermarkStrategy(strategy: WatermarkStrategy): Map[String, String] = strategy match { + case _: AscendingTimestamps => - Map(WATERMARKS_TYPE -> WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING) - case bounding: BoundedOutOfOrderTimestamps => + Map(ROWTIME_WATERMARKS_TYPE -> ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING) + + case bounded: BoundedOutOfOrderTimestamps => Map( - WATERMARKS_TYPE -> WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING, - WATERMARKS_DELAY -> bounding.delay.toString) + ROWTIME_WATERMARKS_TYPE -> ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED, + ROWTIME_WATERMARKS_DELAY -> bounded.delay.toString) + case _: PreserveWatermarks => - Map(WATERMARKS_TYPE -> WATERMARKS_TYPE_VALUE_FROM_SOURCE) + Map(ROWTIME_WATERMARKS_TYPE -> ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE) + case _: WatermarkStrategy => Map( - WATERMARKS_TYPE -> WATERMARKS_TYPE_VALUE_CUSTOM, - WATERMARKS_CLASS -> strategy.getClass.getName, - WATERMARKS_SERIALIZED -> serialize(strategy)) + ROWTIME_WATERMARKS_TYPE -> ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM, + ROWTIME_WATERMARKS_CLASS -> strategy.getClass.getName, + ROWTIME_WATERMARKS_SERIALIZED -> serialize(strategy)) } + + def getRowtimeComponents(properties: DescriptorProperties, prefix: String) + : Option[(TimestampExtractor, WatermarkStrategy)] = { + + // create timestamp extractor + val t = properties.getOptionalString(prefix + ROWTIME_TIMESTAMPS_TYPE) + if (!t.isPresent) { + return None + } + val extractor: TimestampExtractor = t.get() match { + + case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD => + val field = properties.getString(prefix + ROWTIME_TIMESTAMPS_FROM) + new ExistingField(field) + + case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE => + new StreamRecordTimestamp + + case ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM => + val clazz = properties.getClass( + ROWTIME_TIMESTAMPS_CLASS, + classOf[TimestampExtractor]) + DescriptorProperties.deserialize( + properties.getString(prefix + ROWTIME_TIMESTAMPS_SERIALIZED), + clazz) + } + + // create watermark strategy + val s = properties.getString(prefix + ROWTIME_WATERMARKS_TYPE) + val strategy: WatermarkStrategy = s match { + + case ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING => + new AscendingTimestamps() + + case ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED => + val delay = properties.getLong(prefix + ROWTIME_WATERMARKS_DELAY) + new BoundedOutOfOrderTimestamps(delay) + + case ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE => + PreserveWatermarks.INSTANCE + + case ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM => + val clazz = properties.getClass( + prefix + ROWTIME_WATERMARKS_CLASS, + classOf[WatermarkStrategy]) + DescriptorProperties.deserialize( + properties.getString(prefix + ROWTIME_WATERMARKS_SERIALIZED), + clazz) + } + + Some((extractor, strategy)) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala index 2f3a389..fcbb2c7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala @@ -24,6 +24,7 @@ import org.apache.flink.table.descriptors.DescriptorProperties.{normalizeTableSc import org.apache.flink.table.descriptors.SchemaValidator._ import scala.collection.mutable +import scala.collection.JavaConverters._ /** * Describes a schema of a table. @@ -80,7 +81,7 @@ class Schema extends Descriptor { } val fieldProperties = mutable.LinkedHashMap[String, String]() - fieldProperties += (TYPE -> fieldType) + fieldProperties += (SCHEMA_TYPE -> fieldType) tableSchema += (fieldName -> fieldProperties) @@ -100,7 +101,7 @@ class Schema extends Descriptor { lastField match { case None => throw new ValidationException("No field previously defined. Use field() before.") case Some(f) => - tableSchema(f) += (FROM -> originFieldName) + tableSchema(f) += (SCHEMA_FROM -> originFieldName) lastField = None } this @@ -115,7 +116,7 @@ class Schema extends Descriptor { lastField match { case None => throw new ValidationException("No field defined previously. Use field() before.") case Some(f) => - tableSchema(f) += (PROCTIME -> PROCTIME_VALUE_TRUE) + tableSchema(f) += (SCHEMA_PROCTIME -> "true") lastField = None } this @@ -132,7 +133,7 @@ class Schema extends Descriptor { case Some(f) => val fieldProperties = new DescriptorProperties() rowtime.addProperties(fieldProperties) - tableSchema(f) ++= fieldProperties.asMap + tableSchema(f) ++= fieldProperties.asMap.asScala lastField = None } this @@ -142,12 +143,11 @@ class Schema extends Descriptor { * Internal method for properties conversion. */ final override private[flink] def addProperties(properties: DescriptorProperties): Unit = { - properties.putInt(SCHEMA_VERSION, 1) properties.putIndexedVariableProperties( SCHEMA, tableSchema.toSeq.map { case (name, props) => - Map(NAME -> name) ++ props - } + (Map(SCHEMA_NAME -> name) ++ props).asJava + }.asJava ) } } http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala index 19c0e41..0a23911 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala @@ -18,9 +18,17 @@ package org.apache.flink.table.descriptors -import org.apache.flink.table.api.ValidationException -import org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME +import java.util +import java.util.Optional + +import org.apache.flink.table.api.{TableSchema, ValidationException} +import org.apache.flink.table.descriptors.DescriptorProperties.{toJava, toScala} +import org.apache.flink.table.descriptors.RowtimeValidator.{ROWTIME, ROWTIME_TIMESTAMPS_TYPE} import org.apache.flink.table.descriptors.SchemaValidator._ +import org.apache.flink.table.sources.RowtimeAttributeDescriptor + +import scala.collection.JavaConverters._ +import scala.collection.mutable /** * Validator for [[Schema]]. @@ -28,29 +36,39 @@ import org.apache.flink.table.descriptors.SchemaValidator._ class SchemaValidator(isStreamEnvironment: Boolean = true) extends DescriptorValidator { override def validate(properties: DescriptorProperties): Unit = { - properties.validateInt(SCHEMA_VERSION, isOptional = true, 0, Integer.MAX_VALUE) - - val names = properties.getIndexedProperty(SCHEMA, NAME) - val types = properties.getIndexedProperty(SCHEMA, TYPE) + val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME) + val types = properties.getIndexedProperty(SCHEMA, SCHEMA_TYPE) if (names.isEmpty && types.isEmpty) { - throw new ValidationException(s"Could not find the required schema for property '$SCHEMA'.") + throw new ValidationException( + s"Could not find the required schema in property '$SCHEMA'.") } + var proctimeFound = false + for (i <- 0 until Math.max(names.size, types.size)) { - properties.validateString(s"$SCHEMA.$i.$NAME", isOptional = false, minLen = 1) - properties.validateType(s"$SCHEMA.$i.$TYPE", isOptional = false) - properties.validateString(s"$SCHEMA.$i.$FROM", isOptional = true, minLen = 1) + properties + .validateString(s"$SCHEMA.$i.$SCHEMA_NAME", isOptional = false, minLen = 1) + properties + .validateType(s"$SCHEMA.$i.$SCHEMA_TYPE", isOptional = false) + properties + .validateString(s"$SCHEMA.$i.$SCHEMA_FROM", isOptional = true, minLen = 1) // either proctime or rowtime - val proctime = s"$SCHEMA.$i.$PROCTIME" + val proctime = s"$SCHEMA.$i.$SCHEMA_PROCTIME" val rowtime = s"$SCHEMA.$i.$ROWTIME" - if (properties.contains(proctime)) { + if (properties.containsKey(proctime)) { + // check the environment if (!isStreamEnvironment) { throw new ValidationException( s"Property '$proctime' is not allowed in a batch environment.") } + // check for only one proctime attribute + else if (proctimeFound) { + throw new ValidationException("A proctime attribute must only be defined once.") + } // check proctime properties.validateBoolean(proctime, isOptional = false) + proctimeFound = properties.getBoolean(proctime) // no rowtime properties.validatePrefixExclusion(rowtime) } else if (properties.hasPrefix(rowtime)) { @@ -67,14 +85,129 @@ class SchemaValidator(isStreamEnvironment: Boolean = true) extends DescriptorVal object SchemaValidator { val SCHEMA = "schema" - val SCHEMA_VERSION = "schema.version" + val SCHEMA_NAME = "name" + val SCHEMA_TYPE = "type" + val SCHEMA_PROCTIME = "proctime" + val SCHEMA_FROM = "from" + + // utilities + + /** + * Finds the proctime attribute if defined. + */ + def deriveProctimeAttribute(properties: DescriptorProperties): Optional[String] = { + val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME) + + for (i <- 0 until names.size) { + val isProctime = toScala( + properties.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")) + isProctime.foreach { isSet => + if (isSet) { + return toJava(names.asScala.get(s"$SCHEMA.$i.$SCHEMA_NAME")) + } + } + } + toJava(None) + } + + /** + * Finds the rowtime attributes if defined. + */ + def deriveRowtimeAttributes(properties: DescriptorProperties) + : util.List[RowtimeAttributeDescriptor] = { + + val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME) + + var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]() + + // check for rowtime in every field + for (i <- 0 until names.size) { + RowtimeValidator + .getRowtimeComponents(properties, s"$SCHEMA.$i.") + .foreach { case (extractor, strategy) => + // create descriptor + attributes += new RowtimeAttributeDescriptor( + properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME"), + extractor, + strategy) + } + } + + attributes.asJava + } + + /** + * Finds a table source field mapping. + */ + def deriveFieldMapping( + properties: DescriptorProperties, + sourceSchema: Optional[TableSchema]) + : util.Map[String, String] = { + + val mapping = mutable.Map[String, String]() + + val schema = properties.getTableSchema(SCHEMA) + + // add all schema fields first for implicit mappings + schema.getColumnNames.foreach { name => + mapping.put(name, name) + } + + val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME) + + for (i <- 0 until names.size) { + val name = properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME") + toScala(properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")) match { - // per column properties + // add explicit mapping + case Some(source) => + mapping.put(name, source) - val NAME = "name" - val TYPE = "type" - val PROCTIME = "proctime" - val PROCTIME_VALUE_TRUE = "true" - val FROM = "from" + // implicit mapping or time + case None => + val isProctime = properties + .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME") + .orElse(false) + val isRowtime = properties + .containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE") + // remove proctime/rowtime from mapping + if (isProctime || isRowtime) { + mapping.remove(name) + } + // check for invalid fields + else if (toScala(sourceSchema).forall(s => !s.getColumnNames.contains(name))) { + throw new ValidationException(s"Could not map the schema field '$name' to a field " + + s"from source. Please specify the source field from which it can be derived.") + } + } + } + mapping.toMap.asJava + } + + /** + * Finds the fields that can be used for a format schema (without time attributes). + */ + def deriveFormatFields(properties: DescriptorProperties): TableSchema = { + + val builder = TableSchema.builder() + + val schema = properties.getTableSchema(SCHEMA) + + schema.getColumnNames.zip(schema.getTypes).zipWithIndex.foreach { case ((n, t), i) => + val isProctime = properties + .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME") + .orElse(false) + val isRowtime = properties + .containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE") + if (!isProctime && !isRowtime) { + // check for a aliasing + val fieldName = properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM") + .orElse(n) + builder.field(fieldName, t) + } + } + + builder.build() + } } http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala index 3037286..f87a868 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala @@ -135,12 +135,12 @@ class Statistics extends Descriptor { * Internal method for properties conversion. */ final override def addProperties(properties: DescriptorProperties): Unit = { - properties.putInt(STATISTICS_VERSION, 1) + properties.putInt(STATISTICS_PROPERTY_VERSION, 1) rowCount.foreach(rc => properties.putLong(STATISTICS_ROW_COUNT, rc)) val namedStats = columnStats.map { case (name, stats) => // name should not be part of the properties key - (stats + (NAME -> name)).toMap - }.toSeq + (stats + (NAME -> name)).toMap.asJava + }.toList.asJava properties.putIndexedVariableProperties(STATISTICS_COLUMNS, namedStats) } } http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala index a78e422..691cb21 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala @@ -19,7 +19,8 @@ package org.apache.flink.table.descriptors import org.apache.flink.table.api.ValidationException -import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, STATISTICS_VERSION, validateColumnStats} +import org.apache.flink.table.descriptors.DescriptorProperties.toScala +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_PROPERTY_VERSION, STATISTICS_ROW_COUNT, validateColumnStats} import org.apache.flink.table.plan.stats.ColumnStats import scala.collection.mutable @@ -30,7 +31,7 @@ import scala.collection.mutable class StatisticsValidator extends DescriptorValidator { override def validate(properties: DescriptorProperties): Unit = { - properties.validateInt(STATISTICS_VERSION, isOptional = true, 0, Integer.MAX_VALUE) + properties.validateInt(STATISTICS_PROPERTY_VERSION, isOptional = true, 0, Integer.MAX_VALUE) properties.validateLong(STATISTICS_ROW_COUNT, isOptional = true, min = 0) validateColumnStats(properties, STATISTICS_COLUMNS) } @@ -38,7 +39,7 @@ class StatisticsValidator extends DescriptorValidator { object StatisticsValidator { - val STATISTICS_VERSION = "statistics.version" + val STATISTICS_PROPERTY_VERSION = "statistics.property-version" val STATISTICS_ROW_COUNT = "statistics.row-count" val STATISTICS_COLUMNS = "statistics.columns" @@ -99,16 +100,16 @@ object StatisticsValidator { val columnCount = properties.getIndexedProperty(key, NAME).size val stats = for (i <- 0 until columnCount) yield { - val name = properties.getString(s"$key.$i.$NAME").getOrElse( + val name = toScala(properties.getOptionalString(s"$key.$i.$NAME")).getOrElse( throw new ValidationException(s"Could not find name of property '$key.$i.$NAME'.")) val stats = ColumnStats( - properties.getLong(s"$key.$i.$DISTINCT_COUNT").map(v => Long.box(v)).orNull, - properties.getLong(s"$key.$i.$NULL_COUNT").map(v => Long.box(v)).orNull, - properties.getDouble(s"$key.$i.$AVG_LENGTH").map(v => Double.box(v)).orNull, - properties.getInt(s"$key.$i.$MAX_LENGTH").map(v => Int.box(v)).orNull, - properties.getDouble(s"$key.$i.$MAX_VALUE").map(v => Double.box(v)).orNull, - properties.getDouble(s"$key.$i.$MIN_VALUE").map(v => Double.box(v)).orNull + properties.getOptionalLong(s"$key.$i.$DISTINCT_COUNT").orElse(null), + properties.getOptionalLong(s"$key.$i.$NULL_COUNT").orElse(null), + properties.getOptionalDouble(s"$key.$i.$AVG_LENGTH").orElse(null), + properties.getOptionalInt(s"$key.$i.$MAX_LENGTH").orElse(null), + properties.getOptionalDouble(s"$key.$i.$MAX_VALUE").orElse(null), + properties.getOptionalDouble(s"$key.$i.$MIN_VALUE").orElse(null) ) name -> stats http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala index 5e0b42a..8f2e473 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala @@ -46,7 +46,7 @@ class StreamTableSourceDescriptor(tableEnv: StreamTableEnvironment, connector: C * Searches for the specified table source, configures it accordingly, and returns it. */ def toTableSource: TableSource[_] = { - val source = TableSourceFactoryService.findTableSourceFactory(this) + val source = TableSourceFactoryService.findAndCreateTableSource(this) source match { case _: StreamTableSource[_] => source case _ => throw new TableException( http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala index a49a41b..5118489 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala @@ -18,6 +18,7 @@ package org.apache.flink.table.descriptors +import org.apache.flink.table.descriptors.DescriptorProperties.toScala import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats} import org.apache.flink.table.plan.stats.TableStats @@ -50,7 +51,7 @@ abstract class TableSourceDescriptor extends Descriptor { protected def getTableStats: Option[TableStats] = { val normalizedProps = new DescriptorProperties() addProperties(normalizedProps) - val rowCount = normalizedProps.getLong(STATISTICS_ROW_COUNT).map(v => Long.box(v)) + val rowCount = toScala(normalizedProps.getOptionalLong(STATISTICS_ROW_COUNT)) rowCount match { case Some(cnt) => val columnStats = readColumnStats(normalizedProps, STATISTICS_COLUMNS) http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala index bec4565..06d6bfb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala @@ -21,11 +21,12 @@ package org.apache.flink.table.sources import java.util import org.apache.flink.table.api.TableException -import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE, CONNECTOR_VERSION} +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE} import org.apache.flink.table.descriptors.CsvValidator._ +import org.apache.flink.table.descriptors.DescriptorProperties.toScala import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, CONNECTOR_TYPE_VALUE} -import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_VERSION} -import org.apache.flink.table.descriptors.SchemaValidator.{SCHEMA, SCHEMA_VERSION} +import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE} +import org.apache.flink.table.descriptors.SchemaValidator.SCHEMA import org.apache.flink.table.descriptors._ import org.apache.flink.types.Row @@ -38,9 +39,8 @@ class CsvTableSourceFactory extends TableSourceFactory[Row] { val context = new util.HashMap[String, String]() context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE) context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE) - context.put(CONNECTOR_VERSION, "1") - context.put(FORMAT_VERSION, "1") - context.put(SCHEMA_VERSION, "1") + context.put(CONNECTOR_PROPERTY_VERSION, "1") + context.put(FORMAT_PROPERTY_VERSION, "1") context } @@ -76,33 +76,36 @@ class CsvTableSourceFactory extends TableSourceFactory[Row] { // build val csvTableSourceBuilder = new CsvTableSource.Builder - val tableSchema = params.getTableSchema(SCHEMA).get - val encodingSchema = params.getTableSchema(FORMAT_FIELDS) + val formatSchema = params.getTableSchema(FORMAT_FIELDS) + val tableSchema = params.getTableSchema(SCHEMA) // the CsvTableSource needs some rework first // for now the schema must be equal to the encoding - if (!encodingSchema.contains(tableSchema)) { + if (!formatSchema.equals(tableSchema)) { throw new TableException( "Encodings that differ from the schema are not supported yet for CsvTableSources.") } - params.getString(CONNECTOR_PATH).foreach(csvTableSourceBuilder.path) - params.getString(FORMAT_FIELD_DELIMITER).foreach(csvTableSourceBuilder.fieldDelimiter) - params.getString(FORMAT_LINE_DELIMITER).foreach(csvTableSourceBuilder.lineDelimiter) + toScala(params.getOptionalString(CONNECTOR_PATH)) + .foreach(csvTableSourceBuilder.path) + toScala(params.getOptionalString(FORMAT_FIELD_DELIMITER)) + .foreach(csvTableSourceBuilder.fieldDelimiter) + toScala(params.getOptionalString(FORMAT_LINE_DELIMITER)) + .foreach(csvTableSourceBuilder.lineDelimiter) - encodingSchema.foreach { schema => - schema.getColumnNames.zip(schema.getTypes).foreach { case (name, tpe) => - csvTableSourceBuilder.field(name, tpe) - } + formatSchema.getColumnNames.zip(formatSchema.getTypes).foreach { case (name, tpe) => + csvTableSourceBuilder.field(name, tpe) } - params.getCharacter(FORMAT_QUOTE_CHARACTER).foreach(csvTableSourceBuilder.quoteCharacter) - params.getString(FORMAT_COMMENT_PREFIX).foreach(csvTableSourceBuilder.commentPrefix) - params.getBoolean(FORMAT_IGNORE_FIRST_LINE).foreach { flag => + toScala(params.getOptionalCharacter(FORMAT_QUOTE_CHARACTER)) + .foreach(csvTableSourceBuilder.quoteCharacter) + toScala(params.getOptionalString(FORMAT_COMMENT_PREFIX)) + .foreach(csvTableSourceBuilder.commentPrefix) + toScala(params.getOptionalBoolean(FORMAT_IGNORE_FIRST_LINE)).foreach { flag => if (flag) { csvTableSourceBuilder.ignoreFirstLine() } } - params.getBoolean(FORMAT_IGNORE_PARSE_ERRORS).foreach { flag => + toScala(params.getOptionalBoolean(FORMAT_IGNORE_PARSE_ERRORS)).foreach { flag => if (flag) { csvTableSourceBuilder.ignoreParseErrors() } http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala index f42d765..e5f6965 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala @@ -41,10 +41,10 @@ trait TableSourceFactory[T] { * - connector.type * - format.type * - * Specified versions allow the framework to provide backwards compatible properties in case of - * string format changes: - * - connector.version - * - format.version + * Specified property versions allow the framework to provide backwards compatible properties + * in case of string format changes: + * - connector.property-version + * - format.property-version * * An empty context means that the factory matches for all requests. */ @@ -61,7 +61,8 @@ trait TableSourceFactory[T] { * - format.fields.#.type * - format.fields.#.name * - * Note: Use "#" to denote an array of values where "#" represents one or more digits. + * Note: Use "#" to denote an array of values where "#" represents one or more digits. Property + * versions like "format.property-version" must not be part of the supported properties. */ def supportedProperties(): util.List[String] http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala index 1e8e836..877cb7b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala @@ -21,12 +21,10 @@ package org.apache.flink.table.sources import java.util.{ServiceConfigurationError, ServiceLoader} import org.apache.flink.table.api.{AmbiguousTableSourceException, NoMatchingTableSourceException, TableException, ValidationException} -import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION -import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_VERSION -import org.apache.flink.table.descriptors.MetadataValidator.METADATA_VERSION -import org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_VERSION -import org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_VERSION -import org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_VERSION +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION +import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION +import org.apache.flink.table.descriptors.MetadataValidator.METADATA_PROPERTY_VERSION +import org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging @@ -40,13 +38,13 @@ object TableSourceFactoryService extends Logging { private lazy val loader = ServiceLoader.load(classOf[TableSourceFactory[_]]) - def findTableSourceFactory(descriptor: TableSourceDescriptor): TableSource[_] = { + def findAndCreateTableSource(descriptor: TableSourceDescriptor): TableSource[_] = { val properties = new DescriptorProperties() descriptor.addProperties(properties) - findTableSourceFactory(properties.asMap) + findAndCreateTableSource(properties.asMap.asScala.toMap) } - def findTableSourceFactory(properties: Map[String, String]): TableSource[_] = { + def findAndCreateTableSource(properties: Map[String, String]): TableSource[_] = { var matchingFactory: Option[(TableSourceFactory[_], Seq[String])] = None try { val iter = loader.iterator() @@ -73,12 +71,10 @@ object TableSourceFactoryService extends Logging { plainContext ++= requiredContext // we remove the versions for now until we have the first backwards compatibility case // with the version we can provide mappings in case the format changes - plainContext.remove(CONNECTOR_VERSION) - plainContext.remove(FORMAT_VERSION) - plainContext.remove(SCHEMA_VERSION) - plainContext.remove(ROWTIME_VERSION) - plainContext.remove(METADATA_VERSION) - plainContext.remove(STATISTICS_VERSION) + plainContext.remove(CONNECTOR_PROPERTY_VERSION) + plainContext.remove(FORMAT_PROPERTY_VERSION) + plainContext.remove(METADATA_PROPERTY_VERSION) + plainContext.remove(STATISTICS_PROPERTY_VERSION) // check if required context is met if (plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) { http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala index 329f790..fcbd63f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala @@ -27,7 +27,7 @@ import org.apache.flink.table.expressions.{Expression, ResolvedFieldReference} * * Note: This extractor only works for StreamTableSources. */ -class StreamRecordTimestamp extends TimestampExtractor { +final class StreamRecordTimestamp extends TimestampExtractor { /** No argument fields required. */ override def getArgumentFields: Array[String] = Array() @@ -42,5 +42,8 @@ class StreamRecordTimestamp extends TimestampExtractor { override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = { org.apache.flink.table.expressions.StreamRecordTimestamp() } +} +object StreamRecordTimestamp { + val INSTANCE: StreamRecordTimestamp = new StreamRecordTimestamp }
