[FLINK-8240] [table] Create unified interfaces to configure and instatiate TableSources
This closes #5240. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2cb58960 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2cb58960 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2cb58960 Branch: refs/heads/master Commit: 2cb58960e78bee83d29fa66c2c29647353194f75 Parents: a5476cd Author: twalthr <[email protected]> Authored: Fri Dec 15 10:18:20 2017 +0100 Committer: twalthr <[email protected]> Committed: Wed Jan 31 12:05:43 2018 +0100 ---------------------------------------------------------------------- .../connectors/kafka/KafkaJsonTableSource.java | 3 +- .../flink/api/java/typeutils/PojoTypeInfo.java | 1 - .../flink/table/annotation/TableType.java | 6 +- ...pache.flink.table.sources.TableSourceFactory | 16 + .../flink/table/api/BatchTableEnvironment.scala | 24 +- .../table/api/StreamTableEnvironment.scala | 24 +- .../flink/table/api/TableEnvironment.scala | 30 +- .../org/apache/flink/table/api/exceptions.scala | 49 ++ .../table/api/java/BatchTableEnvironment.scala | 2 +- .../table/catalog/ExternalCatalogSchema.scala | 12 +- .../table/catalog/ExternalCatalogTable.scala | 290 ++++++++++- .../table/catalog/ExternalTableSourceUtil.scala | 112 +++-- .../table/catalog/TableSourceConverter.scala | 4 + .../BatchTableSourceDescriptor.scala | 72 +++ .../table/descriptors/ConnectorDescriptor.scala | 54 ++ .../ConnectorDescriptorValidator.scala | 39 ++ .../apache/flink/table/descriptors/Csv.scala | 166 +++++++ .../flink/table/descriptors/CsvValidator.scala | 53 ++ .../flink/table/descriptors/Descriptor.scala | 32 ++ .../descriptors/DescriptorProperties.scala | 489 +++++++++++++++++++ .../table/descriptors/DescriptorValidator.scala | 32 ++ .../flink/table/descriptors/FileSystem.scala | 60 +++ .../table/descriptors/FileSystemValidator.scala | 41 ++ .../table/descriptors/FormatDescriptor.scala | 49 ++ .../descriptors/FormatDescriptorValidator.scala | 39 ++ .../apache/flink/table/descriptors/Json.scala | 78 +++ .../flink/table/descriptors/JsonValidator.scala | 41 ++ .../flink/table/descriptors/Metadata.scala | 81 +++ .../table/descriptors/MetadataValidator.scala | 43 ++ .../flink/table/descriptors/Rowtime.scala | 133 +++++ .../table/descriptors/RowtimeValidator.scala | 134 +++++ .../apache/flink/table/descriptors/Schema.scala | 164 +++++++ .../table/descriptors/SchemaValidator.scala | 80 +++ .../flink/table/descriptors/Statistics.scala | 157 ++++++ .../table/descriptors/StatisticsValidator.scala | 119 +++++ .../StreamTableSourceDescriptor.scala | 75 +++ .../descriptors/TableSourceDescriptor.scala | 75 +++ .../flink/table/plan/stats/FlinkStatistic.scala | 2 +- .../flink/table/sources/CsvTableSource.scala | 2 +- .../table/sources/CsvTableSourceConverter.scala | 4 + .../table/sources/CsvTableSourceFactory.scala | 113 +++++ .../table/sources/TableSourceFactory.scala | 76 +++ .../sources/TableSourceFactoryService.scala | 144 ++++++ .../tsextractors/TimestampExtractor.scala | 2 +- .../wmstrategies/AscendingTimestamps.scala | 4 +- .../BoundedOutOfOrderTimestamps.scala | 4 +- .../wmstrategies/watermarkStrategies.scala | 2 +- .../flink/table/typeutils/TypeStringUtils.scala | 252 ++++++++++ ...pache.flink.table.sources.TableSourceFactory | 16 + .../flink/table/api/TableSourceTest.scala | 38 ++ .../catalog/ExternalCatalogSchemaTest.scala | 4 +- .../catalog/ExternalTableSourceUtilTest.scala | 6 +- .../flink/table/descriptors/CsvTest.scala | 102 ++++ .../table/descriptors/DescriptorTestBase.scala | 54 ++ .../table/descriptors/FileSystemTest.scala | 53 ++ .../flink/table/descriptors/JsonTest.scala | 77 +++ .../flink/table/descriptors/MetadataTest.scala | 55 +++ .../flink/table/descriptors/RowtimeTest.scala | 72 +++ .../flink/table/descriptors/SchemaTest.scala | 76 +++ .../table/descriptors/StatisticsTest.scala | 91 ++++ .../StreamTableSourceDescriptorTest.scala | 79 +++ .../sources/TableSourceFactoryServiceTest.scala | 82 ++++ .../table/sources/TestTableSourceFactory.scala | 60 +++ .../table/typeutils/TypeStringUtilsTest.scala | 104 ++++ .../table/utils/MockTableEnvironment.scala | 7 +- 65 files changed, 4383 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java index 7de7f34..b3b545e 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java @@ -161,7 +161,8 @@ public abstract class KafkaJsonTableSource extends KafkaTableSource implements D /** * Sets flag whether to fail if a field is missing or not. * - * @param failOnMissingField If set to true, the TableSource fails if a missing fields. + * @param failOnMissingField If set to true, the TableSource fails if there is a missing + * field. * If set to false, a missing field is set to null. * @return The builder. */ http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java index 211b7ef..7f41f5d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java @@ -268,7 +268,6 @@ public class PojoTypeInfo<T> extends CompositeType<T> { return new PojoTypeComparatorBuilder(); } - // used for testing. Maybe use mockito here @PublicEvolving public PojoField getPojoFieldAt(int pos) { if (pos < 0 || pos >= this.fields.length) { http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java index 2d2a7af..c564528 100644 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java +++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java @@ -18,7 +18,6 @@ package org.apache.flink.table.annotation; -import org.apache.flink.annotation.Public; import org.apache.flink.table.catalog.TableSourceConverter; import java.lang.annotation.Documented; @@ -29,11 +28,14 @@ import java.lang.annotation.Target; /** * Annotates a table type of a {@link TableSourceConverter}. + * + * @deprecated Use the more generic [[org.apache.flink.table.sources.TableSourceFactory]] interface + * with Java service loaders instead. */ @Documented @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) -@Public +@Deprecated public @interface TableType { /** http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory new file mode 100644 index 0000000..ff43eed --- /dev/null +++ b/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.table.sources.CsvTableSourceFactory http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index c920d23..05255fd 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -86,17 +86,20 @@ abstract class BatchTableEnvironment( } /** Returns a unique table name according to the internal naming pattern. */ - protected def createUniqueTableName(): String = "_DataSetTable_" + nameCntr.getAndIncrement() + override protected def createUniqueTableName(): String = + "_DataSetTable_" + nameCntr.getAndIncrement() /** - * Registers an external [[BatchTableSource]] in this [[TableEnvironment]]'s catalog. - * Registered tables can be referenced in SQL queries. + * Registers an internal [[BatchTableSource]] in this [[TableEnvironment]]'s catalog without + * name checking. Registered tables can be referenced in SQL queries. * * @param name The name under which the [[TableSource]] is registered. * @param tableSource The [[TableSource]] to register. */ - override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = { - checkValidTableName(name) + override protected def registerTableSourceInternal( + name: String, + tableSource: TableSource[_]) + : Unit = { tableSource match { case batchTableSource: BatchTableSource[_] => @@ -107,6 +110,17 @@ abstract class BatchTableEnvironment( } } +// TODO expose this once we have enough table source factories that can deal with it +// /** +// * Creates a table from a descriptor that describes the source connector, source encoding, +// * the resulting table schema, and other properties. +// * +// * @param connectorDescriptor connector descriptor describing the source of the table +// */ +// def from(connectorDescriptor: ConnectorDescriptor): BatchTableSourceDescriptor = { +// new BatchTableSourceDescriptor(this, connectorDescriptor) +// } + /** * Registers an external [[TableSink]] with given field names and types in this * [[TableEnvironment]]'s catalog. http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index 9d94f54..84d7240 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -97,17 +97,20 @@ abstract class StreamTableEnvironment( } /** Returns a unique table name according to the internal naming pattern. */ - protected def createUniqueTableName(): String = "_DataStreamTable_" + nameCntr.getAndIncrement() + override protected def createUniqueTableName(): String = + "_DataStreamTable_" + nameCntr.getAndIncrement() /** - * Registers an external [[StreamTableSource]] in this [[TableEnvironment]]'s catalog. - * Registered tables can be referenced in SQL queries. + * Registers an internal [[StreamTableSource]] in this [[TableEnvironment]]'s catalog without + * name checking. Registered tables can be referenced in SQL queries. * * @param name The name under which the [[TableSource]] is registered. * @param tableSource The [[TableSource]] to register. */ - override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = { - checkValidTableName(name) + override protected def registerTableSourceInternal( + name: String, + tableSource: TableSource[_]) + : Unit = { tableSource match { case streamTableSource: StreamTableSource[_] => @@ -125,6 +128,17 @@ abstract class StreamTableEnvironment( } } +// TODO expose this once we have enough table source factories that can deal with it +// /** +// * Creates a table from a descriptor that describes the source connector, source encoding, +// * the resulting table schema, and other properties. +// * +// * @param connectorDescriptor connector descriptor describing the source of the table +// */ +// def from(connectorDescriptor: ConnectorDescriptor): StreamTableSourceDescriptor = { +// new StreamTableSourceDescriptor(this, connectorDescriptor) +// } + /** * Registers an external [[TableSink]] with given field names and types in this * [[TableEnvironment]]'s catalog. http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 2569cd4..3b314e9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -290,6 +290,17 @@ abstract class TableEnvironment(val config: TableConfig) { } /** + * Creates a table from a table source. + * + * @param source table source used as table + */ + def fromTableSource(source: TableSource[_]): Table = { + val name = createUniqueTableName() + registerTableSourceInternal(name, source) + scan(name) + } + + /** * Registers an [[ExternalCatalog]] under a unique name in the TableEnvironment's schema. * All tables registered in the [[ExternalCatalog]] can be accessed. * @@ -302,7 +313,7 @@ abstract class TableEnvironment(val config: TableConfig) { } this.externalCatalogs.put(name, externalCatalog) // create an external catalog Calcite schema, register it on the root schema - ExternalCatalogSchema.registerCatalog(rootSchema, name, externalCatalog) + ExternalCatalogSchema.registerCatalog(this, rootSchema, name, externalCatalog) } /** @@ -422,7 +433,19 @@ abstract class TableEnvironment(val config: TableConfig) { * @param name The name under which the [[TableSource]] is registered. * @param tableSource The [[TableSource]] to register. */ - def registerTableSource(name: String, tableSource: TableSource[_]): Unit + def registerTableSource(name: String, tableSource: TableSource[_]): Unit = { + checkValidTableName(name) + registerTableSourceInternal(name, tableSource) + } + + /** + * Registers an internal [[TableSource]] in this [[TableEnvironment]]'s catalog without + * name checking. Registered tables can be referenced in SQL queries. + * + * @param name The name under which the [[TableSource]] is registered. + * @param tableSource The [[TableSource]] to register. + */ + protected def registerTableSourceInternal(name: String, tableSource: TableSource[_]): Unit /** * Registers an external [[TableSink]] with given field names and types in this @@ -714,6 +737,9 @@ abstract class TableEnvironment(val config: TableConfig) { } } + /** Returns a unique table name according to the internal naming pattern. */ + protected def createUniqueTableName(): String + /** * Checks if the chosen table name is valid. * http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala index 7ea17fa..f5a713a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala @@ -39,6 +39,9 @@ case class SqlParserException( /** * General Exception for all errors during table handling. + * + * This exception indicates that an internal error occurred or that a feature is not supported + * yet. Usually, this exception does not indicate a fault of the user. */ case class TableException( msg: String, @@ -55,6 +58,8 @@ object TableException { /** * Exception for all errors occurring during validation phase. + * + * This exception indicates that the user did something wrong. */ case class ValidationException( msg: String, @@ -137,11 +142,51 @@ case class CatalogAlreadyExistException( } /** + * Exception for not finding a [[org.apache.flink.table.sources.TableSourceFactory]] for the + * given properties. + * + * @param properties properties that describe the table source + * @param cause the cause + */ +case class NoMatchingTableSourceException( + properties: Map[String, String], + cause: Throwable) + extends RuntimeException( + s"Could not find a table source factory in the classpath satisfying the " + + s"following properties: \n${properties.map(e => e._1 + "=" + e._2 ).mkString("\n")}", + cause) { + + def this(properties: Map[String, String]) = this(properties, null) +} + +/** + * Exception for finding more than one [[org.apache.flink.table.sources.TableSourceFactory]] for + * the given properties. + * + * @param properties properties that describe the table source + * @param cause the cause + */ +case class AmbiguousTableSourceException( + properties: Map[String, String], + cause: Throwable) + extends RuntimeException( + s"More than one table source factory in the classpath satisfying the " + + s"following properties: \n${properties.map(e => e._1 + "=" + e._2 ).mkString("\n")}", + cause) { + + def this(properties: Map[String, String]) = this(properties, null) +} + +/** * Exception for not finding a [[TableSourceConverter]] for a given table type. * * @param tableType table type * @param cause the cause + * @deprecated Use table source factories instead + * (see [[org.apache.flink.table.sources.TableSourceFactory]]). */ +@Deprecated +@deprecated("Use table factories (see TableSourceFactory) instead.") case class NoMatchedTableSourceConverterException( tableType: String, cause: Throwable) @@ -156,7 +201,11 @@ case class NoMatchedTableSourceConverterException( * * @param tableType table type * @param cause the cause + * @deprecated Use table source factories instead + * (see [[org.apache.flink.table.sources.TableSourceFactory]]). */ +@Deprecated +@deprecated("Use table factories (see TableSourceFactory) instead.") case class AmbiguousTableSourceConverterException( tableType: String, cause: Throwable) http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala index b79d161..f8f35eb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala @@ -20,8 +20,8 @@ package org.apache.flink.table.api.java import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} -import org.apache.flink.table.expressions.ExpressionParser import org.apache.flink.table.api._ +import org.apache.flink.table.expressions.ExpressionParser import org.apache.flink.table.functions.{AggregateFunction, TableFunction} /** http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala index d87d665..776ddee 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala @@ -22,7 +22,7 @@ import java.util.{Collection => JCollection, Collections => JCollections, Linked import org.apache.calcite.linq4j.tree.Expression import org.apache.calcite.schema._ -import org.apache.flink.table.api.{CatalogNotExistException, TableNotExistException} +import org.apache.flink.table.api.{CatalogNotExistException, TableEnvironment, TableNotExistException} import org.apache.flink.table.util.Logging import scala.collection.JavaConverters._ @@ -33,10 +33,12 @@ import scala.collection.JavaConverters._ * The external catalog and all included sub-catalogs and tables is registered as * sub-schemas and tables in Calcite. * + * @param tableEnv the environment for this schema * @param catalogIdentifier external catalog name * @param catalog external catalog */ class ExternalCatalogSchema( + tableEnv: TableEnvironment, catalogIdentifier: String, catalog: ExternalCatalog) extends Schema with Logging { @@ -50,7 +52,7 @@ class ExternalCatalogSchema( override def getSubSchema(name: String): Schema = { try { val db = catalog.getSubCatalog(name) - new ExternalCatalogSchema(name, db) + new ExternalCatalogSchema(tableEnv, name, db) } catch { case _: CatalogNotExistException => LOG.warn(s"Sub-catalog $name does not exist in externalCatalog $catalogIdentifier") @@ -75,7 +77,7 @@ class ExternalCatalogSchema( */ override def getTable(name: String): Table = try { val externalCatalogTable = catalog.getTable(name) - ExternalTableSourceUtil.fromExternalCatalogTable(externalCatalogTable) + ExternalTableSourceUtil.fromExternalCatalogTable(tableEnv, externalCatalogTable) } catch { case TableNotExistException(table, _, _) => { LOG.warn(s"Table $table does not exist in externalCatalog $catalogIdentifier") @@ -111,15 +113,17 @@ object ExternalCatalogSchema { /** * Registers an external catalog in a Calcite schema. * + * @param tableEnv The environment the catalog will be part of. * @param parentSchema Parent schema into which the catalog is registered * @param externalCatalogIdentifier Identifier of the external catalog * @param externalCatalog The external catalog to register */ def registerCatalog( + tableEnv: TableEnvironment, parentSchema: SchemaPlus, externalCatalogIdentifier: String, externalCatalog: ExternalCatalog): Unit = { - val newSchema = new ExternalCatalogSchema(externalCatalogIdentifier, externalCatalog) + val newSchema = new ExternalCatalogSchema(tableEnv, externalCatalogIdentifier, externalCatalog) val schemaPlusOfNewSchema = parentSchema.add(externalCatalogIdentifier, newSchema) newSchema.registerSubSchemas(schemaPlusOfNewSchema) } http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala index ae20718..a7c20ac 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala @@ -18,28 +18,296 @@ package org.apache.flink.table.catalog -import java.util.{HashMap => JHashMap, Map => JMap} import java.lang.{Long => JLong} +import java.util.{HashMap => JHashMap, Map => JMap} -import org.apache.flink.table.api.TableSchema +import org.apache.flink.table.api.{TableException, TableSchema} +import org.apache.flink.table.catalog.ExternalCatalogTable._ +import org.apache.flink.table.descriptors.MetadataValidator.{METADATA_COMMENT, METADATA_CREATION_TIME, METADATA_LAST_ACCESS_TIME} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.plan.stats.TableStats +import scala.collection.JavaConverters._ + /** * Defines a table in an [[ExternalCatalog]]. * - * @param tableType Table type, e.g csv, hbase, kafka - * @param schema Schema of the table (column names and types) - * @param properties Properties of the table - * @param stats Statistics of the table - * @param comment Comment of the table - * @param createTime Create timestamp of the table - * @param lastAccessTime Timestamp of last access of the table + * For backwards compatibility this class supports both the legacy table type (see + * [[org.apache.flink.table.annotation.TableType]]) and the factory-based (see + * [[org.apache.flink.table.sources.TableSourceFactory]]) approach. + * + * @param connectorDesc describes the system to connect to + * @param formatDesc describes the data format of a connector + * @param schemaDesc describes the schema of the result table + * @param statisticsDesc describes the estimated statistics of the result table + * @param metadataDesc describes additional metadata of a table */ -case class ExternalCatalogTable( +class ExternalCatalogTable( + connectorDesc: ConnectorDescriptor, + formatDesc: Option[FormatDescriptor], + schemaDesc: Option[Schema], + statisticsDesc: Option[Statistics], + metadataDesc: Option[Metadata]) + extends TableSourceDescriptor(connectorDesc) { + + this.formatDescriptor = formatDesc + this.schemaDescriptor = schemaDesc + this.statisticsDescriptor = statisticsDesc + this.metaDescriptor = metadataDesc + + // expose statistics for external table source util + override def getTableStats: Option[TableStats] = super.getTableStats + + // ---------------------------------------------------------------------------------------------- + // NOTE: the following code is used for backwards compatibility to the TableType approach + // ---------------------------------------------------------------------------------------------- + + /** + * Returns the legacy table type of an external catalog table. + * + * @deprecated for backwards compatibility. + */ + @Deprecated + @deprecated("For backwards compatibility.") + lazy val tableType: String = { + val props = new DescriptorProperties() + connectorDesc.addProperties(props) + props + .getString(CONNECTOR_LEGACY_TYPE) + .getOrElse(throw new TableException("Could not find a legacy table type to return.")) + } + + /** + * Returns the legacy schema of an external catalog table. + * + * @deprecated for backwards compatibility. + */ + @Deprecated + @deprecated("For backwards compatibility.") + lazy val schema: TableSchema = { + val props = new DescriptorProperties() + connectorDesc.addProperties(props) + props + .getTableSchema(CONNECTOR_LEGACY_SCHEMA) + .getOrElse(throw new TableException("Could not find a legacy schema to return.")) + } + + /** + * Returns the legacy properties of an external catalog table. + * + * @deprecated for backwards compatibility. + */ + @Deprecated + @deprecated("For backwards compatibility.") + lazy val properties: JMap[String, String] = { + // skip normalization + val props = new DescriptorProperties(normalizeKeys = false) + val legacyProps = new JHashMap[String, String]() + connectorDesc.addProperties(props) + props.asMap.flatMap { case (k, v) => + if (k.startsWith(CONNECTOR_LEGACY_PROPERTY)) { + // remove "connector.legacy-property-" + Some(legacyProps.put(k.substring(CONNECTOR_LEGACY_PROPERTY.length + 1), v)) + } else { + None + } + } + legacyProps + } + + /** + * Returns the legacy statistics of an external catalog table. + * + * @deprecated for backwards compatibility. + */ + @Deprecated + @deprecated("For backwards compatibility.") + lazy val stats: TableStats = getTableStats.orNull + + /** + * Returns the legacy comment of an external catalog table. + * + * @deprecated for backwards compatibility. + */ + @Deprecated + @deprecated("For backwards compatibility.") + lazy val comment: String = { + val normalizedProps = new DescriptorProperties() + + metadataDesc match { + case Some(meta) => + meta.addProperties(normalizedProps) + normalizedProps.getString(METADATA_COMMENT).orNull + case None => + null + } + } + + /** + * Returns the legacy creation time of an external catalog table. + * + * @deprecated for backwards compatibility. + */ + @Deprecated + @deprecated("For backwards compatibility.") + lazy val createTime: JLong = { + val normalizedProps = new DescriptorProperties() + + metadataDesc match { + case Some(meta) => + meta.addProperties(normalizedProps) + normalizedProps.getLong(METADATA_CREATION_TIME).map(v => Long.box(v)).orNull + case None => + null + } + } + + /** + * Returns the legacy last access time of an external catalog table. + * + * @deprecated for backwards compatibility. + */ + @Deprecated + @deprecated("For backwards compatibility.") + lazy val lastAccessTime: JLong = { + val normalizedProps = new DescriptorProperties() + + metadataDesc match { + case Some(meta) => + meta.addProperties(normalizedProps) + normalizedProps.getLong(METADATA_LAST_ACCESS_TIME).map(v => Long.box(v)).orNull + case None => + null + } + } + + /** + * Defines a table in an [[ExternalCatalog]]. + * + * @param tableType Table type, e.g csv, hbase, kafka + * @param schema Schema of the table (column names and types) + * @param properties Properties of the table + * @param stats Statistics of the table + * @param comment Comment of the table + * @param createTime Create timestamp of the table + * @param lastAccessTime Timestamp of last access of the table + * @deprecated Use a descriptor-based constructor instead. + */ + @Deprecated + @deprecated("Use a descriptor-based constructor instead.") + def this( + tableType: String, + schema: TableSchema, + properties: JMap[String, String] = new JHashMap(), + stats: TableStats = null, + comment: String = null, + createTime: JLong = System.currentTimeMillis, + lastAccessTime: JLong = -1L) = { + + this( + toConnectorDescriptor(tableType, schema, properties), + None, + None, + Some(toStatisticsDescriptor(stats)), + Some(toMetadataDescriptor(comment, createTime, lastAccessTime))) + } + + /** + * Returns whether this external catalog table uses the legacy table type. + * + * @deprecated for backwards compatibility. + */ + @Deprecated + @deprecated("For backwards compatibility.") + def isLegacyTableType: Boolean = connectorDesc.isInstanceOf[TableTypeConnector] +} + +object ExternalCatalogTable { + + val CONNECTOR_TYPE_VALUE = "legacy-table-type" + val CONNECTOR_LEGACY_TYPE = "connector.legacy-type" + val CONNECTOR_LEGACY_SCHEMA = "connector.legacy-schema" + val CONNECTOR_LEGACY_PROPERTY = "connector.legacy-property" + + /** + * Defines a table in an [[ExternalCatalog]]. + * + * @param tableType Table type, e.g csv, hbase, kafka + * @param schema Schema of the table (column names and types) + * @param properties Properties of the table + * @param stats Statistics of the table + * @param comment Comment of the table + * @param createTime Create timestamp of the table + * @param lastAccessTime Timestamp of last access of the table + * @deprecated Use a descriptor-based constructor instead. + */ + @Deprecated + @deprecated("Use a descriptor-based constructor instead.") + def apply( tableType: String, schema: TableSchema, properties: JMap[String, String] = new JHashMap(), stats: TableStats = null, comment: String = null, createTime: JLong = System.currentTimeMillis, - lastAccessTime: JLong = -1L) + lastAccessTime: JLong = -1L): ExternalCatalogTable = { + + new ExternalCatalogTable( + tableType, + schema, + properties, + stats, + comment, + createTime, + lastAccessTime) + } + + class TableTypeConnector( + tableType: String, + schema: TableSchema, + legacyProperties: JMap[String, String]) + extends ConnectorDescriptor(CONNECTOR_TYPE_VALUE, version = 1) { + + override protected def addConnectorProperties(properties: DescriptorProperties): Unit = { + properties.putString(CONNECTOR_LEGACY_TYPE, tableType) + properties.putTableSchema(CONNECTOR_LEGACY_SCHEMA, schema) + legacyProperties.asScala.foreach { case (k, v) => + properties.putString(s"$CONNECTOR_LEGACY_PROPERTY-$k", v) + } + } + + override private[flink] def needsFormat() = false + } + + def toConnectorDescriptor( + tableType: String, + schema: TableSchema, + properties: JMap[String, String]) + : ConnectorDescriptor = { + + new TableTypeConnector(tableType, schema, properties) + } + + def toStatisticsDescriptor(stats: TableStats): Statistics = { + val statsDesc = Statistics() + if (stats != null) { + statsDesc.tableStats(stats) + } + statsDesc + } + + def toMetadataDescriptor( + comment: String, + createTime: JLong, + lastAccessTime: JLong) + : Metadata = { + + val metadataDesc = Metadata() + if (comment != null) { + metadataDesc.comment(comment) + } + metadataDesc + .creationTime(createTime) + .lastAccessTime(lastAccessTime) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala index d2e297f..3bc5dc0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala @@ -23,22 +23,74 @@ import java.net.URL import org.apache.commons.configuration.{ConfigurationException, ConversionException, PropertiesConfiguration} import org.apache.flink.annotation.VisibleForTesting import org.apache.flink.table.annotation.TableType -import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, NoMatchedTableSourceConverterException, TableException} +import org.apache.flink.table.api._ import org.apache.flink.table.plan.schema.{BatchTableSourceTable, StreamTableSourceTable, TableSourceTable} import org.apache.flink.table.plan.stats.FlinkStatistic -import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, TableSource} +import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, TableSource, TableSourceFactoryService} import org.apache.flink.table.util.Logging import org.apache.flink.util.InstantiationUtil import org.reflections.Reflections -import scala.collection.JavaConverters._ -import scala.collection.mutable +import _root_.scala.collection.JavaConverters._ +import _root_.scala.collection.mutable /** * The utility class is used to convert ExternalCatalogTable to TableSourceTable. */ object ExternalTableSourceUtil extends Logging { + /** + * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance + * + * @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert + * @return converted [[TableSourceTable]] instance from the input catalog table + */ + def fromExternalCatalogTable( + tableEnv: TableEnvironment, + externalCatalogTable: ExternalCatalogTable) + : TableSourceTable[_] = { + + // check for the legacy external catalog path + if (externalCatalogTable.isLegacyTableType) { + LOG.warn("External catalog tables based on TableType annotations are deprecated. " + + "Please consider updating them to TableSourceFactories.") + fromExternalCatalogTableType(externalCatalogTable) + } + // use the factory approach + else { + val source = TableSourceFactoryService.findTableSourceFactory(externalCatalogTable) + tableEnv match { + // check for a batch table source in this batch environment + case _: BatchTableEnvironment => + source match { + case bts: BatchTableSource[_] => + new BatchTableSourceTable( + bts, + new FlinkStatistic(externalCatalogTable.getTableStats)) + case _ => throw new TableException( + s"Found table source '${source.getClass.getCanonicalName}' is not applicable " + + s"in a batch environment.") + } + // check for a stream table source in this streaming environment + case _: StreamTableEnvironment => + source match { + case sts: StreamTableSource[_] => + new StreamTableSourceTable( + sts, + new FlinkStatistic(externalCatalogTable.getTableStats)) + case _ => throw new TableException( + s"Found table source '${source.getClass.getCanonicalName}' is not applicable " + + s"in a streaming environment.") + } + case _ => throw new TableException("Unsupported table environment.") + } + } + } + + // ---------------------------------------------------------------------------------------------- + // NOTE: the following lines can be removed once we drop support for TableType + // ---------------------------------------------------------------------------------------------- + // config file to specify scan package to search TableSourceConverter private val tableSourceConverterConfigFileName = "tableSourceConverter.properties" @@ -48,7 +100,7 @@ object ExternalTableSourceUtil extends Logging { val registeredConverters = new mutable.HashMap[String, mutable.Set[Class[_ <: TableSourceConverter[_]]]] with mutable.MultiMap[String, Class[_ <: TableSourceConverter[_]]] - // scan all config files to find TableSourceConverters which are annotationed with TableType. + // scan all config files to find TableSourceConverters which are annotated with TableType. val resourceUrls = getClass.getClassLoader.getResources(tableSourceConverterConfigFileName) while (resourceUrls.hasMoreElements) { val url = resourceUrls.nextElement() @@ -89,12 +141,31 @@ object ExternalTableSourceUtil extends Logging { } /** - * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance + * Parses scan package set from input config file * - * @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert - * @return converted [[TableSourceTable]] instance from the input catalog table + * @param url url of config file + * @return scan package set */ - def fromExternalCatalogTable(externalCatalogTable: ExternalCatalogTable): TableSourceTable[_] = { + private def parseScanPackagesFromConfigFile(url: URL): Set[String] = { + try { + val config = new PropertiesConfiguration(url) + config.setListDelimiter(',') + config.getStringArray("scan.packages").filterNot(_.isEmpty).toSet + } catch { + case e: ConfigurationException => + LOG.warn(s"Error happened while loading the properties file [$url]", e) + Set.empty + case e1: ConversionException => + LOG.warn(s"Error happened while parsing 'scan.packages' field of properties file [$url]. " + + s"The value is not a String or List of Strings.", e1) + Set.empty + } + } + + @VisibleForTesting + def fromExternalCatalogTableType(externalCatalogTable: ExternalCatalogTable) + : TableSourceTable[_] = { + val tableType = externalCatalogTable.tableType val propertyKeys = externalCatalogTable.properties.keySet() tableTypeToTableSourceConvertersClazz.get(tableType) match { @@ -141,27 +212,4 @@ object ExternalTableSourceUtil extends Logging { throw new NoMatchedTableSourceConverterException(tableType) } } - - /** - * Parses scan package set from input config file - * - * @param url url of config file - * @return scan package set - */ - private def parseScanPackagesFromConfigFile(url: URL): Set[String] = { - try { - val config = new PropertiesConfiguration(url) - config.setListDelimiter(',') - config.getStringArray("scan.packages").filterNot(_.isEmpty).toSet - } catch { - case e: ConfigurationException => - LOG.warn(s"Error happened while loading the properties file [$url]", e) - Set.empty - case e1: ConversionException => - LOG.warn(s"Error happened while parsing 'scan.packages' field of properties file [$url]. " + - s"The value is not a String or List of Strings.", e1) - Set.empty - } - } - } http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala index ca6df9a..e6248b0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala @@ -29,7 +29,11 @@ import org.apache.flink.table.sources.TableSource * table is supported. * * @tparam T The [[TableSource]] to be created by this converter. + * + * @deprecated Use the more generic [[org.apache.flink.table.sources.TableSourceFactory]] instead. */ +@Deprecated +@deprecated("Use the more generic table source factories instead.") trait TableSourceConverter[T <: TableSource[_]] { /** http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala new file mode 100644 index 0000000..3c8366d --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.{BatchTableEnvironment, Table, TableException} +import org.apache.flink.table.sources.{BatchTableSource, TableSource, TableSourceFactoryService} + +class BatchTableSourceDescriptor(tableEnv: BatchTableEnvironment, connector: ConnectorDescriptor) + extends TableSourceDescriptor(connector) { + + /** + * Searches for the specified table source, configures it accordingly, and returns it. + */ + def toTableSource: TableSource[_] = { + val source = TableSourceFactoryService.findTableSourceFactory(this) + source match { + case _: BatchTableSource[_] => source + case _ => throw new TableException( + s"Found table source '${source.getClass.getCanonicalName}' is not applicable " + + s"in a batch environment.") + } + } + + /** + * Searches for the specified table source, configures it accordingly, and returns it as a table. + */ + def toTable: Table = { + tableEnv.fromTableSource(toTableSource) + } + + /** + * Searches for the specified table source, configures it accordingly, and registers it as + * a table under the given name. + * + * @param name table name to be registered in the table environment + */ + def register(name: String): Unit = { + tableEnv.registerTableSource(name, toTableSource) + } + + /** + * Specifies the format that defines how to read data from a connector. + */ + def withFormat(format: FormatDescriptor): BatchTableSourceDescriptor = { + formatDescriptor = Some(format) + this + } + + /** + * Specifies the resulting table schema. + */ + def withSchema(schema: Schema): BatchTableSourceDescriptor = { + schemaDescriptor = Some(schema) + this + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala new file mode 100644 index 0000000..f691b4f --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE, CONNECTOR_VERSION} + +/** + * Describes a connector to an other system. + * + * @param tpe string identifier for the connector + */ +abstract class ConnectorDescriptor( + private val tpe: String, + private val version: Int) + extends Descriptor { + + override def toString: String = this.getClass.getSimpleName + + /** + * Internal method for properties conversion. + */ + final private[flink] def addProperties(properties: DescriptorProperties): Unit = { + properties.putString(CONNECTOR_TYPE, tpe) + properties.putLong(CONNECTOR_VERSION, version) + addConnectorProperties(properties) + } + + /** + * Internal method for connector properties conversion. + */ + protected def addConnectorProperties(properties: DescriptorProperties): Unit + + /** + * Internal method that defines if this connector requires a format descriptor. + */ + private[flink] def needsFormat(): Boolean + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala new file mode 100644 index 0000000..8ab0f45 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE, CONNECTOR_VERSION} + +/** + * Validator for [[ConnectorDescriptor]]. + */ +class ConnectorDescriptorValidator extends DescriptorValidator { + + override def validate(properties: DescriptorProperties): Unit = { + properties.validateString(CONNECTOR_TYPE, isOptional = false, minLen = 1) + properties.validateInt(CONNECTOR_VERSION, isOptional = true, 0, Integer.MAX_VALUE) + } +} + +object ConnectorDescriptorValidator { + + val CONNECTOR_TYPE = "connector.type" + val CONNECTOR_VERSION = "connector.version" + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Csv.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Csv.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Csv.scala new file mode 100644 index 0000000..0493d99 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Csv.scala @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.{TableSchema, ValidationException} +import org.apache.flink.table.descriptors.CsvValidator._ + +import scala.collection.mutable + +/** + * Format descriptor for comma-separated values (CSV). + */ +class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, version = 1) { + + private var fieldDelim: Option[String] = None + private var lineDelim: Option[String] = None + private val formatSchema: mutable.LinkedHashMap[String, String] = + mutable.LinkedHashMap[String, String]() + private var quoteCharacter: Option[Character] = None + private var commentPrefix: Option[String] = None + private var isIgnoreFirstLine: Option[Boolean] = None + private var lenient: Option[Boolean] = None + + /** + * Sets the field delimiter, "," by default. + * + * @param delim the field delimiter + */ + def fieldDelimiter(delim: String): Csv = { + this.fieldDelim = Some(delim) + this + } + + /** + * Sets the line delimiter, "\n" by default. + * + * @param delim the line delimiter + */ + def lineDelimiter(delim: String): Csv = { + this.lineDelim = Some(delim) + this + } + + /** + * Sets the format schema with field names and the types. Required. + * The table schema must not contain nested fields. + * + * This method overwrites existing fields added with [[field()]]. + * + * @param schema the table schema + */ + def schema(schema: TableSchema): Csv = { + this.formatSchema.clear() + DescriptorProperties.normalizeTableSchema(schema).foreach { + case (n, t) => field(n, t) + } + this + } + + /** + * Adds a format field with the field name and the type information. Required. + * This method can be called multiple times. The call order of this method defines + * also the order of the fields in the format. + * + * @param fieldName the field name + * @param fieldType the type information of the field + */ + def field(fieldName: String, fieldType: TypeInformation[_]): Csv = { + field(fieldName, DescriptorProperties.normalizeTypeInfo(fieldType)) + this + } + + /** + * Adds a format field with the field name and the type string. Required. + * This method can be called multiple times. The call order of this method defines + * also the order of the fields in the format. + * + * @param fieldName the field name + * @param fieldType the type string of the field + */ + def field(fieldName: String, fieldType: String): Csv = { + if (formatSchema.contains(fieldName)) { + throw new ValidationException(s"Duplicate field name $fieldName.") + } + formatSchema += (fieldName -> fieldType) + this + } + + /** + * Sets a quote character for String values, null by default. + * + * @param quote the quote character + */ + def quoteCharacter(quote: Character): Csv = { + this.quoteCharacter = Option(quote) + this + } + + /** + * Sets a prefix to indicate comments, null by default. + * + * @param prefix the prefix to indicate comments + */ + def commentPrefix(prefix: String): Csv = { + this.commentPrefix = Option(prefix) + this + } + + /** + * Ignore the first line. Not skip the first line by default. + */ + def ignoreFirstLine(): Csv = { + this.isIgnoreFirstLine = Some(true) + this + } + + /** + * Skip records with parse error instead to fail. Throw an exception by default. + */ + def ignoreParseErrors(): Csv = { + this.lenient = Some(true) + this + } + + /** + * Internal method for format properties conversion. + */ + override protected def addFormatProperties(properties: DescriptorProperties): Unit = { + fieldDelim.foreach(properties.putString(FORMAT_FIELD_DELIMITER, _)) + lineDelim.foreach(properties.putString(FORMAT_LINE_DELIMITER, _)) + properties.putTableSchema(FORMAT_FIELDS, formatSchema.toIndexedSeq) + quoteCharacter.foreach(properties.putCharacter(FORMAT_QUOTE_CHARACTER, _)) + commentPrefix.foreach(properties.putString(FORMAT_COMMENT_PREFIX, _)) + isIgnoreFirstLine.foreach(properties.putBoolean(FORMAT_IGNORE_FIRST_LINE, _)) + lenient.foreach(properties.putBoolean(FORMAT_IGNORE_PARSE_ERRORS, _)) + } +} + +/** + * Format descriptor for comma-separated values (CSV). + */ +object Csv { + + /** + * Format descriptor for comma-separated values (CSV). + */ + def apply(): Csv = new Csv() + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/CsvValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/CsvValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/CsvValidator.scala new file mode 100644 index 0000000..d49314e --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/CsvValidator.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.descriptors.CsvValidator._ +import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE + +/** + * Validator for [[Csv]]. + */ +class CsvValidator extends FormatDescriptorValidator { + + override def validate(properties: DescriptorProperties): Unit = { + super.validate(properties) + properties.validateValue(FORMAT_TYPE, FORMAT_TYPE_VALUE, isOptional = false) + properties.validateString(FORMAT_FIELD_DELIMITER, isOptional = true, minLen = 1) + properties.validateString(FORMAT_LINE_DELIMITER, isOptional = true, minLen = 1) + properties.validateString(FORMAT_QUOTE_CHARACTER, isOptional = true, minLen = 1, maxLen = 1) + properties.validateString(FORMAT_COMMENT_PREFIX, isOptional = true, minLen = 1) + properties.validateBoolean(FORMAT_IGNORE_FIRST_LINE, isOptional = true) + properties.validateBoolean(FORMAT_IGNORE_PARSE_ERRORS, isOptional = true) + properties.validateTableSchema(FORMAT_FIELDS, isOptional = false) + } +} + +object CsvValidator { + + val FORMAT_TYPE_VALUE = "csv" + val FORMAT_FIELD_DELIMITER = "format.field-delimiter" + val FORMAT_LINE_DELIMITER = "format.line-delimiter" + val FORMAT_QUOTE_CHARACTER = "format.quote-character" + val FORMAT_COMMENT_PREFIX = "format.comment-prefix" + val FORMAT_IGNORE_FIRST_LINE = "format.ignore-first-line" + val FORMAT_IGNORE_PARSE_ERRORS = "format.ignore-parse-errors" + val FORMAT_FIELDS = "format.fields" + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala new file mode 100644 index 0000000..ad97ded --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +/** + * A class that adds a set of string-based, normalized properties for describing a + * table source or table sink. + */ +abstract class Descriptor { + + /** + * Internal method for properties conversion. + */ + private[flink] def addProperties(properties: DescriptorProperties): Unit + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala new file mode 100644 index 0000000..43d63b3 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala @@ -0,0 +1,489 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import java.io.Serializable +import java.lang.{Boolean => JBoolean, Double => JDouble, Integer => JInt, Long => JLong} +import java.util +import java.util.regex.Pattern + +import org.apache.commons.codec.binary.Base64 +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.{TableSchema, ValidationException} +import org.apache.flink.table.descriptors.DescriptorProperties.{NAME, TYPE, normalizeTableSchema} +import org.apache.flink.table.typeutils.TypeStringUtils +import org.apache.flink.util.InstantiationUtil +import org.apache.flink.util.Preconditions.checkNotNull + +import scala.collection.mutable + +import scala.collection.JavaConverters._ + +/** + * Utility class for having a unified string-based representation of Table API related classes + * such as [[TableSchema]], [[TypeInformation]], etc. + * + * @param normalizeKeys flag that indicates if keys should be normalized (this flag is + * necessary for backwards compatibility) + */ +class DescriptorProperties(normalizeKeys: Boolean = true) { + + private val properties: mutable.Map[String, String] = new mutable.HashMap[String, String]() + + private def put(key: String, value: String): Unit = { + if (properties.contains(key)) { + throw new IllegalStateException("Property already present.") + } + if (normalizeKeys) { + properties.put(key.toLowerCase, value) + } else { + properties.put(key, value) + } + } + + // for testing + private[flink] def unsafePut(key: String, value: String): Unit = { + properties.put(key, value) + } + + // for testing + private[flink] def unsafeRemove(key: String): Unit = { + properties.remove(key) + } + + def putProperties(properties: Map[String, String]): Unit = { + properties.foreach { case (k, v) => + put(k, v) + } + } + + def putProperties(properties: java.util.Map[String, String]): Unit = { + properties.asScala.foreach { case (k, v) => + put(k, v) + } + } + + def putClass(key: String, clazz: Class[_]): Unit = { + checkNotNull(key) + checkNotNull(clazz) + val error = InstantiationUtil.checkForInstantiationError(clazz) + if (error != null) { + throw new ValidationException(s"Class '${clazz.getName}' is not supported: $error") + } + put(key, clazz.getName) + } + + def putString(key: String, str: String): Unit = { + checkNotNull(key) + checkNotNull(str) + put(key, str) + } + + def putBoolean(key: String, b: Boolean): Unit = { + checkNotNull(key) + put(key, b.toString) + } + + def putLong(key: String, l: Long): Unit = { + checkNotNull(key) + put(key, l.toString) + } + + def putInt(key: String, i: Int): Unit = { + checkNotNull(key) + put(key, i.toString) + } + + def putCharacter(key: String, c: Character): Unit = { + checkNotNull(key) + checkNotNull(c) + put(key, c.toString) + } + + def putTableSchema(key: String, schema: TableSchema): Unit = { + putTableSchema(key, normalizeTableSchema(schema)) + } + + def putTableSchema(key: String, nameAndType: Seq[(String, String)]): Unit = { + putIndexedFixedProperties( + key, + Seq(NAME, TYPE), + nameAndType.map(t => Seq(t._1, t._2)) + ) + } + + /** + * Adds an indexed sequence of properties (with sub-properties) under a common key. + * + * For example: + * + * schema.fields.0.type = INT, schema.fields.0.name = test + * schema.fields.1.type = LONG, schema.fields.1.name = test2 + * + * The arity of each propertyValue must match the arity of propertyKeys. + */ + def putIndexedFixedProperties( + key: String, + propertyKeys: Seq[String], + propertyValues: Seq[Seq[String]]) + : Unit = { + checkNotNull(key) + checkNotNull(propertyValues) + propertyValues.zipWithIndex.foreach { case (values, idx) => + if (values.lengthCompare(propertyKeys.size) != 0) { + throw new ValidationException("Values must have same arity as keys.") + } + values.zipWithIndex.foreach { case (value, keyIdx) => + put(s"$key.$idx.${propertyKeys(keyIdx)}", value) + } + } + } + + /** + * Adds an indexed mapping of properties under a common key. + * + * For example: + * + * schema.fields.0.type = INT, schema.fields.0.name = test + * schema.fields.1.name = test2 + * + * The arity of the propertySets can differ. + */ + def putIndexedVariableProperties( + key: String, + propertySets: Seq[Map[String, String]]) + : Unit = { + checkNotNull(key) + checkNotNull(propertySets) + propertySets.zipWithIndex.foreach { case (propertySet, idx) => + propertySet.foreach { case (k, v) => + put(s"$key.$idx.$k", v) + } + } + } + + // ---------------------------------------------------------------------------------------------- + + def getString(key: String): Option[String] = { + properties.get(key) + } + + def getCharacter(key: String): Option[Character] = getString(key) match { + case Some(c) => + if (c.length != 1) { + throw new ValidationException(s"The value of $key must only contain one character.") + } + Some(c.charAt(0)) + + case None => None + } + + def getBoolean(key: String): Option[Boolean] = getString(key) match { + case Some(b) => Some(JBoolean.parseBoolean(b)) + + case None => None + } + + def getInt(key: String): Option[Int] = getString(key) match { + case Some(l) => Some(JInt.parseInt(l)) + + case None => None + } + + def getLong(key: String): Option[Long] = getString(key) match { + case Some(l) => Some(JLong.parseLong(l)) + + case None => None + } + + def getDouble(key: String): Option[Double] = getString(key) match { + case Some(d) => Some(JDouble.parseDouble(d)) + + case None => None + } + + def getTableSchema(key: String): Option[TableSchema] = { + // filter for number of columns + val fieldCount = properties + .filterKeys(k => k.startsWith(key) && k.endsWith(s".$NAME")) + .size + + if (fieldCount == 0) { + return None + } + + // validate fields and build schema + val schemaBuilder = TableSchema.builder() + for (i <- 0 until fieldCount) { + val name = s"$key.$i.$NAME" + val tpe = s"$key.$i.$TYPE" + schemaBuilder.field( + properties.getOrElse(name, throw new ValidationException(s"Invalid table schema. " + + s"Could not find name for field '$key.$i'.") + ), + TypeStringUtils.readTypeInfo( + properties.getOrElse(tpe, throw new ValidationException(s"Invalid table schema. " + + s"Could not find type for field '$key.$i'.")) + ) + ) + } + Some(schemaBuilder.build()) + } + + // ---------------------------------------------------------------------------------------------- + + def validateString( + key: String, + isOptional: Boolean, + minLen: Int = 0, // inclusive + maxLen: Int = Integer.MAX_VALUE) // inclusive + : Unit = { + + if (!properties.contains(key)) { + if (!isOptional) { + throw new ValidationException(s"Could not find required property '$key'.") + } + } else { + val len = properties(key).length + if (len < minLen || len > maxLen) { + throw new ValidationException( + s"Property '$key' must have a length between $minLen and $maxLen but " + + s"was: ${properties(key)}") + } + } + } + + def validateInt( + key: String, + isOptional: Boolean, + min: Int = Int.MinValue, // inclusive + max: Int = Int.MaxValue) // inclusive + : Unit = { + + if (!properties.contains(key)) { + if (!isOptional) { + throw new ValidationException(s"Could not find required property '$key'.") + } + } else { + try { + val value = Integer.parseInt(properties(key)) + if (value < min || value > max) { + throw new ValidationException(s"Property '$key' must be an integer value between $min " + + s"and $max but was: ${properties(key)}") + } + } catch { + case _: NumberFormatException => + throw new ValidationException( + s"Property '$key' must be an integer value but was: ${properties(key)}") + } + } + } + + def validateLong( + key: String, + isOptional: Boolean, + min: Long = Long.MinValue, // inclusive + max: Long = Long.MaxValue) // inclusive + : Unit = { + + if (!properties.contains(key)) { + if (!isOptional) { + throw new ValidationException(s"Could not find required property '$key'.") + } + } else { + try { + val value = JLong.parseLong(properties(key)) + if (value < min || value > max) { + throw new ValidationException(s"Property '$key' must be a long value between $min " + + s"and $max but was: ${properties(key)}") + } + } catch { + case _: NumberFormatException => + throw new ValidationException( + s"Property '$key' must be a long value but was: ${properties(key)}") + } + } + } + + def validateValue(key: String, value: String, isOptional: Boolean): Unit = { + if (!properties.contains(key)) { + if (!isOptional) { + throw new ValidationException(s"Could not find required property '$key'.") + } + } else { + if (properties(key) != value) { + throw new ValidationException( + s"Could not find required value '$value' for property '$key'.") + } + } + } + + def validateBoolean(key: String, isOptional: Boolean): Unit = { + if (!properties.contains(key)) { + if (!isOptional) { + throw new ValidationException(s"Could not find required property '$key'.") + } + } else { + val value = properties(key) + if (!value.equalsIgnoreCase("true") && !value.equalsIgnoreCase("false")) { + throw new ValidationException( + s"Property '$key' must be a boolean value (true/false) but was: $value") + } + } + } + + def validateDouble( + key: String, + isOptional: Boolean, + min: Double = Double.MinValue, // inclusive + max: Double = Double.MaxValue) // inclusive + : Unit = { + + if (!properties.contains(key)) { + if (!isOptional) { + throw new ValidationException(s"Could not find required property '$key'.") + } + } else { + try { + val value = JDouble.parseDouble(properties(key)) + if (value < min || value > max) { + throw new ValidationException(s"Property '$key' must be a double value between $min " + + s"and $max but was: ${properties(key)}") + } + } catch { + case _: NumberFormatException => + throw new ValidationException( + s"Property '$key' must be an double value but was: ${properties(key)}") + } + } + } + + def validateTableSchema(key: String, isOptional: Boolean): Unit = { + // filter for name columns + val names = getIndexedProperty(key, NAME) + // filter for type columns + val types = getIndexedProperty(key, TYPE) + if (names.isEmpty && types.isEmpty && !isOptional) { + throw new ValidationException( + s"Could not find the required schema for property '$key'.") + } + for (i <- 0 until Math.max(names.size, types.size)) { + validateString(s"$key.$i.$NAME", isOptional = false, minLen = 1) + validateType(s"$key.$i.$TYPE", isOptional = false) + } + } + + def validateEnum( + key: String, + isOptional: Boolean, + enumToValidation: Map[String, () => Unit]) + : Unit = { + + if (!properties.contains(key)) { + if (!isOptional) { + throw new ValidationException(s"Could not find required property '$key'.") + } + } else { + val value = properties(key) + if (!enumToValidation.contains(value)) { + throw new ValidationException(s"Unknown value for property '$key'. " + + s"Supported values [${enumToValidation.keys.mkString(", ")}] but was: $value") + } else { + enumToValidation(value).apply() // run validation logic + } + } + } + + def validateType(key: String, isOptional: Boolean): Unit = { + if (!properties.contains(key)) { + if (!isOptional) { + throw new ValidationException(s"Could not find required property '$key'.") + } + } else { + TypeStringUtils.readTypeInfo(properties(key)) // throws validation exceptions + } + } + + def validatePrefixExclusion(prefix: String): Unit = { + val invalidField = properties.find(_._1.startsWith(prefix)) + if (invalidField.isDefined) { + throw new ValidationException( + s"Property '${invalidField.get._1}' is not allowed in this context.") + } + } + + def validateExclusion(key: String): Unit = { + if (properties.contains(key)) { + throw new ValidationException(s"Property '$key' is not allowed in this context.") + } + } + + // ---------------------------------------------------------------------------------------------- + + def getIndexedProperty(key: String, property: String): Map[String, String] = { + val escapedKey = Pattern.quote(key) + properties.filterKeys(k => k.matches(s"$escapedKey\\.\\d+\\.$property")).toMap + } + + def contains(str: String): Boolean = { + properties.exists(e => e._1.contains(str)) + } + + def hasPrefix(prefix: String): Boolean = { + properties.exists(e => e._1.startsWith(prefix)) + } + + def asMap: Map[String, String] = { + properties.toMap + } +} + +object DescriptorProperties { + + val TYPE = "type" + val NAME = "name" + + // the string representation should be equal to SqlTypeName + def normalizeTypeInfo(typeInfo: TypeInformation[_]): String = { + checkNotNull(typeInfo) + TypeStringUtils.writeTypeInfo(typeInfo) + } + + def normalizeTableSchema(schema: TableSchema): Seq[(String, String)] = { + schema.getColumnNames.zip(schema.getTypes).map { case (n, t) => + (n, normalizeTypeInfo(t)) + } + } + + def serialize(obj: Serializable): String = { + // test public accessibility + val error = InstantiationUtil.checkForInstantiationError(obj.getClass) + if (error != null) { + throw new ValidationException(s"Class '${obj.getClass.getName}' is not supported: $error") + } + try { + val byteArray = InstantiationUtil.serializeObject(obj) + Base64.encodeBase64URLSafeString(byteArray) + } catch { + case e: Exception => + throw new ValidationException( + s"Unable to serialize class '${obj.getClass.getCanonicalName}'.", e) + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorValidator.scala new file mode 100644 index 0000000..007a406 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorValidator.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +/** + * Validator for a descriptor. We put the validation methods and utilities in separate classes + * to keep the descriptor interfaces clean. + */ +trait DescriptorValidator { + + /** + * Performs basic validation such as completeness tests. + */ + def validate(properties: DescriptorProperties): Unit + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala new file mode 100644 index 0000000..b1d900f --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, CONNECTOR_TYPE_VALUE} + +/** + * Connector descriptor for a file system. + */ +class FileSystem extends ConnectorDescriptor(CONNECTOR_TYPE_VALUE, version = 1) { + + private var path: Option[String] = None + + /** + * Sets the path to a file or directory in a file system. + * + * @param path the path a file or directory + */ + def path(path: String): FileSystem = { + this.path = Some(path) + this + } + + /** + * Internal method for properties conversion. + */ + override protected def addConnectorProperties(properties: DescriptorProperties): Unit = { + path.foreach(properties.putString(CONNECTOR_PATH, _)) + } + + override private[flink] def needsFormat() = true +} + +/** + * Connector descriptor for a file system. + */ +object FileSystem { + + /** + * Connector descriptor for a file system. + */ + def apply(): FileSystem = new FileSystem() + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala new file mode 100644 index 0000000..b065b5f --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE +import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, CONNECTOR_TYPE_VALUE} + +/** + * Validator for [[FileSystem]]. + */ +class FileSystemValidator extends ConnectorDescriptorValidator { + + override def validate(properties: DescriptorProperties): Unit = { + super.validate(properties) + properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE, isOptional = false) + properties.validateString(CONNECTOR_PATH, isOptional = false, minLen = 1) + } +} + +object FileSystemValidator { + + val CONNECTOR_TYPE_VALUE = "filesystem" + val CONNECTOR_PATH = "connector.path" + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptor.scala new file mode 100644 index 0000000..86f6229 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptor.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_VERSION} + +/** + * Describes the format of data. + * + * @param tpe string identifier for the format + */ +abstract class FormatDescriptor( + private val tpe: String, + private val version: Int) + extends Descriptor { + + override def toString: String = this.getClass.getSimpleName + + /** + * Internal method for properties conversion. + */ + final private[flink] def addProperties(properties: DescriptorProperties): Unit = { + properties.putString(FORMAT_TYPE, tpe) + properties.putInt(FORMAT_VERSION, version) + addFormatProperties(properties) + } + + /** + * Internal method for format properties conversion. + */ + protected def addFormatProperties(properties: DescriptorProperties): Unit + +}
