http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala new file mode 100644 index 0000000..1aaa399 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_VERSION} + +/** + * Validator for [[FormatDescriptor]]. + */ +class FormatDescriptorValidator extends DescriptorValidator { + + override def validate(properties: DescriptorProperties): Unit = { + properties.validateString(FORMAT_TYPE, isOptional = false, minLen = 1) + properties.validateInt(FORMAT_VERSION, isOptional = true, 0, Integer.MAX_VALUE) + } +} + +object FormatDescriptorValidator { + + val FORMAT_TYPE = "format.type" + val FORMAT_VERSION = "format.version" + +}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Json.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Json.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Json.scala new file mode 100644 index 0000000..cc46d9c --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Json.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.descriptors.JsonValidator.{FORMAT_FAIL_ON_MISSING_FIELD, FORMAT_SCHEMA_STRING, FORMAT_TYPE_VALUE} + +/** + * Encoding descriptor for JSON. + */ +class Json extends FormatDescriptor(FORMAT_TYPE_VALUE, version = 1) { + + private var failOnMissingField: Option[Boolean] = None + + private var schema: Option[String] = None + + /** + * Sets flag whether to fail if a field is missing or not. + * + * @param failOnMissingField If set to true, the operation fails if there is a missing field. + * If set to false, a missing field is set to null. + * @return The builder. + */ + def failOnMissingField(failOnMissingField: Boolean): Json = { + this.failOnMissingField = Some(failOnMissingField) + this + } + + /** + * Sets the JSON schema string with field names and the types according to the JSON schema + * specification [[http://json-schema.org/specification.html]]. Required. + * + * The schema might be nested. + * + * @param schema JSON schema + */ + def schema(schema: String): Json = { + this.schema = Some(schema) + this + } + + /** + * Internal method for format properties conversion. + */ + override protected def addFormatProperties(properties: DescriptorProperties): Unit = { + // we distinguish between "schema string" and "schema" to allow parsing of a + // schema object in the future (such that the entire JSON schema can be defined in a YAML + // file instead of one large string) + schema.foreach(properties.putString(FORMAT_SCHEMA_STRING, _)) + failOnMissingField.foreach(properties.putBoolean(FORMAT_FAIL_ON_MISSING_FIELD, _)) + } +} + +/** + * Encoding descriptor for JSON. + */ +object Json { + + /** + * Encoding descriptor for JSON. + */ + def apply(): Json = new Json() +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JsonValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JsonValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JsonValidator.scala new file mode 100644 index 0000000..9f11caf --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JsonValidator.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.descriptors.JsonValidator.{FORMAT_FAIL_ON_MISSING_FIELD, FORMAT_SCHEMA_STRING} + +/** + * Validator for [[Json]]. + */ +class JsonValidator extends FormatDescriptorValidator { + + override def validate(properties: DescriptorProperties): Unit = { + super.validate(properties) + properties.validateString(FORMAT_SCHEMA_STRING, isOptional = false, minLen = 1) + properties.validateBoolean(FORMAT_FAIL_ON_MISSING_FIELD, isOptional = true) + } +} + +object JsonValidator { + + val FORMAT_TYPE_VALUE = "json" + val FORMAT_SCHEMA_STRING = "format.schema-string" + val FORMAT_FAIL_ON_MISSING_FIELD = "format.fail-on-missing-field" + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Metadata.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Metadata.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Metadata.scala new file mode 100644 index 0000000..211d786 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Metadata.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.descriptors.MetadataValidator.{METADATA_COMMENT, METADATA_CREATION_TIME, METADATA_LAST_ACCESS_TIME} + +/** + * Metadata descriptor for adding additional, useful information. + */ +class Metadata extends Descriptor { + + protected var comment: Option[String] = None + protected var creationTime: Option[Long] = None + protected var lastAccessTime: Option[Long] = None + + /** + * Sets a comment. + * + * @param comment the description + */ + def comment(comment: String): Metadata = { + this.comment = Some(comment) + this + } + + /** + * Sets a creation time. + * + * @param time UTC milliseconds timestamp + */ + def creationTime(time: Long): Metadata = { + this.creationTime = Some(time) + this + } + + /** + * Sets a last access time. + * + * @param time UTC milliseconds timestamp + */ + def lastAccessTime(time: Long): Metadata = { + this.lastAccessTime = Some(time) + this + } + + /** + * Internal method for properties conversion. + */ + final override def addProperties(properties: DescriptorProperties): Unit = { + comment.foreach(c => properties.putString(METADATA_COMMENT, c)) + creationTime.foreach(t => properties.putLong(METADATA_CREATION_TIME, t)) + lastAccessTime.foreach(t => properties.putLong(METADATA_LAST_ACCESS_TIME, t)) + } +} + +/** + * Metadata descriptor for adding additional, useful information. + */ +object Metadata { + + /** + * Metadata descriptor for adding additional, useful information. + */ + def apply(): Metadata = new Metadata() +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala new file mode 100644 index 0000000..a8d580c --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.descriptors.MetadataValidator.{METADATA_COMMENT, METADATA_CREATION_TIME, METADATA_LAST_ACCESS_TIME, METADATA_VERSION} + +/** + * Validator for [[Metadata]]. + */ +class MetadataValidator extends DescriptorValidator { + + override def validate(properties: DescriptorProperties): Unit = { + properties.validateInt(METADATA_VERSION, isOptional = true, 0, Integer.MAX_VALUE) + properties.validateString(METADATA_COMMENT, isOptional = true) + properties.validateLong(METADATA_CREATION_TIME, isOptional = true) + properties.validateLong(METADATA_LAST_ACCESS_TIME, isOptional = true) + } +} + +object MetadataValidator { + + val METADATA_VERSION = "metadata.version" + val METADATA_COMMENT = "metadata.comment" + val METADATA_CREATION_TIME = "metadata.creation-time" + val METADATA_LAST_ACCESS_TIME = "metadata.last-access-time" + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala new file mode 100644 index 0000000..a1c80f5 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.Types +import org.apache.flink.table.descriptors.RowtimeValidator.{ROWTIME, ROWTIME_VERSION, normalizeTimestampExtractor, normalizeWatermarkStrategy} +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor} +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy} + +import scala.collection.mutable + +/** + * Rowtime descriptor for describing an event time attribute in the schema. + */ +class Rowtime extends Descriptor { + + private var timestampExtractor: Option[TimestampExtractor] = None + private var watermarkStrategy: Option[WatermarkStrategy] = None + + /** + * Sets a built-in timestamp extractor that converts an existing [[Long]] or + * [[Types.SQL_TIMESTAMP]] field into the rowtime attribute. + * + * @param fieldName The field to convert into a rowtime attribute. + */ + def timestampsFromField(fieldName: String): Rowtime = { + timestampExtractor = Some(new ExistingField(fieldName)) + this + } + + /** + * Sets a built-in timestamp extractor that converts the assigned timestamps from + * a DataStream API record into the rowtime attribute and thus preserves the assigned + * timestamps from the source. + * + * Note: This extractor only works in streaming environments. + */ + def timestampsFromSource(): Rowtime = { + timestampExtractor = Some(new StreamRecordTimestamp) + this + } + + /** + * Sets a custom timestamp extractor to be used for the rowtime attribute. + * + * @param extractor The [[TimestampExtractor]] to extract the rowtime attribute + * from the physical type. + */ + def timestampsFromExtractor(extractor: TimestampExtractor): Rowtime = { + timestampExtractor = Some(extractor) + this + } + + /** + * Sets a built-in watermark strategy for ascending rowtime attributes. + * + * Emits a watermark of the maximum observed timestamp so far minus 1. + * Rows that have a timestamp equal to the max timestamp are not late. + */ + def watermarksPeriodicAscending(): Rowtime = { + watermarkStrategy = Some(new AscendingTimestamps) + this + } + + /** + * Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded + * time interval. + * + * Emits watermarks which are the maximum observed timestamp minus the specified delay. + */ + def watermarksPeriodicBounding(delay: Long): Rowtime = { + watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay)) + this + } + + /** + * Sets a built-in watermark strategy which indicates the watermarks should be preserved from the + * underlying DataStream API and thus preserves the assigned timestamps from the source. + */ + def watermarksFromSource(): Rowtime = { + watermarkStrategy = Some(PreserveWatermarks.INSTANCE) + this + } + + /** + * Sets a custom watermark strategy to be used for the rowtime attribute. + */ + def watermarksFromStrategy(strategy: WatermarkStrategy): Rowtime = { + watermarkStrategy = Some(strategy) + this + } + + /** + * Internal method for properties conversion. + */ + final override def addProperties(properties: DescriptorProperties): Unit = { + val props = mutable.HashMap[String, String]() + props.put(ROWTIME_VERSION, "1") + timestampExtractor.foreach(normalizeTimestampExtractor(_).foreach(e => props.put(e._1, e._2))) + watermarkStrategy.foreach(normalizeWatermarkStrategy(_).foreach(e => props.put(e._1, e._2))) + + // use a list for the rowtime to support multiple rowtime attributes in the future + properties.putIndexedVariableProperties(ROWTIME, Seq(props.toMap)) + } +} + +/** + * Rowtime descriptor for describing an event time attribute in the schema. + */ +object Rowtime { + + /** + * Rowtime descriptor for describing an event time attribute in the schema. + */ + def apply(): Rowtime = new Rowtime() + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala new file mode 100644 index 0000000..74e49f1 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.descriptors.DescriptorProperties.serialize +import org.apache.flink.table.descriptors.RowtimeValidator._ +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor} +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy} + +/** + * Validator for [[Rowtime]]. + */ +class RowtimeValidator(val prefix: String = "") extends DescriptorValidator { + + override def validate(properties: DescriptorProperties): Unit = { + properties.validateInt(prefix + ROWTIME_VERSION, isOptional = true, 0, Integer.MAX_VALUE) + + val noValidation = () => {} + + val timestampExistingField = () => { + properties.validateString(prefix + TIMESTAMPS_FROM, isOptional = false, minLen = 1) + } + + val timestampCustom = () => { + properties.validateString(prefix + TIMESTAMPS_CLASS, isOptional = false, minLen = 1) + properties.validateString(prefix + TIMESTAMPS_SERIALIZED, isOptional = false, minLen = 1) + } + + properties.validateEnum( + prefix + TIMESTAMPS_TYPE, + isOptional = false, + Map( + TIMESTAMPS_TYPE_VALUE_FROM_FIELD -> timestampExistingField, + TIMESTAMPS_TYPE_VALUE_FROM_SOURCE -> noValidation, + TIMESTAMPS_TYPE_VALUE_CUSTOM -> timestampCustom + ) + ) + + val watermarkPeriodicBounding = () => { + properties.validateLong(prefix + WATERMARKS_DELAY, isOptional = false, min = 0) + } + + val watermarkCustom = () => { + properties.validateString(prefix + WATERMARKS_CLASS, isOptional = false, minLen = 1) + properties.validateString(prefix + WATERMARKS_SERIALIZED, isOptional = false, minLen = 1) + } + + properties.validateEnum( + prefix + WATERMARKS_TYPE, + isOptional = false, + Map( + WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING -> noValidation, + WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING -> watermarkPeriodicBounding, + WATERMARKS_TYPE_VALUE_FROM_SOURCE -> noValidation, + WATERMARKS_TYPE_VALUE_CUSTOM -> watermarkCustom + ) + ) + } +} + +object RowtimeValidator { + + val ROWTIME = "rowtime" + + // per rowtime properties + + val ROWTIME_VERSION = "version" + val TIMESTAMPS_TYPE = "timestamps.type" + val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field" + val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source" + val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom" + val TIMESTAMPS_FROM = "timestamps.from" + val TIMESTAMPS_CLASS = "timestamps.class" + val TIMESTAMPS_SERIALIZED = "timestamps.serialized" + + val WATERMARKS_TYPE = "watermarks.type" + val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending" + val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding" + val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source" + val WATERMARKS_TYPE_VALUE_CUSTOM = "custom" + val WATERMARKS_CLASS = "watermarks.class" + val WATERMARKS_SERIALIZED = "watermarks.serialized" + val WATERMARKS_DELAY = "watermarks.delay" + + // utilities + + def normalizeTimestampExtractor(extractor: TimestampExtractor): Map[String, String] = + extractor match { + case existing: ExistingField => + Map( + TIMESTAMPS_TYPE -> TIMESTAMPS_TYPE_VALUE_FROM_FIELD, + TIMESTAMPS_FROM -> existing.getArgumentFields.apply(0)) + case _: StreamRecordTimestamp => + Map(TIMESTAMPS_TYPE -> TIMESTAMPS_TYPE_VALUE_FROM_SOURCE) + case _: TimestampExtractor => + Map( + TIMESTAMPS_TYPE -> TIMESTAMPS_TYPE_VALUE_CUSTOM, + TIMESTAMPS_CLASS -> extractor.getClass.getName, + TIMESTAMPS_SERIALIZED -> serialize(extractor)) + } + + def normalizeWatermarkStrategy(strategy: WatermarkStrategy): Map[String, String] = + strategy match { + case _: AscendingTimestamps => + Map(WATERMARKS_TYPE -> WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING) + case bounding: BoundedOutOfOrderTimestamps => + Map( + WATERMARKS_TYPE -> WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING, + WATERMARKS_DELAY -> bounding.delay.toString) + case _: PreserveWatermarks => + Map(WATERMARKS_TYPE -> WATERMARKS_TYPE_VALUE_FROM_SOURCE) + case _: WatermarkStrategy => + Map( + WATERMARKS_TYPE -> WATERMARKS_TYPE_VALUE_CUSTOM, + WATERMARKS_CLASS -> strategy.getClass.getName, + WATERMARKS_SERIALIZED -> serialize(strategy)) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala new file mode 100644 index 0000000..2f3a389 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.{TableSchema, ValidationException} +import org.apache.flink.table.descriptors.DescriptorProperties.{normalizeTableSchema, normalizeTypeInfo} +import org.apache.flink.table.descriptors.SchemaValidator._ + +import scala.collection.mutable + +/** + * Describes a schema of a table. + * + * Note: Field names are matched by the exact name by default (case sensitive). + */ +class Schema extends Descriptor { + + // maps a field name to a list of properties that describe type, origin, and the time attribute + private val tableSchema = mutable.LinkedHashMap[String, mutable.LinkedHashMap[String, String]]() + + private var lastField: Option[String] = None + + /** + * Sets the schema with field names and the types. Required. + * + * This method overwrites existing fields added with [[field()]]. + * + * @param schema the table schema + */ + def schema(schema: TableSchema): Schema = { + tableSchema.clear() + lastField = None + normalizeTableSchema(schema).foreach { + case (n, t) => field(n, t) + } + this + } + + /** + * Adds a field with the field name and the type information. Required. + * This method can be called multiple times. The call order of this method defines + * also the order of the fields in a row. + * + * @param fieldName the field name + * @param fieldType the type information of the field + */ + def field(fieldName: String, fieldType: TypeInformation[_]): Schema = { + field(fieldName, normalizeTypeInfo(fieldType)) + this + } + + /** + * Adds a field with the field name and the type string. Required. + * This method can be called multiple times. The call order of this method defines + * also the order of the fields in a row. + * + * @param fieldName the field name + * @param fieldType the type string of the field + */ + def field(fieldName: String, fieldType: String): Schema = { + if (tableSchema.contains(fieldName)) { + throw new ValidationException(s"Duplicate field name $fieldName.") + } + + val fieldProperties = mutable.LinkedHashMap[String, String]() + fieldProperties += (TYPE -> fieldType) + + tableSchema += (fieldName -> fieldProperties) + + lastField = Some(fieldName) + this + } + + /** + * Specifies the origin of the previously defined field. The origin field is defined by a + * connector or format. + * + * E.g. field("myString", Types.STRING).from("CSV_MY_STRING") + * + * Note: Field names are matched by the exact name by default (case sensitive). + */ + def from(originFieldName: String): Schema = { + lastField match { + case None => throw new ValidationException("No field previously defined. Use field() before.") + case Some(f) => + tableSchema(f) += (FROM -> originFieldName) + lastField = None + } + this + } + + /** + * Specifies the previously defined field as a processing-time attribute. + * + * E.g. field("proctime", Types.SQL_TIMESTAMP).proctime() + */ + def proctime(): Schema = { + lastField match { + case None => throw new ValidationException("No field defined previously. Use field() before.") + case Some(f) => + tableSchema(f) += (PROCTIME -> PROCTIME_VALUE_TRUE) + lastField = None + } + this + } + + /** + * Specifies the previously defined field as an event-time attribute. + * + * E.g. field("rowtime", Types.SQL_TIMESTAMP).rowtime(...) + */ + def rowtime(rowtime: Rowtime): Schema = { + lastField match { + case None => throw new ValidationException("No field defined previously. Use field() before.") + case Some(f) => + val fieldProperties = new DescriptorProperties() + rowtime.addProperties(fieldProperties) + tableSchema(f) ++= fieldProperties.asMap + lastField = None + } + this + } + + /** + * Internal method for properties conversion. + */ + final override private[flink] def addProperties(properties: DescriptorProperties): Unit = { + properties.putInt(SCHEMA_VERSION, 1) + properties.putIndexedVariableProperties( + SCHEMA, + tableSchema.toSeq.map { case (name, props) => + Map(NAME -> name) ++ props + } + ) + } +} + +/** + * Describes a schema of a table. + */ +object Schema { + + /** + * Describes a schema of a table. + */ + def apply(): Schema = new Schema() +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala new file mode 100644 index 0000000..19c0e41 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME +import org.apache.flink.table.descriptors.SchemaValidator._ + +/** + * Validator for [[Schema]]. + */ +class SchemaValidator(isStreamEnvironment: Boolean = true) extends DescriptorValidator { + + override def validate(properties: DescriptorProperties): Unit = { + properties.validateInt(SCHEMA_VERSION, isOptional = true, 0, Integer.MAX_VALUE) + + val names = properties.getIndexedProperty(SCHEMA, NAME) + val types = properties.getIndexedProperty(SCHEMA, TYPE) + + if (names.isEmpty && types.isEmpty) { + throw new ValidationException(s"Could not find the required schema for property '$SCHEMA'.") + } + + for (i <- 0 until Math.max(names.size, types.size)) { + properties.validateString(s"$SCHEMA.$i.$NAME", isOptional = false, minLen = 1) + properties.validateType(s"$SCHEMA.$i.$TYPE", isOptional = false) + properties.validateString(s"$SCHEMA.$i.$FROM", isOptional = true, minLen = 1) + // either proctime or rowtime + val proctime = s"$SCHEMA.$i.$PROCTIME" + val rowtime = s"$SCHEMA.$i.$ROWTIME" + if (properties.contains(proctime)) { + if (!isStreamEnvironment) { + throw new ValidationException( + s"Property '$proctime' is not allowed in a batch environment.") + } + // check proctime + properties.validateBoolean(proctime, isOptional = false) + // no rowtime + properties.validatePrefixExclusion(rowtime) + } else if (properties.hasPrefix(rowtime)) { + // check rowtime + val rowtimeValidator = new RowtimeValidator(s"$SCHEMA.$i.") + rowtimeValidator.validate(properties) + // no proctime + properties.validateExclusion(proctime) + } + } + } +} + +object SchemaValidator { + + val SCHEMA = "schema" + val SCHEMA_VERSION = "schema.version" + + // per column properties + + val NAME = "name" + val TYPE = "type" + val PROCTIME = "proctime" + val PROCTIME_VALUE_TRUE = "true" + val FROM = "from" + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala new file mode 100644 index 0000000..3037286 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.descriptors.StatisticsValidator._ +import org.apache.flink.table.plan.stats.{ColumnStats, TableStats} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** + * Statistics descriptor for describing table stats. + */ +class Statistics extends Descriptor { + + private var rowCount: Option[Long] = None + private val columnStats: mutable.LinkedHashMap[String, mutable.Map[String, String]] = + mutable.LinkedHashMap[String, mutable.Map[String, String]]() + + /** + * Sets the statistics from a [[TableStats]] instance. + * + * This method overwrites all existing statistics. + * + * @param tableStats the table statistics + */ + def tableStats(tableStats: TableStats): Statistics = { + rowCount(tableStats.rowCount) + columnStats.clear() + tableStats.colStats.asScala.foreach { case (col, stats) => + columnStats(col, stats) + } + this + } + + /** + * Sets statistics for the overall row count. Required. + * + * @param rowCount the expected number of rows + */ + def rowCount(rowCount: Long): Statistics = { + this.rowCount = Some(rowCount) + this + } + + /** + * Sets statistics for a column. Overwrites all existing statistics for this column. + * + * @param columnName the column name + * @param columnStats expected statistics for the column + */ + def columnStats(columnName: String, columnStats: ColumnStats): Statistics = { + val map = mutable.Map(normalizeColumnStats(columnStats).toSeq: _*) + this.columnStats.put(columnName, map) + this + } + + /** + * Sets the number of distinct values statistic for the given column. + */ + def columnDistinctCount(columnName: String, ndv: Long): Statistics = { + this.columnStats + .getOrElseUpdate(columnName, mutable.HashMap()) + .put(DISTINCT_COUNT, ndv.toString) + this + } + + /** + * Sets the number of null values statistic for the given column. + */ + def columnNullCount(columnName: String, nullCount: Long): Statistics = { + this.columnStats + .getOrElseUpdate(columnName, mutable.HashMap()) + .put(NULL_COUNT, nullCount.toString) + this + } + + /** + * Sets the average length statistic for the given column. + */ + def columnAvgLength(columnName: String, avgLen: Double): Statistics = { + this.columnStats + .getOrElseUpdate(columnName, mutable.HashMap()) + .put(AVG_LENGTH, avgLen.toString) + this + } + + /** + * Sets the maximum length statistic for the given column. + */ + def columnMaxLength(columnName: String, maxLen: Integer): Statistics = { + this.columnStats + .getOrElseUpdate(columnName, mutable.HashMap()) + .put(MAX_LENGTH, maxLen.toString) + this + } + + /** + * Sets the maximum value statistic for the given column. + */ + def columnMaxValue(columnName: String, max: Number): Statistics = { + this.columnStats + .getOrElseUpdate(columnName, mutable.HashMap()) + .put(MAX_VALUE, max.toString) + this + } + + /** + * Sets the minimum value statistic for the given column. + */ + def columnMinValue(columnName: String, min: Number): Statistics = { + this.columnStats + .getOrElseUpdate(columnName, mutable.HashMap()) + .put(MIN_VALUE, min.toString) + this + } + + /** + * Internal method for properties conversion. + */ + final override def addProperties(properties: DescriptorProperties): Unit = { + properties.putInt(STATISTICS_VERSION, 1) + rowCount.foreach(rc => properties.putLong(STATISTICS_ROW_COUNT, rc)) + val namedStats = columnStats.map { case (name, stats) => + // name should not be part of the properties key + (stats + (NAME -> name)).toMap + }.toSeq + properties.putIndexedVariableProperties(STATISTICS_COLUMNS, namedStats) + } +} + +/** + * Statistics descriptor for describing table stats. + */ +object Statistics { + + /** + * Statistics descriptor for describing table stats. + */ + def apply(): Statistics = new Statistics() +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala new file mode 100644 index 0000000..a78e422 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, STATISTICS_VERSION, validateColumnStats} +import org.apache.flink.table.plan.stats.ColumnStats + +import scala.collection.mutable + +/** + * Validator for [[FormatDescriptor]]. + */ +class StatisticsValidator extends DescriptorValidator { + + override def validate(properties: DescriptorProperties): Unit = { + properties.validateInt(STATISTICS_VERSION, isOptional = true, 0, Integer.MAX_VALUE) + properties.validateLong(STATISTICS_ROW_COUNT, isOptional = true, min = 0) + validateColumnStats(properties, STATISTICS_COLUMNS) + } +} + +object StatisticsValidator { + + val STATISTICS_VERSION = "statistics.version" + val STATISTICS_ROW_COUNT = "statistics.row-count" + val STATISTICS_COLUMNS = "statistics.columns" + + // per column properties + + val NAME = "name" + val DISTINCT_COUNT = "distinct-count" + val NULL_COUNT = "null-count" + val AVG_LENGTH = "avg-length" + val MAX_LENGTH = "max-length" + val MAX_VALUE = "max-value" + val MIN_VALUE = "min-value" + + // utilities + + def normalizeColumnStats(columnStats: ColumnStats): Map[String, String] = { + val stats = mutable.HashMap[String, String]() + if (columnStats.ndv != null) { + stats += DISTINCT_COUNT -> columnStats.ndv.toString + } + if (columnStats.nullCount != null) { + stats += NULL_COUNT -> columnStats.nullCount.toString + } + if (columnStats.avgLen != null) { + stats += AVG_LENGTH -> columnStats.avgLen.toString + } + if (columnStats.maxLen != null) { + stats += MAX_LENGTH -> columnStats.maxLen.toString + } + if (columnStats.max != null) { + stats += MAX_VALUE -> columnStats.max.toString + } + if (columnStats.min != null) { + stats += MIN_VALUE -> columnStats.min.toString + } + stats.toMap + } + + def validateColumnStats(properties: DescriptorProperties, key: String): Unit = { + + // filter for number of columns + val columnCount = properties.getIndexedProperty(key, NAME).size + + for (i <- 0 until columnCount) { + properties.validateString(s"$key.$i.$NAME", isOptional = false, minLen = 1) + properties.validateLong(s"$key.$i.$DISTINCT_COUNT", isOptional = true, min = 0L) + properties.validateLong(s"$key.$i.$NULL_COUNT", isOptional = true, min = 0L) + properties.validateDouble(s"$key.$i.$AVG_LENGTH", isOptional = true, min = 0.0) + properties.validateInt(s"$key.$i.$MAX_LENGTH", isOptional = true, min = 0) + properties.validateDouble(s"$key.$i.$MAX_VALUE", isOptional = true, min = 0.0) + properties.validateDouble(s"$key.$i.$MIN_VALUE", isOptional = true, min = 0.0) + } + } + + def readColumnStats(properties: DescriptorProperties, key: String): Map[String, ColumnStats] = { + + // filter for number of columns + val columnCount = properties.getIndexedProperty(key, NAME).size + + val stats = for (i <- 0 until columnCount) yield { + val name = properties.getString(s"$key.$i.$NAME").getOrElse( + throw new ValidationException(s"Could not find name of property '$key.$i.$NAME'.")) + + val stats = ColumnStats( + properties.getLong(s"$key.$i.$DISTINCT_COUNT").map(v => Long.box(v)).orNull, + properties.getLong(s"$key.$i.$NULL_COUNT").map(v => Long.box(v)).orNull, + properties.getDouble(s"$key.$i.$AVG_LENGTH").map(v => Double.box(v)).orNull, + properties.getInt(s"$key.$i.$MAX_LENGTH").map(v => Int.box(v)).orNull, + properties.getDouble(s"$key.$i.$MAX_VALUE").map(v => Double.box(v)).orNull, + properties.getDouble(s"$key.$i.$MIN_VALUE").map(v => Double.box(v)).orNull + ) + + name -> stats + } + + stats.toMap + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala new file mode 100644 index 0000000..ae88236 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.{StreamTableEnvironment, Table, TableException} +import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceFactoryService} + +/** + * Descriptor for specifying a table source in a streaming environment. + */ +class StreamTableSourceDescriptor(tableEnv: StreamTableEnvironment, connector: ConnectorDescriptor) + extends TableSourceDescriptor(connector) { + + /** + * Searches for the specified table source, configures it accordingly, and returns it. + */ + def toTableSource: TableSource[_] = { + val source = TableSourceFactoryService.findTableSourceFactory(this) + source match { + case _: StreamTableSource[_] => source + case _ => throw new TableException( + s"Found table source '${source.getClass.getCanonicalName}' is not applicable " + + s"in a streaming environment.") + } + } + + /** + * Searches for the specified table source, configures it accordingly, and returns it as a table. + */ + def toTable: Table = { + tableEnv.fromTableSource(toTableSource) + } + + /** + * Searches for the specified table source, configures it accordingly, and registers it as + * a table under the given name. + * + * @param name table name to be registered in the table environment + */ + def register(name: String): Unit = { + tableEnv.registerTableSource(name, toTableSource) + } + + /** + * Specifies the format that defines how to read data from a connector. + */ + def withFormat(format: FormatDescriptor): StreamTableSourceDescriptor = { + formatDescriptor = Some(format) + this + } + + /** + * Specifies the resulting table schema. + */ + def withSchema(schema: Schema): StreamTableSourceDescriptor = { + schemaDescriptor = Some(schema) + this + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala new file mode 100644 index 0000000..918b618 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats} +import org.apache.flink.table.plan.stats.TableStats + +import scala.collection.JavaConverters._ + +/** + * Common class for all descriptors describing a table source. + */ +abstract class TableSourceDescriptor(connector: ConnectorDescriptor) extends Descriptor { + + protected val connectorDescriptor: ConnectorDescriptor = connector + + protected var formatDescriptor: Option[FormatDescriptor] = None + protected var schemaDescriptor: Option[Schema] = None + protected var statisticsDescriptor: Option[Statistics] = None + protected var metaDescriptor: Option[Metadata] = None + + /** + * Internal method for properties conversion. + */ + override private[flink] def addProperties(properties: DescriptorProperties): Unit = { + connectorDescriptor.addProperties(properties) + + // check for a format + if (connectorDescriptor.needsFormat() && formatDescriptor.isEmpty) { + throw new ValidationException( + s"The connector '$connectorDescriptor' requires a format description.") + } else if (!connectorDescriptor.needsFormat() && formatDescriptor.isDefined) { + throw new ValidationException( + s"The connector '$connectorDescriptor' does not require a format description " + + s"but '${formatDescriptor.get}' found.") + } + + formatDescriptor.foreach(_.addProperties(properties)) + schemaDescriptor.foreach(_.addProperties(properties)) + metaDescriptor.foreach(_.addProperties(properties)) + } + + /** + * Reads table statistics from the descriptors properties. + */ + protected def getTableStats: Option[TableStats] = { + val normalizedProps = new DescriptorProperties() + addProperties(normalizedProps) + val rowCount = normalizedProps.getLong(STATISTICS_ROW_COUNT).map(v => Long.box(v)) + rowCount match { + case Some(cnt) => + val columnStats = readColumnStats(normalizedProps, STATISTICS_COLUMNS) + Some(TableStats(cnt, columnStats.asJava)) + case None => + None + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala index d5a5f36..da08916 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala @@ -40,7 +40,7 @@ class FlinkStatistic(tableStats: Option[TableStats]) extends Statistic { * * @return The table statistics */ - def getTableStats: TableStats = tableStats.getOrElse(null) + def getTableStats: TableStats = tableStats.orNull /** * Returns the stats of the specified the column. http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala index ba076b4..e0022c5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala @@ -270,7 +270,7 @@ object CsvTableSource { /** * Adds a field with the field name and the type information. Required. * This method can be called multiple times. The call order of this method defines - * also the order of thee fields in a row. + * also the order of the fields in a row. * * @param fieldName the field name * @param fieldType the type information of the field http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceConverter.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceConverter.scala index 68d2f55..ab984a6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceConverter.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceConverter.scala @@ -29,7 +29,11 @@ import com.google.common.collect.ImmutableSet /** * The class defines a converter used to convert [[CsvTableSource]] to * or from [[ExternalCatalogTable]]. + * + * @deprecated Use the more generic [[org.apache.flink.table.sources.TableSourceFactory]] instead. */ +@Deprecated +@deprecated("Use the more generic table source factories instead.") @TableType(value = "csv") class CsvTableSourceConverter extends TableSourceConverter[CsvTableSource] { http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala new file mode 100644 index 0000000..bec4565 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util + +import org.apache.flink.table.api.TableException +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE, CONNECTOR_VERSION} +import org.apache.flink.table.descriptors.CsvValidator._ +import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, CONNECTOR_TYPE_VALUE} +import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_VERSION} +import org.apache.flink.table.descriptors.SchemaValidator.{SCHEMA, SCHEMA_VERSION} +import org.apache.flink.table.descriptors._ +import org.apache.flink.types.Row + +/** + * Factory for creating configured instances of [[CsvTableSource]]. + */ +class CsvTableSourceFactory extends TableSourceFactory[Row] { + + override def requiredContext(): util.Map[String, String] = { + val context = new util.HashMap[String, String]() + context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE) + context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE) + context.put(CONNECTOR_VERSION, "1") + context.put(FORMAT_VERSION, "1") + context.put(SCHEMA_VERSION, "1") + context + } + + override def supportedProperties(): util.List[String] = { + val properties = new util.ArrayList[String]() + // connector + properties.add(CONNECTOR_PATH) + // format + properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.TYPE}") + properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.NAME}") + properties.add(FORMAT_FIELD_DELIMITER) + properties.add(FORMAT_LINE_DELIMITER) + properties.add(FORMAT_QUOTE_CHARACTER) + properties.add(FORMAT_COMMENT_PREFIX) + properties.add(FORMAT_IGNORE_FIRST_LINE) + properties.add(FORMAT_IGNORE_PARSE_ERRORS) + properties.add(CONNECTOR_PATH) + // schema + properties.add(s"$SCHEMA.#.${DescriptorProperties.TYPE}") + properties.add(s"$SCHEMA.#.${DescriptorProperties.NAME}") + properties + } + + override def create(properties: util.Map[String, String]): TableSource[Row] = { + val params = new DescriptorProperties() + params.putProperties(properties) + + // validate + new FileSystemValidator().validate(params) + new CsvValidator().validate(params) + new SchemaValidator().validate(params) + + // build + val csvTableSourceBuilder = new CsvTableSource.Builder + + val tableSchema = params.getTableSchema(SCHEMA).get + val encodingSchema = params.getTableSchema(FORMAT_FIELDS) + + // the CsvTableSource needs some rework first + // for now the schema must be equal to the encoding + if (!encodingSchema.contains(tableSchema)) { + throw new TableException( + "Encodings that differ from the schema are not supported yet for CsvTableSources.") + } + + params.getString(CONNECTOR_PATH).foreach(csvTableSourceBuilder.path) + params.getString(FORMAT_FIELD_DELIMITER).foreach(csvTableSourceBuilder.fieldDelimiter) + params.getString(FORMAT_LINE_DELIMITER).foreach(csvTableSourceBuilder.lineDelimiter) + + encodingSchema.foreach { schema => + schema.getColumnNames.zip(schema.getTypes).foreach { case (name, tpe) => + csvTableSourceBuilder.field(name, tpe) + } + } + params.getCharacter(FORMAT_QUOTE_CHARACTER).foreach(csvTableSourceBuilder.quoteCharacter) + params.getString(FORMAT_COMMENT_PREFIX).foreach(csvTableSourceBuilder.commentPrefix) + params.getBoolean(FORMAT_IGNORE_FIRST_LINE).foreach { flag => + if (flag) { + csvTableSourceBuilder.ignoreFirstLine() + } + } + params.getBoolean(FORMAT_IGNORE_PARSE_ERRORS).foreach { flag => + if (flag) { + csvTableSourceBuilder.ignoreParseErrors() + } + } + + csvTableSourceBuilder.build() + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala new file mode 100644 index 0000000..f42d765 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util + +/** + * A factory to create a [[TableSource]]. This factory is used with Java's Service Provider + * Interfaces (SPI) for discovering. A factory is called with a set of normalized properties that + * describe the desired table source. The factory allows for matching to the given set of + * properties and creating a configured [[TableSource]] accordingly. + * + * Classes that implement this interface need to be added to the + * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' file of a JAR file in + * the current classpath to be found. + */ +trait TableSourceFactory[T] { + + /** + * Specifies the context that this factory has been implemented for. The framework guarantees + * to only call the [[create()]] method of the factory if the specified set of properties and + * values are met. + * + * Typical properties might be: + * - connector.type + * - format.type + * + * Specified versions allow the framework to provide backwards compatible properties in case of + * string format changes: + * - connector.version + * - format.version + * + * An empty context means that the factory matches for all requests. + */ + def requiredContext(): util.Map[String, String] + + /** + * List of property keys that this factory can handle. This method will be used for validation. + * If a property is passed that this factory cannot handle, an exception will be thrown. The + * list must not contain the keys that are specified by the context. + * + * Example properties might be: + * - format.line-delimiter + * - format.ignore-parse-errors + * - format.fields.#.type + * - format.fields.#.name + * + * Note: Use "#" to denote an array of values where "#" represents one or more digits. + */ + def supportedProperties(): util.List[String] + + /** + * Creates and configures a [[TableSource]] using the given properties. + * + * @param properties normalized properties describing a table source + * @return the configured table source + */ + def create(properties: util.Map[String, String]): TableSource[T] + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala new file mode 100644 index 0000000..cb737a9 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util.{ServiceConfigurationError, ServiceLoader} + +import org.apache.flink.table.api.{AmbiguousTableSourceException, NoMatchingTableSourceException, TableException, ValidationException} +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION +import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_VERSION +import org.apache.flink.table.descriptors.MetadataValidator.METADATA_VERSION +import org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_VERSION +import org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_VERSION +import org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_VERSION +import org.apache.flink.table.descriptors._ +import org.apache.flink.table.util.Logging + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** + * Service provider interface for finding suitable table source factories for the given properties. + */ +object TableSourceFactoryService extends Logging { + + private lazy val loader = ServiceLoader.load(classOf[TableSourceFactory[_]]) + + def findTableSourceFactory(descriptor: TableSourceDescriptor): TableSource[_] = { + val properties = new DescriptorProperties() + descriptor.addProperties(properties) + findTableSourceFactory(properties.asMap) + } + + def findTableSourceFactory(properties: Map[String, String]): TableSource[_] = { + var matchingFactory: Option[(TableSourceFactory[_], Seq[String])] = None + try { + val iter = loader.iterator() + while (iter.hasNext) { + val factory = iter.next() + + val requiredContextJava = try { + factory.requiredContext() + } catch { + case t: Throwable => + throw new TableException( + s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.", + t) + } + + val requiredContext = if (requiredContextJava != null) { + // normalize properties + requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2)) + } else { + Map[String, String]() + } + + val plainContext = mutable.Map[String, String]() + plainContext ++= requiredContext + // we remove the versions for now until we have the first backwards compatibility case + // with the version we can provide mappings in case the format changes + plainContext.remove(CONNECTOR_VERSION) + plainContext.remove(FORMAT_VERSION) + plainContext.remove(SCHEMA_VERSION) + plainContext.remove(ROWTIME_VERSION) + plainContext.remove(METADATA_VERSION) + plainContext.remove(STATISTICS_VERSION) + + // check if required context is met + if (plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) { + matchingFactory match { + case Some(_) => throw new AmbiguousTableSourceException(properties) + case None => matchingFactory = Some((factory, requiredContext.keys.toSeq)) + } + } + } + } catch { + case e: ServiceConfigurationError => + LOG.error("Could not load service provider for table source factories.", e) + throw new TableException("Could not load service provider for table source factories.", e) + } + + val (factory, context) = matchingFactory + .getOrElse(throw new NoMatchingTableSourceException(properties)) + + val plainProperties = mutable.ArrayBuffer[String]() + properties.keys.foreach { k => + // replace arrays with wildcard + val key = k.replaceAll(".\\d+", ".#") + // ignore context properties and duplicates + if (!context.contains(key) && !plainProperties.contains(key)) { + plainProperties += key + } + } + + val supportedPropertiesJava = try { + factory.supportedProperties() + } catch { + case t: Throwable => + throw new TableException( + s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.", + t) + } + + val supportedProperties = if (supportedPropertiesJava != null) { + supportedPropertiesJava.asScala.map(_.toLowerCase) + } else { + Seq[String]() + } + + // check for supported properties + plainProperties.foreach { k => + if (!supportedProperties.contains(k)) { + throw new ValidationException( + s"Table factory '${factory.getClass.getCanonicalName}' does not support the " + + s"property '$k'. Supported properties are: \n${supportedProperties.mkString("\n")}") + } + } + + // create the table source + try { + factory.create(properties.asJava) + } catch { + case t: Throwable => + throw new TableException( + s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.", + t) + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala index 34c6ba5..ce57b92 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala @@ -25,7 +25,7 @@ import org.apache.flink.table.sources.FieldComputer /** * Provides the an expression to extract the timestamp for a rowtime attribute. */ -abstract class TimestampExtractor extends FieldComputer[Long] { +abstract class TimestampExtractor extends FieldComputer[Long] with Serializable { /** Timestamp extractors compute the timestamp as Long. */ override def getReturnType: TypeInformation[Long] = Types.LONG.asInstanceOf[TypeInformation[Long]] http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala index 3a947ac..9fc7c88 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala @@ -21,12 +21,12 @@ package org.apache.flink.table.sources.wmstrategies import org.apache.flink.streaming.api.watermark.Watermark /** - * A watermark assigner for ascending rowtime attributes. + * A watermark strategy for ascending rowtime attributes. * * Emits a watermark of the maximum observed timestamp so far minus 1. * Rows that have a timestamp equal to the max timestamp are not late. */ -class AscendingTimestamps extends PeriodicWatermarkAssigner { +final class AscendingTimestamps extends PeriodicWatermarkAssigner { var maxTimestamp: Long = Long.MinValue + 1 http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala index 957daca..8f7c235 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala @@ -21,13 +21,13 @@ package org.apache.flink.table.sources.wmstrategies import org.apache.flink.streaming.api.watermark.Watermark /** - * A watermark assigner for rowtime attributes which are out-of-order by a bounded time interval. + * A watermark strategy for rowtime attributes which are out-of-order by a bounded time interval. * * Emits watermarks which are the maximum observed timestamp minus the specified delay. * * @param delay The delay by which watermarks are behind the maximum observed timestamp. */ -class BoundedOutOfOrderTimestamps(val delay: Long) extends PeriodicWatermarkAssigner { +final class BoundedOutOfOrderTimestamps(val delay: Long) extends PeriodicWatermarkAssigner { var maxTimestamp: Long = Long.MinValue + delay http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala index 4c7f4e4..dd71bd3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala @@ -62,7 +62,7 @@ abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy { } /** A strategy which indicates the watermarks should be preserved from the underlying datastream.*/ -class PreserveWatermarks extends WatermarkStrategy +final class PreserveWatermarks extends WatermarkStrategy object PreserveWatermarks { val INSTANCE: PreserveWatermarks = new PreserveWatermarks } http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala new file mode 100644 index 0000000..253b491 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.typeutils + +import java.io.Serializable + +import org.apache.commons.codec.binary.Base64 +import org.apache.commons.lang3.StringEscapeUtils +import org.apache.flink.api.common.functions.InvalidTypesException +import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.typeutils._ +import org.apache.flink.table.api.{TableException, Types, ValidationException} +import org.apache.flink.table.descriptors.DescriptorProperties.normalizeTypeInfo +import org.apache.flink.util.InstantiationUtil + +import _root_.scala.language.implicitConversions +import _root_.scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers} + +/** + * Utilities to convert [[org.apache.flink.api.common.typeinfo.TypeInformation]] into a + * string representation and back. + */ +object TypeStringUtils extends JavaTokenParsers with PackratParsers { + case class Keyword(key: String) + + // convert the keyword into an case insensitive Parser + implicit def keyword2Parser(kw: Keyword): Parser[String] = { + ("""(?i)\Q""" + kw.key + """\E""").r + } + + lazy val VARCHAR: Keyword = Keyword("VARCHAR") + lazy val STRING: Keyword = Keyword("STRING") + lazy val BOOLEAN: Keyword = Keyword("BOOLEAN") + lazy val BYTE: Keyword = Keyword("BYTE") + lazy val TINYINT: Keyword = Keyword("TINYINT") + lazy val SHORT: Keyword = Keyword("SHORT") + lazy val SMALLINT: Keyword = Keyword("SMALLINT") + lazy val INT: Keyword = Keyword("INT") + lazy val LONG: Keyword = Keyword("LONG") + lazy val BIGINT: Keyword = Keyword("BIGINT") + lazy val FLOAT: Keyword = Keyword("FLOAT") + lazy val DOUBLE: Keyword = Keyword("DOUBLE") + lazy val DECIMAL: Keyword = Keyword("DECIMAL") + lazy val SQL_DATE: Keyword = Keyword("SQL_DATE") + lazy val DATE: Keyword = Keyword("DATE") + lazy val SQL_TIME: Keyword = Keyword("SQL_TIME") + lazy val TIME: Keyword = Keyword("TIME") + lazy val SQL_TIMESTAMP: Keyword = Keyword("SQL_TIMESTAMP") + lazy val TIMESTAMP: Keyword = Keyword("TIMESTAMP") + lazy val ROW: Keyword = Keyword("ROW") + lazy val ANY: Keyword = Keyword("ANY") + lazy val POJO: Keyword = Keyword("POJO") + + lazy val qualifiedName: Parser[String] = + """\p{javaJavaIdentifierStart}[\p{javaJavaIdentifierPart}.]*""".r + + lazy val base64Url: Parser[String] = + """[A-Za-z0-9_-]*""".r + + lazy val atomic: PackratParser[TypeInformation[_]] = + (VARCHAR | STRING) ^^ { e => Types.STRING } | + BOOLEAN ^^ { e => Types.BOOLEAN } | + (TINYINT | BYTE) ^^ { e => Types.BYTE } | + (SMALLINT | SHORT) ^^ { e => Types.SHORT } | + INT ^^ { e => Types.INT } | + (BIGINT | LONG) ^^ { e => Types.LONG } | + FLOAT ^^ { e => Types.FLOAT } | + DOUBLE ^^ { e => Types.DOUBLE } | + DECIMAL ^^ { e => Types.DECIMAL } | + (DATE | SQL_DATE) ^^ { e => Types.SQL_DATE.asInstanceOf[TypeInformation[_]] } | + (TIMESTAMP | SQL_TIMESTAMP) ^^ { e => Types.SQL_TIMESTAMP } | + (TIME | SQL_TIME) ^^ { e => Types.SQL_TIME } + + lazy val escapedFieldName: PackratParser[String] = "\"" ~> stringLiteral <~ "\"" ^^ { s => + StringEscapeUtils.unescapeJava(s) + } + + lazy val fieldName: PackratParser[String] = escapedFieldName | stringLiteral | ident + + lazy val field: PackratParser[(String, TypeInformation[_])] = + fieldName ~ typeInfo ^^ { + case name ~ info => (name, info) + } + + lazy val namedRow: PackratParser[TypeInformation[_]] = + ROW ~ "(" ~> rep1sep(field, ",") <~ ")" ^^ { + fields => Types.ROW(fields.map(_._1).toArray, fields.map(_._2).toArray) + } | failure("Named row type expected.") + + lazy val unnamedRow: PackratParser[TypeInformation[_]] = + ROW ~ "(" ~> rep1sep(typeInfo, ",") <~ ")" ^^ { + types => Types.ROW(types: _*) + } | failure("Unnamed row type expected.") + + lazy val generic: PackratParser[TypeInformation[_]] = + ANY ~ "(" ~> qualifiedName <~ ")" ^^ { + typeClass => + val clazz = loadClass(typeClass) + new GenericTypeInfo[AnyRef](clazz.asInstanceOf[Class[AnyRef]]) + } + + lazy val pojo: PackratParser[TypeInformation[_]] = POJO ~ "(" ~> qualifiedName <~ ")" ^^ { + typeClass => + val clazz = loadClass(typeClass) + val info = TypeExtractor.createTypeInfo(clazz) + if (!info.isInstanceOf[PojoTypeInfo[_]]) { + throw new ValidationException(s"Class '$typeClass'is not a POJO type.") + } + info + } + + lazy val any: PackratParser[TypeInformation[_]] = + ANY ~ "(" ~ qualifiedName ~ "," ~ base64Url ~ ")" ^^ { + case _ ~ _ ~ typeClass ~ _ ~ serialized ~ _=> + val clazz = loadClass(typeClass) + val typeInfo = deserialize(serialized) + + if (clazz != typeInfo.getTypeClass) { + throw new ValidationException( + s"Class '$typeClass' does no correspond to serialized data.") + } + typeInfo + } + + lazy val typeInfo: PackratParser[TypeInformation[_]] = + namedRow | unnamedRow | any | generic | pojo | atomic | failure("Invalid type.") + + def readTypeInfo(typeString: String): TypeInformation[_] = { + parseAll(typeInfo, typeString) match { + case Success(lst, _) => lst + + case NoSuccess(msg, next) => + throwError(msg, next) + } + } + + // ---------------------------------------------------------------------------------------------- + + def writeTypeInfo(typeInfo: TypeInformation[_]): String = typeInfo match { + + case Types.STRING => VARCHAR.key + case Types.BOOLEAN => BOOLEAN.key + case Types.BYTE => TINYINT.key + case Types.SHORT => SMALLINT.key + case Types.INT => INT.key + case Types.LONG => BIGINT.key + case Types.FLOAT => FLOAT.key + case Types.DOUBLE => DOUBLE.key + case Types.DECIMAL => DECIMAL.key + case Types.SQL_DATE => DATE.key + case Types.SQL_TIME => TIME.key + case Types.SQL_TIMESTAMP => TIMESTAMP.key + + case rt: RowTypeInfo => + val fields = rt.getFieldNames.zip(rt.getFieldTypes) + val normalizedFields = fields.map { f => + + // escape field name if it contains spaces + val name = if (!f._1.matches("\\S+")) { + "\"" + StringEscapeUtils.escapeJava(f._1) + "\"" + } else { + f._1 + } + + s"$name ${normalizeTypeInfo(f._2)}" + } + s"${ROW.key}(${normalizedFields.mkString(", ")})" + + case generic: GenericTypeInfo[_] => + s"${ANY.key}(${generic.getTypeClass.getName})" + + case pojo: PojoTypeInfo[_] => + // we only support very simple POJOs that only contain extracted fields + // (not manually specified) + val extractedInfo = try { + Some(TypeExtractor.createTypeInfo(pojo.getTypeClass)) + } catch { + case _: InvalidTypesException => None + } + extractedInfo match { + case Some(ei) if ei == pojo => s"${POJO.key}(${pojo.getTypeClass.getName})" + case _ => + throw new TableException( + "A string representation for custom POJO types is not supported yet.") + } + + case _: CompositeType[_] => + throw new TableException("A string representation for composite types is not supported yet.") + + case _: BasicArrayTypeInfo[_, _] | _: ObjectArrayTypeInfo[_, _] | + _: PrimitiveArrayTypeInfo[_] => + throw new TableException("A string representation for array types is not supported yet.") + + case _: MapTypeInfo[_, _] | _: MultisetTypeInfo[_] => + throw new TableException("A string representation for map types is not supported yet.") + + case any: TypeInformation[_] => + s"${ANY.key}(${any.getTypeClass.getName}, ${serialize(any)})" + } + + // ---------------------------------------------------------------------------------------------- + + private def serialize(obj: Serializable): String = { + try { + val byteArray = InstantiationUtil.serializeObject(obj) + Base64.encodeBase64URLSafeString(byteArray) + } catch { + case e: Exception => + throw new ValidationException(s"Unable to serialize type information '$obj' with " + + s"class '${obj.getClass.getName}'.", e) + } + } + + private def deserialize(data: String): TypeInformation[_] = { + val byteData = Base64.decodeBase64(data) + InstantiationUtil + .deserializeObject[TypeInformation[_]](byteData, Thread.currentThread.getContextClassLoader) + } + + private def throwError(msg: String, next: Input): Nothing = { + val improvedMsg = msg.replace("string matching regex `\\z'", "End of type information") + + throw new ValidationException( + s"""Could not parse type information at column ${next.pos.column}: $improvedMsg + |${next.pos.longString}""".stripMargin) + } + + private def loadClass(qualifiedName: String): Class[_] = try { + Class.forName(qualifiedName, true, Thread.currentThread.getContextClassLoader) + } catch { + case e: ClassNotFoundException => + throw new ValidationException("Class '" + qualifiedName + "' could not be found. " + + "Please note that inner classes must be globally accessible and declared static.", e) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory new file mode 100644 index 0000000..1b2506e --- /dev/null +++ b/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.table.sources.TestTableSourceFactory
