Repository: flink Updated Branches: refs/heads/master a5476cdcd -> 2cb58960e
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala index 42f0769..050f1a1 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala @@ -380,6 +380,44 @@ class TableSourceTest extends TableTestBase { Assert.assertEquals(source1, source2) } +// TODO enable this test once we expose the feature through the table environment +// @Test +// def testCsvTableSourceDescriptor(): Unit = { +// val util = streamTestUtil() +// val source1 = util.tableEnv +// .from( +// FileSystem() +// .path("/path/to/csv")) +// .withFormat( +// Csv() +// .field("myfield", Types.STRING) +// .field("myfield2", Types.INT) +// .quoteCharacter(';') +// .fieldDelimiter("#") +// .lineDelimiter("\r\n") +// .commentPrefix("%%") +// .ignoreFirstLine() +// .ignoreParseErrors()) +// .withSchema( +// Schema() +// .field("myfield", Types.STRING) +// .field("myfield2", Types.INT)) +// .toTableSource +// +// val source2 = new CsvTableSource( +// "/path/to/csv", +// Array("myfield", "myfield2"), +// Array(Types.STRING, Types.INT), +// "#", +// "\r\n", +// ';', +// true, +// "%%", +// true) +// +// Assert.assertEquals(source1, source2) +// } + @Test def testTimeLiteralExpressionPushdown(): Unit = { val (tableSource, tableName) = filterableTableSourceTimeTypes http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala index f9920e7..e6d2458 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala @@ -31,6 +31,7 @@ import org.apache.flink.table.calcite.{CalciteConfig, FlinkTypeFactory, FlinkTyp import org.apache.flink.table.plan.schema.TableSourceTable import org.apache.flink.table.runtime.utils.CommonTestData import org.apache.flink.table.sources.CsvTableSource +import org.apache.flink.table.utils.MockTableEnvironment import org.junit.Assert._ import org.junit.{Before, Test} @@ -48,7 +49,8 @@ class ExternalCatalogSchemaTest { def setUp(): Unit = { val rootSchemaPlus: SchemaPlus = CalciteSchema.createRootSchema(true, false).plus() val catalog = CommonTestData.getInMemoryTestCatalog - ExternalCatalogSchema.registerCatalog(rootSchemaPlus, schemaName, catalog) + ExternalCatalogSchema.registerCatalog( + new MockTableEnvironment, rootSchemaPlus, schemaName, catalog) externalCatalogSchema = rootSchemaPlus.getSubSchema("schemaName") val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem()) val prop = new Properties() http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala index 0744de9..cdacce1 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala @@ -24,8 +24,9 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.api.{TableSchema, Types} -import org.apache.flink.table.plan.schema.{StreamTableSourceTable} +import org.apache.flink.table.plan.schema.StreamTableSourceTable import org.apache.flink.table.sources.StreamTableSource +import org.apache.flink.table.utils.MockTableEnvironment import org.apache.flink.types.Row import org.junit.Assert.assertTrue import org.junit.{Before, Test} @@ -41,7 +42,8 @@ class ExternalTableSourceUtilTest { def testExternalStreamTable() = { val schema = new TableSchema(Array("foo"), Array(BasicTypeInfo.INT_TYPE_INFO)) val table = ExternalCatalogTable("mock", schema) - val tableSource = ExternalTableSourceUtil.fromExternalCatalogTable(table) + val tableSource = ExternalTableSourceUtil.fromExternalCatalogTable( + new MockTableEnvironment, table) assertTrue(tableSource.isInstanceOf[StreamTableSourceTable[_]]) } } http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala new file mode 100644 index 0000000..15cf13b --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.table.api.{TableSchema, Types, ValidationException} +import org.junit.Test + +class CsvTest extends DescriptorTestBase { + + @Test + def testCsv(): Unit = { + val desc = Csv() + .field("field1", "STRING") + .field("field2", Types.SQL_TIMESTAMP) + .field("field3", TypeExtractor.createTypeInfo(classOf[Class[_]])) + .field("field4", Types.ROW( + Array[String]("test", "row"), + Array[TypeInformation[_]](Types.INT, Types.STRING))) + .lineDelimiter("^") + val expected = Seq( + "format.type" -> "csv", + "format.version" -> "1", + "format.fields.0.name" -> "field1", + "format.fields.0.type" -> "STRING", + "format.fields.1.name" -> "field2", + "format.fields.1.type" -> "TIMESTAMP", + "format.fields.2.name" -> "field3", + "format.fields.2.type" -> "ANY(java.lang.Class)", + "format.fields.3.name" -> "field4", + "format.fields.3.type" -> "ROW(test INT, row VARCHAR)", + "format.line-delimiter" -> "^") + verifyProperties(desc, expected) + } + + @Test + def testCsvTableSchema(): Unit = { + val desc = Csv() + .schema(new TableSchema( + Array[String]("test", "row"), + Array[TypeInformation[_]](Types.INT, Types.STRING))) + .quoteCharacter('#') + .ignoreFirstLine() + val expected = Seq( + "format.type" -> "csv", + "format.version" -> "1", + "format.fields.0.name" -> "test", + "format.fields.0.type" -> "INT", + "format.fields.1.name" -> "row", + "format.fields.1.type" -> "VARCHAR", + "format.quote-character" -> "#", + "format.ignore-first-line" -> "true") + verifyProperties(desc, expected) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidType(): Unit = { + verifyInvalidProperty("format.fields.0.type", "WHATEVER") + } + + @Test(expected = classOf[ValidationException]) + def testInvalidField(): Unit = { + verifyInvalidProperty("format.fields.10.name", "WHATEVER") + } + + @Test(expected = classOf[ValidationException]) + def testInvalidQuoteCharacter(): Unit = { + verifyInvalidProperty("format.quote-character", "qq") + } + + override def descriptor(): Descriptor = { + Csv() + .field("field1", "STRING") + .field("field2", Types.SQL_TIMESTAMP) + .field("field3", TypeExtractor.createTypeInfo(classOf[Class[_]])) + .field("field4", Types.ROW( + Array[String]("test", "row"), + Array[TypeInformation[_]](Types.INT, Types.STRING))) + .lineDelimiter("^") + } + + override def validator(): DescriptorValidator = { + new CsvValidator() + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala new file mode 100644 index 0000000..3a59c9b --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.junit.Assert.assertEquals + +abstract class DescriptorTestBase { + + /** + * Returns a valid descriptor. + */ + def descriptor(): Descriptor + + /** + * Returns a validator that can validate this descriptor. + */ + def validator(): DescriptorValidator + + def verifyProperties(descriptor: Descriptor, expected: Seq[(String, String)]): Unit = { + val normProps = new DescriptorProperties + descriptor.addProperties(normProps) + assertEquals(expected.toMap, normProps.asMap) + } + + def verifyInvalidProperty(property: String, invalidValue: String): Unit = { + val properties = new DescriptorProperties + descriptor().addProperties(properties) + properties.unsafePut(property, invalidValue) + validator().validate(properties) + } + + def verifyMissingProperty(removeProperty: String): Unit = { + val properties = new DescriptorProperties + descriptor().addProperties(properties) + properties.unsafeRemove(removeProperty) + validator().validate(properties) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala new file mode 100644 index 0000000..3452e8d --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.ValidationException +import org.junit.Test + +class FileSystemTest extends DescriptorTestBase { + + @Test + def testFileSystem(): Unit = { + val desc = FileSystem().path("/myfile") + val expected = Seq( + "connector.type" -> "filesystem", + "connector.version" -> "1", + "connector.path" -> "/myfile") + verifyProperties(desc, expected) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidPath(): Unit = { + verifyInvalidProperty("connector.path", "") + } + + @Test(expected = classOf[ValidationException]) + def testMissingPath(): Unit = { + verifyMissingProperty("connector.path") + } + + override def descriptor(): Descriptor = { + FileSystem().path("/myfile") + } + + override def validator(): DescriptorValidator = { + new FileSystemValidator() + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/JsonTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/JsonTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/JsonTest.scala new file mode 100644 index 0000000..756ca23 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/JsonTest.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.ValidationException +import org.junit.Test + +class JsonTest extends DescriptorTestBase { + + @Test + def testJson(): Unit = { + val schema = + """ + |{ + | "title": "Person", + | "type": "object", + | "properties": { + | "firstName": { + | "type": "string" + | }, + | "lastName": { + | "type": "string" + | }, + | "age": { + | "description": "Age in years", + | "type": "integer", + | "minimum": 0 + | } + | }, + | "required": ["firstName", "lastName"] + |} + |""".stripMargin + val desc = Json() + .schema(schema) + .failOnMissingField(true) + val expected = Seq( + "format.type" -> "json", + "format.version" -> "1", + "format.schema-string" -> schema, + "format.fail-on-missing-field" -> "true") + verifyProperties(desc, expected) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidMissingField(): Unit = { + verifyInvalidProperty("format.fail-on-missing-field", "DDD") + } + + @Test(expected = classOf[ValidationException]) + def testMissingSchema(): Unit = { + verifyMissingProperty("format.schema-string") + } + + override def descriptor(): Descriptor = { + Json().schema("test") + } + + override def validator(): DescriptorValidator = { + new JsonValidator() + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala new file mode 100644 index 0000000..a1854ce --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.ValidationException +import org.junit.Test + +class MetadataTest extends DescriptorTestBase { + + @Test + def testMetadata(): Unit = { + val desc = Metadata() + .comment("Some additional comment") + .creationTime(123L) + .lastAccessTime(12020202L) + val expected = Seq( + "metadata.comment" -> "Some additional comment", + "metadata.creation-time" -> "123", + "metadata.last-access-time" -> "12020202" + ) + verifyProperties(desc, expected) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidCreationTime(): Unit = { + verifyInvalidProperty("metadata.creation-time", "dfghj") + } + + override def descriptor(): Descriptor = { + Metadata() + .comment("Some additional comment") + .creationTime(123L) + .lastAccessTime(12020202L) + } + + override def validator(): DescriptorValidator = { + new MetadataValidator() + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala new file mode 100644 index 0000000..80050fc --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.descriptors.RowtimeTest.CustomAssigner +import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner +import org.apache.flink.types.Row +import org.junit.Test + +class RowtimeTest extends DescriptorTestBase { + + @Test + def testRowtime(): Unit = { + val desc = Rowtime() + .timestampsFromField("otherField") + .watermarksPeriodicBounding(1000L) + val expected = Seq( + "rowtime.0.version" -> "1", + "rowtime.0.timestamps.type" -> "from-field", + "rowtime.0.timestamps.from" -> "otherField", + "rowtime.0.watermarks.type" -> "periodic-bounding", + "rowtime.0.watermarks.delay" -> "1000" + ) + verifyProperties(desc, expected) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidWatermarkType(): Unit = { + verifyInvalidProperty("rowtime.0.watermarks.type", "xxx") + } + + @Test(expected = classOf[ValidationException]) + def testMissingWatermarkClass(): Unit = { + verifyMissingProperty("rowtime.0.watermarks.class") + } + + override def descriptor(): Descriptor = { + Rowtime() + .timestampsFromSource() + .watermarksFromStrategy(new CustomAssigner()) + } + + override def validator(): DescriptorValidator = { + new RowtimeValidator("rowtime.0.") + } +} + +object RowtimeTest { + + class CustomAssigner extends PunctuatedWatermarkAssigner() { + override def getWatermark(row: Row, timestamp: Long): Watermark = + throw new UnsupportedOperationException() + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala new file mode 100644 index 0000000..f663a96 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.{Types, ValidationException} +import org.junit.Test + +class SchemaTest extends DescriptorTestBase { + + @Test + def testSchema(): Unit = { + val desc = Schema() + .field("myField", Types.BOOLEAN) + .field("otherField", "VARCHAR").from("csvField") + .field("p", Types.SQL_TIMESTAMP).proctime() + .field("r", Types.SQL_TIMESTAMP).rowtime( + Rowtime().timestampsFromSource().watermarksFromSource()) + val expected = Seq( + "schema.version" -> "1", + "schema.0.name" -> "myField", + "schema.0.type" -> "BOOLEAN", + "schema.1.name" -> "otherField", + "schema.1.type" -> "VARCHAR", + "schema.1.from" -> "csvField", + "schema.2.name" -> "p", + "schema.2.type" -> "TIMESTAMP", + "schema.2.proctime" -> "true", + "schema.3.name" -> "r", + "schema.3.type" -> "TIMESTAMP", + "schema.3.rowtime.0.version" -> "1", + "schema.3.rowtime.0.watermarks.type" -> "from-source", + "schema.3.rowtime.0.timestamps.type" -> "from-source" + ) + verifyProperties(desc, expected) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidType(): Unit = { + verifyInvalidProperty("schema.1.type", "dfghj") + } + + @Test(expected = classOf[ValidationException]) + def testBothRowtimeAndProctime(): Unit = { + verifyInvalidProperty("schema.2.rowtime.0.version", "1") + verifyInvalidProperty("schema.2.rowtime.0.watermarks.type", "from-source") + verifyInvalidProperty("schema.2.rowtime.0.timestamps.type", "from-source") + } + + override def descriptor(): Descriptor = { + Schema() + .field("myField", Types.BOOLEAN) + .field("otherField", "VARCHAR").from("csvField") + .field("p", Types.SQL_TIMESTAMP).proctime() + .field("r", Types.SQL_TIMESTAMP) + } + + override def validator(): DescriptorValidator = { + new SchemaValidator(isStreamEnvironment = true) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala new file mode 100644 index 0000000..3b248b4 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import _root_.java.util + +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.plan.stats.{ColumnStats, TableStats} +import org.junit.Test + +class StatisticsTest extends DescriptorTestBase { + + @Test + def testStatistics(): Unit = { + val desc = Statistics() + .rowCount(1000L) + .columnStats("a", ColumnStats(1L, 2L, 3.0, 4, 5, 6)) + .columnAvgLength("b", 42.0) + .columnNullCount("a", 300) + val expected = Seq( + "statistics.version" -> "1", + "statistics.row-count" -> "1000", + "statistics.columns.0.name" -> "a", + "statistics.columns.0.distinct-count" -> "1", + "statistics.columns.0.null-count" -> "300", + "statistics.columns.0.avg-length" -> "3.0", + "statistics.columns.0.max-length" -> "4", + "statistics.columns.0.max-value" -> "5", + "statistics.columns.0.min-value" -> "6", + "statistics.columns.1.name" -> "b", + "statistics.columns.1.avg-length" -> "42.0" + ) + verifyProperties(desc, expected) + } + + @Test + def testStatisticsTableStats(): Unit = { + val map = new util.HashMap[String, ColumnStats]() + map.put("a", ColumnStats(null, 2L, 3.0, null, 5, 6)) + val desc = Statistics() + .tableStats(TableStats(32L, map)) + val expected = Seq( + "statistics.version" -> "1", + "statistics.row-count" -> "32", + "statistics.columns.0.name" -> "a", + "statistics.columns.0.null-count" -> "2", + "statistics.columns.0.avg-length" -> "3.0", + "statistics.columns.0.max-value" -> "5", + "statistics.columns.0.min-value" -> "6" + ) + verifyProperties(desc, expected) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowCount(): Unit = { + verifyInvalidProperty("statistics.row-count", "abx") + } + + @Test(expected = classOf[ValidationException]) + def testMissingName(): Unit = { + verifyMissingProperty("statistics.columns.0.name") + } + + override def descriptor(): Descriptor = { + Statistics() + .rowCount(1000L) + .columnStats("a", ColumnStats(1L, 2L, 3.0, 4, 5, 6)) + .columnAvgLength("b", 42.0) + .columnNullCount("a", 300) + } + + override def validator(): DescriptorValidator = { + new StatisticsValidator() + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptorTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptorTest.scala new file mode 100644 index 0000000..2c9a89c --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptorTest.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.utils.TableTestBase + +class StreamTableSourceDescriptorTest extends TableTestBase { + +// TODO enable this test once we expose the feature through the table environment +// @Test +// def testStreamTableSourceDescriptor(): Unit = { +// val util = streamTestUtil() +// val desc = util.tableEnv +// .from( +// FileSystem() +// .path("/path/to/csv")) +// .withFormat( +// Csv() +// .field("myfield", Types.STRING) +// .field("myfield2", Types.INT) +// .quoteCharacter(';') +// .fieldDelimiter("#") +// .lineDelimiter("\r\n") +// .commentPrefix("%%") +// .ignoreFirstLine() +// .ignoreParseErrors()) +// .withSchema( +// Schema() +// .field("myfield", Types.STRING) +// .field("myfield2", Types.INT) +// .field("proctime", Types.SQL_TIMESTAMP).proctime() +// .field("rowtime", Types.SQL_TIMESTAMP).rowtime( +// Rowtime().timestampsFromSource().watermarksFromSource()) +// ) +// val expected = Seq( +// "connector.type" -> "filesystem", +// "connector.path" -> "/path/to/csv", +// "format.type" -> "csv", +// "format.fields.0.name" -> "myfield", +// "format.fields.0.type" -> "VARCHAR", +// "format.fields.1.name" -> "myfield2", +// "format.fields.1.type" -> "INT", +// "format.quote-character" -> ";", +// "format.field-delimiter" -> "#", +// "format.line-delimiter" -> "\r\n", +// "format.comment-prefix" -> "%%", +// "format.ignore-first-line" -> "true", +// "format.ignore-parse-errors" -> "true", +// "schema.0.name" -> "myfield", +// "schema.0.type" -> "VARCHAR", +// "schema.1.name" -> "myfield2", +// "schema.1.type" -> "INT", +// "schema.2.name" -> "proctime", +// "schema.2.type" -> "TIMESTAMP", +// "schema.2.proctime" -> "true", +// "schema.3.name" -> "rowtime", +// "schema.3.type" -> "TIMESTAMP", +// "schema.3.rowtime.0.timestamps.type" -> "from-source", +// "schema.3.rowtime.0.watermarks.type" -> "from-source" +// ) +// verifyProperties(desc, expected) +// } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala new file mode 100644 index 0000000..5e9b5a2 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import org.apache.flink.table.api.{NoMatchingTableSourceException, TableException, ValidationException} +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE, CONNECTOR_VERSION} +import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_VERSION} +import org.junit.Assert.assertTrue +import org.junit.Test + +import scala.collection.mutable + +class TableSourceFactoryServiceTest { + + @Test + def testValidProperties(): Unit = { + val props = properties() + assertTrue(TableSourceFactoryService.findTableSourceFactory(props.toMap) != null) + } + + @Test(expected = classOf[NoMatchingTableSourceException]) + def testInvalidContext(): Unit = { + val props = properties() + props.put(CONNECTOR_TYPE, "FAIL") + TableSourceFactoryService.findTableSourceFactory(props.toMap) + } + + @Test + def testDifferentContextVersion(): Unit = { + val props = properties() + props.put(CONNECTOR_VERSION, "2") + // the table source should still be found + assertTrue(TableSourceFactoryService.findTableSourceFactory(props.toMap) != null) + } + + @Test(expected = classOf[ValidationException]) + def testUnsupportedProperty(): Unit = { + val props = properties() + props.put("format.path_new", "/new/path") + TableSourceFactoryService.findTableSourceFactory(props.toMap) + } + + @Test(expected = classOf[TableException]) + def testFailingFactory(): Unit = { + val props = properties() + props.put("failing", "true") + TableSourceFactoryService.findTableSourceFactory(props.toMap) + } + + private def properties(): mutable.Map[String, String] = { + val properties = mutable.Map[String, String]() + properties.put(CONNECTOR_TYPE, "test") + properties.put(FORMAT_TYPE, "test") + properties.put(CONNECTOR_VERSION, "1") + properties.put(FORMAT_VERSION, "1") + properties.put("format.path", "/path/to/target") + properties.put("schema.0.name", "a") + properties.put("schema.1.name", "b") + properties.put("schema.2.name", "c") + properties.put("schema.0.field.0.name", "a") + properties.put("schema.0.field.1.name", "b") + properties.put("schema.0.field.2.name", "c") + properties.put("failing", "false") + properties + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala new file mode 100644 index 0000000..ae75f99 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableSchema +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE, CONNECTOR_VERSION} +import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_VERSION} +import org.apache.flink.types.Row + +class TestTableSourceFactory extends TableSourceFactory[Row] { + + override def requiredContext(): util.Map[String, String] = { + val context = new util.HashMap[String, String]() + context.put(CONNECTOR_TYPE, "test") + context.put(FORMAT_TYPE, "test") + context.put(CONNECTOR_VERSION, "1") + context.put(FORMAT_VERSION, "1") + context + } + + override def supportedProperties(): util.List[String] = { + val properties = new util.ArrayList[String]() + // connector + properties.add("format.path") + properties.add("schema.#.name") + properties.add("schema.#.field.#.name") + properties.add("failing") + properties + } + + override def create(properties: util.Map[String, String]): TableSource[Row] = { + if (properties.get("failing") == "true") { + throw new IllegalArgumentException("Error in this factory.") + } + new TableSource[Row] { + override def getTableSchema: TableSchema = throw new UnsupportedOperationException() + + override def getReturnType: TypeInformation[Row] = throw new UnsupportedOperationException() + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala new file mode 100644 index 0000000..29d647c --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.typeutils + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.table.api.Types +import org.apache.flink.table.runtime.utils.CommonTestData.{NonPojo, Person} +import org.junit.Assert.assertEquals +import org.junit.Test + +/** + * Tests for string-based representation of [[TypeInformation]]. + */ +class TypeStringUtilsTest { + + @Test + def testPrimitiveTypes(): Unit = { + testReadAndWrite("VARCHAR", Types.STRING) + testReadAndWrite("BOOLEAN", Types.BOOLEAN) + testReadAndWrite("TINYINT", Types.BYTE) + testReadAndWrite("SMALLINT", Types.SHORT) + testReadAndWrite("INT", Types.INT) + testReadAndWrite("BIGINT", Types.LONG) + testReadAndWrite("FLOAT", Types.FLOAT) + testReadAndWrite("DOUBLE", Types.DOUBLE) + testReadAndWrite("DECIMAL", Types.DECIMAL) + testReadAndWrite("DATE", Types.SQL_DATE) + testReadAndWrite("TIME", Types.SQL_TIME) + testReadAndWrite("TIMESTAMP", Types.SQL_TIMESTAMP) + + // unsupported type information + testReadAndWrite( + "ANY(java.lang.Void, " + + "rO0ABXNyADJvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZWluZm8uQmFzaWNUeXBlSW5mb_oE8IKl" + + "ad0GAgAETAAFY2xhenp0ABFMamF2YS9sYW5nL0NsYXNzO0wAD2NvbXBhcmF0b3JDbGFzc3EAfgABWwAXcG9z" + + "c2libGVDYXN0VGFyZ2V0VHlwZXN0ABJbTGphdmEvbGFuZy9DbGFzcztMAApzZXJpYWxpemVydAA2TG9yZy9h" + + "cGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZVNlcmlhbGl6ZXI7eHIANG9yZy5hcGFjaGUu" + + "ZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIADmphdmEu" + + "bGFuZy5Wb2lkAAAAAAAAAAAAAAB4cHB1cgASW0xqYXZhLmxhbmcuQ2xhc3M7qxbXrsvNWpkCAAB4cAAAAABz" + + "cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlZvaWRTZXJpYWxpemVyAAAA" + + "AAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJp" + + "YWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1" + + "dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHA)", + BasicTypeInfo.VOID_TYPE_INFO) + } + + @Test + def testWriteComplexTypes(): Unit = { + testReadAndWrite( + "ROW(f0 DECIMAL, f1 TINYINT)", + Types.ROW(Types.DECIMAL, Types.BYTE)) + + testReadAndWrite( + "ROW(hello DECIMAL, world TINYINT)", + Types.ROW( + Array[String]("hello", "world"), + Array[TypeInformation[_]](Types.DECIMAL, Types.BYTE))) + + testReadAndWrite( + "ROW(\"he llo\" DECIMAL, world TINYINT)", + Types.ROW( + Array[String]("he llo", "world"), + Array[TypeInformation[_]](Types.DECIMAL, Types.BYTE))) + + testReadAndWrite( + "ROW(\"he \\nllo\" DECIMAL, world TINYINT)", + Types.ROW( + Array[String]("he \nllo", "world"), + Array[TypeInformation[_]](Types.DECIMAL, Types.BYTE))) + + testReadAndWrite( + "POJO(org.apache.flink.table.runtime.utils.CommonTestData$Person)", + TypeExtractor.createTypeInfo(classOf[Person])) + + testReadAndWrite( + "ANY(org.apache.flink.table.runtime.utils.CommonTestData$NonPojo)", + TypeExtractor.createTypeInfo(classOf[NonPojo])) + } + + private def testReadAndWrite(expected: String, tpe: TypeInformation[_]): Unit = { + // test write to string + assertEquals(expected, TypeStringUtils.writeTypeInfo(tpe)) + + // test read from string + assertEquals(tpe, TypeStringUtils.readTypeInfo(expected)) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala index ff7c79d..f43bcc6 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala @@ -35,8 +35,6 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) { override def sql(query: String): Table = ??? - override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = ??? - override protected def getBuiltInNormRuleSet: RuleSet = ??? override protected def getBuiltInPhysicalOptRuleSet: RuleSet = ??? @@ -46,4 +44,9 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) { fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]], tableSink: TableSink[_]): Unit = ??? + + override protected def createUniqueTableName(): String = ??? + + override protected def registerTableSourceInternal(name: String, tableSource: TableSource[_]) + : Unit = ??? }
