http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala
new file mode 100644
index 0000000..1aaa399
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, 
FORMAT_VERSION}
+
+/**
+  * Validator for [[FormatDescriptor]].
+  */
+class FormatDescriptorValidator extends DescriptorValidator {
+
+  override def validate(properties: DescriptorProperties): Unit = {
+    properties.validateString(FORMAT_TYPE, isOptional = false, minLen = 1)
+    properties.validateInt(FORMAT_VERSION, isOptional = true, 0, 
Integer.MAX_VALUE)
+  }
+}
+
+object FormatDescriptorValidator {
+
+  val FORMAT_TYPE = "format.type"
+  val FORMAT_VERSION = "format.version"
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Json.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Json.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Json.scala
new file mode 100644
index 0000000..cc46d9c
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Json.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import 
org.apache.flink.table.descriptors.JsonValidator.{FORMAT_FAIL_ON_MISSING_FIELD, 
FORMAT_SCHEMA_STRING, FORMAT_TYPE_VALUE}
+
+/**
+  * Encoding descriptor for JSON.
+  */
+class Json extends FormatDescriptor(FORMAT_TYPE_VALUE, version = 1) {
+
+  private var failOnMissingField: Option[Boolean] = None
+
+  private var schema: Option[String] = None
+
+  /**
+    * Sets flag whether to fail if a field is missing or not.
+    *
+    * @param failOnMissingField If set to true, the operation fails if there 
is a missing field.
+    *                           If set to false, a missing field is set to 
null.
+    * @return The builder.
+    */
+  def failOnMissingField(failOnMissingField: Boolean): Json = {
+    this.failOnMissingField = Some(failOnMissingField)
+    this
+  }
+
+  /**
+    * Sets the JSON schema string with field names and the types according to 
the JSON schema
+    * specification [[http://json-schema.org/specification.html]]. Required.
+    *
+    * The schema might be nested.
+    *
+    * @param schema JSON schema
+    */
+  def schema(schema: String): Json = {
+    this.schema = Some(schema)
+    this
+  }
+
+  /**
+    * Internal method for format properties conversion.
+    */
+  override protected def addFormatProperties(properties: 
DescriptorProperties): Unit = {
+    // we distinguish between "schema string" and "schema" to allow parsing of 
a
+    // schema object in the future (such that the entire JSON schema can be 
defined in a YAML
+    // file instead of one large string)
+    schema.foreach(properties.putString(FORMAT_SCHEMA_STRING, _))
+    
failOnMissingField.foreach(properties.putBoolean(FORMAT_FAIL_ON_MISSING_FIELD, 
_))
+  }
+}
+
+/**
+  * Encoding descriptor for JSON.
+  */
+object Json {
+
+  /**
+    * Encoding descriptor for JSON.
+    */
+  def apply(): Json = new Json()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JsonValidator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JsonValidator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JsonValidator.scala
new file mode 100644
index 0000000..9f11caf
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JsonValidator.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import 
org.apache.flink.table.descriptors.JsonValidator.{FORMAT_FAIL_ON_MISSING_FIELD, 
FORMAT_SCHEMA_STRING}
+
+/**
+  * Validator for [[Json]].
+  */
+class JsonValidator extends FormatDescriptorValidator {
+
+  override def validate(properties: DescriptorProperties): Unit = {
+    super.validate(properties)
+    properties.validateString(FORMAT_SCHEMA_STRING, isOptional = false, minLen 
= 1)
+    properties.validateBoolean(FORMAT_FAIL_ON_MISSING_FIELD, isOptional = true)
+  }
+}
+
+object JsonValidator {
+
+  val FORMAT_TYPE_VALUE = "json"
+  val FORMAT_SCHEMA_STRING = "format.schema-string"
+  val FORMAT_FAIL_ON_MISSING_FIELD = "format.fail-on-missing-field"
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Metadata.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Metadata.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Metadata.scala
new file mode 100644
index 0000000..211d786
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Metadata.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.MetadataValidator.{METADATA_COMMENT, 
METADATA_CREATION_TIME, METADATA_LAST_ACCESS_TIME}
+
+/**
+  * Metadata descriptor for adding additional, useful information.
+  */
+class Metadata extends Descriptor {
+
+  protected var comment: Option[String] = None
+  protected var creationTime: Option[Long] = None
+  protected var lastAccessTime: Option[Long] = None
+
+  /**
+    * Sets a comment.
+    *
+    * @param comment the description
+    */
+  def comment(comment: String): Metadata = {
+    this.comment = Some(comment)
+    this
+  }
+
+  /**
+    * Sets a creation time.
+    *
+    * @param time UTC milliseconds timestamp
+    */
+  def creationTime(time: Long): Metadata = {
+    this.creationTime = Some(time)
+    this
+  }
+
+  /**
+    * Sets a last access time.
+    *
+    * @param time UTC milliseconds timestamp
+    */
+  def lastAccessTime(time: Long): Metadata = {
+    this.lastAccessTime = Some(time)
+    this
+  }
+
+  /**
+    * Internal method for properties conversion.
+    */
+  final override def addProperties(properties: DescriptorProperties): Unit = {
+    comment.foreach(c => properties.putString(METADATA_COMMENT, c))
+    creationTime.foreach(t => properties.putLong(METADATA_CREATION_TIME, t))
+    lastAccessTime.foreach(t => properties.putLong(METADATA_LAST_ACCESS_TIME, 
t))
+  }
+}
+
+/**
+  * Metadata descriptor for adding additional, useful information.
+  */
+object Metadata {
+
+  /**
+    * Metadata descriptor for adding additional, useful information.
+    */
+  def apply(): Metadata = new Metadata()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala
new file mode 100644
index 0000000..a8d580c
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.MetadataValidator.{METADATA_COMMENT, 
METADATA_CREATION_TIME, METADATA_LAST_ACCESS_TIME, METADATA_VERSION}
+
+/**
+  * Validator for [[Metadata]].
+  */
+class MetadataValidator extends DescriptorValidator {
+
+  override def validate(properties: DescriptorProperties): Unit = {
+    properties.validateInt(METADATA_VERSION, isOptional = true, 0, 
Integer.MAX_VALUE)
+    properties.validateString(METADATA_COMMENT, isOptional = true)
+    properties.validateLong(METADATA_CREATION_TIME, isOptional = true)
+    properties.validateLong(METADATA_LAST_ACCESS_TIME, isOptional = true)
+  }
+}
+
+object MetadataValidator {
+
+  val METADATA_VERSION = "metadata.version"
+  val METADATA_COMMENT = "metadata.comment"
+  val METADATA_CREATION_TIME = "metadata.creation-time"
+  val METADATA_LAST_ACCESS_TIME = "metadata.last-access-time"
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
new file mode 100644
index 0000000..a1c80f5
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.descriptors.RowtimeValidator.{ROWTIME, 
ROWTIME_VERSION, normalizeTimestampExtractor, normalizeWatermarkStrategy}
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+import scala.collection.mutable
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the schema.
+  */
+class Rowtime extends Descriptor {
+
+  private var timestampExtractor: Option[TimestampExtractor] = None
+  private var watermarkStrategy: Option[WatermarkStrategy] = None
+
+  /**
+    * Sets a built-in timestamp extractor that converts an existing [[Long]] or
+    * [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
+    *
+    * @param fieldName The field to convert into a rowtime attribute.
+    */
+  def timestampsFromField(fieldName: String): Rowtime = {
+    timestampExtractor = Some(new ExistingField(fieldName))
+    this
+  }
+
+  /**
+    * Sets a built-in timestamp extractor that converts the assigned 
timestamps from
+    * a DataStream API record into the rowtime attribute and thus preserves 
the assigned
+    * timestamps from the source.
+    *
+    * Note: This extractor only works in streaming environments.
+    */
+  def timestampsFromSource(): Rowtime = {
+    timestampExtractor = Some(new StreamRecordTimestamp)
+    this
+  }
+
+  /**
+    * Sets a custom timestamp extractor to be used for the rowtime attribute.
+    *
+    * @param extractor The [[TimestampExtractor]] to extract the rowtime 
attribute
+    *                  from the physical type.
+    */
+  def timestampsFromExtractor(extractor: TimestampExtractor): Rowtime = {
+    timestampExtractor = Some(extractor)
+    this
+  }
+
+  /**
+    * Sets a built-in watermark strategy for ascending rowtime attributes.
+    *
+    * Emits a watermark of the maximum observed timestamp so far minus 1.
+    * Rows that have a timestamp equal to the max timestamp are not late.
+    */
+  def watermarksPeriodicAscending(): Rowtime = {
+    watermarkStrategy = Some(new AscendingTimestamps)
+    this
+  }
+
+  /**
+    * Sets a built-in watermark strategy for rowtime attributes which are 
out-of-order by a bounded
+    * time interval.
+    *
+    * Emits watermarks which are the maximum observed timestamp minus the 
specified delay.
+    */
+  def watermarksPeriodicBounding(delay: Long): Rowtime = {
+    watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay))
+    this
+  }
+
+  /**
+    * Sets a built-in watermark strategy which indicates the watermarks should 
be preserved from the
+    * underlying DataStream API and thus preserves the assigned timestamps 
from the source.
+    */
+  def watermarksFromSource(): Rowtime = {
+    watermarkStrategy = Some(PreserveWatermarks.INSTANCE)
+    this
+  }
+
+  /**
+    * Sets a custom watermark strategy to be used for the rowtime attribute.
+    */
+  def watermarksFromStrategy(strategy: WatermarkStrategy): Rowtime = {
+    watermarkStrategy = Some(strategy)
+    this
+  }
+
+  /**
+    * Internal method for properties conversion.
+    */
+  final override def addProperties(properties: DescriptorProperties): Unit = {
+    val props = mutable.HashMap[String, String]()
+    props.put(ROWTIME_VERSION, "1")
+    timestampExtractor.foreach(normalizeTimestampExtractor(_).foreach(e => 
props.put(e._1, e._2)))
+    watermarkStrategy.foreach(normalizeWatermarkStrategy(_).foreach(e => 
props.put(e._1, e._2)))
+
+    // use a list for the rowtime to support multiple rowtime attributes in 
the future
+    properties.putIndexedVariableProperties(ROWTIME, Seq(props.toMap))
+  }
+}
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the schema.
+  */
+object Rowtime {
+
+  /**
+    * Rowtime descriptor for describing an event time attribute in the schema.
+    */
+  def apply(): Rowtime = new Rowtime()
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
new file mode 100644
index 0000000..74e49f1
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.DescriptorProperties.serialize
+import org.apache.flink.table.descriptors.RowtimeValidator._
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+/**
+  * Validator for [[Rowtime]].
+  */
+class RowtimeValidator(val prefix: String = "") extends DescriptorValidator {
+
+  override def validate(properties: DescriptorProperties): Unit = {
+    properties.validateInt(prefix + ROWTIME_VERSION, isOptional = true, 0, 
Integer.MAX_VALUE)
+
+    val noValidation = () => {}
+
+    val timestampExistingField = () => {
+      properties.validateString(prefix + TIMESTAMPS_FROM, isOptional = false, 
minLen = 1)
+    }
+
+    val timestampCustom = () => {
+      properties.validateString(prefix + TIMESTAMPS_CLASS, isOptional = false, 
minLen = 1)
+      properties.validateString(prefix + TIMESTAMPS_SERIALIZED, isOptional = 
false, minLen = 1)
+    }
+
+    properties.validateEnum(
+      prefix + TIMESTAMPS_TYPE,
+      isOptional = false,
+      Map(
+        TIMESTAMPS_TYPE_VALUE_FROM_FIELD -> timestampExistingField,
+        TIMESTAMPS_TYPE_VALUE_FROM_SOURCE -> noValidation,
+        TIMESTAMPS_TYPE_VALUE_CUSTOM -> timestampCustom
+      )
+    )
+
+    val watermarkPeriodicBounding = () => {
+      properties.validateLong(prefix + WATERMARKS_DELAY, isOptional = false, 
min = 0)
+    }
+
+    val watermarkCustom = () => {
+      properties.validateString(prefix + WATERMARKS_CLASS, isOptional = false, 
minLen = 1)
+      properties.validateString(prefix + WATERMARKS_SERIALIZED, isOptional = 
false, minLen = 1)
+    }
+
+    properties.validateEnum(
+      prefix + WATERMARKS_TYPE,
+      isOptional = false,
+      Map(
+        WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING -> noValidation,
+        WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING -> watermarkPeriodicBounding,
+        WATERMARKS_TYPE_VALUE_FROM_SOURCE -> noValidation,
+        WATERMARKS_TYPE_VALUE_CUSTOM -> watermarkCustom
+      )
+    )
+  }
+}
+
+object RowtimeValidator {
+
+  val ROWTIME = "rowtime"
+
+  // per rowtime properties
+
+  val ROWTIME_VERSION = "version"
+  val TIMESTAMPS_TYPE = "timestamps.type"
+  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
+  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
+  val TIMESTAMPS_FROM = "timestamps.from"
+  val TIMESTAMPS_CLASS = "timestamps.class"
+  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
+
+  val WATERMARKS_TYPE = "watermarks.type"
+  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
+  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
+  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
+  val WATERMARKS_CLASS = "watermarks.class"
+  val WATERMARKS_SERIALIZED = "watermarks.serialized"
+  val WATERMARKS_DELAY = "watermarks.delay"
+
+  // utilities
+
+  def normalizeTimestampExtractor(extractor: TimestampExtractor): Map[String, 
String] =
+    extractor match {
+        case existing: ExistingField =>
+          Map(
+            TIMESTAMPS_TYPE -> TIMESTAMPS_TYPE_VALUE_FROM_FIELD,
+            TIMESTAMPS_FROM -> existing.getArgumentFields.apply(0))
+        case _: StreamRecordTimestamp =>
+          Map(TIMESTAMPS_TYPE -> TIMESTAMPS_TYPE_VALUE_FROM_SOURCE)
+        case _: TimestampExtractor =>
+          Map(
+            TIMESTAMPS_TYPE -> TIMESTAMPS_TYPE_VALUE_CUSTOM,
+            TIMESTAMPS_CLASS -> extractor.getClass.getName,
+            TIMESTAMPS_SERIALIZED -> serialize(extractor))
+    }
+
+  def normalizeWatermarkStrategy(strategy: WatermarkStrategy): Map[String, 
String] =
+    strategy match {
+      case _: AscendingTimestamps =>
+        Map(WATERMARKS_TYPE -> WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING)
+      case bounding: BoundedOutOfOrderTimestamps =>
+        Map(
+          WATERMARKS_TYPE -> WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING,
+          WATERMARKS_DELAY -> bounding.delay.toString)
+      case _: PreserveWatermarks =>
+        Map(WATERMARKS_TYPE -> WATERMARKS_TYPE_VALUE_FROM_SOURCE)
+      case _: WatermarkStrategy =>
+        Map(
+          WATERMARKS_TYPE -> WATERMARKS_TYPE_VALUE_CUSTOM,
+          WATERMARKS_CLASS -> strategy.getClass.getName,
+          WATERMARKS_SERIALIZED -> serialize(strategy))
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
new file mode 100644
index 0000000..2f3a389
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableSchema, ValidationException}
+import 
org.apache.flink.table.descriptors.DescriptorProperties.{normalizeTableSchema, 
normalizeTypeInfo}
+import org.apache.flink.table.descriptors.SchemaValidator._
+
+import scala.collection.mutable
+
+/**
+  * Describes a schema of a table.
+  *
+  * Note: Field names are matched by the exact name by default (case 
sensitive).
+  */
+class Schema extends Descriptor {
+
+  // maps a field name to a list of properties that describe type, origin, and 
the time attribute
+  private val tableSchema = mutable.LinkedHashMap[String, 
mutable.LinkedHashMap[String, String]]()
+
+  private var lastField: Option[String] = None
+
+  /**
+    * Sets the schema with field names and the types. Required.
+    *
+    * This method overwrites existing fields added with [[field()]].
+    *
+    * @param schema the table schema
+    */
+  def schema(schema: TableSchema): Schema = {
+    tableSchema.clear()
+    lastField = None
+    normalizeTableSchema(schema).foreach {
+      case (n, t) => field(n, t)
+    }
+    this
+  }
+
+  /**
+    * Adds a field with the field name and the type information. Required.
+    * This method can be called multiple times. The call order of this method 
defines
+    * also the order of the fields in a row.
+    *
+    * @param fieldName the field name
+    * @param fieldType the type information of the field
+    */
+  def field(fieldName: String, fieldType: TypeInformation[_]): Schema = {
+    field(fieldName, normalizeTypeInfo(fieldType))
+    this
+  }
+
+  /**
+    * Adds a field with the field name and the type string. Required.
+    * This method can be called multiple times. The call order of this method 
defines
+    * also the order of the fields in a row.
+    *
+    * @param fieldName the field name
+    * @param fieldType the type string of the field
+    */
+  def field(fieldName: String, fieldType: String): Schema = {
+    if (tableSchema.contains(fieldName)) {
+      throw new ValidationException(s"Duplicate field name $fieldName.")
+    }
+
+    val fieldProperties = mutable.LinkedHashMap[String, String]()
+    fieldProperties += (TYPE -> fieldType)
+
+    tableSchema += (fieldName -> fieldProperties)
+
+    lastField = Some(fieldName)
+    this
+  }
+
+  /**
+    * Specifies the origin of the previously defined field. The origin field 
is defined by a
+    * connector or format.
+    *
+    * E.g. field("myString", Types.STRING).from("CSV_MY_STRING")
+    *
+    * Note: Field names are matched by the exact name by default (case 
sensitive).
+    */
+  def from(originFieldName: String): Schema = {
+    lastField match {
+      case None => throw new ValidationException("No field previously defined. 
Use field() before.")
+      case Some(f) =>
+        tableSchema(f) += (FROM -> originFieldName)
+        lastField = None
+    }
+    this
+  }
+
+  /**
+    * Specifies the previously defined field as a processing-time attribute.
+    *
+    * E.g. field("proctime", Types.SQL_TIMESTAMP).proctime()
+    */
+  def proctime(): Schema = {
+    lastField match {
+      case None => throw new ValidationException("No field defined previously. 
Use field() before.")
+      case Some(f) =>
+        tableSchema(f) += (PROCTIME -> PROCTIME_VALUE_TRUE)
+        lastField = None
+    }
+    this
+  }
+
+  /**
+    * Specifies the previously defined field as an event-time attribute.
+    *
+    * E.g. field("rowtime", Types.SQL_TIMESTAMP).rowtime(...)
+    */
+  def rowtime(rowtime: Rowtime): Schema = {
+    lastField match {
+      case None => throw new ValidationException("No field defined previously. 
Use field() before.")
+      case Some(f) =>
+        val fieldProperties = new DescriptorProperties()
+        rowtime.addProperties(fieldProperties)
+        tableSchema(f) ++= fieldProperties.asMap
+        lastField = None
+    }
+    this
+  }
+
+  /**
+    * Internal method for properties conversion.
+    */
+  final override private[flink] def addProperties(properties: 
DescriptorProperties): Unit = {
+    properties.putInt(SCHEMA_VERSION, 1)
+    properties.putIndexedVariableProperties(
+      SCHEMA,
+      tableSchema.toSeq.map { case (name, props) =>
+        Map(NAME -> name) ++ props
+      }
+    )
+  }
+}
+
+/**
+  * Describes a schema of a table.
+  */
+object Schema {
+
+  /**
+    * Describes a schema of a table.
+    */
+  def apply(): Schema = new Schema()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
new file mode 100644
index 0000000..19c0e41
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME
+import org.apache.flink.table.descriptors.SchemaValidator._
+
+/**
+  * Validator for [[Schema]].
+  */
+class SchemaValidator(isStreamEnvironment: Boolean = true) extends 
DescriptorValidator {
+
+  override def validate(properties: DescriptorProperties): Unit = {
+    properties.validateInt(SCHEMA_VERSION, isOptional = true, 0, 
Integer.MAX_VALUE)
+
+    val names = properties.getIndexedProperty(SCHEMA, NAME)
+    val types = properties.getIndexedProperty(SCHEMA, TYPE)
+
+    if (names.isEmpty && types.isEmpty) {
+      throw new ValidationException(s"Could not find the required schema for 
property '$SCHEMA'.")
+    }
+
+    for (i <- 0 until Math.max(names.size, types.size)) {
+      properties.validateString(s"$SCHEMA.$i.$NAME", isOptional = false, 
minLen = 1)
+      properties.validateType(s"$SCHEMA.$i.$TYPE", isOptional = false)
+      properties.validateString(s"$SCHEMA.$i.$FROM", isOptional = true, minLen 
= 1)
+      // either proctime or rowtime
+      val proctime = s"$SCHEMA.$i.$PROCTIME"
+      val rowtime = s"$SCHEMA.$i.$ROWTIME"
+      if (properties.contains(proctime)) {
+        if (!isStreamEnvironment) {
+          throw new ValidationException(
+            s"Property '$proctime' is not allowed in a batch environment.")
+        }
+        // check proctime
+        properties.validateBoolean(proctime, isOptional = false)
+        // no rowtime
+        properties.validatePrefixExclusion(rowtime)
+      } else if (properties.hasPrefix(rowtime)) {
+        // check rowtime
+        val rowtimeValidator = new RowtimeValidator(s"$SCHEMA.$i.")
+        rowtimeValidator.validate(properties)
+        // no proctime
+        properties.validateExclusion(proctime)
+      }
+    }
+  }
+}
+
+object SchemaValidator {
+
+  val SCHEMA = "schema"
+  val SCHEMA_VERSION = "schema.version"
+
+  // per column properties
+
+  val NAME = "name"
+  val TYPE = "type"
+  val PROCTIME = "proctime"
+  val PROCTIME_VALUE_TRUE = "true"
+  val FROM = "from"
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala
new file mode 100644
index 0000000..3037286
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.StatisticsValidator._
+import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+  * Statistics descriptor for describing table stats.
+  */
+class Statistics extends Descriptor {
+
+  private var rowCount: Option[Long] = None
+  private val columnStats: mutable.LinkedHashMap[String, mutable.Map[String, 
String]] =
+    mutable.LinkedHashMap[String, mutable.Map[String, String]]()
+
+  /**
+    * Sets the statistics from a [[TableStats]] instance.
+    *
+    * This method overwrites all existing statistics.
+    *
+    * @param tableStats the table statistics
+    */
+  def tableStats(tableStats: TableStats): Statistics = {
+    rowCount(tableStats.rowCount)
+    columnStats.clear()
+    tableStats.colStats.asScala.foreach { case (col, stats) =>
+        columnStats(col, stats)
+    }
+    this
+  }
+
+  /**
+    * Sets statistics for the overall row count. Required.
+    *
+    * @param rowCount the expected number of rows
+    */
+  def rowCount(rowCount: Long): Statistics = {
+    this.rowCount = Some(rowCount)
+    this
+  }
+
+  /**
+    * Sets statistics for a column. Overwrites all existing statistics for 
this column.
+    *
+    * @param columnName the column name
+    * @param columnStats expected statistics for the column
+    */
+  def columnStats(columnName: String, columnStats: ColumnStats): Statistics = {
+    val map = mutable.Map(normalizeColumnStats(columnStats).toSeq: _*)
+    this.columnStats.put(columnName, map)
+    this
+  }
+
+  /**
+    * Sets the number of distinct values statistic for the given column.
+    */
+  def columnDistinctCount(columnName: String, ndv: Long): Statistics = {
+    this.columnStats
+      .getOrElseUpdate(columnName, mutable.HashMap())
+      .put(DISTINCT_COUNT, ndv.toString)
+    this
+  }
+
+  /**
+    * Sets the number of null values statistic for the given column.
+    */
+  def columnNullCount(columnName: String, nullCount: Long): Statistics = {
+    this.columnStats
+      .getOrElseUpdate(columnName, mutable.HashMap())
+      .put(NULL_COUNT, nullCount.toString)
+    this
+  }
+
+  /**
+    * Sets the average length statistic for the given column.
+    */
+  def columnAvgLength(columnName: String, avgLen: Double): Statistics = {
+    this.columnStats
+      .getOrElseUpdate(columnName, mutable.HashMap())
+      .put(AVG_LENGTH, avgLen.toString)
+    this
+  }
+
+  /**
+    * Sets the maximum length statistic for the given column.
+    */
+  def columnMaxLength(columnName: String, maxLen: Integer): Statistics = {
+    this.columnStats
+      .getOrElseUpdate(columnName, mutable.HashMap())
+      .put(MAX_LENGTH, maxLen.toString)
+    this
+  }
+
+  /**
+    * Sets the maximum value statistic for the given column.
+    */
+  def columnMaxValue(columnName: String, max: Number): Statistics = {
+    this.columnStats
+      .getOrElseUpdate(columnName, mutable.HashMap())
+      .put(MAX_VALUE, max.toString)
+    this
+  }
+
+  /**
+    * Sets the minimum value statistic for the given column.
+    */
+  def columnMinValue(columnName: String, min: Number): Statistics = {
+    this.columnStats
+      .getOrElseUpdate(columnName, mutable.HashMap())
+      .put(MIN_VALUE, min.toString)
+    this
+  }
+
+  /**
+    * Internal method for properties conversion.
+    */
+  final override def addProperties(properties: DescriptorProperties): Unit = {
+    properties.putInt(STATISTICS_VERSION, 1)
+    rowCount.foreach(rc => properties.putLong(STATISTICS_ROW_COUNT, rc))
+    val namedStats = columnStats.map { case (name, stats) =>
+      // name should not be part of the properties key
+      (stats + (NAME -> name)).toMap
+    }.toSeq
+    properties.putIndexedVariableProperties(STATISTICS_COLUMNS, namedStats)
+  }
+}
+
+/**
+  * Statistics descriptor for describing table stats.
+  */
+object Statistics {
+
+  /**
+    * Statistics descriptor for describing table stats.
+    */
+  def apply(): Statistics = new Statistics()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala
new file mode 100644
index 0000000..a78e422
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.ValidationException
+import 
org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, 
STATISTICS_ROW_COUNT, STATISTICS_VERSION, validateColumnStats}
+import org.apache.flink.table.plan.stats.ColumnStats
+
+import scala.collection.mutable
+
+/**
+  * Validator for [[FormatDescriptor]].
+  */
+class StatisticsValidator extends DescriptorValidator {
+
+  override def validate(properties: DescriptorProperties): Unit = {
+    properties.validateInt(STATISTICS_VERSION, isOptional = true, 0, 
Integer.MAX_VALUE)
+    properties.validateLong(STATISTICS_ROW_COUNT, isOptional = true, min = 0)
+    validateColumnStats(properties, STATISTICS_COLUMNS)
+  }
+}
+
+object StatisticsValidator {
+
+  val STATISTICS_VERSION = "statistics.version"
+  val STATISTICS_ROW_COUNT = "statistics.row-count"
+  val STATISTICS_COLUMNS = "statistics.columns"
+
+  // per column properties
+
+  val NAME = "name"
+  val DISTINCT_COUNT = "distinct-count"
+  val NULL_COUNT = "null-count"
+  val AVG_LENGTH = "avg-length"
+  val MAX_LENGTH = "max-length"
+  val MAX_VALUE = "max-value"
+  val MIN_VALUE = "min-value"
+
+  // utilities
+
+  def normalizeColumnStats(columnStats: ColumnStats): Map[String, String] = {
+    val stats = mutable.HashMap[String, String]()
+    if (columnStats.ndv != null) {
+      stats += DISTINCT_COUNT -> columnStats.ndv.toString
+    }
+    if (columnStats.nullCount != null) {
+      stats += NULL_COUNT -> columnStats.nullCount.toString
+    }
+    if (columnStats.avgLen != null) {
+      stats += AVG_LENGTH -> columnStats.avgLen.toString
+    }
+    if (columnStats.maxLen != null) {
+      stats += MAX_LENGTH -> columnStats.maxLen.toString
+    }
+    if (columnStats.max != null) {
+      stats += MAX_VALUE -> columnStats.max.toString
+    }
+    if (columnStats.min != null) {
+      stats += MIN_VALUE -> columnStats.min.toString
+    }
+    stats.toMap
+  }
+
+  def validateColumnStats(properties: DescriptorProperties, key: String): Unit 
= {
+
+    // filter for number of columns
+    val columnCount = properties.getIndexedProperty(key, NAME).size
+
+    for (i <- 0 until columnCount) {
+      properties.validateString(s"$key.$i.$NAME", isOptional = false, minLen = 
1)
+      properties.validateLong(s"$key.$i.$DISTINCT_COUNT", isOptional = true, 
min = 0L)
+      properties.validateLong(s"$key.$i.$NULL_COUNT", isOptional = true, min = 
0L)
+      properties.validateDouble(s"$key.$i.$AVG_LENGTH", isOptional = true, min 
= 0.0)
+      properties.validateInt(s"$key.$i.$MAX_LENGTH", isOptional = true, min = 
0)
+      properties.validateDouble(s"$key.$i.$MAX_VALUE", isOptional = true, min 
= 0.0)
+      properties.validateDouble(s"$key.$i.$MIN_VALUE", isOptional = true, min 
= 0.0)
+    }
+  }
+
+  def readColumnStats(properties: DescriptorProperties, key: String): 
Map[String, ColumnStats] = {
+
+    // filter for number of columns
+    val columnCount = properties.getIndexedProperty(key, NAME).size
+
+    val stats = for (i <- 0 until columnCount) yield {
+      val name = properties.getString(s"$key.$i.$NAME").getOrElse(
+        throw new ValidationException(s"Could not find name of property 
'$key.$i.$NAME'."))
+
+      val stats = ColumnStats(
+        properties.getLong(s"$key.$i.$DISTINCT_COUNT").map(v => 
Long.box(v)).orNull,
+        properties.getLong(s"$key.$i.$NULL_COUNT").map(v => 
Long.box(v)).orNull,
+        properties.getDouble(s"$key.$i.$AVG_LENGTH").map(v => 
Double.box(v)).orNull,
+        properties.getInt(s"$key.$i.$MAX_LENGTH").map(v => Int.box(v)).orNull,
+        properties.getDouble(s"$key.$i.$MAX_VALUE").map(v => 
Double.box(v)).orNull,
+        properties.getDouble(s"$key.$i.$MIN_VALUE").map(v => 
Double.box(v)).orNull
+      )
+
+      name -> stats
+    }
+
+    stats.toMap
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala
new file mode 100644
index 0000000..ae88236
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{StreamTableEnvironment, Table, 
TableException}
+import org.apache.flink.table.sources.{StreamTableSource, TableSource, 
TableSourceFactoryService}
+
+/**
+  * Descriptor for specifying a table source in a streaming environment.
+  */
+class StreamTableSourceDescriptor(tableEnv: StreamTableEnvironment, connector: 
ConnectorDescriptor)
+  extends TableSourceDescriptor(connector) {
+
+  /**
+    * Searches for the specified table source, configures it accordingly, and 
returns it.
+    */
+  def toTableSource: TableSource[_] = {
+    val source = TableSourceFactoryService.findTableSourceFactory(this)
+    source match {
+      case _: StreamTableSource[_] => source
+      case _ => throw new TableException(
+        s"Found table source '${source.getClass.getCanonicalName}' is not 
applicable " +
+          s"in a streaming environment.")
+    }
+  }
+
+  /**
+    * Searches for the specified table source, configures it accordingly, and 
returns it as a table.
+    */
+  def toTable: Table = {
+    tableEnv.fromTableSource(toTableSource)
+  }
+
+  /**
+    * Searches for the specified table source, configures it accordingly, and 
registers it as
+    * a table under the given name.
+    *
+    * @param name table name to be registered in the table environment
+    */
+  def register(name: String): Unit = {
+    tableEnv.registerTableSource(name, toTableSource)
+  }
+
+  /**
+    * Specifies the format that defines how to read data from a connector.
+    */
+  def withFormat(format: FormatDescriptor): StreamTableSourceDescriptor = {
+    formatDescriptor = Some(format)
+    this
+  }
+
+  /**
+    * Specifies the resulting table schema.
+    */
+  def withSchema(schema: Schema): StreamTableSourceDescriptor = {
+    schemaDescriptor = Some(schema)
+    this
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
new file mode 100644
index 0000000..918b618
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.ValidationException
+import 
org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, 
STATISTICS_ROW_COUNT, readColumnStats}
+import org.apache.flink.table.plan.stats.TableStats
+
+import scala.collection.JavaConverters._
+
+/**
+  * Common class for all descriptors describing a table source.
+  */
+abstract class TableSourceDescriptor(connector: ConnectorDescriptor) extends 
Descriptor {
+
+  protected val connectorDescriptor: ConnectorDescriptor = connector
+
+  protected var formatDescriptor: Option[FormatDescriptor] = None
+  protected var schemaDescriptor: Option[Schema] = None
+  protected var statisticsDescriptor: Option[Statistics] = None
+  protected var metaDescriptor: Option[Metadata] = None
+
+  /**
+    * Internal method for properties conversion.
+    */
+  override private[flink] def addProperties(properties: DescriptorProperties): 
Unit = {
+    connectorDescriptor.addProperties(properties)
+
+    // check for a format
+    if (connectorDescriptor.needsFormat() && formatDescriptor.isEmpty) {
+      throw new ValidationException(
+        s"The connector '$connectorDescriptor' requires a format description.")
+    } else if (!connectorDescriptor.needsFormat() && 
formatDescriptor.isDefined) {
+      throw new ValidationException(
+        s"The connector '$connectorDescriptor' does not require a format 
description " +
+          s"but '${formatDescriptor.get}' found.")
+    }
+
+    formatDescriptor.foreach(_.addProperties(properties))
+    schemaDescriptor.foreach(_.addProperties(properties))
+    metaDescriptor.foreach(_.addProperties(properties))
+  }
+
+  /**
+    * Reads table statistics from the descriptors properties.
+    */
+  protected def getTableStats: Option[TableStats] = {
+      val normalizedProps = new DescriptorProperties()
+      addProperties(normalizedProps)
+      val rowCount = normalizedProps.getLong(STATISTICS_ROW_COUNT).map(v => 
Long.box(v))
+      rowCount match {
+        case Some(cnt) =>
+          val columnStats = readColumnStats(normalizedProps, 
STATISTICS_COLUMNS)
+          Some(TableStats(cnt, columnStats.asJava))
+        case None =>
+          None
+      }
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
index d5a5f36..da08916 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
@@ -40,7 +40,7 @@ class FlinkStatistic(tableStats: Option[TableStats]) extends 
Statistic {
     *
     * @return The table statistics
     */
-  def getTableStats: TableStats = tableStats.getOrElse(null)
+  def getTableStats: TableStats = tableStats.orNull
 
   /**
     * Returns the stats of the specified the column.

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
index ba076b4..e0022c5 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
@@ -270,7 +270,7 @@ object CsvTableSource {
     /**
       * Adds a field with the field name and the type information. Required.
       * This method can be called multiple times. The call order of this 
method defines
-      * also the order of thee fields in a row.
+      * also the order of the fields in a row.
       *
       * @param fieldName the field name
       * @param fieldType the type information of the field

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceConverter.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceConverter.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceConverter.scala
index 68d2f55..ab984a6 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceConverter.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceConverter.scala
@@ -29,7 +29,11 @@ import com.google.common.collect.ImmutableSet
 /**
   * The class defines a converter used to convert [[CsvTableSource]] to
   * or from [[ExternalCatalogTable]].
+  *
+  * @deprecated Use the more generic 
[[org.apache.flink.table.sources.TableSourceFactory]] instead.
   */
+@Deprecated
+@deprecated("Use the more generic table source factories instead.")
 @TableType(value = "csv")
 class CsvTableSourceConverter extends TableSourceConverter[CsvTableSource] {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
new file mode 100644
index 0000000..bec4565
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util
+
+import org.apache.flink.table.api.TableException
+import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE,
 CONNECTOR_VERSION}
+import org.apache.flink.table.descriptors.CsvValidator._
+import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, 
CONNECTOR_TYPE_VALUE}
+import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, 
FORMAT_VERSION}
+import org.apache.flink.table.descriptors.SchemaValidator.{SCHEMA, 
SCHEMA_VERSION}
+import org.apache.flink.table.descriptors._
+import org.apache.flink.types.Row
+
+/**
+  * Factory for creating configured instances of [[CsvTableSource]].
+  */
+class CsvTableSourceFactory extends TableSourceFactory[Row] {
+
+  override def requiredContext(): util.Map[String, String] = {
+    val context = new util.HashMap[String, String]()
+    context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE)
+    context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE)
+    context.put(CONNECTOR_VERSION, "1")
+    context.put(FORMAT_VERSION, "1")
+    context.put(SCHEMA_VERSION, "1")
+    context
+  }
+
+  override def supportedProperties(): util.List[String] = {
+    val properties = new util.ArrayList[String]()
+    // connector
+    properties.add(CONNECTOR_PATH)
+    // format
+    properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.TYPE}")
+    properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.NAME}")
+    properties.add(FORMAT_FIELD_DELIMITER)
+    properties.add(FORMAT_LINE_DELIMITER)
+    properties.add(FORMAT_QUOTE_CHARACTER)
+    properties.add(FORMAT_COMMENT_PREFIX)
+    properties.add(FORMAT_IGNORE_FIRST_LINE)
+    properties.add(FORMAT_IGNORE_PARSE_ERRORS)
+    properties.add(CONNECTOR_PATH)
+    // schema
+    properties.add(s"$SCHEMA.#.${DescriptorProperties.TYPE}")
+    properties.add(s"$SCHEMA.#.${DescriptorProperties.NAME}")
+    properties
+  }
+
+  override def create(properties: util.Map[String, String]): TableSource[Row] 
= {
+    val params = new DescriptorProperties()
+    params.putProperties(properties)
+
+    // validate
+    new FileSystemValidator().validate(params)
+    new CsvValidator().validate(params)
+    new SchemaValidator().validate(params)
+
+    // build
+    val csvTableSourceBuilder = new CsvTableSource.Builder
+
+    val tableSchema = params.getTableSchema(SCHEMA).get
+    val encodingSchema = params.getTableSchema(FORMAT_FIELDS)
+
+    // the CsvTableSource needs some rework first
+    // for now the schema must be equal to the encoding
+    if (!encodingSchema.contains(tableSchema)) {
+      throw new TableException(
+        "Encodings that differ from the schema are not supported yet for 
CsvTableSources.")
+    }
+
+    params.getString(CONNECTOR_PATH).foreach(csvTableSourceBuilder.path)
+    
params.getString(FORMAT_FIELD_DELIMITER).foreach(csvTableSourceBuilder.fieldDelimiter)
+    
params.getString(FORMAT_LINE_DELIMITER).foreach(csvTableSourceBuilder.lineDelimiter)
+
+    encodingSchema.foreach { schema =>
+      schema.getColumnNames.zip(schema.getTypes).foreach { case (name, tpe) =>
+        csvTableSourceBuilder.field(name, tpe)
+      }
+    }
+    
params.getCharacter(FORMAT_QUOTE_CHARACTER).foreach(csvTableSourceBuilder.quoteCharacter)
+    
params.getString(FORMAT_COMMENT_PREFIX).foreach(csvTableSourceBuilder.commentPrefix)
+    params.getBoolean(FORMAT_IGNORE_FIRST_LINE).foreach { flag =>
+      if (flag) {
+        csvTableSourceBuilder.ignoreFirstLine()
+      }
+    }
+    params.getBoolean(FORMAT_IGNORE_PARSE_ERRORS).foreach { flag =>
+      if (flag) {
+        csvTableSourceBuilder.ignoreParseErrors()
+      }
+    }
+
+    csvTableSourceBuilder.build()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala
new file mode 100644
index 0000000..f42d765
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util
+
+/**
+  * A factory to create a [[TableSource]]. This factory is used with Java's 
Service Provider
+  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
+  * describe the desired table source. The factory allows for matching to the 
given set of
+  * properties and creating a configured [[TableSource]] accordingly.
+  *
+  * Classes that implement this interface need to be added to the
+  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' file 
of a JAR file in
+  * the current classpath to be found.
+  */
+trait TableSourceFactory[T] {
+
+  /**
+    * Specifies the context that this factory has been implemented for. The 
framework guarantees
+    * to only call the [[create()]] method of the factory if the specified set 
of properties and
+    * values are met.
+    *
+    * Typical properties might be:
+    *   - connector.type
+    *   - format.type
+    *
+    * Specified versions allow the framework to provide backwards compatible 
properties in case of
+    * string format changes:
+    *   - connector.version
+    *   - format.version
+    *
+    * An empty context means that the factory matches for all requests.
+    */
+  def requiredContext(): util.Map[String, String]
+
+  /**
+    * List of property keys that this factory can handle. This method will be 
used for validation.
+    * If a property is passed that this factory cannot handle, an exception 
will be thrown. The
+    * list must not contain the keys that are specified by the context.
+    *
+    * Example properties might be:
+    *   - format.line-delimiter
+    *   - format.ignore-parse-errors
+    *   - format.fields.#.type
+    *   - format.fields.#.name
+    *
+    * Note: Use "#" to denote an array of values where "#" represents one or 
more digits.
+    */
+  def supportedProperties(): util.List[String]
+
+  /**
+    * Creates and configures a [[TableSource]] using the given properties.
+    *
+    * @param properties normalized properties describing a table source
+    * @return the configured table source
+    */
+  def create(properties: util.Map[String, String]): TableSource[T]
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala
new file mode 100644
index 0000000..cb737a9
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util.{ServiceConfigurationError, ServiceLoader}
+
+import org.apache.flink.table.api.{AmbiguousTableSourceException, 
NoMatchingTableSourceException, TableException, ValidationException}
+import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION
+import 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_VERSION
+import org.apache.flink.table.descriptors.MetadataValidator.METADATA_VERSION
+import org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_VERSION
+import org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_VERSION
+import 
org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_VERSION
+import org.apache.flink.table.descriptors._
+import org.apache.flink.table.util.Logging
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+  * Service provider interface for finding suitable table source factories for 
the given properties.
+  */
+object TableSourceFactoryService extends Logging {
+
+  private lazy val loader = ServiceLoader.load(classOf[TableSourceFactory[_]])
+
+  def findTableSourceFactory(descriptor: TableSourceDescriptor): 
TableSource[_] = {
+    val properties = new DescriptorProperties()
+    descriptor.addProperties(properties)
+    findTableSourceFactory(properties.asMap)
+  }
+
+  def findTableSourceFactory(properties: Map[String, String]): TableSource[_] 
= {
+    var matchingFactory: Option[(TableSourceFactory[_], Seq[String])] = None
+    try {
+      val iter = loader.iterator()
+      while (iter.hasNext) {
+        val factory = iter.next()
+
+        val requiredContextJava = try {
+          factory.requiredContext()
+        } catch {
+          case t: Throwable =>
+            throw new TableException(
+              s"Table source factory '${factory.getClass.getCanonicalName}' 
caused an exception.",
+              t)
+        }
+
+        val requiredContext = if (requiredContextJava != null) {
+          // normalize properties
+          requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2))
+        } else {
+          Map[String, String]()
+        }
+
+        val plainContext = mutable.Map[String, String]()
+        plainContext ++= requiredContext
+        // we remove the versions for now until we have the first backwards 
compatibility case
+        // with the version we can provide mappings in case the format changes
+        plainContext.remove(CONNECTOR_VERSION)
+        plainContext.remove(FORMAT_VERSION)
+        plainContext.remove(SCHEMA_VERSION)
+        plainContext.remove(ROWTIME_VERSION)
+        plainContext.remove(METADATA_VERSION)
+        plainContext.remove(STATISTICS_VERSION)
+
+        // check if required context is met
+        if (plainContext.forall(e => properties.contains(e._1) && 
properties(e._1) == e._2)) {
+          matchingFactory match {
+            case Some(_) => throw new AmbiguousTableSourceException(properties)
+            case None => matchingFactory = Some((factory, 
requiredContext.keys.toSeq))
+          }
+        }
+      }
+    } catch {
+      case e: ServiceConfigurationError =>
+        LOG.error("Could not load service provider for table source 
factories.", e)
+        throw new TableException("Could not load service provider for table 
source factories.", e)
+    }
+
+    val (factory, context) = matchingFactory
+      .getOrElse(throw new NoMatchingTableSourceException(properties))
+
+    val plainProperties = mutable.ArrayBuffer[String]()
+    properties.keys.foreach { k =>
+      // replace arrays with wildcard
+      val key = k.replaceAll(".\\d+", ".#")
+      // ignore context properties and duplicates
+      if (!context.contains(key) && !plainProperties.contains(key)) {
+        plainProperties += key
+      }
+    }
+
+    val supportedPropertiesJava = try {
+      factory.supportedProperties()
+    } catch {
+      case t: Throwable =>
+        throw new TableException(
+          s"Table source factory '${factory.getClass.getCanonicalName}' caused 
an exception.",
+          t)
+    }
+
+    val supportedProperties = if (supportedPropertiesJava != null) {
+      supportedPropertiesJava.asScala.map(_.toLowerCase)
+    } else {
+      Seq[String]()
+    }
+
+    // check for supported properties
+    plainProperties.foreach { k =>
+      if (!supportedProperties.contains(k)) {
+        throw new ValidationException(
+          s"Table factory '${factory.getClass.getCanonicalName}' does not 
support the " +
+          s"property '$k'. Supported properties are: 
\n${supportedProperties.mkString("\n")}")
+      }
+    }
+
+    // create the table source
+    try {
+      factory.create(properties.asJava)
+    } catch {
+      case t: Throwable =>
+        throw new TableException(
+          s"Table source factory '${factory.getClass.getCanonicalName}' caused 
an exception.",
+          t)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala
index 34c6ba5..ce57b92 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala
@@ -25,7 +25,7 @@ import org.apache.flink.table.sources.FieldComputer
 /**
   * Provides the an expression to extract the timestamp for a rowtime 
attribute.
   */
-abstract class TimestampExtractor extends FieldComputer[Long] {
+abstract class TimestampExtractor extends FieldComputer[Long] with 
Serializable {
 
   /** Timestamp extractors compute the timestamp as Long. */
   override def getReturnType: TypeInformation[Long] = 
Types.LONG.asInstanceOf[TypeInformation[Long]]

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
index 3a947ac..9fc7c88 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
@@ -21,12 +21,12 @@ package org.apache.flink.table.sources.wmstrategies
 import org.apache.flink.streaming.api.watermark.Watermark
 
 /**
-  * A watermark assigner for ascending rowtime attributes.
+  * A watermark strategy for ascending rowtime attributes.
   *
   * Emits a watermark of the maximum observed timestamp so far minus 1.
   * Rows that have a timestamp equal to the max timestamp are not late.
   */
-class AscendingTimestamps extends PeriodicWatermarkAssigner {
+final class AscendingTimestamps extends PeriodicWatermarkAssigner {
 
   var maxTimestamp: Long = Long.MinValue + 1
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
index 957daca..8f7c235 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
@@ -21,13 +21,13 @@ package org.apache.flink.table.sources.wmstrategies
 import org.apache.flink.streaming.api.watermark.Watermark
 
 /**
-  * A watermark assigner for rowtime attributes which are out-of-order by a 
bounded time interval.
+  * A watermark strategy for rowtime attributes which are out-of-order by a 
bounded time interval.
   *
   * Emits watermarks which are the maximum observed timestamp minus the 
specified delay.
   *
   * @param delay The delay by which watermarks are behind the maximum observed 
timestamp.
   */
-class BoundedOutOfOrderTimestamps(val delay: Long) extends 
PeriodicWatermarkAssigner {
+final class BoundedOutOfOrderTimestamps(val delay: Long) extends 
PeriodicWatermarkAssigner {
 
   var maxTimestamp: Long = Long.MinValue + delay
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
index 4c7f4e4..dd71bd3 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
@@ -62,7 +62,7 @@ abstract class PunctuatedWatermarkAssigner extends 
WatermarkStrategy {
 }
 
 /** A strategy which indicates the watermarks should be preserved from the 
underlying datastream.*/
-class PreserveWatermarks extends WatermarkStrategy
+final class PreserveWatermarks extends WatermarkStrategy
 object PreserveWatermarks {
   val INSTANCE: PreserveWatermarks = new PreserveWatermarks
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
new file mode 100644
index 0000000..253b491
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils
+
+import java.io.Serializable
+
+import org.apache.commons.codec.binary.Base64
+import org.apache.commons.lang3.StringEscapeUtils
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.{TableException, Types, ValidationException}
+import 
org.apache.flink.table.descriptors.DescriptorProperties.normalizeTypeInfo
+import org.apache.flink.util.InstantiationUtil
+
+import _root_.scala.language.implicitConversions
+import _root_.scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers}
+
+/**
+  * Utilities to convert 
[[org.apache.flink.api.common.typeinfo.TypeInformation]] into a
+  * string representation and back.
+  */
+object TypeStringUtils extends JavaTokenParsers with PackratParsers {
+  case class Keyword(key: String)
+
+  // convert the keyword into an case insensitive Parser
+  implicit def keyword2Parser(kw: Keyword): Parser[String] = {
+    ("""(?i)\Q""" + kw.key + """\E""").r
+  }
+
+  lazy val VARCHAR: Keyword = Keyword("VARCHAR")
+  lazy val STRING: Keyword = Keyword("STRING")
+  lazy val BOOLEAN: Keyword = Keyword("BOOLEAN")
+  lazy val BYTE: Keyword = Keyword("BYTE")
+  lazy val TINYINT: Keyword = Keyword("TINYINT")
+  lazy val SHORT: Keyword = Keyword("SHORT")
+  lazy val SMALLINT: Keyword = Keyword("SMALLINT")
+  lazy val INT: Keyword = Keyword("INT")
+  lazy val LONG: Keyword = Keyword("LONG")
+  lazy val BIGINT: Keyword = Keyword("BIGINT")
+  lazy val FLOAT: Keyword = Keyword("FLOAT")
+  lazy val DOUBLE: Keyword = Keyword("DOUBLE")
+  lazy val DECIMAL: Keyword = Keyword("DECIMAL")
+  lazy val SQL_DATE: Keyword = Keyword("SQL_DATE")
+  lazy val DATE: Keyword = Keyword("DATE")
+  lazy val SQL_TIME: Keyword = Keyword("SQL_TIME")
+  lazy val TIME: Keyword = Keyword("TIME")
+  lazy val SQL_TIMESTAMP: Keyword = Keyword("SQL_TIMESTAMP")
+  lazy val TIMESTAMP: Keyword = Keyword("TIMESTAMP")
+  lazy val ROW: Keyword = Keyword("ROW")
+  lazy val ANY: Keyword = Keyword("ANY")
+  lazy val POJO: Keyword = Keyword("POJO")
+
+  lazy val qualifiedName: Parser[String] =
+    """\p{javaJavaIdentifierStart}[\p{javaJavaIdentifierPart}.]*""".r
+
+  lazy val base64Url: Parser[String] =
+    """[A-Za-z0-9_-]*""".r
+
+  lazy val atomic: PackratParser[TypeInformation[_]] =
+    (VARCHAR | STRING) ^^ { e => Types.STRING } |
+    BOOLEAN ^^ { e => Types.BOOLEAN } |
+    (TINYINT | BYTE) ^^ { e => Types.BYTE } |
+    (SMALLINT | SHORT) ^^ { e => Types.SHORT } |
+    INT ^^ { e => Types.INT } |
+    (BIGINT | LONG) ^^ { e => Types.LONG } |
+    FLOAT ^^ { e => Types.FLOAT } |
+    DOUBLE ^^ { e => Types.DOUBLE } |
+    DECIMAL ^^ { e => Types.DECIMAL } |
+    (DATE | SQL_DATE) ^^ { e => 
Types.SQL_DATE.asInstanceOf[TypeInformation[_]] } |
+    (TIMESTAMP | SQL_TIMESTAMP) ^^ { e => Types.SQL_TIMESTAMP } |
+    (TIME | SQL_TIME) ^^ { e => Types.SQL_TIME }
+
+  lazy val escapedFieldName: PackratParser[String] = "\"" ~> stringLiteral <~ 
"\"" ^^ { s =>
+    StringEscapeUtils.unescapeJava(s)
+  }
+
+  lazy val fieldName: PackratParser[String] = escapedFieldName | stringLiteral 
| ident
+
+  lazy val field: PackratParser[(String, TypeInformation[_])] =
+    fieldName ~ typeInfo ^^ {
+      case name ~ info => (name, info)
+    }
+
+  lazy val namedRow: PackratParser[TypeInformation[_]] =
+    ROW ~ "(" ~> rep1sep(field, ",") <~ ")" ^^ {
+      fields => Types.ROW(fields.map(_._1).toArray, fields.map(_._2).toArray)
+    } | failure("Named row type expected.")
+
+  lazy val unnamedRow: PackratParser[TypeInformation[_]] =
+    ROW ~ "(" ~> rep1sep(typeInfo, ",") <~ ")" ^^ {
+      types => Types.ROW(types: _*)
+    } | failure("Unnamed row type expected.")
+
+  lazy val generic: PackratParser[TypeInformation[_]] =
+    ANY ~ "(" ~> qualifiedName <~ ")" ^^ {
+      typeClass =>
+        val clazz = loadClass(typeClass)
+        new GenericTypeInfo[AnyRef](clazz.asInstanceOf[Class[AnyRef]])
+    }
+
+  lazy val pojo: PackratParser[TypeInformation[_]] = POJO ~ "(" ~> 
qualifiedName <~ ")" ^^ {
+    typeClass =>
+      val clazz = loadClass(typeClass)
+      val info = TypeExtractor.createTypeInfo(clazz)
+      if (!info.isInstanceOf[PojoTypeInfo[_]]) {
+        throw new ValidationException(s"Class '$typeClass'is not a POJO type.")
+      }
+      info
+  }
+
+  lazy val any: PackratParser[TypeInformation[_]] =
+    ANY ~ "(" ~ qualifiedName ~ "," ~ base64Url ~ ")" ^^ {
+      case _ ~ _ ~ typeClass ~ _ ~ serialized ~ _=>
+        val clazz = loadClass(typeClass)
+        val typeInfo = deserialize(serialized)
+
+        if (clazz != typeInfo.getTypeClass) {
+          throw new ValidationException(
+            s"Class '$typeClass' does no correspond to serialized data.")
+        }
+        typeInfo
+    }
+
+  lazy val typeInfo: PackratParser[TypeInformation[_]] =
+    namedRow | unnamedRow | any | generic | pojo | atomic | failure("Invalid 
type.")
+
+  def readTypeInfo(typeString: String): TypeInformation[_] = {
+    parseAll(typeInfo, typeString) match {
+      case Success(lst, _) => lst
+
+      case NoSuccess(msg, next) =>
+        throwError(msg, next)
+    }
+  }
+
+  // 
----------------------------------------------------------------------------------------------
+
+  def writeTypeInfo(typeInfo: TypeInformation[_]): String = typeInfo match {
+
+    case Types.STRING => VARCHAR.key
+    case Types.BOOLEAN => BOOLEAN.key
+    case Types.BYTE => TINYINT.key
+    case Types.SHORT => SMALLINT.key
+    case Types.INT => INT.key
+    case Types.LONG => BIGINT.key
+    case Types.FLOAT => FLOAT.key
+    case Types.DOUBLE => DOUBLE.key
+    case Types.DECIMAL => DECIMAL.key
+    case Types.SQL_DATE => DATE.key
+    case Types.SQL_TIME => TIME.key
+    case Types.SQL_TIMESTAMP => TIMESTAMP.key
+
+    case rt: RowTypeInfo =>
+      val fields = rt.getFieldNames.zip(rt.getFieldTypes)
+      val normalizedFields = fields.map { f =>
+
+        // escape field name if it contains spaces
+        val name = if (!f._1.matches("\\S+")) {
+          "\"" + StringEscapeUtils.escapeJava(f._1) + "\""
+        } else {
+          f._1
+        }
+
+        s"$name ${normalizeTypeInfo(f._2)}"
+      }
+      s"${ROW.key}(${normalizedFields.mkString(", ")})"
+
+    case generic: GenericTypeInfo[_] =>
+      s"${ANY.key}(${generic.getTypeClass.getName})"
+
+    case pojo: PojoTypeInfo[_] =>
+      // we only support very simple POJOs that only contain extracted fields
+      // (not manually specified)
+      val extractedInfo = try {
+        Some(TypeExtractor.createTypeInfo(pojo.getTypeClass))
+      } catch {
+        case _: InvalidTypesException => None
+      }
+      extractedInfo match {
+        case Some(ei) if ei == pojo => 
s"${POJO.key}(${pojo.getTypeClass.getName})"
+        case _ =>
+          throw new TableException(
+            "A string representation for custom POJO types is not supported 
yet.")
+      }
+
+    case _: CompositeType[_] =>
+      throw new TableException("A string representation for composite types is 
not supported yet.")
+
+    case _: BasicArrayTypeInfo[_, _] | _: ObjectArrayTypeInfo[_, _] |
+         _: PrimitiveArrayTypeInfo[_] =>
+      throw new TableException("A string representation for array types is not 
supported yet.")
+
+    case _: MapTypeInfo[_, _] | _: MultisetTypeInfo[_] =>
+      throw new TableException("A string representation for map types is not 
supported yet.")
+
+    case any: TypeInformation[_] =>
+      s"${ANY.key}(${any.getTypeClass.getName}, ${serialize(any)})"
+  }
+
+  // 
----------------------------------------------------------------------------------------------
+
+  private def serialize(obj: Serializable): String = {
+    try {
+      val byteArray = InstantiationUtil.serializeObject(obj)
+      Base64.encodeBase64URLSafeString(byteArray)
+    } catch {
+      case e: Exception =>
+        throw new ValidationException(s"Unable to serialize type information 
'$obj' with " +
+          s"class '${obj.getClass.getName}'.", e)
+    }
+  }
+
+  private def deserialize(data: String): TypeInformation[_] = {
+    val byteData = Base64.decodeBase64(data)
+    InstantiationUtil
+      .deserializeObject[TypeInformation[_]](byteData, 
Thread.currentThread.getContextClassLoader)
+  }
+
+  private def throwError(msg: String, next: Input): Nothing = {
+    val improvedMsg = msg.replace("string matching regex `\\z'", "End of type 
information")
+
+    throw new ValidationException(
+      s"""Could not parse type information at column ${next.pos.column}: 
$improvedMsg
+        |${next.pos.longString}""".stripMargin)
+  }
+
+  private def loadClass(qualifiedName: String): Class[_] = try {
+    Class.forName(qualifiedName, true, 
Thread.currentThread.getContextClassLoader)
+  } catch {
+    case e: ClassNotFoundException =>
+      throw new ValidationException("Class '" + qualifiedName + "' could not 
be found. " +
+        "Please note that inner classes must be globally accessible and 
declared static.", e)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
 
b/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
new file mode 100644
index 0000000..1b2506e
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.sources.TestTableSourceFactory

Reply via email to