http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
index 15cf13b..85778ca 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
@@ -18,16 +18,36 @@
 
 package org.apache.flink.table.descriptors
 
+import java.util
+
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.table.api.{TableSchema, Types, ValidationException}
 import org.junit.Test
 
+import scala.collection.JavaConverters._
+
 class CsvTest extends DescriptorTestBase {
 
-  @Test
-  def testCsv(): Unit = {
-    val desc = Csv()
+  @Test(expected = classOf[ValidationException])
+  def testInvalidType(): Unit = {
+    addPropertyAndVerify(descriptors().get(0), "format.fields.0.type", 
"WHATEVER")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidField(): Unit = {
+    addPropertyAndVerify(descriptors().get(0), "format.fields.10.name", 
"WHATEVER")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidQuoteCharacter(): Unit = {
+    addPropertyAndVerify(descriptors().get(0), "format.quote-character", "qq")
+  }
+
+  // 
----------------------------------------------------------------------------------------------
+
+  override def descriptors(): util.List[Descriptor] = {
+    val desc1 = Csv()
       .field("field1", "STRING")
       .field("field2", Types.SQL_TIMESTAMP)
       .field("field3", TypeExtractor.createTypeInfo(classOf[Class[_]]))
@@ -35,9 +55,21 @@ class CsvTest extends DescriptorTestBase {
         Array[String]("test", "row"),
         Array[TypeInformation[_]](Types.INT, Types.STRING)))
       .lineDelimiter("^")
-    val expected = Seq(
+
+    val desc2 = Csv()
+      .schema(new TableSchema(
+        Array[String]("test", "row"),
+        Array[TypeInformation[_]](Types.INT, Types.STRING)))
+      .quoteCharacter('#')
+      .ignoreFirstLine()
+
+    util.Arrays.asList(desc1, desc2)
+  }
+
+  override def properties(): util.List[util.Map[String, String]] = {
+    val props1 = Map(
       "format.type" -> "csv",
-      "format.version" -> "1",
+      "format.property-version" -> "1",
       "format.fields.0.name" -> "field1",
       "format.fields.0.type" -> "STRING",
       "format.fields.1.name" -> "field2",
@@ -47,53 +79,18 @@ class CsvTest extends DescriptorTestBase {
       "format.fields.3.name" -> "field4",
       "format.fields.3.type" -> "ROW(test INT, row VARCHAR)",
       "format.line-delimiter" -> "^")
-    verifyProperties(desc, expected)
-  }
 
-  @Test
-  def testCsvTableSchema(): Unit = {
-    val desc = Csv()
-      .schema(new TableSchema(
-        Array[String]("test", "row"),
-        Array[TypeInformation[_]](Types.INT, Types.STRING)))
-      .quoteCharacter('#')
-      .ignoreFirstLine()
-    val expected = Seq(
+    val props2 = Map(
       "format.type" -> "csv",
-      "format.version" -> "1",
+      "format.property-version" -> "1",
       "format.fields.0.name" -> "test",
       "format.fields.0.type" -> "INT",
       "format.fields.1.name" -> "row",
       "format.fields.1.type" -> "VARCHAR",
       "format.quote-character" -> "#",
       "format.ignore-first-line" -> "true")
-    verifyProperties(desc, expected)
-  }
 
-  @Test(expected = classOf[ValidationException])
-  def testInvalidType(): Unit = {
-    verifyInvalidProperty("format.fields.0.type", "WHATEVER")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidField(): Unit = {
-    verifyInvalidProperty("format.fields.10.name", "WHATEVER")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidQuoteCharacter(): Unit = {
-    verifyInvalidProperty("format.quote-character", "qq")
-  }
-
-  override def descriptor(): Descriptor = {
-    Csv()
-      .field("field1", "STRING")
-      .field("field2", Types.SQL_TIMESTAMP)
-      .field("field3", TypeExtractor.createTypeInfo(classOf[Class[_]]))
-      .field("field4", Types.ROW(
-        Array[String]("test", "row"),
-        Array[TypeInformation[_]](Types.INT, Types.STRING)))
-      .lineDelimiter("^")
+    util.Arrays.asList(props1.asJava, props2.asJava)
   }
 
   override def validator(): DescriptorValidator = {

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
index 3a59c9b..7a98b0b 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
@@ -18,37 +18,84 @@
 
 package org.apache.flink.table.descriptors
 
+import org.apache.flink.util.Preconditions
 import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.JavaConverters._
 
 abstract class DescriptorTestBase {
 
   /**
-    * Returns a valid descriptor.
+    * Returns a set of valid descriptors.
+    * This method is implemented in both Scala and Java.
     */
-  def descriptor(): Descriptor
+  def descriptors(): java.util.List[Descriptor]
 
   /**
-    * Returns a validator that can validate this descriptor.
+    * Returns a set of properties for each valid descriptor.
+    * This code is implemented in both Scala and Java.
+    */
+  def properties(): java.util.List[java.util.Map[String, String]]
+
+  /**
+    * Returns a validator that can validate all valid descriptors.
     */
   def validator(): DescriptorValidator
 
-  def verifyProperties(descriptor: Descriptor, expected: Seq[(String, 
String)]): Unit = {
+  @Test
+  def testValidation(): Unit = {
+    val d = descriptors().asScala
+    val p = properties().asScala
+
+    Preconditions.checkArgument(d.length == p.length)
+
+    d.zip(p).foreach { case (desc, props) =>
+      verifyProperties(desc, props.asScala.toMap)
+    }
+  }
+
+  def verifyProperties(descriptor: Descriptor, expected: Map[String, String]): 
Unit = {
     val normProps = new DescriptorProperties
     descriptor.addProperties(normProps)
-    assertEquals(expected.toMap, normProps.asMap)
+
+    // test produced properties
+    assertEquals(expected, normProps.asMap.asScala.toMap)
+
+    // test validation logic
+    validator().validate(normProps)
   }
 
-  def verifyInvalidProperty(property: String, invalidValue: String): Unit = {
+  def addPropertyAndVerify(
+      descriptor: Descriptor,
+      property: String,
+      invalidValue: String): Unit = {
     val properties = new DescriptorProperties
-    descriptor().addProperties(properties)
+    descriptor.addProperties(properties)
     properties.unsafePut(property, invalidValue)
     validator().validate(properties)
   }
 
-  def verifyMissingProperty(removeProperty: String): Unit = {
+  def removePropertyAndVerify(descriptor: Descriptor, removeProperty: String): 
Unit = {
     val properties = new DescriptorProperties
-    descriptor().addProperties(properties)
+    descriptor.addProperties(properties)
     properties.unsafeRemove(removeProperty)
     validator().validate(properties)
   }
 }
+
+class TestTableSourceDescriptor(connector: ConnectorDescriptor)
+  extends TableSourceDescriptor {
+
+  this.connectorDescriptor = Some(connector)
+
+  def addFormat(format: FormatDescriptor): TestTableSourceDescriptor = {
+    this.formatDescriptor = Some(format)
+    this
+  }
+
+  def addSchema(schema: Schema): TestTableSourceDescriptor = {
+    this.schemaDescriptor = Some(schema)
+    this
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala
index 3452e8d..1162694 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala
@@ -18,36 +18,41 @@
 
 package org.apache.flink.table.descriptors
 
+import java.util
+
 import org.apache.flink.table.api.ValidationException
 import org.junit.Test
 
-class FileSystemTest extends DescriptorTestBase {
+import scala.collection.JavaConverters._
 
-  @Test
-  def testFileSystem(): Unit = {
-    val desc = FileSystem().path("/myfile")
-    val expected = Seq(
-      "connector.type" -> "filesystem",
-      "connector.version" -> "1",
-      "connector.path" -> "/myfile")
-    verifyProperties(desc, expected)
-  }
+class FileSystemTest extends DescriptorTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testInvalidPath(): Unit = {
-    verifyInvalidProperty("connector.path", "")
+    addPropertyAndVerify(descriptors().get(0), "connector.path", "")
   }
 
   @Test(expected = classOf[ValidationException])
   def testMissingPath(): Unit = {
-    verifyMissingProperty("connector.path")
+    removePropertyAndVerify(descriptors().get(0), "connector.path")
   }
 
-  override def descriptor(): Descriptor = {
-    FileSystem().path("/myfile")
+  // 
----------------------------------------------------------------------------------------------
+
+  override def descriptors(): util.List[Descriptor] = {
+    util.Arrays.asList(FileSystem().path("/myfile"))
   }
 
   override def validator(): DescriptorValidator = {
     new FileSystemValidator()
   }
+
+  override def properties(): util.List[util.Map[String, String]] = {
+    val desc = Map(
+      "connector.type" -> "filesystem",
+      "connector.property-version" -> "1",
+      "connector.path" -> "/myfile")
+
+    util.Arrays.asList(desc.asJava)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/JsonTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/JsonTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/JsonTest.scala
deleted file mode 100644
index 756ca23..0000000
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/JsonTest.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import org.apache.flink.table.api.ValidationException
-import org.junit.Test
-
-class JsonTest extends DescriptorTestBase {
-
-  @Test
-  def testJson(): Unit = {
-    val schema =
-      """
-        |{
-        |    "title": "Person",
-        |    "type": "object",
-        |    "properties": {
-        |        "firstName": {
-        |            "type": "string"
-        |        },
-        |        "lastName": {
-        |            "type": "string"
-        |        },
-        |        "age": {
-        |            "description": "Age in years",
-        |            "type": "integer",
-        |            "minimum": 0
-        |        }
-        |    },
-        |    "required": ["firstName", "lastName"]
-        |}
-        |""".stripMargin
-    val desc = Json()
-      .schema(schema)
-      .failOnMissingField(true)
-    val expected = Seq(
-      "format.type" -> "json",
-      "format.version" -> "1",
-      "format.schema-string" -> schema,
-      "format.fail-on-missing-field" -> "true")
-    verifyProperties(desc, expected)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidMissingField(): Unit = {
-    verifyInvalidProperty("format.fail-on-missing-field", "DDD")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testMissingSchema(): Unit = {
-    verifyMissingProperty("format.schema-string")
-  }
-
-  override def descriptor(): Descriptor = {
-    Json().schema("test")
-  }
-
-  override def validator(): DescriptorValidator = {
-    new JsonValidator()
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala
index a1854ce..64965b0 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala
@@ -18,38 +18,42 @@
 
 package org.apache.flink.table.descriptors
 
+import java.util
+
 import org.apache.flink.table.api.ValidationException
 import org.junit.Test
 
-class MetadataTest extends DescriptorTestBase {
+import scala.collection.JavaConverters._
 
-  @Test
-  def testMetadata(): Unit = {
-    val desc = Metadata()
-      .comment("Some additional comment")
-      .creationTime(123L)
-      .lastAccessTime(12020202L)
-    val expected = Seq(
-      "metadata.comment" -> "Some additional comment",
-      "metadata.creation-time" -> "123",
-      "metadata.last-access-time" -> "12020202"
-    )
-    verifyProperties(desc, expected)
-  }
+class MetadataTest extends DescriptorTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testInvalidCreationTime(): Unit = {
-    verifyInvalidProperty("metadata.creation-time", "dfghj")
+    addPropertyAndVerify(descriptors().get(0), "metadata.creation-time", 
"dfghj")
   }
 
-  override def descriptor(): Descriptor = {
-    Metadata()
+  // 
----------------------------------------------------------------------------------------------
+
+  override def descriptors(): util.List[Descriptor] = {
+    val desc = Metadata()
       .comment("Some additional comment")
       .creationTime(123L)
       .lastAccessTime(12020202L)
+
+    util.Arrays.asList(desc)
   }
 
   override def validator(): DescriptorValidator = {
     new MetadataValidator()
   }
+
+  override def properties(): util.List[util.Map[String, String]] = {
+    val props = Map(
+      "metadata.comment" -> "Some additional comment",
+      "metadata.creation-time" -> "123",
+      "metadata.last-access-time" -> "12020202"
+    )
+
+    util.Arrays.asList(props.asJava)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
index 80050fc..7968b48 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.descriptors
 
+import java.util
+
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.descriptors.RowtimeTest.CustomAssigner
@@ -25,41 +27,58 @@ import 
org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner
 import org.apache.flink.types.Row
 import org.junit.Test
 
-class RowtimeTest extends DescriptorTestBase {
+import scala.collection.JavaConverters._
 
-  @Test
-  def testRowtime(): Unit = {
-    val desc = Rowtime()
-      .timestampsFromField("otherField")
-      .watermarksPeriodicBounding(1000L)
-    val expected = Seq(
-      "rowtime.0.version" -> "1",
-      "rowtime.0.timestamps.type" -> "from-field",
-      "rowtime.0.timestamps.from" -> "otherField",
-      "rowtime.0.watermarks.type" -> "periodic-bounding",
-      "rowtime.0.watermarks.delay" -> "1000"
-    )
-    verifyProperties(desc, expected)
-  }
+class RowtimeTest extends DescriptorTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testInvalidWatermarkType(): Unit = {
-    verifyInvalidProperty("rowtime.0.watermarks.type", "xxx")
+    addPropertyAndVerify(descriptors().get(0), "rowtime.watermarks.type", 
"xxx")
   }
 
   @Test(expected = classOf[ValidationException])
   def testMissingWatermarkClass(): Unit = {
-    verifyMissingProperty("rowtime.0.watermarks.class")
+    removePropertyAndVerify(descriptors().get(1), "rowtime.watermarks.class")
   }
 
-  override def descriptor(): Descriptor = {
-    Rowtime()
+  // 
----------------------------------------------------------------------------------------------
+
+  override def descriptors(): util.List[Descriptor] = {
+    val desc1 = Rowtime()
+      .timestampsFromField("otherField")
+      .watermarksPeriodicBounding(1000L)
+
+    val desc2 = Rowtime()
       .timestampsFromSource()
       .watermarksFromStrategy(new CustomAssigner())
+
+    util.Arrays.asList(desc1, desc2)
   }
 
   override def validator(): DescriptorValidator = {
-    new RowtimeValidator("rowtime.0.")
+    new RowtimeValidator()
+  }
+
+  override def properties(): util.List[util.Map[String, String]] = {
+    val props1 = Map(
+      "rowtime.timestamps.type" -> "from-field",
+      "rowtime.timestamps.from" -> "otherField",
+      "rowtime.watermarks.type" -> "periodic-bounded",
+      "rowtime.watermarks.delay" -> "1000"
+    )
+
+    val props2 = Map(
+      "rowtime.timestamps.type" -> "from-source",
+      "rowtime.watermarks.type" -> "custom",
+      "rowtime.watermarks.class" -> 
"org.apache.flink.table.descriptors.RowtimeTest$CustomAssigner",
+      "rowtime.watermarks.serialized" -> 
("rO0ABXNyAD1vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmRlc2NyaX" +
+        
"B0b3JzLlJvd3RpbWVUZXN0JEN1c3RvbUFzc2lnbmVyeDcuDvfbu0kCAAB4cgBHb3JnLmFwYWNoZS5mbGluay"
 +
+        
"50YWJsZS5zb3VyY2VzLndtc3RyYXRlZ2llcy5QdW5jdHVhdGVkV2F0ZXJtYXJrQXNzaWduZXKBUc57oaWu9A"
 +
+        
"IAAHhyAD1vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnNvdXJjZXMud21zdHJhdGVnaWVzLldhdGVybWFya1N0cm"
 +
+        "F0ZWd5mB_uSxDZ8-MCAAB4cA")
+    )
+
+    util.Arrays.asList(props1.asJava, props2.asJava)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala
index f663a96..589ec4f 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala
@@ -18,21 +18,54 @@
 
 package org.apache.flink.table.descriptors
 
+import java.util
+
 import org.apache.flink.table.api.{Types, ValidationException}
 import org.junit.Test
 
+import scala.collection.JavaConverters._
+
 class SchemaTest extends DescriptorTestBase {
 
-  @Test
-  def testSchema(): Unit = {
-    val desc = Schema()
+  @Test(expected = classOf[ValidationException])
+  def testInvalidType(): Unit = {
+    addPropertyAndVerify(
+      descriptors().get(0),
+      "schema.1.type", "dfghj")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testBothRowtimeAndProctime(): Unit = {
+    addPropertyAndVerify(
+      descriptors().get(0),
+      "schema.2.rowtime.watermarks.type", "from-source")
+  }
+
+  // 
----------------------------------------------------------------------------------------------
+
+  override def descriptors(): util.List[Descriptor] = {
+    val desc1 = Schema()
       .field("myField", Types.BOOLEAN)
       .field("otherField", "VARCHAR").from("csvField")
       .field("p", Types.SQL_TIMESTAMP).proctime()
       .field("r", Types.SQL_TIMESTAMP).rowtime(
         Rowtime().timestampsFromSource().watermarksFromSource())
-    val expected = Seq(
-      "schema.version" -> "1",
+
+    val desc2 = Schema()
+      .field("myField", Types.BOOLEAN)
+      .field("otherField", "VARCHAR").from("csvField")
+      .field("p", Types.SQL_TIMESTAMP).proctime()
+      .field("r", Types.SQL_TIMESTAMP)
+
+    util.Arrays.asList(desc1, desc2)
+  }
+
+  override def validator(): DescriptorValidator = {
+    new SchemaValidator(isStreamEnvironment = true)
+  }
+
+  override def properties(): util.List[util.Map[String, String]] = {
+    val props1 = Map(
       "schema.0.name" -> "myField",
       "schema.0.type" -> "BOOLEAN",
       "schema.1.name" -> "otherField",
@@ -43,34 +76,23 @@ class SchemaTest extends DescriptorTestBase {
       "schema.2.proctime" -> "true",
       "schema.3.name" -> "r",
       "schema.3.type" -> "TIMESTAMP",
-      "schema.3.rowtime.0.version" -> "1",
-      "schema.3.rowtime.0.watermarks.type" -> "from-source",
-      "schema.3.rowtime.0.timestamps.type" -> "from-source"
+      "schema.3.rowtime.watermarks.type" -> "from-source",
+      "schema.3.rowtime.timestamps.type" -> "from-source"
     )
-    verifyProperties(desc, expected)
-  }
 
-  @Test(expected = classOf[ValidationException])
-  def testInvalidType(): Unit = {
-    verifyInvalidProperty("schema.1.type", "dfghj")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testBothRowtimeAndProctime(): Unit = {
-    verifyInvalidProperty("schema.2.rowtime.0.version", "1")
-    verifyInvalidProperty("schema.2.rowtime.0.watermarks.type", "from-source")
-    verifyInvalidProperty("schema.2.rowtime.0.timestamps.type", "from-source")
-  }
-
-  override def descriptor(): Descriptor = {
-    Schema()
-      .field("myField", Types.BOOLEAN)
-      .field("otherField", "VARCHAR").from("csvField")
-      .field("p", Types.SQL_TIMESTAMP).proctime()
-      .field("r", Types.SQL_TIMESTAMP)
-  }
+    val props2 = Map(
+      "schema.0.name" -> "myField",
+      "schema.0.type" -> "BOOLEAN",
+      "schema.1.name" -> "otherField",
+      "schema.1.type" -> "VARCHAR",
+      "schema.1.from" -> "csvField",
+      "schema.2.name" -> "p",
+      "schema.2.type" -> "TIMESTAMP",
+      "schema.2.proctime" -> "true",
+      "schema.3.name" -> "r",
+      "schema.3.type" -> "TIMESTAMP"
+    )
 
-  override def validator(): DescriptorValidator = {
-    new SchemaValidator(isStreamEnvironment = true)
+    util.Arrays.asList(props1.asJava, props2.asJava)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
new file mode 100644
index 0000000..ba05dff
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util.Optional
+
+import org.apache.flink.table.api.{TableSchema, Types}
+import org.apache.flink.table.sources.tsextractors.StreamRecordTimestamp
+import org.apache.flink.table.sources.wmstrategies.PreserveWatermarks
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+/**
+  * Tests for [[SchemaValidator]].
+  */
+class SchemaValidatorTest {
+
+  @Test
+  def testSchema(): Unit = {
+     val desc1 = Schema()
+      .field("otherField", Types.STRING).from("csvField")
+      .field("abcField", Types.STRING)
+      .field("p", Types.SQL_TIMESTAMP).proctime()
+      .field("r", Types.SQL_TIMESTAMP).rowtime(
+        Rowtime().timestampsFromSource().watermarksFromSource())
+    val props = new DescriptorProperties()
+    desc1.addProperties(props)
+
+    val inputSchema = TableSchema.builder()
+      .field("csvField", Types.STRING)
+      .field("abcField", Types.STRING)
+      .field("myField", Types.BOOLEAN)
+      .build()
+
+    // test proctime
+    assertEquals(Optional.of("p"), 
SchemaValidator.deriveProctimeAttribute(props))
+
+    // test rowtime
+    val rowtime = SchemaValidator.deriveRowtimeAttributes(props).get(0)
+    assertEquals("r", rowtime.getAttributeName)
+    
assertTrue(rowtime.getTimestampExtractor.isInstanceOf[StreamRecordTimestamp])
+    assertTrue(rowtime.getWatermarkStrategy.isInstanceOf[PreserveWatermarks])
+
+    // test field mapping
+    val expectedMapping = Map("otherField" -> "csvField", "abcField" -> 
"abcField").asJava
+    assertEquals(
+      expectedMapping,
+      SchemaValidator.deriveFieldMapping(props, Optional.of(inputSchema)))
+
+    // test field format
+    val formatSchema = SchemaValidator.deriveFormatFields(props)
+    val expectedFormatSchema = TableSchema.builder()
+      .field("csvField", Types.STRING) // aliased
+      .field("abcField", Types.STRING)
+      .build()
+    assertEquals(expectedFormatSchema, formatSchema)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala
index 3b248b4..2def0c3 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala
@@ -24,17 +24,44 @@ import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
 import org.junit.Test
 
+import scala.collection.JavaConverters._
+
 class StatisticsTest extends DescriptorTestBase {
 
-  @Test
-  def testStatistics(): Unit = {
-    val desc = Statistics()
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowCount(): Unit = {
+    addPropertyAndVerify(descriptors().get(0), "statistics.row-count", "abx")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testMissingName(): Unit = {
+    removePropertyAndVerify(descriptors().get(0), "statistics.columns.0.name")
+  }
+
+  // 
----------------------------------------------------------------------------------------------
+
+  override def descriptors(): util.List[Descriptor] = {
+    val desc1 = Statistics()
       .rowCount(1000L)
       .columnStats("a", ColumnStats(1L, 2L, 3.0, 4, 5, 6))
       .columnAvgLength("b", 42.0)
       .columnNullCount("a", 300)
-    val expected = Seq(
-      "statistics.version" -> "1",
+
+    val map = new util.HashMap[String, ColumnStats]()
+    map.put("a", ColumnStats(null, 2L, 3.0, null, 5, 6))
+    val desc2 = Statistics()
+      .tableStats(TableStats(32L, map))
+
+    util.Arrays.asList(desc1, desc2)
+  }
+
+  override def validator(): DescriptorValidator = {
+    new StatisticsValidator()
+  }
+
+  override def properties(): util.List[util.Map[String, String]] = {
+    val props1 = Map(
+      "statistics.property-version" -> "1",
       "statistics.row-count" -> "1000",
       "statistics.columns.0.name" -> "a",
       "statistics.columns.0.distinct-count" -> "1",
@@ -46,17 +73,9 @@ class StatisticsTest extends DescriptorTestBase {
       "statistics.columns.1.name" -> "b",
       "statistics.columns.1.avg-length" -> "42.0"
     )
-    verifyProperties(desc, expected)
-  }
 
-  @Test
-  def testStatisticsTableStats(): Unit = {
-    val map = new util.HashMap[String, ColumnStats]()
-    map.put("a", ColumnStats(null, 2L, 3.0, null, 5, 6))
-    val desc = Statistics()
-      .tableStats(TableStats(32L, map))
-    val expected = Seq(
-      "statistics.version" -> "1",
+    val props2 = Map(
+      "statistics.property-version" -> "1",
       "statistics.row-count" -> "32",
       "statistics.columns.0.name" -> "a",
       "statistics.columns.0.null-count" -> "2",
@@ -64,28 +83,7 @@ class StatisticsTest extends DescriptorTestBase {
       "statistics.columns.0.max-value" -> "5",
       "statistics.columns.0.min-value" -> "6"
     )
-    verifyProperties(desc, expected)
-  }
 
-  @Test(expected = classOf[ValidationException])
-  def testInvalidRowCount(): Unit = {
-    verifyInvalidProperty("statistics.row-count", "abx")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testMissingName(): Unit = {
-    verifyMissingProperty("statistics.columns.0.name")
-  }
-
-  override def descriptor(): Descriptor = {
-    Statistics()
-      .rowCount(1000L)
-      .columnStats("a", ColumnStats(1L, 2L, 3.0, 4, 5, 6))
-      .columnAvgLength("b", 42.0)
-      .columnNullCount("a", 300)
-  }
-
-  override def validator(): DescriptorValidator = {
-    new StatisticsValidator()
+    util.Arrays.asList(props1.asJava, props2.asJava)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
index 5e9b5a2..279e9a4 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
@@ -19,8 +19,8 @@
 package org.apache.flink.table.sources
 
 import org.apache.flink.table.api.{NoMatchingTableSourceException, 
TableException, ValidationException}
-import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE,
 CONNECTOR_VERSION}
-import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, 
FORMAT_VERSION}
+import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE,
 CONNECTOR_PROPERTY_VERSION}
+import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, 
FORMAT_PROPERTY_VERSION}
 import org.junit.Assert.assertTrue
 import org.junit.Test
 
@@ -31,44 +31,44 @@ class TableSourceFactoryServiceTest {
   @Test
   def testValidProperties(): Unit = {
     val props = properties()
-    assertTrue(TableSourceFactoryService.findTableSourceFactory(props.toMap) 
!= null)
+    assertTrue(TableSourceFactoryService.findAndCreateTableSource(props.toMap) 
!= null)
   }
 
   @Test(expected = classOf[NoMatchingTableSourceException])
   def testInvalidContext(): Unit = {
     val props = properties()
     props.put(CONNECTOR_TYPE, "FAIL")
-    TableSourceFactoryService.findTableSourceFactory(props.toMap)
+    TableSourceFactoryService.findAndCreateTableSource(props.toMap)
   }
 
   @Test
   def testDifferentContextVersion(): Unit = {
     val props = properties()
-    props.put(CONNECTOR_VERSION, "2")
+    props.put(CONNECTOR_PROPERTY_VERSION, "2")
     // the table source should still be found
-    assertTrue(TableSourceFactoryService.findTableSourceFactory(props.toMap) 
!= null)
+    assertTrue(TableSourceFactoryService.findAndCreateTableSource(props.toMap) 
!= null)
   }
 
   @Test(expected = classOf[ValidationException])
   def testUnsupportedProperty(): Unit = {
     val props = properties()
     props.put("format.path_new", "/new/path")
-    TableSourceFactoryService.findTableSourceFactory(props.toMap)
+    TableSourceFactoryService.findAndCreateTableSource(props.toMap)
   }
 
   @Test(expected = classOf[TableException])
   def testFailingFactory(): Unit = {
     val props = properties()
     props.put("failing", "true")
-    TableSourceFactoryService.findTableSourceFactory(props.toMap)
+    TableSourceFactoryService.findAndCreateTableSource(props.toMap)
   }
 
   private def properties(): mutable.Map[String, String] = {
     val properties = mutable.Map[String, String]()
     properties.put(CONNECTOR_TYPE, "test")
     properties.put(FORMAT_TYPE, "test")
-    properties.put(CONNECTOR_VERSION, "1")
-    properties.put(FORMAT_VERSION, "1")
+    properties.put(CONNECTOR_PROPERTY_VERSION, "1")
+    properties.put(FORMAT_PROPERTY_VERSION, "1")
     properties.put("format.path", "/path/to/target")
     properties.put("schema.0.name", "a")
     properties.put("schema.1.name", "b")

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala
index ae75f99..ee3d637 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala
@@ -22,8 +22,8 @@ import java.util
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.TableSchema
-import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE,
 CONNECTOR_VERSION}
-import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, 
FORMAT_VERSION}
+import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE,
 CONNECTOR_PROPERTY_VERSION}
+import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, 
FORMAT_PROPERTY_VERSION}
 import org.apache.flink.types.Row
 
 class TestTableSourceFactory extends TableSourceFactory[Row] {
@@ -32,8 +32,8 @@ class TestTableSourceFactory extends TableSourceFactory[Row] {
     val context = new util.HashMap[String, String]()
     context.put(CONNECTOR_TYPE, "test")
     context.put(FORMAT_TYPE, "test")
-    context.put(CONNECTOR_VERSION, "1")
-    context.put(FORMAT_VERSION, "1")
+    context.put(CONNECTOR_PROPERTY_VERSION, "1")
+    context.put(FORMAT_PROPERTY_VERSION, "1")
     context
   }
 

Reply via email to