http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala new file mode 100644 index 0000000..ec57c5e --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog + +import org.apache.flink.table.api._ +import org.apache.flink.table.factories._ +import org.apache.flink.table.plan.schema._ +import org.apache.flink.table.plan.stats.FlinkStatistic +import org.apache.flink.table.sinks.{BatchTableSink, StreamTableSink} +import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource} +import org.apache.flink.table.util.Logging + + +/** + * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]]. + * + * It uses [[TableFactoryService]] for discovering. + */ +object ExternalTableUtil extends Logging { + + /** + * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance + * + * @param externalTable the [[ExternalCatalogTable]] instance which to convert + * @return converted [[TableSourceTable]] instance from the input catalog table + */ + def fromExternalCatalogTable[T1, T2]( + tableEnv: TableEnvironment, + externalTable: ExternalCatalogTable) + : TableSourceSinkTable[T1, T2] = { + + val statistics = new FlinkStatistic(externalTable.getTableStats) + + val source: Option[TableSourceTable[T1]] = if (externalTable.isTableSource) { + Some(createTableSource(tableEnv, externalTable, statistics)) + } else { + None + } + + val sink: Option[TableSinkTable[T2]] = if (externalTable.isTableSink) { + Some(createTableSink(tableEnv, externalTable, statistics)) + } else { + None + } + + new TableSourceSinkTable[T1, T2](source, sink) + } + + private def createTableSource[T]( + tableEnv: TableEnvironment, + externalTable: ExternalCatalogTable, + statistics: FlinkStatistic) + : TableSourceTable[T] = tableEnv match { + + case _: BatchTableEnvironment if externalTable.isBatchTable => + val source = TableFactoryUtil.findAndCreateTableSource(tableEnv, externalTable) + new BatchTableSourceTable[T](source.asInstanceOf[BatchTableSource[T]], statistics) + + case _: StreamTableEnvironment if externalTable.isStreamTable => + val source = TableFactoryUtil.findAndCreateTableSource(tableEnv, externalTable) + new StreamTableSourceTable[T](source.asInstanceOf[StreamTableSource[T]], statistics) + + case _ => + throw new ValidationException( + "External catalog table does not support the current environment for a table source.") + } + + private def createTableSink[T]( + tableEnv: TableEnvironment, + externalTable: ExternalCatalogTable, + statistics: FlinkStatistic) + : TableSinkTable[T] = tableEnv match { + + case _: BatchTableEnvironment if externalTable.isBatchTable => + val sink = TableFactoryUtil.findAndCreateTableSink(tableEnv, externalTable) + new TableSinkTable[T](sink.asInstanceOf[BatchTableSink[T]], statistics) + + case _: StreamTableEnvironment if externalTable.isStreamTable => + val sink = TableFactoryUtil.findAndCreateTableSink(tableEnv, externalTable) + new TableSinkTable[T](sink.asInstanceOf[StreamTableSink[T]], statistics) + + case _ => + throw new ValidationException( + "External catalog table does not support the current environment for a table sink.") + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala new file mode 100644 index 0000000..6bd2a71 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.BatchTableEnvironment + +/** + * Descriptor for specifying a table source and/or sink in a batch environment. + */ +class BatchTableDescriptor( + tableEnv: BatchTableEnvironment, + connectorDescriptor: ConnectorDescriptor) + extends ConnectTableDescriptor[BatchTableDescriptor]( + tableEnv, + connectorDescriptor) http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala deleted file mode 100644 index c967291..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors - -import org.apache.flink.table.api.{BatchTableEnvironment, Table, ValidationException} -import org.apache.flink.table.factories.{BatchTableSourceFactory, TableFactoryService} -import org.apache.flink.table.sources.TableSource - -class BatchTableSourceDescriptor(tableEnv: BatchTableEnvironment, connector: ConnectorDescriptor) - extends TableSourceDescriptor { - - connectorDescriptor = Some(connector) - - override private[flink] def addProperties(properties: DescriptorProperties): Unit = { - // check for a format - if (connector.needsFormat() && formatDescriptor.isEmpty) { - throw new ValidationException( - s"The connector '$connector' requires a format description.") - } else if (!connector.needsFormat() && formatDescriptor.isDefined) { - throw new ValidationException( - s"The connector '$connector' does not require a format description " + - s"but '${formatDescriptor.get}' found.") - } - super.addProperties(properties) - } - - /** - * Searches for the specified table source, configures it accordingly, and returns it. - */ - def toTableSource: TableSource[_] = { - val properties = new DescriptorProperties() - addProperties(properties) - val javaMap = properties.asMap - TableFactoryService - .find(classOf[BatchTableSourceFactory[_]], javaMap) - .createBatchTableSource(javaMap) - } - - /** - * Searches for the specified table source, configures it accordingly, and returns it as a table. - */ - def toTable: Table = { - tableEnv.fromTableSource(toTableSource) - } - - /** - * Searches for the specified table source, configures it accordingly, and registers it as - * a table under the given name. - * - * @param name table name to be registered in the table environment - */ - def register(name: String): Unit = { - tableEnv.registerTableSource(name, toTableSource) - } - - /** - * Specifies the format that defines how to read data from a connector. - */ - def withFormat(format: FormatDescriptor): BatchTableSourceDescriptor = { - formatDescriptor = Some(format) - this - } - - /** - * Specifies the resulting table schema. - */ - def withSchema(schema: Schema): BatchTableSourceDescriptor = { - schemaDescriptor = Some(schema) - this - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectTableDescriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectTableDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectTableDescriptor.scala new file mode 100644 index 0000000..569b825 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectTableDescriptor.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.{TableEnvironment, ValidationException} +import org.apache.flink.table.factories.TableFactoryUtil + +/** + * Common class for table's created with [[TableEnvironment.connect(ConnectorDescriptor)]]. + */ +abstract class ConnectTableDescriptor[D <: ConnectTableDescriptor[D]]( + private val tableEnv: TableEnvironment, + private val connectorDescriptor: ConnectorDescriptor) + extends TableDescriptor + with SchematicDescriptor[D] + with RegistrableDescriptor { this: D => + + private var formatDescriptor: Option[FormatDescriptor] = None + private var schemaDescriptor: Option[Schema] = None + + /** + * Searches for the specified table source, configures it accordingly, and registers it as + * a table under the given name. + * + * @param name table name to be registered in the table environment + */ + override def registerTableSource(name: String): Unit = { + val tableSource = TableFactoryUtil.findAndCreateTableSource(tableEnv, this) + tableEnv.registerTableSource(name, tableSource) + } + + /** + * Searches for the specified table sink, configures it accordingly, and registers it as + * a table under the given name. + * + * @param name table name to be registered in the table environment + */ + override def registerTableSink(name: String): Unit = { + val tableSink = TableFactoryUtil.findAndCreateTableSink(tableEnv, this) + tableEnv.registerTableSink(name, tableSink) + } + + /** + * Searches for the specified table source and sink, configures them accordingly, and registers + * them as a table under the given name. + * + * @param name table name to be registered in the table environment + */ + override def registerTableSourceAndSink(name: String): Unit = { + registerTableSource(name) + registerTableSink(name) + } + + /** + * Specifies the format that defines how to read data from a connector. + */ + override def withFormat(format: FormatDescriptor): D = { + formatDescriptor = Some(format) + this + } + + /** + * Specifies the resulting table schema. + */ + override def withSchema(schema: Schema): D = { + schemaDescriptor = Some(schema) + this + } + + // ---------------------------------------------------------------------------------------------- + + /** + * Internal method for properties conversion. + */ + override private[flink] def addProperties(properties: DescriptorProperties): Unit = { + + // this performs only basic validation + // more validation can only happen within a factory + if (connectorDescriptor.needsFormat() && formatDescriptor.isEmpty) { + throw new ValidationException( + s"The connector '$connectorDescriptor' requires a format description.") + } else if (!connectorDescriptor.needsFormat() && formatDescriptor.isDefined) { + throw new ValidationException( + s"The connector '$connectorDescriptor' does not require a format description " + + s"but '${formatDescriptor.get}' found.") + } + + connectorDescriptor.addProperties(properties) + formatDescriptor.foreach(_.addProperties(properties)) + schemaDescriptor.foreach(_.addProperties(properties)) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala index e21527b..aa96bc4 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala @@ -19,18 +19,23 @@ package org.apache.flink.table.descriptors /** - * A class that adds a set of string-based, normalized properties for describing DDL information. + * A trait that adds a set of string-based, normalized properties for describing DDL information. * * Typical characteristics of a descriptor are: * - descriptors have a default constructor and a default 'apply()' method for Scala * - descriptors themselves contain very little logic * - corresponding validators validate the correctness (goal: have a single point of validation) */ -abstract class Descriptor { +trait Descriptor { /** * Internal method for properties conversion. */ private[flink] def addProperties(properties: DescriptorProperties): Unit + override def toString: String = { + val descriptorProperties = new DescriptorProperties() + addProperties(descriptorProperties) + descriptorProperties.toString + } } http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala index 3ad3eac..2c88dfd 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala @@ -1064,6 +1064,10 @@ class DescriptorProperties(normalizeKeys: Boolean = true) { properties.toMap.asJava } + override def toString: String = { + DescriptorProperties.toString(properties.toMap) + } + // ---------------------------------------------------------------------------------------------- /** @@ -1283,6 +1287,12 @@ object DescriptorProperties { .mkString("\n") } + def toJavaMap(descriptor: Descriptor): util.Map[String, String] = { + val descriptorProperties = new DescriptorProperties() + descriptor.addProperties(descriptorProperties) + descriptorProperties.asMap + } + // the following methods help for Scala <-> Java interfaces // most of these methods are not necessary once we upgraded to Scala 2.12 http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RegistrableDescriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RegistrableDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RegistrableDescriptor.scala new file mode 100644 index 0000000..e89ca8c --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RegistrableDescriptor.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +/** + * A trait for descriptors that allow to register table source and/or sinks. + */ +trait RegistrableDescriptor extends TableDescriptor { + + /** + * Searches for the specified table source, configures it accordingly, and registers it as + * a table under the given name. + * + * @param name table name to be registered in the table environment + */ + def registerTableSource(name: String): Unit + + /** + * Searches for the specified table sink, configures it accordingly, and registers it as + * a table under the given name. + * + * @param name table name to be registered in the table environment + */ + def registerTableSink(name: String): Unit + + /** + * Searches for the specified table source and sink, configures them accordingly, and registers + * them as a table under the given name. + * + * @param name table name to be registered in the table environment + */ + def registerTableSourceAndSink(name: String): Unit +} http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchematicDescriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchematicDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchematicDescriptor.scala new file mode 100644 index 0000000..794ff9e --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchematicDescriptor.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +/** + * A trait for descriptors that allow to define a format and schema. + */ +trait SchematicDescriptor[D <: SchematicDescriptor[D]] extends TableDescriptor { + + /** + * Specifies the format that defines how to read data from a connector. + */ + def withFormat(format: FormatDescriptor): D + + /** + * Specifies the resulting table schema. + */ + def withSchema(schema: Schema): D +} http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala new file mode 100644 index 0000000..b9e64f9 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._ + +/** + * Descriptor for specifying a table source and/or sink in a streaming environment. + */ +class StreamTableDescriptor( + tableEnv: StreamTableEnvironment, + connectorDescriptor: ConnectorDescriptor) + extends ConnectTableDescriptor[StreamTableDescriptor]( + tableEnv, + connectorDescriptor) + with StreamableDescriptor[StreamTableDescriptor] { + + private var updateMode: Option[String] = None + + /** + * Declares how to perform the conversion between a dynamic table and an external connector. + * + * In append mode, a dynamic table and an external connector only exchange INSERT messages. + * + * @see See also [[inRetractMode()]] and [[inUpsertMode()]]. + */ + override def inAppendMode(): StreamTableDescriptor = { + updateMode = Some(UPDATE_MODE_VALUE_APPEND) + this + } + + /** + * Declares how to perform the conversion between a dynamic table and an external connector. + * + * In retract mode, a dynamic table and an external connector exchange ADD and RETRACT messages. + * + * An INSERT change is encoded as an ADD message, a DELETE change as a RETRACT message, and an + * UPDATE change as a RETRACT message for the updated (previous) row and an ADD message for + * the updating (new) row. + * + * In this mode, a key must not be defined as opposed to upsert mode. However, every update + * consists of two messages which is less efficient. + * + * @see See also [[inAppendMode()]] and [[inUpsertMode()]]. + */ + override def inRetractMode(): StreamTableDescriptor = { + updateMode = Some(UPDATE_MODE_VALUE_RETRACT) + this + } + + /** + * Declares how to perform the conversion between a dynamic table and an external connector. + * + * In upsert mode, a dynamic table and an external connector exchange UPSERT and DELETE messages. + * + * This mode requires a (possibly composite) unique key by which updates can be propagated. The + * external connector needs to be aware of the unique key attribute in order to apply messages + * correctly. INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as + * DELETE messages. + * + * The main difference to a retract stream is that UPDATE changes are encoded with a single + * message and are therefore more efficient. + * + * @see See also [[inAppendMode()]] and [[inRetractMode()]]. + */ + override def inUpsertMode(): StreamTableDescriptor = { + updateMode = Some(UPDATE_MODE_VALUE_UPSERT) + this + } + + // ---------------------------------------------------------------------------------------------- + + /** + * Internal method for properties conversion. + */ + override private[flink] def addProperties(properties: DescriptorProperties): Unit = { + super.addProperties(properties) + updateMode.foreach(mode => properties.putString(UPDATE_MODE, mode)) + + // this performs only basic validation + // more validation can only happen within a factory + new StreamTableDescriptorValidator().validate(properties) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.scala new file mode 100644 index 0000000..5a6a946 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import java.util + +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._ + +/** + * Validator for [[StreamTableDescriptor]]. + */ +class StreamTableDescriptorValidator extends DescriptorValidator { + + override def validate(properties: DescriptorProperties): Unit = { + properties.validateEnumValues( + UPDATE_MODE, + isOptional = false, + util.Arrays.asList( + UPDATE_MODE_VALUE_APPEND, + UPDATE_MODE_VALUE_RETRACT, + UPDATE_MODE_VALUE_UPSERT) + ) + } +} + +object StreamTableDescriptorValidator { + + val UPDATE_MODE = "update-mode" + val UPDATE_MODE_VALUE_APPEND = "append" + val UPDATE_MODE_VALUE_RETRACT = "retract" + val UPDATE_MODE_VALUE_UPSERT = "upsert" +} http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala deleted file mode 100644 index 6ade2d6..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors - -import org.apache.flink.table.api.{StreamTableEnvironment, Table, ValidationException} -import org.apache.flink.table.factories.{StreamTableSourceFactory, TableFactoryService} -import org.apache.flink.table.sources.TableSource - -/** - * Descriptor for specifying a table source in a streaming environment. - */ -class StreamTableSourceDescriptor(tableEnv: StreamTableEnvironment, connector: ConnectorDescriptor) - extends TableSourceDescriptor { - - connectorDescriptor = Some(connector) - - override private[flink] def addProperties(properties: DescriptorProperties): Unit = { - // check for a format - if (connector.needsFormat() && formatDescriptor.isEmpty) { - throw new ValidationException( - s"The connector '$connector' requires a format description.") - } else if (!connector.needsFormat() && formatDescriptor.isDefined) { - throw new ValidationException( - s"The connector '$connector' does not require a format description " + - s"but '${formatDescriptor.get}' found.") - } - super.addProperties(properties) - } - - /** - * Searches for the specified table source, configures it accordingly, and returns it. - */ - def toTableSource: TableSource[_] = { - val properties = new DescriptorProperties() - addProperties(properties) - val javaMap = properties.asMap - TableFactoryService - .find(classOf[StreamTableSourceFactory[_]], javaMap) - .createStreamTableSource(javaMap) - } - - /** - * Searches for the specified table source, configures it accordingly, and returns it as a table. - */ - def toTable: Table = { - tableEnv.fromTableSource(toTableSource) - } - - /** - * Searches for the specified table source, configures it accordingly, and registers it as - * a table under the given name. - * - * @param name table name to be registered in the table environment - */ - def register(name: String): Unit = { - tableEnv.registerTableSource(name, toTableSource) - } - - /** - * Specifies the format that defines how to read data from a connector. - */ - def withFormat(format: FormatDescriptor): StreamTableSourceDescriptor = { - formatDescriptor = Some(format) - this - } - - /** - * Specifies the resulting table schema. - */ - def withSchema(schema: Schema): StreamTableSourceDescriptor = { - schemaDescriptor = Some(schema) - this - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamableDescriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamableDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamableDescriptor.scala new file mode 100644 index 0000000..0d424bd --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamableDescriptor.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +/** + * A trait for descriptors that allow to convert between a dynamic table and an external connector. + */ +trait StreamableDescriptor[D <: StreamableDescriptor[D]] extends TableDescriptor { + + /** + * Declares how to perform the conversion between a dynamic table and an external connector. + * + * In append mode, a dynamic table and an external connector only exchange INSERT messages. + * + * @see See also [[inRetractMode()]] and [[inUpsertMode()]]. + */ + def inAppendMode(): D + + /** + * Declares how to perform the conversion between a dynamic table and an external connector. + * + * In retract mode, a dynamic table and an external connector exchange ADD and RETRACT messages. + * + * An INSERT change is encoded as an ADD message, a DELETE change as a RETRACT message, and an + * UPDATE change as a RETRACT message for the updated (previous) row and an ADD message for + * the updating (new) row. + * + * In this mode, a key must not be defined as opposed to upsert mode. However, every update + * consists of two messages which is less efficient. + * + * @see See also [[inAppendMode()]] and [[inUpsertMode()]]. + */ + def inRetractMode(): D + + /** + * Declares how to perform the conversion between a dynamic table and an external connector. + * + * In upsert mode, a dynamic table and an external connector exchange UPSERT and DELETE messages. + * + * This mode requires a (possibly composite) unique key by which updates can be propagated. The + * external connector needs to be aware of the unique key attribute in order to apply messages + * correctly. INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as + * DELETE messages. + * + * The main difference to a retract stream is that UPDATE changes are encoded with a single + * message and are therefore more efficient. + * + * @see See also [[inAppendMode()]] and [[inRetractMode()]]. + */ + def inUpsertMode(): D +} http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala index 7b864d8..b14a310 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala @@ -19,22 +19,6 @@ package org.apache.flink.table.descriptors /** - * Common class for all descriptors describing table sources and sinks. + * Common trait for all descriptors describing table sources and sinks. */ -abstract class TableDescriptor extends Descriptor { - - protected var connectorDescriptor: Option[ConnectorDescriptor] = None - protected var formatDescriptor: Option[FormatDescriptor] = None - protected var schemaDescriptor: Option[Schema] = None - protected var metaDescriptor: Option[Metadata] = None - - /** - * Internal method for properties conversion. - */ - override private[flink] def addProperties(properties: DescriptorProperties): Unit = { - connectorDescriptor.foreach(_.addProperties(properties)) - formatDescriptor.foreach(_.addProperties(properties)) - schemaDescriptor.foreach(_.addProperties(properties)) - metaDescriptor.foreach(_.addProperties(properties)) - } -} +trait TableDescriptor extends Descriptor http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala deleted file mode 100644 index e0fa602..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors - -/** - * Validator for [[TableDescriptor]]. - */ -class TableDescriptorValidator extends DescriptorValidator { - - override def validate(properties: DescriptorProperties): Unit = { - // nothing to do - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala deleted file mode 100644 index 0a4d504..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors - -/** - * Common class for all descriptors describing a table sink. - */ -abstract class TableSinkDescriptor extends TableDescriptor { - - /** - * Internal method for properties conversion. - */ - override private[flink] def addProperties(properties: DescriptorProperties): Unit = { - super.addProperties(properties) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala deleted file mode 100644 index 3ca39c2..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors - -import org.apache.flink.table.descriptors.DescriptorProperties.toScala -import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats} -import org.apache.flink.table.plan.stats.TableStats - -import scala.collection.JavaConverters._ - -/** - * Common class for all descriptors describing a table source. - */ -abstract class TableSourceDescriptor extends TableDescriptor { - - protected var statisticsDescriptor: Option[Statistics] = None - - /** - * Internal method for properties conversion. - */ - override private[flink] def addProperties(properties: DescriptorProperties): Unit = { - super.addProperties(properties) - statisticsDescriptor.foreach(_.addProperties(properties)) - } - - /** - * Reads table statistics from the descriptors properties. - */ - protected def getTableStats: Option[TableStats] = { - val normalizedProps = new DescriptorProperties() - addProperties(normalizedProps) - val rowCount = toScala(normalizedProps.getOptionalLong(STATISTICS_ROW_COUNT)) - rowCount match { - case Some(cnt) => - val columnStats = readColumnStats(normalizedProps, STATISTICS_COLUMNS) - Some(TableStats(cnt, columnStats.asJava)) - case None => - None - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala index cc99ecc..a5dcc60 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala @@ -29,6 +29,12 @@ import java.util * Classes that implement this interface can be added to the * "META_INF/services/org.apache.flink.table.factories.TableFactory" file of a JAR file in * the current classpath to be found. + * + * @see [[BatchTableSourceFactory]] + * @see [[BatchTableSinkFactory]] + * @see [[StreamTableSourceFactory]] + * @see [[StreamTableSinkFactory]] + * @see [[TableFormatFactory]] */ trait TableFactory { http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala index 3baff8e..26b7c6d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala @@ -50,9 +50,7 @@ object TableFactoryService extends Logging { def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { Preconditions.checkNotNull(descriptor) - val descriptorProperties = new DescriptorProperties() - descriptor.addProperties(descriptorProperties) - findInternal(factoryClass, descriptorProperties.asMap, None) + findInternal(factoryClass, DescriptorProperties.toJavaMap(descriptor), None) } /** @@ -68,9 +66,7 @@ object TableFactoryService extends Logging { Preconditions.checkNotNull(descriptor) Preconditions.checkNotNull(classLoader) - val descriptorProperties = new DescriptorProperties() - descriptor.addProperties(descriptorProperties) - findInternal(factoryClass, descriptorProperties.asMap, Some(classLoader)) + findInternal(factoryClass, DescriptorProperties.toJavaMap(descriptor), Some(classLoader)) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryUtil.scala new file mode 100644 index 0000000..9989ebc --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryUtil.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories + +import org.apache.flink.table.api.{BatchTableEnvironment, StreamTableEnvironment, TableEnvironment, TableException} +import org.apache.flink.table.descriptors.{Descriptor, DescriptorProperties} +import org.apache.flink.table.sinks.TableSink +import org.apache.flink.table.sources.TableSource + +/** + * Utility for dealing with [[TableFactory]] using the [[TableFactoryService]]. + */ +object TableFactoryUtil { + + /** + * Returns a table source for a table environment. + */ + def findAndCreateTableSource[T]( + tableEnvironment: TableEnvironment, + descriptor: Descriptor) + : TableSource[T] = { + + val javaMap = DescriptorProperties.toJavaMap(descriptor) + + tableEnvironment match { + case _: BatchTableEnvironment => + TableFactoryService + .find(classOf[BatchTableSourceFactory[T]], javaMap) + .createBatchTableSource(javaMap) + + case _: StreamTableEnvironment => + TableFactoryService + .find(classOf[StreamTableSourceFactory[T]], javaMap) + .createStreamTableSource(javaMap) + + case e@_ => + throw new TableException(s"Unsupported table environment: ${e.getClass.getName}") + } + } + + /** + * Returns a table sink for a table environment. + */ + def findAndCreateTableSink[T]( + tableEnvironment: TableEnvironment, + descriptor: Descriptor) + : TableSink[T] = { + + val javaMap = DescriptorProperties.toJavaMap(descriptor) + + tableEnvironment match { + case _: BatchTableEnvironment => + TableFactoryService + .find(classOf[BatchTableSinkFactory[T]], javaMap) + .createBatchTableSink(javaMap) + + case _: StreamTableEnvironment => + TableFactoryService + .find(classOf[StreamTableSinkFactory[T]], javaMap) + .createStreamTableSink(javaMap) + + case e@_ => + throw new TableException(s"Unsupported table environment: ${e.getClass.getName}") + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala index 8fa6fad..9e42a15 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala @@ -25,6 +25,8 @@ import java.util * also [[TableFactory]] for more information. * * @tparam T record type that the format produces or consumes + * @see [[DeserializationSchemaFactory]] + * @see [[SerializationSchemaFactory]] */ trait TableFormatFactory[T] extends TableFactory { http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.scala new file mode 100644 index 0000000..65a41bb --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.util + +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND} +import org.apache.flink.table.factories.StreamTableSinkFactory +import org.apache.flink.types.Row + +/** + * Factory base for creating configured instances of [[CsvTableSink]] in a stream environment. + */ +class CsvAppendTableSinkFactory + extends CsvTableSinkFactoryBase + with StreamTableSinkFactory[Row] { + + override def requiredContext(): util.Map[String, String] = { + val context = new util.HashMap[String, String](super.requiredContext()) + context.put(UPDATE_MODE, UPDATE_MODE_VALUE_APPEND) + context + } + + override def createStreamTableSink( + properties: util.Map[String, String]) + : StreamTableSink[Row] = { + createTableSink(isStreaming = true, properties) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.scala new file mode 100644 index 0000000..2687ed2 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.util + +import org.apache.flink.table.factories.BatchTableSinkFactory +import org.apache.flink.types.Row + +/** + * Factory base for creating configured instances of [[CsvTableSink]] in a batch environment. + */ +class CsvBatchTableSinkFactory + extends CsvTableSinkFactoryBase + with BatchTableSinkFactory[Row] { + + override def createBatchTableSink( + properties: util.Map[String, String]) + : BatchTableSink[Row] = { + createTableSink(isStreaming = false, properties) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala deleted file mode 100644 index eb99f02..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.sinks - -import java.util - -import org.apache.flink.table.api.TableException -import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ -import org.apache.flink.table.descriptors.CsvValidator._ -import org.apache.flink.table.descriptors.DescriptorProperties._ -import org.apache.flink.table.descriptors.FileSystemValidator._ -import org.apache.flink.table.descriptors.FormatDescriptorValidator._ -import org.apache.flink.table.descriptors.SchemaValidator._ -import org.apache.flink.table.descriptors._ -import org.apache.flink.table.factories.{BatchTableSinkFactory, StreamTableSinkFactory, TableFactory} -import org.apache.flink.types.Row - -/** - * Factory for creating configured instances of [[CsvTableSink]]. - */ -class CsvTableSinkFactory - extends TableFactory - with StreamTableSinkFactory[Row] - with BatchTableSinkFactory[Row] { - - override def requiredContext(): util.Map[String, String] = { - val context = new util.HashMap[String, String]() - context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE) - context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE) - context.put(CONNECTOR_PROPERTY_VERSION, "1") - context.put(FORMAT_PROPERTY_VERSION, "1") - context - } - - override def supportedProperties(): util.List[String] = { - val properties = new util.ArrayList[String]() - // connector - properties.add(CONNECTOR_PATH) - // format - properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.TYPE}") - properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.NAME}") - properties.add(FORMAT_FIELD_DELIMITER) - properties.add(CONNECTOR_PATH) - // schema - properties.add(s"$SCHEMA.#.${DescriptorProperties.TYPE}") - properties.add(s"$SCHEMA.#.${DescriptorProperties.NAME}") - properties - } - - override def createStreamTableSink( - properties: util.Map[String, String]) - : StreamTableSink[Row] = { - createTableSink(isStreaming = true, properties) - } - - override def createBatchTableSink( - properties: util.Map[String, String]) - : BatchTableSink[Row] = { - createTableSink(isStreaming = false, properties) - } - - private def createTableSink( - isStreaming: Boolean, - properties: util.Map[String, String]) - : CsvTableSink = { - - val params = new DescriptorProperties() - params.putProperties(properties) - - // validate - new FileSystemValidator().validate(params) - new CsvValidator().validate(params) - new SchemaValidator( - isStreaming, - supportsSourceTimestamps = false, - supportsSourceWatermarks = false).validate(params) - - // build - val formatSchema = params.getTableSchema(FORMAT_FIELDS) - val tableSchema = SchemaValidator.deriveTableSinkSchema(params) - - if (!formatSchema.equals(tableSchema)) { - throw new TableException( - "Encodings that differ from the schema are not supported yet for CsvTableSink.") - } - - val path = params.getString(CONNECTOR_PATH) - val fieldDelimiter = toScala(params.getOptionalString(FORMAT_FIELD_DELIMITER)).getOrElse(",") - - val csvTableSink = new CsvTableSink(path, fieldDelimiter) - - csvTableSink - .configure(formatSchema.getColumnNames, formatSchema.getTypes) - .asInstanceOf[CsvTableSink] - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala new file mode 100644 index 0000000..6ceba4c --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.util + +import org.apache.flink.table.api.TableException +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ +import org.apache.flink.table.descriptors.CsvValidator._ +import org.apache.flink.table.descriptors.DescriptorProperties._ +import org.apache.flink.table.descriptors.FileSystemValidator._ +import org.apache.flink.table.descriptors.FormatDescriptorValidator._ +import org.apache.flink.table.descriptors.SchemaValidator._ +import org.apache.flink.table.descriptors._ +import org.apache.flink.table.factories.TableFactory + +/** + * Factory base for creating configured instances of [[CsvTableSink]]. + */ +abstract class CsvTableSinkFactoryBase extends TableFactory { + + override def requiredContext(): util.Map[String, String] = { + val context = new util.HashMap[String, String]() + context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE) + context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE) + context.put(CONNECTOR_PROPERTY_VERSION, "1") + context.put(FORMAT_PROPERTY_VERSION, "1") + context + } + + override def supportedProperties(): util.List[String] = { + val properties = new util.ArrayList[String]() + // connector + properties.add(CONNECTOR_PATH) + // format + properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.TYPE}") + properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.NAME}") + properties.add(FORMAT_FIELD_DELIMITER) + properties.add(CONNECTOR_PATH) + // schema + properties.add(s"$SCHEMA.#.${DescriptorProperties.TYPE}") + properties.add(s"$SCHEMA.#.${DescriptorProperties.NAME}") + properties + } + + protected def createTableSink( + isStreaming: Boolean, + properties: util.Map[String, String]) + : CsvTableSink = { + + val params = new DescriptorProperties() + params.putProperties(properties) + + // validate + new FileSystemValidator().validate(params) + new CsvValidator().validate(params) + new SchemaValidator( + isStreaming, + supportsSourceTimestamps = false, + supportsSourceWatermarks = false).validate(params) + + // build + val formatSchema = params.getTableSchema(FORMAT_FIELDS) + val tableSchema = SchemaValidator.deriveTableSinkSchema(params) + + if (!formatSchema.equals(tableSchema)) { + throw new TableException( + "Encodings that differ from the schema are not supported yet for CsvTableSink.") + } + + val path = params.getString(CONNECTOR_PATH) + val fieldDelimiter = toScala(params.getOptionalString(FORMAT_FIELD_DELIMITER)).getOrElse(",") + + val csvTableSink = new CsvTableSink(path, fieldDelimiter) + + csvTableSink + .configure(formatSchema.getColumnNames, formatSchema.getTypes) + .asInstanceOf[CsvTableSink] + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvAppendTableSourceFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvAppendTableSourceFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvAppendTableSourceFactory.scala new file mode 100644 index 0000000..afbe2ea --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvAppendTableSourceFactory.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util + +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND} +import org.apache.flink.table.factories.StreamTableSourceFactory +import org.apache.flink.types.Row + +/** + * Factory for creating configured instances of [[CsvTableSource]] in a stream environment. + */ +class CsvAppendTableSourceFactory + extends CsvTableSourceFactoryBase + with StreamTableSourceFactory[Row] { + + override def requiredContext(): util.Map[String, String] = { + val context = new util.HashMap[String, String](super.requiredContext()) + context.put(UPDATE_MODE, UPDATE_MODE_VALUE_APPEND) + context + } + + override def createStreamTableSource( + properties: util.Map[String, String]) + : StreamTableSource[Row] = { + createTableSource(isStreaming = true, properties) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvBatchTableSourceFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvBatchTableSourceFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvBatchTableSourceFactory.scala new file mode 100644 index 0000000..9d8fa40 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvBatchTableSourceFactory.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util + +import org.apache.flink.table.factories.BatchTableSourceFactory +import org.apache.flink.types.Row + +/** + * Factory for creating configured instances of [[CsvTableSource]] in a batch environment. + */ +class CsvBatchTableSourceFactory + extends CsvTableSourceFactoryBase + with BatchTableSourceFactory[Row] { + + override def createBatchTableSource( + properties: util.Map[String, String]) + : BatchTableSource[Row] = { + createTableSource(isStreaming = false, properties) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala deleted file mode 100644 index 96751ec..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.sources - -import java.util - -import org.apache.flink.table.api.TableException -import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE} -import org.apache.flink.table.descriptors.CsvValidator._ -import org.apache.flink.table.descriptors.DescriptorProperties.toScala -import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, CONNECTOR_TYPE_VALUE} -import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE} -import org.apache.flink.table.descriptors.SchemaValidator.SCHEMA -import org.apache.flink.table.descriptors._ -import org.apache.flink.table.factories.{BatchTableSourceFactory, StreamTableSourceFactory, TableFactory} -import org.apache.flink.types.Row - -/** - * Factory for creating configured instances of [[CsvTableSource]]. - */ -class CsvTableSourceFactory - extends TableFactory - with StreamTableSourceFactory[Row] - with BatchTableSourceFactory[Row] { - - override def requiredContext(): util.Map[String, String] = { - val context = new util.HashMap[String, String]() - context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE) - context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE) - context.put(CONNECTOR_PROPERTY_VERSION, "1") - context.put(FORMAT_PROPERTY_VERSION, "1") - context - } - - override def supportedProperties(): util.List[String] = { - val properties = new util.ArrayList[String]() - // connector - properties.add(CONNECTOR_PATH) - // format - properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.TYPE}") - properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.NAME}") - properties.add(FORMAT_FIELD_DELIMITER) - properties.add(FORMAT_LINE_DELIMITER) - properties.add(FORMAT_QUOTE_CHARACTER) - properties.add(FORMAT_COMMENT_PREFIX) - properties.add(FORMAT_IGNORE_FIRST_LINE) - properties.add(FORMAT_IGNORE_PARSE_ERRORS) - properties.add(CONNECTOR_PATH) - // schema - properties.add(s"$SCHEMA.#.${DescriptorProperties.TYPE}") - properties.add(s"$SCHEMA.#.${DescriptorProperties.NAME}") - properties - } - - override def createStreamTableSource( - properties: util.Map[String, String]) - : StreamTableSource[Row] = { - createTableSource(isStreaming = true, properties) - } - - override def createBatchTableSource( - properties: util.Map[String, String]) - : BatchTableSource[Row] = { - createTableSource(isStreaming = false, properties) - } - - private def createTableSource( - isStreaming: Boolean, - properties: util.Map[String, String]) - : CsvTableSource = { - - val params = new DescriptorProperties() - params.putProperties(properties) - - // validate - new FileSystemValidator().validate(params) - new CsvValidator().validate(params) - new SchemaValidator( - isStreaming, - supportsSourceTimestamps = false, - supportsSourceWatermarks = false).validate(params) - - // build - val csvTableSourceBuilder = new CsvTableSource.Builder - - val formatSchema = params.getTableSchema(FORMAT_FIELDS) - val tableSchema = params.getTableSchema(SCHEMA) - - // the CsvTableSource needs some rework first - // for now the schema must be equal to the encoding - if (!formatSchema.equals(tableSchema)) { - throw new TableException( - "Encodings that differ from the schema are not supported yet for CsvTableSources.") - } - - toScala(params.getOptionalString(CONNECTOR_PATH)) - .foreach(csvTableSourceBuilder.path) - toScala(params.getOptionalString(FORMAT_FIELD_DELIMITER)) - .foreach(csvTableSourceBuilder.fieldDelimiter) - toScala(params.getOptionalString(FORMAT_LINE_DELIMITER)) - .foreach(csvTableSourceBuilder.lineDelimiter) - - formatSchema.getColumnNames.zip(formatSchema.getTypes).foreach { case (name, tpe) => - csvTableSourceBuilder.field(name, tpe) - } - toScala(params.getOptionalCharacter(FORMAT_QUOTE_CHARACTER)) - .foreach(csvTableSourceBuilder.quoteCharacter) - toScala(params.getOptionalString(FORMAT_COMMENT_PREFIX)) - .foreach(csvTableSourceBuilder.commentPrefix) - toScala(params.getOptionalBoolean(FORMAT_IGNORE_FIRST_LINE)).foreach { flag => - if (flag) { - csvTableSourceBuilder.ignoreFirstLine() - } - } - toScala(params.getOptionalBoolean(FORMAT_IGNORE_PARSE_ERRORS)).foreach { flag => - if (flag) { - csvTableSourceBuilder.ignoreParseErrors() - } - } - - csvTableSourceBuilder.build() - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala new file mode 100644 index 0000000..d320220 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util + +import org.apache.flink.table.api.TableException +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE} +import org.apache.flink.table.descriptors.CsvValidator._ +import org.apache.flink.table.descriptors.DescriptorProperties.toScala +import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, CONNECTOR_TYPE_VALUE} +import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE} +import org.apache.flink.table.descriptors.SchemaValidator.SCHEMA +import org.apache.flink.table.descriptors._ +import org.apache.flink.table.factories.TableFactory + +/** + * Factory base for creating configured instances of [[CsvTableSource]]. + */ +abstract class CsvTableSourceFactoryBase extends TableFactory { + + override def requiredContext(): util.Map[String, String] = { + val context = new util.HashMap[String, String]() + context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE) + context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE) + context.put(CONNECTOR_PROPERTY_VERSION, "1") + context.put(FORMAT_PROPERTY_VERSION, "1") + context + } + + override def supportedProperties(): util.List[String] = { + val properties = new util.ArrayList[String]() + // connector + properties.add(CONNECTOR_PATH) + // format + properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.TYPE}") + properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.NAME}") + properties.add(FORMAT_FIELD_DELIMITER) + properties.add(FORMAT_LINE_DELIMITER) + properties.add(FORMAT_QUOTE_CHARACTER) + properties.add(FORMAT_COMMENT_PREFIX) + properties.add(FORMAT_IGNORE_FIRST_LINE) + properties.add(FORMAT_IGNORE_PARSE_ERRORS) + properties.add(CONNECTOR_PATH) + // schema + properties.add(s"$SCHEMA.#.${DescriptorProperties.TYPE}") + properties.add(s"$SCHEMA.#.${DescriptorProperties.NAME}") + properties + } + + protected def createTableSource( + isStreaming: Boolean, + properties: util.Map[String, String]) + : CsvTableSource = { + + val params = new DescriptorProperties() + params.putProperties(properties) + + // validate + new FileSystemValidator().validate(params) + new CsvValidator().validate(params) + new SchemaValidator( + isStreaming, + supportsSourceTimestamps = false, + supportsSourceWatermarks = false).validate(params) + + // build + val csvTableSourceBuilder = new CsvTableSource.Builder + + val formatSchema = params.getTableSchema(FORMAT_FIELDS) + val tableSchema = params.getTableSchema(SCHEMA) + + // the CsvTableSource needs some rework first + // for now the schema must be equal to the encoding + if (!formatSchema.equals(tableSchema)) { + throw new TableException( + "Encodings that differ from the schema are not supported yet for CsvTableSources.") + } + + toScala(params.getOptionalString(CONNECTOR_PATH)) + .foreach(csvTableSourceBuilder.path) + toScala(params.getOptionalString(FORMAT_FIELD_DELIMITER)) + .foreach(csvTableSourceBuilder.fieldDelimiter) + toScala(params.getOptionalString(FORMAT_LINE_DELIMITER)) + .foreach(csvTableSourceBuilder.lineDelimiter) + + formatSchema.getColumnNames.zip(formatSchema.getTypes).foreach { case (name, tpe) => + csvTableSourceBuilder.field(name, tpe) + } + toScala(params.getOptionalCharacter(FORMAT_QUOTE_CHARACTER)) + .foreach(csvTableSourceBuilder.quoteCharacter) + toScala(params.getOptionalString(FORMAT_COMMENT_PREFIX)) + .foreach(csvTableSourceBuilder.commentPrefix) + toScala(params.getOptionalBoolean(FORMAT_IGNORE_FIRST_LINE)).foreach { flag => + if (flag) { + csvTableSourceBuilder.ignoreFirstLine() + } + } + toScala(params.getOptionalBoolean(FORMAT_IGNORE_PARSE_ERRORS)).foreach { flag => + if (flag) { + csvTableSourceBuilder.ignoreParseErrors() + } + } + + csvTableSourceBuilder.build() + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala index 6df00e7..7f567f9 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala @@ -39,7 +39,9 @@ class ExternalCatalogTest extends TableTestBase { val util = batchTestUtil() val tableEnv = util.tableEnv - tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog) + tableEnv.registerExternalCatalog( + "test", + CommonTestData.getInMemoryTestCatalog(isStreaming = false)) val table1 = tableEnv.scan("test", "db1", "tb1") val table2 = tableEnv.scan("test", "db2", "tb2") @@ -69,7 +71,9 @@ class ExternalCatalogTest extends TableTestBase { def testBatchSQL(): Unit = { val util = batchTestUtil() - util.tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog) + util.tableEnv.registerExternalCatalog( + "test", + CommonTestData.getInMemoryTestCatalog(isStreaming = false)) val sqlQuery = "SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 UNION ALL " + "(SELECT a * 2, b, c FROM test.db1.tb1)" @@ -96,7 +100,9 @@ class ExternalCatalogTest extends TableTestBase { val util = streamTestUtil() val tableEnv = util.tableEnv - util.tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog) + util.tableEnv.registerExternalCatalog( + "test", + CommonTestData.getInMemoryTestCatalog(isStreaming = true)) val table1 = tableEnv.scan("test", "db1", "tb1") val table2 = tableEnv.scan("test", "db2", "tb2") @@ -128,7 +134,9 @@ class ExternalCatalogTest extends TableTestBase { def testStreamSQL(): Unit = { val util = streamTestUtil() - util.tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog) + util.tableEnv.registerExternalCatalog( + "test", + CommonTestData.getInMemoryTestCatalog(isStreaming = true)) val sqlQuery = "SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 UNION ALL " + "(SELECT a * 2, b, c FROM test.db1.tb1)" @@ -155,7 +163,9 @@ class ExternalCatalogTest extends TableTestBase { val util = batchTestUtil() val tableEnv = util.tableEnv - tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog) + tableEnv.registerExternalCatalog( + "test", + CommonTestData.getInMemoryTestCatalog(isStreaming = false)) val table1 = tableEnv.scan("test", "tb1") val table2 = tableEnv.scan("test", "db2", "tb2") http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala index 58f51f9..c98a7c1 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala @@ -47,7 +47,7 @@ class ExternalCatalogSchemaTest extends TableTestBase { @Before def setUp(): Unit = { val rootSchemaPlus: SchemaPlus = CalciteSchema.createRootSchema(true, false).plus() - val catalog = CommonTestData.getInMemoryTestCatalog + val catalog = CommonTestData.getInMemoryTestCatalog(isStreaming = true) ExternalCatalogSchema.registerCatalog( streamTestUtil().tableEnv, rootSchemaPlus, schemaName, catalog) externalCatalogSchema = rootSchemaPlus.getSubSchema("schemaName") http://git-wip-us.apache.org/repos/asf/flink/blob/7bb07e4e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala index ac2a729..1f84b3d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala @@ -138,7 +138,9 @@ class InMemoryExternalCatalogTest { val schemaDesc = new Schema() .field("first", BasicTypeInfo.STRING_TYPE_INFO) .field("second", BasicTypeInfo.INT_TYPE_INFO) - new ExternalCatalogTable(connDesc, None, Some(schemaDesc), None, None) + ExternalCatalogTable.builder(connDesc) + .withSchema(schemaDesc) + .asTableSource() } private def createTableInstance( @@ -149,7 +151,9 @@ class InMemoryExternalCatalogTest { fieldNames.zipWithIndex.foreach { case (fieldName, index) => schemaDesc.field(fieldName, fieldTypes(index)) } - new ExternalCatalogTable(connDesc, None, Some(schemaDesc), None, None) + ExternalCatalogTable.builder(connDesc) + .withSchema(schemaDesc) + .asTableSource() } class TestConnectorDesc extends ConnectorDescriptor("test", version = 1, formatNeeded = false) {
