Repository: flink Updated Branches: refs/heads/master ddba1b69f -> 6fcc1e9a8
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala index 7a98b0b..3f6426d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala @@ -18,6 +18,7 @@ package org.apache.flink.table.descriptors +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT} import org.apache.flink.util.Preconditions import org.junit.Assert.assertEquals import org.junit.Test @@ -84,18 +85,44 @@ abstract class DescriptorTestBase { } } -class TestTableSourceDescriptor(connector: ConnectorDescriptor) - extends TableSourceDescriptor { +class TestTableDescriptor(connector: ConnectorDescriptor) + extends TableDescriptor + with SchematicDescriptor[TestTableDescriptor] + with StreamableDescriptor[TestTableDescriptor] { - this.connectorDescriptor = Some(connector) + private var formatDescriptor: Option[FormatDescriptor] = None + private var schemaDescriptor: Option[Schema] = None + private var updateMode: Option[String] = None - def addFormat(format: FormatDescriptor): TestTableSourceDescriptor = { + override private[flink] def addProperties(properties: DescriptorProperties): Unit = { + connector.addProperties(properties) + formatDescriptor.foreach(_.addProperties(properties)) + schemaDescriptor.foreach(_.addProperties(properties)) + updateMode.foreach(mode => properties.putString(UPDATE_MODE, mode)) + } + + override def withFormat(format: FormatDescriptor): TestTableDescriptor = { this.formatDescriptor = Some(format) this } - def addSchema(schema: Schema): TestTableSourceDescriptor = { + override def withSchema(schema: Schema): TestTableDescriptor = { this.schemaDescriptor = Some(schema) this } + + override def inAppendMode(): TestTableDescriptor = { + updateMode = Some(UPDATE_MODE_VALUE_APPEND) + this + } + + override def inRetractMode(): TestTableDescriptor = { + updateMode = Some(UPDATE_MODE_VALUE_RETRACT) + this + } + + override def inUpsertMode(): TestTableDescriptor = { + updateMode = Some(UPDATE_MODE_VALUE_UPSERT) + this + } } http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala new file mode 100644 index 0000000..ccac317 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import org.apache.flink.table.api.Types +import org.apache.flink.table.utils.TableTestBase +import org.junit.Assert.assertEquals +import org.junit.Test + +import scala.collection.JavaConverters._ + +/** + * Tests for [[TableDescriptor]]. + */ +class TableDescriptorTest extends TableTestBase { + + @Test + def testStreamTableSourceDescriptor(): Unit = { + testTableSourceDescriptor(true) + } + + @Test + def testBatchTableSourceDescriptor(): Unit = { + testTableSourceDescriptor(false) + } + + private def testTableSourceDescriptor(isStreaming: Boolean): Unit = { + + val schema = Schema() + .field("myfield", Types.STRING) + .field("myfield2", Types.INT) + // CSV table source and sink do not support proctime yet + //if (isStreaming) { + // schema.field("proctime", Types.SQL_TIMESTAMP).proctime() + //} + + val connector = FileSystem() + .path("/path/to/csv") + + val format = Csv() + .field("myfield", Types.STRING) + .field("myfield2", Types.INT) + .fieldDelimiter("#") + + val descriptor: RegistrableDescriptor = if (isStreaming) { + streamTestUtil().tableEnv + .connect(connector) + .withFormat(format) + .withSchema(schema) + .inAppendMode() + } else { + batchTestUtil().tableEnv + .connect(connector) + .withFormat(format) + .withSchema(schema) + } + + // tests the table factory discovery and thus validates the result automatically + descriptor.registerTableSourceAndSink("MyTable") + + val expectedCommonProperties = Seq( + "connector.property-version" -> "1", + "connector.type" -> "filesystem", + "connector.path" -> "/path/to/csv", + "format.property-version" -> "1", + "format.type" -> "csv", + "format.fields.0.name" -> "myfield", + "format.fields.0.type" -> "VARCHAR", + "format.fields.1.name" -> "myfield2", + "format.fields.1.type" -> "INT", + "format.field-delimiter" -> "#", + "schema.0.name" -> "myfield", + "schema.0.type" -> "VARCHAR", + "schema.1.name" -> "myfield2", + "schema.1.type" -> "INT" + ) + + val expectedProperties = if (isStreaming) { + expectedCommonProperties ++ Seq( + //"schema.2.name" -> "proctime", + //"schema.2.type" -> "TIMESTAMP", + //"schema.2.proctime" -> "true", + "update-mode" -> "append" + ) + } else { + expectedCommonProperties + } + + val actualProperties = new DescriptorProperties(true) + descriptor.addProperties(actualProperties) + + assertEquals(expectedProperties.toMap.asJava, actualProperties.asMap) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableSourceDescriptorTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableSourceDescriptorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableSourceDescriptorTest.scala deleted file mode 100644 index a7dd644..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableSourceDescriptorTest.scala +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors - -import org.apache.flink.table.api.Types -import org.apache.flink.table.utils.TableTestBase -import org.junit.Assert.assertEquals -import org.junit.Test - -import scala.collection.JavaConverters._ - -/** - * Tests for [[TableSourceDescriptor]]. - */ -class TableSourceDescriptorTest extends TableTestBase { - - @Test - def testStreamTableSourceDescriptor(): Unit = { - testTableSourceDescriptor(true) - } - - @Test - def testBatchTableSourceDescriptor(): Unit = { - testTableSourceDescriptor(false) - } - - private def testTableSourceDescriptor(isStreaming: Boolean): Unit = { - - val schema = Schema() - .field("myfield", Types.STRING) - .field("myfield2", Types.INT) - if (isStreaming) { - schema.field("proctime", Types.SQL_TIMESTAMP).proctime() - } - - val connector = FileSystem() - .path("/path/to/csv") - - val format = Csv() - .field("myfield", Types.STRING) - .field("myfield2", Types.INT) - .quoteCharacter(';') - .fieldDelimiter("#") - .lineDelimiter("\r\n") - .commentPrefix("%%") - .ignoreFirstLine() - .ignoreParseErrors() - - val descriptor = if (isStreaming) { - streamTestUtil().tableEnv - .from(connector) - .withFormat(format) - .withSchema(schema) - } else { - batchTestUtil().tableEnv - .from(connector) - .withFormat(format) - .withSchema(schema) - } - - val expectedCommonProperties = Seq( - "connector.property-version" -> "1", - "connector.type" -> "filesystem", - "connector.path" -> "/path/to/csv", - "format.property-version" -> "1", - "format.type" -> "csv", - "format.fields.0.name" -> "myfield", - "format.fields.0.type" -> "VARCHAR", - "format.fields.1.name" -> "myfield2", - "format.fields.1.type" -> "INT", - "format.quote-character" -> ";", - "format.field-delimiter" -> "#", - "format.line-delimiter" -> "\r\n", - "format.comment-prefix" -> "%%", - "format.ignore-first-line" -> "true", - "format.ignore-parse-errors" -> "true", - "schema.0.name" -> "myfield", - "schema.0.type" -> "VARCHAR", - "schema.1.name" -> "myfield2", - "schema.1.type" -> "INT" - ) - - val expectedProperties = if (isStreaming) { - expectedCommonProperties ++ Seq( - "schema.2.name" -> "proctime", - "schema.2.type" -> "TIMESTAMP", - "schema.2.proctime" -> "true" - ) - } else { - expectedCommonProperties - } - - val actualProperties = new DescriptorProperties(true) - descriptor.addProperties(actualProperties) - - assertEquals(expectedProperties.toMap.asJava, actualProperties.asMap) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala index ac9894c..e62396f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala @@ -62,13 +62,13 @@ object CommonTestData { ) } - def getInMemoryTestCatalog: ExternalCatalog = { + def getInMemoryTestCatalog(isStreaming: Boolean): ExternalCatalog = { val csvRecord1 = Seq( "1#1#Hi", "2#2#Hello", "3#2#Hello world" ) - val tempFilePath1 = writeToTempFile(csvRecord1.mkString("$"), "csv-test1", "tmp") + val tempFilePath1 = writeToTempFile(csvRecord1.mkString("\n"), "csv-test1", "tmp") val connDesc1 = FileSystem().path(tempFilePath1) val formatDesc1 = Csv() @@ -76,13 +76,17 @@ object CommonTestData { .field("b", Types.LONG) .field("c", Types.STRING) .fieldDelimiter("#") - .lineDelimiter("$") val schemaDesc1 = Schema() .field("a", Types.INT) .field("b", Types.LONG) .field("c", Types.STRING) - val externalCatalogTable1 = new ExternalCatalogTable( - connDesc1, Some(formatDesc1), Some(schemaDesc1), None, None) + val externalTableBuilder1 = ExternalCatalogTable.builder(connDesc1) + .withFormat(formatDesc1) + .withSchema(schemaDesc1) + + if (isStreaming) { + externalTableBuilder1.inAppendMode() + } val csvRecord2 = Seq( "1#1#0#Hallo#1", @@ -101,7 +105,7 @@ object CommonTestData { "5#14#13#JKL#2", "5#15#14#KLM#2" ) - val tempFilePath2 = writeToTempFile(csvRecord2.mkString("$"), "csv-test2", "tmp") + val tempFilePath2 = writeToTempFile(csvRecord2.mkString("\n"), "csv-test2", "tmp") val connDesc2 = FileSystem().path(tempFilePath2) val formatDesc2 = Csv() @@ -111,15 +115,19 @@ object CommonTestData { .field("g", Types.STRING) .field("h", Types.LONG) .fieldDelimiter("#") - .lineDelimiter("$") val schemaDesc2 = Schema() .field("d", Types.INT) .field("e", Types.LONG) .field("f", Types.INT) .field("g", Types.STRING) .field("h", Types.LONG) - val externalCatalogTable2 = new ExternalCatalogTable( - connDesc2, Some(formatDesc2), Some(schemaDesc2), None, None) + val externalTableBuilder2 = ExternalCatalogTable.builder(connDesc2) + .withFormat(formatDesc2) + .withSchema(schemaDesc2) + + if (isStreaming) { + externalTableBuilder2.inAppendMode() + } val catalog = new InMemoryExternalCatalog("test") val db1 = new InMemoryExternalCatalog("db1") @@ -128,9 +136,9 @@ object CommonTestData { catalog.createSubCatalog("db2", db2, ignoreIfExists = false) // Register the table with both catalogs - catalog.createTable("tb1", externalCatalogTable1, ignoreIfExists = false) - db1.createTable("tb1", externalCatalogTable1, ignoreIfExists = false) - db2.createTable("tb2", externalCatalogTable2, ignoreIfExists = false) + catalog.createTable("tb1", externalTableBuilder1.asTableSource(), ignoreIfExists = false) + db1.createTable("tb1", externalTableBuilder1.asTableSource(), ignoreIfExists = false) + db2.createTable("tb2", externalTableBuilder2.asTableSource(), ignoreIfExists = false) catalog } http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala index c25f30f..87dbc91 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala @@ -21,7 +21,7 @@ package org.apache.flink.table.utils import org.apache.calcite.tools.RuleSet import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.api.{QueryConfig, Table, TableConfig, TableEnvironment} -import org.apache.flink.table.descriptors.{ConnectorDescriptor, TableSourceDescriptor} +import org.apache.flink.table.descriptors.{ConnectorDescriptor, TableDescriptor} import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.sources.TableSource @@ -55,5 +55,5 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) { override def explain(table: Table): String = ??? - override def from(connectorDescriptor: ConnectorDescriptor): TableSourceDescriptor = ??? + override def connect(connectorDescriptor: ConnectorDescriptor): TableDescriptor = ??? }
