Repository: flink
Updated Branches:
  refs/heads/master a5476cdcd -> 2cb58960e


http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
index 42f0769..050f1a1 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
@@ -380,6 +380,44 @@ class TableSourceTest extends TableTestBase {
     Assert.assertEquals(source1, source2)
   }
 
+// TODO enable this test once we expose the feature through the table 
environment
+//  @Test
+//  def testCsvTableSourceDescriptor(): Unit = {
+//    val util = streamTestUtil()
+//    val source1 = util.tableEnv
+//      .from(
+//        FileSystem()
+//          .path("/path/to/csv"))
+//      .withFormat(
+//        Csv()
+//          .field("myfield", Types.STRING)
+//          .field("myfield2", Types.INT)
+//          .quoteCharacter(';')
+//          .fieldDelimiter("#")
+//          .lineDelimiter("\r\n")
+//          .commentPrefix("%%")
+//          .ignoreFirstLine()
+//          .ignoreParseErrors())
+//        .withSchema(
+//          Schema()
+//          .field("myfield", Types.STRING)
+//          .field("myfield2", Types.INT))
+//      .toTableSource
+//
+//    val source2 = new CsvTableSource(
+//      "/path/to/csv",
+//      Array("myfield", "myfield2"),
+//      Array(Types.STRING, Types.INT),
+//      "#",
+//      "\r\n",
+//      ';',
+//      true,
+//      "%%",
+//      true)
+//
+//    Assert.assertEquals(source1, source2)
+//  }
+
   @Test
   def testTimeLiteralExpressionPushdown(): Unit = {
     val (tableSource, tableName) = filterableTableSourceTimeTypes

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
index f9920e7..e6d2458 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
@@ -31,6 +31,7 @@ import org.apache.flink.table.calcite.{CalciteConfig, 
FlinkTypeFactory, FlinkTyp
 import org.apache.flink.table.plan.schema.TableSourceTable
 import org.apache.flink.table.runtime.utils.CommonTestData
 import org.apache.flink.table.sources.CsvTableSource
+import org.apache.flink.table.utils.MockTableEnvironment
 import org.junit.Assert._
 import org.junit.{Before, Test}
 
@@ -48,7 +49,8 @@ class ExternalCatalogSchemaTest {
   def setUp(): Unit = {
     val rootSchemaPlus: SchemaPlus = CalciteSchema.createRootSchema(true, 
false).plus()
     val catalog = CommonTestData.getInMemoryTestCatalog
-    ExternalCatalogSchema.registerCatalog(rootSchemaPlus, schemaName, catalog)
+    ExternalCatalogSchema.registerCatalog(
+      new MockTableEnvironment, rootSchemaPlus, schemaName, catalog)
     externalCatalogSchema = rootSchemaPlus.getSubSchema("schemaName")
     val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem())
     val prop = new Properties()

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala
index 0744de9..cdacce1 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala
@@ -24,8 +24,9 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.{TableSchema, Types}
-import org.apache.flink.table.plan.schema.{StreamTableSourceTable}
+import org.apache.flink.table.plan.schema.StreamTableSourceTable
 import org.apache.flink.table.sources.StreamTableSource
+import org.apache.flink.table.utils.MockTableEnvironment
 import org.apache.flink.types.Row
 import org.junit.Assert.assertTrue
 import org.junit.{Before, Test}
@@ -41,7 +42,8 @@ class ExternalTableSourceUtilTest {
   def testExternalStreamTable() = {
     val schema = new TableSchema(Array("foo"), 
Array(BasicTypeInfo.INT_TYPE_INFO))
     val table = ExternalCatalogTable("mock", schema)
-    val tableSource = ExternalTableSourceUtil.fromExternalCatalogTable(table)
+    val tableSource = ExternalTableSourceUtil.fromExternalCatalogTable(
+      new MockTableEnvironment, table)
     assertTrue(tableSource.isInstanceOf[StreamTableSourceTable[_]])
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
new file mode 100644
index 0000000..15cf13b
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.table.api.{TableSchema, Types, ValidationException}
+import org.junit.Test
+
+class CsvTest extends DescriptorTestBase {
+
+  @Test
+  def testCsv(): Unit = {
+    val desc = Csv()
+      .field("field1", "STRING")
+      .field("field2", Types.SQL_TIMESTAMP)
+      .field("field3", TypeExtractor.createTypeInfo(classOf[Class[_]]))
+      .field("field4", Types.ROW(
+        Array[String]("test", "row"),
+        Array[TypeInformation[_]](Types.INT, Types.STRING)))
+      .lineDelimiter("^")
+    val expected = Seq(
+      "format.type" -> "csv",
+      "format.version" -> "1",
+      "format.fields.0.name" -> "field1",
+      "format.fields.0.type" -> "STRING",
+      "format.fields.1.name" -> "field2",
+      "format.fields.1.type" -> "TIMESTAMP",
+      "format.fields.2.name" -> "field3",
+      "format.fields.2.type" -> "ANY(java.lang.Class)",
+      "format.fields.3.name" -> "field4",
+      "format.fields.3.type" -> "ROW(test INT, row VARCHAR)",
+      "format.line-delimiter" -> "^")
+    verifyProperties(desc, expected)
+  }
+
+  @Test
+  def testCsvTableSchema(): Unit = {
+    val desc = Csv()
+      .schema(new TableSchema(
+        Array[String]("test", "row"),
+        Array[TypeInformation[_]](Types.INT, Types.STRING)))
+      .quoteCharacter('#')
+      .ignoreFirstLine()
+    val expected = Seq(
+      "format.type" -> "csv",
+      "format.version" -> "1",
+      "format.fields.0.name" -> "test",
+      "format.fields.0.type" -> "INT",
+      "format.fields.1.name" -> "row",
+      "format.fields.1.type" -> "VARCHAR",
+      "format.quote-character" -> "#",
+      "format.ignore-first-line" -> "true")
+    verifyProperties(desc, expected)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidType(): Unit = {
+    verifyInvalidProperty("format.fields.0.type", "WHATEVER")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidField(): Unit = {
+    verifyInvalidProperty("format.fields.10.name", "WHATEVER")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidQuoteCharacter(): Unit = {
+    verifyInvalidProperty("format.quote-character", "qq")
+  }
+
+  override def descriptor(): Descriptor = {
+    Csv()
+      .field("field1", "STRING")
+      .field("field2", Types.SQL_TIMESTAMP)
+      .field("field3", TypeExtractor.createTypeInfo(classOf[Class[_]]))
+      .field("field4", Types.ROW(
+        Array[String]("test", "row"),
+        Array[TypeInformation[_]](Types.INT, Types.STRING)))
+      .lineDelimiter("^")
+  }
+
+  override def validator(): DescriptorValidator = {
+    new CsvValidator()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
new file mode 100644
index 0000000..3a59c9b
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.junit.Assert.assertEquals
+
+abstract class DescriptorTestBase {
+
+  /**
+    * Returns a valid descriptor.
+    */
+  def descriptor(): Descriptor
+
+  /**
+    * Returns a validator that can validate this descriptor.
+    */
+  def validator(): DescriptorValidator
+
+  def verifyProperties(descriptor: Descriptor, expected: Seq[(String, 
String)]): Unit = {
+    val normProps = new DescriptorProperties
+    descriptor.addProperties(normProps)
+    assertEquals(expected.toMap, normProps.asMap)
+  }
+
+  def verifyInvalidProperty(property: String, invalidValue: String): Unit = {
+    val properties = new DescriptorProperties
+    descriptor().addProperties(properties)
+    properties.unsafePut(property, invalidValue)
+    validator().validate(properties)
+  }
+
+  def verifyMissingProperty(removeProperty: String): Unit = {
+    val properties = new DescriptorProperties
+    descriptor().addProperties(properties)
+    properties.unsafeRemove(removeProperty)
+    validator().validate(properties)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala
new file mode 100644
index 0000000..3452e8d
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.ValidationException
+import org.junit.Test
+
+class FileSystemTest extends DescriptorTestBase {
+
+  @Test
+  def testFileSystem(): Unit = {
+    val desc = FileSystem().path("/myfile")
+    val expected = Seq(
+      "connector.type" -> "filesystem",
+      "connector.version" -> "1",
+      "connector.path" -> "/myfile")
+    verifyProperties(desc, expected)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidPath(): Unit = {
+    verifyInvalidProperty("connector.path", "")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testMissingPath(): Unit = {
+    verifyMissingProperty("connector.path")
+  }
+
+  override def descriptor(): Descriptor = {
+    FileSystem().path("/myfile")
+  }
+
+  override def validator(): DescriptorValidator = {
+    new FileSystemValidator()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/JsonTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/JsonTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/JsonTest.scala
new file mode 100644
index 0000000..756ca23
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/JsonTest.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.ValidationException
+import org.junit.Test
+
+class JsonTest extends DescriptorTestBase {
+
+  @Test
+  def testJson(): Unit = {
+    val schema =
+      """
+        |{
+        |    "title": "Person",
+        |    "type": "object",
+        |    "properties": {
+        |        "firstName": {
+        |            "type": "string"
+        |        },
+        |        "lastName": {
+        |            "type": "string"
+        |        },
+        |        "age": {
+        |            "description": "Age in years",
+        |            "type": "integer",
+        |            "minimum": 0
+        |        }
+        |    },
+        |    "required": ["firstName", "lastName"]
+        |}
+        |""".stripMargin
+    val desc = Json()
+      .schema(schema)
+      .failOnMissingField(true)
+    val expected = Seq(
+      "format.type" -> "json",
+      "format.version" -> "1",
+      "format.schema-string" -> schema,
+      "format.fail-on-missing-field" -> "true")
+    verifyProperties(desc, expected)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidMissingField(): Unit = {
+    verifyInvalidProperty("format.fail-on-missing-field", "DDD")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testMissingSchema(): Unit = {
+    verifyMissingProperty("format.schema-string")
+  }
+
+  override def descriptor(): Descriptor = {
+    Json().schema("test")
+  }
+
+  override def validator(): DescriptorValidator = {
+    new JsonValidator()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala
new file mode 100644
index 0000000..a1854ce
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.ValidationException
+import org.junit.Test
+
+class MetadataTest extends DescriptorTestBase {
+
+  @Test
+  def testMetadata(): Unit = {
+    val desc = Metadata()
+      .comment("Some additional comment")
+      .creationTime(123L)
+      .lastAccessTime(12020202L)
+    val expected = Seq(
+      "metadata.comment" -> "Some additional comment",
+      "metadata.creation-time" -> "123",
+      "metadata.last-access-time" -> "12020202"
+    )
+    verifyProperties(desc, expected)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidCreationTime(): Unit = {
+    verifyInvalidProperty("metadata.creation-time", "dfghj")
+  }
+
+  override def descriptor(): Descriptor = {
+    Metadata()
+      .comment("Some additional comment")
+      .creationTime(123L)
+      .lastAccessTime(12020202L)
+  }
+
+  override def validator(): DescriptorValidator = {
+    new MetadataValidator()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
new file mode 100644
index 0000000..80050fc
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.descriptors.RowtimeTest.CustomAssigner
+import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class RowtimeTest extends DescriptorTestBase {
+
+  @Test
+  def testRowtime(): Unit = {
+    val desc = Rowtime()
+      .timestampsFromField("otherField")
+      .watermarksPeriodicBounding(1000L)
+    val expected = Seq(
+      "rowtime.0.version" -> "1",
+      "rowtime.0.timestamps.type" -> "from-field",
+      "rowtime.0.timestamps.from" -> "otherField",
+      "rowtime.0.watermarks.type" -> "periodic-bounding",
+      "rowtime.0.watermarks.delay" -> "1000"
+    )
+    verifyProperties(desc, expected)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidWatermarkType(): Unit = {
+    verifyInvalidProperty("rowtime.0.watermarks.type", "xxx")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testMissingWatermarkClass(): Unit = {
+    verifyMissingProperty("rowtime.0.watermarks.class")
+  }
+
+  override def descriptor(): Descriptor = {
+    Rowtime()
+      .timestampsFromSource()
+      .watermarksFromStrategy(new CustomAssigner())
+  }
+
+  override def validator(): DescriptorValidator = {
+    new RowtimeValidator("rowtime.0.")
+  }
+}
+
+object RowtimeTest {
+
+  class CustomAssigner extends PunctuatedWatermarkAssigner() {
+    override def getWatermark(row: Row, timestamp: Long): Watermark =
+      throw new UnsupportedOperationException()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala
new file mode 100644
index 0000000..f663a96
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.junit.Test
+
+class SchemaTest extends DescriptorTestBase {
+
+  @Test
+  def testSchema(): Unit = {
+    val desc = Schema()
+      .field("myField", Types.BOOLEAN)
+      .field("otherField", "VARCHAR").from("csvField")
+      .field("p", Types.SQL_TIMESTAMP).proctime()
+      .field("r", Types.SQL_TIMESTAMP).rowtime(
+        Rowtime().timestampsFromSource().watermarksFromSource())
+    val expected = Seq(
+      "schema.version" -> "1",
+      "schema.0.name" -> "myField",
+      "schema.0.type" -> "BOOLEAN",
+      "schema.1.name" -> "otherField",
+      "schema.1.type" -> "VARCHAR",
+      "schema.1.from" -> "csvField",
+      "schema.2.name" -> "p",
+      "schema.2.type" -> "TIMESTAMP",
+      "schema.2.proctime" -> "true",
+      "schema.3.name" -> "r",
+      "schema.3.type" -> "TIMESTAMP",
+      "schema.3.rowtime.0.version" -> "1",
+      "schema.3.rowtime.0.watermarks.type" -> "from-source",
+      "schema.3.rowtime.0.timestamps.type" -> "from-source"
+    )
+    verifyProperties(desc, expected)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidType(): Unit = {
+    verifyInvalidProperty("schema.1.type", "dfghj")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testBothRowtimeAndProctime(): Unit = {
+    verifyInvalidProperty("schema.2.rowtime.0.version", "1")
+    verifyInvalidProperty("schema.2.rowtime.0.watermarks.type", "from-source")
+    verifyInvalidProperty("schema.2.rowtime.0.timestamps.type", "from-source")
+  }
+
+  override def descriptor(): Descriptor = {
+    Schema()
+      .field("myField", Types.BOOLEAN)
+      .field("otherField", "VARCHAR").from("csvField")
+      .field("p", Types.SQL_TIMESTAMP).proctime()
+      .field("r", Types.SQL_TIMESTAMP)
+  }
+
+  override def validator(): DescriptorValidator = {
+    new SchemaValidator(isStreamEnvironment = true)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala
new file mode 100644
index 0000000..3b248b4
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import _root_.java.util
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
+import org.junit.Test
+
+class StatisticsTest extends DescriptorTestBase {
+
+  @Test
+  def testStatistics(): Unit = {
+    val desc = Statistics()
+      .rowCount(1000L)
+      .columnStats("a", ColumnStats(1L, 2L, 3.0, 4, 5, 6))
+      .columnAvgLength("b", 42.0)
+      .columnNullCount("a", 300)
+    val expected = Seq(
+      "statistics.version" -> "1",
+      "statistics.row-count" -> "1000",
+      "statistics.columns.0.name" -> "a",
+      "statistics.columns.0.distinct-count" -> "1",
+      "statistics.columns.0.null-count" -> "300",
+      "statistics.columns.0.avg-length" -> "3.0",
+      "statistics.columns.0.max-length" -> "4",
+      "statistics.columns.0.max-value" -> "5",
+      "statistics.columns.0.min-value" -> "6",
+      "statistics.columns.1.name" -> "b",
+      "statistics.columns.1.avg-length" -> "42.0"
+    )
+    verifyProperties(desc, expected)
+  }
+
+  @Test
+  def testStatisticsTableStats(): Unit = {
+    val map = new util.HashMap[String, ColumnStats]()
+    map.put("a", ColumnStats(null, 2L, 3.0, null, 5, 6))
+    val desc = Statistics()
+      .tableStats(TableStats(32L, map))
+    val expected = Seq(
+      "statistics.version" -> "1",
+      "statistics.row-count" -> "32",
+      "statistics.columns.0.name" -> "a",
+      "statistics.columns.0.null-count" -> "2",
+      "statistics.columns.0.avg-length" -> "3.0",
+      "statistics.columns.0.max-value" -> "5",
+      "statistics.columns.0.min-value" -> "6"
+    )
+    verifyProperties(desc, expected)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowCount(): Unit = {
+    verifyInvalidProperty("statistics.row-count", "abx")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testMissingName(): Unit = {
+    verifyMissingProperty("statistics.columns.0.name")
+  }
+
+  override def descriptor(): Descriptor = {
+    Statistics()
+      .rowCount(1000L)
+      .columnStats("a", ColumnStats(1L, 2L, 3.0, 4, 5, 6))
+      .columnAvgLength("b", 42.0)
+      .columnNullCount("a", 300)
+  }
+
+  override def validator(): DescriptorValidator = {
+    new StatisticsValidator()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptorTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptorTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptorTest.scala
new file mode 100644
index 0000000..2c9a89c
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptorTest.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.utils.TableTestBase
+
+class StreamTableSourceDescriptorTest extends TableTestBase {
+
+// TODO enable this test once we expose the feature through the table 
environment
+//  @Test
+//  def testStreamTableSourceDescriptor(): Unit = {
+//    val util = streamTestUtil()
+//    val desc = util.tableEnv
+//      .from(
+//        FileSystem()
+//          .path("/path/to/csv"))
+//      .withFormat(
+//        Csv()
+//          .field("myfield", Types.STRING)
+//          .field("myfield2", Types.INT)
+//          .quoteCharacter(';')
+//          .fieldDelimiter("#")
+//          .lineDelimiter("\r\n")
+//          .commentPrefix("%%")
+//          .ignoreFirstLine()
+//          .ignoreParseErrors())
+//        .withSchema(
+//          Schema()
+//            .field("myfield", Types.STRING)
+//            .field("myfield2", Types.INT)
+//            .field("proctime", Types.SQL_TIMESTAMP).proctime()
+//            .field("rowtime", Types.SQL_TIMESTAMP).rowtime(
+//              Rowtime().timestampsFromSource().watermarksFromSource())
+//        )
+//    val expected = Seq(
+//      "connector.type" -> "filesystem",
+//      "connector.path" -> "/path/to/csv",
+//      "format.type" -> "csv",
+//      "format.fields.0.name" -> "myfield",
+//      "format.fields.0.type" -> "VARCHAR",
+//      "format.fields.1.name" -> "myfield2",
+//      "format.fields.1.type" -> "INT",
+//      "format.quote-character" -> ";",
+//      "format.field-delimiter" -> "#",
+//      "format.line-delimiter" -> "\r\n",
+//      "format.comment-prefix" -> "%%",
+//      "format.ignore-first-line" -> "true",
+//      "format.ignore-parse-errors" -> "true",
+//      "schema.0.name" -> "myfield",
+//      "schema.0.type" -> "VARCHAR",
+//      "schema.1.name" -> "myfield2",
+//      "schema.1.type" -> "INT",
+//      "schema.2.name" -> "proctime",
+//      "schema.2.type" -> "TIMESTAMP",
+//      "schema.2.proctime" -> "true",
+//      "schema.3.name" -> "rowtime",
+//      "schema.3.type" -> "TIMESTAMP",
+//      "schema.3.rowtime.0.timestamps.type" -> "from-source",
+//      "schema.3.rowtime.0.watermarks.type" -> "from-source"
+//    )
+//    verifyProperties(desc, expected)
+//  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
new file mode 100644
index 0000000..5e9b5a2
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.table.api.{NoMatchingTableSourceException, 
TableException, ValidationException}
+import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE,
 CONNECTOR_VERSION}
+import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, 
FORMAT_VERSION}
+import org.junit.Assert.assertTrue
+import org.junit.Test
+
+import scala.collection.mutable
+
+class TableSourceFactoryServiceTest {
+
+  @Test
+  def testValidProperties(): Unit = {
+    val props = properties()
+    assertTrue(TableSourceFactoryService.findTableSourceFactory(props.toMap) 
!= null)
+  }
+
+  @Test(expected = classOf[NoMatchingTableSourceException])
+  def testInvalidContext(): Unit = {
+    val props = properties()
+    props.put(CONNECTOR_TYPE, "FAIL")
+    TableSourceFactoryService.findTableSourceFactory(props.toMap)
+  }
+
+  @Test
+  def testDifferentContextVersion(): Unit = {
+    val props = properties()
+    props.put(CONNECTOR_VERSION, "2")
+    // the table source should still be found
+    assertTrue(TableSourceFactoryService.findTableSourceFactory(props.toMap) 
!= null)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnsupportedProperty(): Unit = {
+    val props = properties()
+    props.put("format.path_new", "/new/path")
+    TableSourceFactoryService.findTableSourceFactory(props.toMap)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testFailingFactory(): Unit = {
+    val props = properties()
+    props.put("failing", "true")
+    TableSourceFactoryService.findTableSourceFactory(props.toMap)
+  }
+
+  private def properties(): mutable.Map[String, String] = {
+    val properties = mutable.Map[String, String]()
+    properties.put(CONNECTOR_TYPE, "test")
+    properties.put(FORMAT_TYPE, "test")
+    properties.put(CONNECTOR_VERSION, "1")
+    properties.put(FORMAT_VERSION, "1")
+    properties.put("format.path", "/path/to/target")
+    properties.put("schema.0.name", "a")
+    properties.put("schema.1.name", "b")
+    properties.put("schema.2.name", "c")
+    properties.put("schema.0.field.0.name", "a")
+    properties.put("schema.0.field.1.name", "b")
+    properties.put("schema.0.field.2.name", "c")
+    properties.put("failing", "false")
+    properties
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala
new file mode 100644
index 0000000..ae75f99
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableSchema
+import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE,
 CONNECTOR_VERSION}
+import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, 
FORMAT_VERSION}
+import org.apache.flink.types.Row
+
+class TestTableSourceFactory extends TableSourceFactory[Row] {
+
+  override def requiredContext(): util.Map[String, String] = {
+    val context = new util.HashMap[String, String]()
+    context.put(CONNECTOR_TYPE, "test")
+    context.put(FORMAT_TYPE, "test")
+    context.put(CONNECTOR_VERSION, "1")
+    context.put(FORMAT_VERSION, "1")
+    context
+  }
+
+  override def supportedProperties(): util.List[String] = {
+    val properties = new util.ArrayList[String]()
+    // connector
+    properties.add("format.path")
+    properties.add("schema.#.name")
+    properties.add("schema.#.field.#.name")
+    properties.add("failing")
+    properties
+  }
+
+  override def create(properties: util.Map[String, String]): TableSource[Row] 
= {
+    if (properties.get("failing") == "true") {
+      throw new IllegalArgumentException("Error in this factory.")
+    }
+    new TableSource[Row] {
+      override def getTableSchema: TableSchema = throw new 
UnsupportedOperationException()
+
+      override def getReturnType: TypeInformation[Row] = throw new 
UnsupportedOperationException()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
new file mode 100644
index 0000000..29d647c
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.runtime.utils.CommonTestData.{NonPojo, Person}
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+/**
+  * Tests for string-based representation of [[TypeInformation]].
+  */
+class TypeStringUtilsTest {
+
+  @Test
+  def testPrimitiveTypes(): Unit = {
+    testReadAndWrite("VARCHAR", Types.STRING)
+    testReadAndWrite("BOOLEAN", Types.BOOLEAN)
+    testReadAndWrite("TINYINT", Types.BYTE)
+    testReadAndWrite("SMALLINT", Types.SHORT)
+    testReadAndWrite("INT", Types.INT)
+    testReadAndWrite("BIGINT", Types.LONG)
+    testReadAndWrite("FLOAT", Types.FLOAT)
+    testReadAndWrite("DOUBLE", Types.DOUBLE)
+    testReadAndWrite("DECIMAL", Types.DECIMAL)
+    testReadAndWrite("DATE", Types.SQL_DATE)
+    testReadAndWrite("TIME", Types.SQL_TIME)
+    testReadAndWrite("TIMESTAMP", Types.SQL_TIMESTAMP)
+
+    // unsupported type information
+    testReadAndWrite(
+      "ANY(java.lang.Void, " +
+        
"rO0ABXNyADJvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZWluZm8uQmFzaWNUeXBlSW5mb_oE8IKl"
 +
+        
"ad0GAgAETAAFY2xhenp0ABFMamF2YS9sYW5nL0NsYXNzO0wAD2NvbXBhcmF0b3JDbGFzc3EAfgABWwAXcG9z"
 +
+        
"c2libGVDYXN0VGFyZ2V0VHlwZXN0ABJbTGphdmEvbGFuZy9DbGFzcztMAApzZXJpYWxpemVydAA2TG9yZy9h"
 +
+        
"cGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZVNlcmlhbGl6ZXI7eHIANG9yZy5hcGFjaGUu"
 +
+        
"ZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIADmphdmEu"
 +
+        
"bGFuZy5Wb2lkAAAAAAAAAAAAAAB4cHB1cgASW0xqYXZhLmxhbmcuQ2xhc3M7qxbXrsvNWpkCAAB4cAAAAABz"
 +
+        
"cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlZvaWRTZXJpYWxpemVyAAAA"
 +
+        
"AAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJp"
 +
+        
"YWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1"
 +
+        "dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHA)",
+      BasicTypeInfo.VOID_TYPE_INFO)
+  }
+
+  @Test
+  def testWriteComplexTypes(): Unit = {
+    testReadAndWrite(
+      "ROW(f0 DECIMAL, f1 TINYINT)",
+      Types.ROW(Types.DECIMAL, Types.BYTE))
+
+    testReadAndWrite(
+      "ROW(hello DECIMAL, world TINYINT)",
+      Types.ROW(
+        Array[String]("hello", "world"),
+        Array[TypeInformation[_]](Types.DECIMAL, Types.BYTE)))
+
+    testReadAndWrite(
+      "ROW(\"he llo\" DECIMAL, world TINYINT)",
+      Types.ROW(
+        Array[String]("he llo", "world"),
+        Array[TypeInformation[_]](Types.DECIMAL, Types.BYTE)))
+
+    testReadAndWrite(
+      "ROW(\"he         \\nllo\" DECIMAL, world TINYINT)",
+      Types.ROW(
+        Array[String]("he         \nllo", "world"),
+        Array[TypeInformation[_]](Types.DECIMAL, Types.BYTE)))
+
+    testReadAndWrite(
+      "POJO(org.apache.flink.table.runtime.utils.CommonTestData$Person)",
+      TypeExtractor.createTypeInfo(classOf[Person]))
+
+    testReadAndWrite(
+      "ANY(org.apache.flink.table.runtime.utils.CommonTestData$NonPojo)",
+      TypeExtractor.createTypeInfo(classOf[NonPojo]))
+  }
+
+  private def testReadAndWrite(expected: String, tpe: TypeInformation[_]): 
Unit = {
+    // test write to string
+    assertEquals(expected, TypeStringUtils.writeTypeInfo(tpe))
+
+    // test read from string
+    assertEquals(tpe, TypeStringUtils.readTypeInfo(expected))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index ff7c79d..f43bcc6 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -35,8 +35,6 @@ class MockTableEnvironment extends TableEnvironment(new 
TableConfig) {
 
   override def sql(query: String): Table = ???
 
-  override def registerTableSource(name: String, tableSource: TableSource[_]): 
Unit = ???
-
   override protected def getBuiltInNormRuleSet: RuleSet = ???
 
   override protected def getBuiltInPhysicalOptRuleSet: RuleSet = ???
@@ -46,4 +44,9 @@ class MockTableEnvironment extends TableEnvironment(new 
TableConfig) {
       fieldNames: Array[String],
       fieldTypes: Array[TypeInformation[_]],
       tableSink: TableSink[_]): Unit = ???
+
+  override protected def createUniqueTableName(): String = ???
+
+  override protected def registerTableSourceInternal(name: String, 
tableSource: TableSource[_])
+    : Unit = ???
 }

Reply via email to