Repository: flink Updated Branches: refs/heads/release-1.5 a269f8519 -> db2c510fb
http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala index 15cf13b..85778ca 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala @@ -18,16 +18,36 @@ package org.apache.flink.table.descriptors +import java.util + import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.table.api.{TableSchema, Types, ValidationException} import org.junit.Test +import scala.collection.JavaConverters._ + class CsvTest extends DescriptorTestBase { - @Test - def testCsv(): Unit = { - val desc = Csv() + @Test(expected = classOf[ValidationException]) + def testInvalidType(): Unit = { + addPropertyAndVerify(descriptors().get(0), "format.fields.0.type", "WHATEVER") + } + + @Test(expected = classOf[ValidationException]) + def testInvalidField(): Unit = { + addPropertyAndVerify(descriptors().get(0), "format.fields.10.name", "WHATEVER") + } + + @Test(expected = classOf[ValidationException]) + def testInvalidQuoteCharacter(): Unit = { + addPropertyAndVerify(descriptors().get(0), "format.quote-character", "qq") + } + + // ---------------------------------------------------------------------------------------------- + + override def descriptors(): util.List[Descriptor] = { + val desc1 = Csv() .field("field1", "STRING") .field("field2", Types.SQL_TIMESTAMP) .field("field3", TypeExtractor.createTypeInfo(classOf[Class[_]])) @@ -35,9 +55,21 @@ class CsvTest extends DescriptorTestBase { Array[String]("test", "row"), Array[TypeInformation[_]](Types.INT, Types.STRING))) .lineDelimiter("^") - val expected = Seq( + + val desc2 = Csv() + .schema(new TableSchema( + Array[String]("test", "row"), + Array[TypeInformation[_]](Types.INT, Types.STRING))) + .quoteCharacter('#') + .ignoreFirstLine() + + util.Arrays.asList(desc1, desc2) + } + + override def properties(): util.List[util.Map[String, String]] = { + val props1 = Map( "format.type" -> "csv", - "format.version" -> "1", + "format.property-version" -> "1", "format.fields.0.name" -> "field1", "format.fields.0.type" -> "STRING", "format.fields.1.name" -> "field2", @@ -47,53 +79,18 @@ class CsvTest extends DescriptorTestBase { "format.fields.3.name" -> "field4", "format.fields.3.type" -> "ROW(test INT, row VARCHAR)", "format.line-delimiter" -> "^") - verifyProperties(desc, expected) - } - @Test - def testCsvTableSchema(): Unit = { - val desc = Csv() - .schema(new TableSchema( - Array[String]("test", "row"), - Array[TypeInformation[_]](Types.INT, Types.STRING))) - .quoteCharacter('#') - .ignoreFirstLine() - val expected = Seq( + val props2 = Map( "format.type" -> "csv", - "format.version" -> "1", + "format.property-version" -> "1", "format.fields.0.name" -> "test", "format.fields.0.type" -> "INT", "format.fields.1.name" -> "row", "format.fields.1.type" -> "VARCHAR", "format.quote-character" -> "#", "format.ignore-first-line" -> "true") - verifyProperties(desc, expected) - } - @Test(expected = classOf[ValidationException]) - def testInvalidType(): Unit = { - verifyInvalidProperty("format.fields.0.type", "WHATEVER") - } - - @Test(expected = classOf[ValidationException]) - def testInvalidField(): Unit = { - verifyInvalidProperty("format.fields.10.name", "WHATEVER") - } - - @Test(expected = classOf[ValidationException]) - def testInvalidQuoteCharacter(): Unit = { - verifyInvalidProperty("format.quote-character", "qq") - } - - override def descriptor(): Descriptor = { - Csv() - .field("field1", "STRING") - .field("field2", Types.SQL_TIMESTAMP) - .field("field3", TypeExtractor.createTypeInfo(classOf[Class[_]])) - .field("field4", Types.ROW( - Array[String]("test", "row"), - Array[TypeInformation[_]](Types.INT, Types.STRING))) - .lineDelimiter("^") + util.Arrays.asList(props1.asJava, props2.asJava) } override def validator(): DescriptorValidator = { http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala index 3a59c9b..7a98b0b 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala @@ -18,37 +18,84 @@ package org.apache.flink.table.descriptors +import org.apache.flink.util.Preconditions import org.junit.Assert.assertEquals +import org.junit.Test + +import scala.collection.JavaConverters._ abstract class DescriptorTestBase { /** - * Returns a valid descriptor. + * Returns a set of valid descriptors. + * This method is implemented in both Scala and Java. */ - def descriptor(): Descriptor + def descriptors(): java.util.List[Descriptor] /** - * Returns a validator that can validate this descriptor. + * Returns a set of properties for each valid descriptor. + * This code is implemented in both Scala and Java. + */ + def properties(): java.util.List[java.util.Map[String, String]] + + /** + * Returns a validator that can validate all valid descriptors. */ def validator(): DescriptorValidator - def verifyProperties(descriptor: Descriptor, expected: Seq[(String, String)]): Unit = { + @Test + def testValidation(): Unit = { + val d = descriptors().asScala + val p = properties().asScala + + Preconditions.checkArgument(d.length == p.length) + + d.zip(p).foreach { case (desc, props) => + verifyProperties(desc, props.asScala.toMap) + } + } + + def verifyProperties(descriptor: Descriptor, expected: Map[String, String]): Unit = { val normProps = new DescriptorProperties descriptor.addProperties(normProps) - assertEquals(expected.toMap, normProps.asMap) + + // test produced properties + assertEquals(expected, normProps.asMap.asScala.toMap) + + // test validation logic + validator().validate(normProps) } - def verifyInvalidProperty(property: String, invalidValue: String): Unit = { + def addPropertyAndVerify( + descriptor: Descriptor, + property: String, + invalidValue: String): Unit = { val properties = new DescriptorProperties - descriptor().addProperties(properties) + descriptor.addProperties(properties) properties.unsafePut(property, invalidValue) validator().validate(properties) } - def verifyMissingProperty(removeProperty: String): Unit = { + def removePropertyAndVerify(descriptor: Descriptor, removeProperty: String): Unit = { val properties = new DescriptorProperties - descriptor().addProperties(properties) + descriptor.addProperties(properties) properties.unsafeRemove(removeProperty) validator().validate(properties) } } + +class TestTableSourceDescriptor(connector: ConnectorDescriptor) + extends TableSourceDescriptor { + + this.connectorDescriptor = Some(connector) + + def addFormat(format: FormatDescriptor): TestTableSourceDescriptor = { + this.formatDescriptor = Some(format) + this + } + + def addSchema(schema: Schema): TestTableSourceDescriptor = { + this.schemaDescriptor = Some(schema) + this + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala index 3452e8d..1162694 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala @@ -18,36 +18,41 @@ package org.apache.flink.table.descriptors +import java.util + import org.apache.flink.table.api.ValidationException import org.junit.Test -class FileSystemTest extends DescriptorTestBase { +import scala.collection.JavaConverters._ - @Test - def testFileSystem(): Unit = { - val desc = FileSystem().path("/myfile") - val expected = Seq( - "connector.type" -> "filesystem", - "connector.version" -> "1", - "connector.path" -> "/myfile") - verifyProperties(desc, expected) - } +class FileSystemTest extends DescriptorTestBase { @Test(expected = classOf[ValidationException]) def testInvalidPath(): Unit = { - verifyInvalidProperty("connector.path", "") + addPropertyAndVerify(descriptors().get(0), "connector.path", "") } @Test(expected = classOf[ValidationException]) def testMissingPath(): Unit = { - verifyMissingProperty("connector.path") + removePropertyAndVerify(descriptors().get(0), "connector.path") } - override def descriptor(): Descriptor = { - FileSystem().path("/myfile") + // ---------------------------------------------------------------------------------------------- + + override def descriptors(): util.List[Descriptor] = { + util.Arrays.asList(FileSystem().path("/myfile")) } override def validator(): DescriptorValidator = { new FileSystemValidator() } + + override def properties(): util.List[util.Map[String, String]] = { + val desc = Map( + "connector.type" -> "filesystem", + "connector.property-version" -> "1", + "connector.path" -> "/myfile") + + util.Arrays.asList(desc.asJava) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/JsonTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/JsonTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/JsonTest.scala deleted file mode 100644 index 756ca23..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/JsonTest.scala +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors - -import org.apache.flink.table.api.ValidationException -import org.junit.Test - -class JsonTest extends DescriptorTestBase { - - @Test - def testJson(): Unit = { - val schema = - """ - |{ - | "title": "Person", - | "type": "object", - | "properties": { - | "firstName": { - | "type": "string" - | }, - | "lastName": { - | "type": "string" - | }, - | "age": { - | "description": "Age in years", - | "type": "integer", - | "minimum": 0 - | } - | }, - | "required": ["firstName", "lastName"] - |} - |""".stripMargin - val desc = Json() - .schema(schema) - .failOnMissingField(true) - val expected = Seq( - "format.type" -> "json", - "format.version" -> "1", - "format.schema-string" -> schema, - "format.fail-on-missing-field" -> "true") - verifyProperties(desc, expected) - } - - @Test(expected = classOf[ValidationException]) - def testInvalidMissingField(): Unit = { - verifyInvalidProperty("format.fail-on-missing-field", "DDD") - } - - @Test(expected = classOf[ValidationException]) - def testMissingSchema(): Unit = { - verifyMissingProperty("format.schema-string") - } - - override def descriptor(): Descriptor = { - Json().schema("test") - } - - override def validator(): DescriptorValidator = { - new JsonValidator() - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala index a1854ce..64965b0 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala @@ -18,38 +18,42 @@ package org.apache.flink.table.descriptors +import java.util + import org.apache.flink.table.api.ValidationException import org.junit.Test -class MetadataTest extends DescriptorTestBase { +import scala.collection.JavaConverters._ - @Test - def testMetadata(): Unit = { - val desc = Metadata() - .comment("Some additional comment") - .creationTime(123L) - .lastAccessTime(12020202L) - val expected = Seq( - "metadata.comment" -> "Some additional comment", - "metadata.creation-time" -> "123", - "metadata.last-access-time" -> "12020202" - ) - verifyProperties(desc, expected) - } +class MetadataTest extends DescriptorTestBase { @Test(expected = classOf[ValidationException]) def testInvalidCreationTime(): Unit = { - verifyInvalidProperty("metadata.creation-time", "dfghj") + addPropertyAndVerify(descriptors().get(0), "metadata.creation-time", "dfghj") } - override def descriptor(): Descriptor = { - Metadata() + // ---------------------------------------------------------------------------------------------- + + override def descriptors(): util.List[Descriptor] = { + val desc = Metadata() .comment("Some additional comment") .creationTime(123L) .lastAccessTime(12020202L) + + util.Arrays.asList(desc) } override def validator(): DescriptorValidator = { new MetadataValidator() } + + override def properties(): util.List[util.Map[String, String]] = { + val props = Map( + "metadata.comment" -> "Some additional comment", + "metadata.creation-time" -> "123", + "metadata.last-access-time" -> "12020202" + ) + + util.Arrays.asList(props.asJava) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala index 80050fc..7968b48 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala @@ -18,6 +18,8 @@ package org.apache.flink.table.descriptors +import java.util + import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.table.api.ValidationException import org.apache.flink.table.descriptors.RowtimeTest.CustomAssigner @@ -25,41 +27,58 @@ import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner import org.apache.flink.types.Row import org.junit.Test -class RowtimeTest extends DescriptorTestBase { +import scala.collection.JavaConverters._ - @Test - def testRowtime(): Unit = { - val desc = Rowtime() - .timestampsFromField("otherField") - .watermarksPeriodicBounding(1000L) - val expected = Seq( - "rowtime.0.version" -> "1", - "rowtime.0.timestamps.type" -> "from-field", - "rowtime.0.timestamps.from" -> "otherField", - "rowtime.0.watermarks.type" -> "periodic-bounding", - "rowtime.0.watermarks.delay" -> "1000" - ) - verifyProperties(desc, expected) - } +class RowtimeTest extends DescriptorTestBase { @Test(expected = classOf[ValidationException]) def testInvalidWatermarkType(): Unit = { - verifyInvalidProperty("rowtime.0.watermarks.type", "xxx") + addPropertyAndVerify(descriptors().get(0), "rowtime.watermarks.type", "xxx") } @Test(expected = classOf[ValidationException]) def testMissingWatermarkClass(): Unit = { - verifyMissingProperty("rowtime.0.watermarks.class") + removePropertyAndVerify(descriptors().get(1), "rowtime.watermarks.class") } - override def descriptor(): Descriptor = { - Rowtime() + // ---------------------------------------------------------------------------------------------- + + override def descriptors(): util.List[Descriptor] = { + val desc1 = Rowtime() + .timestampsFromField("otherField") + .watermarksPeriodicBounding(1000L) + + val desc2 = Rowtime() .timestampsFromSource() .watermarksFromStrategy(new CustomAssigner()) + + util.Arrays.asList(desc1, desc2) } override def validator(): DescriptorValidator = { - new RowtimeValidator("rowtime.0.") + new RowtimeValidator() + } + + override def properties(): util.List[util.Map[String, String]] = { + val props1 = Map( + "rowtime.timestamps.type" -> "from-field", + "rowtime.timestamps.from" -> "otherField", + "rowtime.watermarks.type" -> "periodic-bounded", + "rowtime.watermarks.delay" -> "1000" + ) + + val props2 = Map( + "rowtime.timestamps.type" -> "from-source", + "rowtime.watermarks.type" -> "custom", + "rowtime.watermarks.class" -> "org.apache.flink.table.descriptors.RowtimeTest$CustomAssigner", + "rowtime.watermarks.serialized" -> ("rO0ABXNyAD1vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmRlc2NyaX" + + "B0b3JzLlJvd3RpbWVUZXN0JEN1c3RvbUFzc2lnbmVyeDcuDvfbu0kCAAB4cgBHb3JnLmFwYWNoZS5mbGluay" + + "50YWJsZS5zb3VyY2VzLndtc3RyYXRlZ2llcy5QdW5jdHVhdGVkV2F0ZXJtYXJrQXNzaWduZXKBUc57oaWu9A" + + "IAAHhyAD1vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnNvdXJjZXMud21zdHJhdGVnaWVzLldhdGVybWFya1N0cm" + + "F0ZWd5mB_uSxDZ8-MCAAB4cA") + ) + + util.Arrays.asList(props1.asJava, props2.asJava) } } http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala index f663a96..589ec4f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala @@ -18,21 +18,54 @@ package org.apache.flink.table.descriptors +import java.util + import org.apache.flink.table.api.{Types, ValidationException} import org.junit.Test +import scala.collection.JavaConverters._ + class SchemaTest extends DescriptorTestBase { - @Test - def testSchema(): Unit = { - val desc = Schema() + @Test(expected = classOf[ValidationException]) + def testInvalidType(): Unit = { + addPropertyAndVerify( + descriptors().get(0), + "schema.1.type", "dfghj") + } + + @Test(expected = classOf[ValidationException]) + def testBothRowtimeAndProctime(): Unit = { + addPropertyAndVerify( + descriptors().get(0), + "schema.2.rowtime.watermarks.type", "from-source") + } + + // ---------------------------------------------------------------------------------------------- + + override def descriptors(): util.List[Descriptor] = { + val desc1 = Schema() .field("myField", Types.BOOLEAN) .field("otherField", "VARCHAR").from("csvField") .field("p", Types.SQL_TIMESTAMP).proctime() .field("r", Types.SQL_TIMESTAMP).rowtime( Rowtime().timestampsFromSource().watermarksFromSource()) - val expected = Seq( - "schema.version" -> "1", + + val desc2 = Schema() + .field("myField", Types.BOOLEAN) + .field("otherField", "VARCHAR").from("csvField") + .field("p", Types.SQL_TIMESTAMP).proctime() + .field("r", Types.SQL_TIMESTAMP) + + util.Arrays.asList(desc1, desc2) + } + + override def validator(): DescriptorValidator = { + new SchemaValidator(isStreamEnvironment = true) + } + + override def properties(): util.List[util.Map[String, String]] = { + val props1 = Map( "schema.0.name" -> "myField", "schema.0.type" -> "BOOLEAN", "schema.1.name" -> "otherField", @@ -43,34 +76,23 @@ class SchemaTest extends DescriptorTestBase { "schema.2.proctime" -> "true", "schema.3.name" -> "r", "schema.3.type" -> "TIMESTAMP", - "schema.3.rowtime.0.version" -> "1", - "schema.3.rowtime.0.watermarks.type" -> "from-source", - "schema.3.rowtime.0.timestamps.type" -> "from-source" + "schema.3.rowtime.watermarks.type" -> "from-source", + "schema.3.rowtime.timestamps.type" -> "from-source" ) - verifyProperties(desc, expected) - } - @Test(expected = classOf[ValidationException]) - def testInvalidType(): Unit = { - verifyInvalidProperty("schema.1.type", "dfghj") - } - - @Test(expected = classOf[ValidationException]) - def testBothRowtimeAndProctime(): Unit = { - verifyInvalidProperty("schema.2.rowtime.0.version", "1") - verifyInvalidProperty("schema.2.rowtime.0.watermarks.type", "from-source") - verifyInvalidProperty("schema.2.rowtime.0.timestamps.type", "from-source") - } - - override def descriptor(): Descriptor = { - Schema() - .field("myField", Types.BOOLEAN) - .field("otherField", "VARCHAR").from("csvField") - .field("p", Types.SQL_TIMESTAMP).proctime() - .field("r", Types.SQL_TIMESTAMP) - } + val props2 = Map( + "schema.0.name" -> "myField", + "schema.0.type" -> "BOOLEAN", + "schema.1.name" -> "otherField", + "schema.1.type" -> "VARCHAR", + "schema.1.from" -> "csvField", + "schema.2.name" -> "p", + "schema.2.type" -> "TIMESTAMP", + "schema.2.proctime" -> "true", + "schema.3.name" -> "r", + "schema.3.type" -> "TIMESTAMP" + ) - override def validator(): DescriptorValidator = { - new SchemaValidator(isStreamEnvironment = true) + util.Arrays.asList(props1.asJava, props2.asJava) } } http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala new file mode 100644 index 0000000..ba05dff --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import java.util.Optional + +import org.apache.flink.table.api.{TableSchema, Types} +import org.apache.flink.table.sources.tsextractors.StreamRecordTimestamp +import org.apache.flink.table.sources.wmstrategies.PreserveWatermarks +import org.junit.Assert.{assertEquals, assertTrue} +import org.junit.Test + +import scala.collection.JavaConverters._ + +/** + * Tests for [[SchemaValidator]]. + */ +class SchemaValidatorTest { + + @Test + def testSchema(): Unit = { + val desc1 = Schema() + .field("otherField", Types.STRING).from("csvField") + .field("abcField", Types.STRING) + .field("p", Types.SQL_TIMESTAMP).proctime() + .field("r", Types.SQL_TIMESTAMP).rowtime( + Rowtime().timestampsFromSource().watermarksFromSource()) + val props = new DescriptorProperties() + desc1.addProperties(props) + + val inputSchema = TableSchema.builder() + .field("csvField", Types.STRING) + .field("abcField", Types.STRING) + .field("myField", Types.BOOLEAN) + .build() + + // test proctime + assertEquals(Optional.of("p"), SchemaValidator.deriveProctimeAttribute(props)) + + // test rowtime + val rowtime = SchemaValidator.deriveRowtimeAttributes(props).get(0) + assertEquals("r", rowtime.getAttributeName) + assertTrue(rowtime.getTimestampExtractor.isInstanceOf[StreamRecordTimestamp]) + assertTrue(rowtime.getWatermarkStrategy.isInstanceOf[PreserveWatermarks]) + + // test field mapping + val expectedMapping = Map("otherField" -> "csvField", "abcField" -> "abcField").asJava + assertEquals( + expectedMapping, + SchemaValidator.deriveFieldMapping(props, Optional.of(inputSchema))) + + // test field format + val formatSchema = SchemaValidator.deriveFormatFields(props) + val expectedFormatSchema = TableSchema.builder() + .field("csvField", Types.STRING) // aliased + .field("abcField", Types.STRING) + .build() + assertEquals(expectedFormatSchema, formatSchema) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala index 3b248b4..2def0c3 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala @@ -24,17 +24,44 @@ import org.apache.flink.table.api.ValidationException import org.apache.flink.table.plan.stats.{ColumnStats, TableStats} import org.junit.Test +import scala.collection.JavaConverters._ + class StatisticsTest extends DescriptorTestBase { - @Test - def testStatistics(): Unit = { - val desc = Statistics() + @Test(expected = classOf[ValidationException]) + def testInvalidRowCount(): Unit = { + addPropertyAndVerify(descriptors().get(0), "statistics.row-count", "abx") + } + + @Test(expected = classOf[ValidationException]) + def testMissingName(): Unit = { + removePropertyAndVerify(descriptors().get(0), "statistics.columns.0.name") + } + + // ---------------------------------------------------------------------------------------------- + + override def descriptors(): util.List[Descriptor] = { + val desc1 = Statistics() .rowCount(1000L) .columnStats("a", ColumnStats(1L, 2L, 3.0, 4, 5, 6)) .columnAvgLength("b", 42.0) .columnNullCount("a", 300) - val expected = Seq( - "statistics.version" -> "1", + + val map = new util.HashMap[String, ColumnStats]() + map.put("a", ColumnStats(null, 2L, 3.0, null, 5, 6)) + val desc2 = Statistics() + .tableStats(TableStats(32L, map)) + + util.Arrays.asList(desc1, desc2) + } + + override def validator(): DescriptorValidator = { + new StatisticsValidator() + } + + override def properties(): util.List[util.Map[String, String]] = { + val props1 = Map( + "statistics.property-version" -> "1", "statistics.row-count" -> "1000", "statistics.columns.0.name" -> "a", "statistics.columns.0.distinct-count" -> "1", @@ -46,17 +73,9 @@ class StatisticsTest extends DescriptorTestBase { "statistics.columns.1.name" -> "b", "statistics.columns.1.avg-length" -> "42.0" ) - verifyProperties(desc, expected) - } - @Test - def testStatisticsTableStats(): Unit = { - val map = new util.HashMap[String, ColumnStats]() - map.put("a", ColumnStats(null, 2L, 3.0, null, 5, 6)) - val desc = Statistics() - .tableStats(TableStats(32L, map)) - val expected = Seq( - "statistics.version" -> "1", + val props2 = Map( + "statistics.property-version" -> "1", "statistics.row-count" -> "32", "statistics.columns.0.name" -> "a", "statistics.columns.0.null-count" -> "2", @@ -64,28 +83,7 @@ class StatisticsTest extends DescriptorTestBase { "statistics.columns.0.max-value" -> "5", "statistics.columns.0.min-value" -> "6" ) - verifyProperties(desc, expected) - } - @Test(expected = classOf[ValidationException]) - def testInvalidRowCount(): Unit = { - verifyInvalidProperty("statistics.row-count", "abx") - } - - @Test(expected = classOf[ValidationException]) - def testMissingName(): Unit = { - verifyMissingProperty("statistics.columns.0.name") - } - - override def descriptor(): Descriptor = { - Statistics() - .rowCount(1000L) - .columnStats("a", ColumnStats(1L, 2L, 3.0, 4, 5, 6)) - .columnAvgLength("b", 42.0) - .columnNullCount("a", 300) - } - - override def validator(): DescriptorValidator = { - new StatisticsValidator() + util.Arrays.asList(props1.asJava, props2.asJava) } } http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala index 5e9b5a2..279e9a4 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala @@ -19,8 +19,8 @@ package org.apache.flink.table.sources import org.apache.flink.table.api.{NoMatchingTableSourceException, TableException, ValidationException} -import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE, CONNECTOR_VERSION} -import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_VERSION} +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE, CONNECTOR_PROPERTY_VERSION} +import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_PROPERTY_VERSION} import org.junit.Assert.assertTrue import org.junit.Test @@ -31,44 +31,44 @@ class TableSourceFactoryServiceTest { @Test def testValidProperties(): Unit = { val props = properties() - assertTrue(TableSourceFactoryService.findTableSourceFactory(props.toMap) != null) + assertTrue(TableSourceFactoryService.findAndCreateTableSource(props.toMap) != null) } @Test(expected = classOf[NoMatchingTableSourceException]) def testInvalidContext(): Unit = { val props = properties() props.put(CONNECTOR_TYPE, "FAIL") - TableSourceFactoryService.findTableSourceFactory(props.toMap) + TableSourceFactoryService.findAndCreateTableSource(props.toMap) } @Test def testDifferentContextVersion(): Unit = { val props = properties() - props.put(CONNECTOR_VERSION, "2") + props.put(CONNECTOR_PROPERTY_VERSION, "2") // the table source should still be found - assertTrue(TableSourceFactoryService.findTableSourceFactory(props.toMap) != null) + assertTrue(TableSourceFactoryService.findAndCreateTableSource(props.toMap) != null) } @Test(expected = classOf[ValidationException]) def testUnsupportedProperty(): Unit = { val props = properties() props.put("format.path_new", "/new/path") - TableSourceFactoryService.findTableSourceFactory(props.toMap) + TableSourceFactoryService.findAndCreateTableSource(props.toMap) } @Test(expected = classOf[TableException]) def testFailingFactory(): Unit = { val props = properties() props.put("failing", "true") - TableSourceFactoryService.findTableSourceFactory(props.toMap) + TableSourceFactoryService.findAndCreateTableSource(props.toMap) } private def properties(): mutable.Map[String, String] = { val properties = mutable.Map[String, String]() properties.put(CONNECTOR_TYPE, "test") properties.put(FORMAT_TYPE, "test") - properties.put(CONNECTOR_VERSION, "1") - properties.put(FORMAT_VERSION, "1") + properties.put(CONNECTOR_PROPERTY_VERSION, "1") + properties.put(FORMAT_PROPERTY_VERSION, "1") properties.put("format.path", "/path/to/target") properties.put("schema.0.name", "a") properties.put("schema.1.name", "b") http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala index ae75f99..ee3d637 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala @@ -22,8 +22,8 @@ import java.util import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.api.TableSchema -import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE, CONNECTOR_VERSION} -import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_VERSION} +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE, CONNECTOR_PROPERTY_VERSION} +import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_PROPERTY_VERSION} import org.apache.flink.types.Row class TestTableSourceFactory extends TableSourceFactory[Row] { @@ -32,8 +32,8 @@ class TestTableSourceFactory extends TableSourceFactory[Row] { val context = new util.HashMap[String, String]() context.put(CONNECTOR_TYPE, "test") context.put(FORMAT_TYPE, "test") - context.put(CONNECTOR_VERSION, "1") - context.put(FORMAT_VERSION, "1") + context.put(CONNECTOR_PROPERTY_VERSION, "1") + context.put(FORMAT_PROPERTY_VERSION, "1") context }
