[FLINK-6543] [table] Deprecate toDataStream and add toAppendStream.

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0246ce51
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0246ce51
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0246ce51

Branch: refs/heads/release-1.3
Commit: 0246ce51aaa12ce6ce4ac95898e014916a99ccd2
Parents: c794d6e
Author: twalthr <twal...@apache.org>
Authored: Wed May 17 11:31:33 2017 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Thu May 18 22:06:00 2017 +0200

----------------------------------------------------------------------
 .../table/examples/scala/StreamSQLExample.scala |   2 +-
 .../examples/scala/StreamTableExample.scala     |   2 +-
 .../flink/table/api/java/package-info.java      |   5 +-
 .../table/api/java/StreamTableEnvironment.scala | 121 ++++++++++++++++++-
 .../api/scala/StreamTableEnvironment.scala      |  55 ++++++++-
 .../table/api/scala/TableConversions.scala      |  96 +++++++++++++--
 .../apache/flink/table/api/scala/package.scala  |   2 +-
 .../table/api/java/stream/sql/SqlITCase.java    |   8 +-
 .../api/scala/stream/TableSourceITCase.scala    |   4 +-
 .../api/scala/stream/sql/OverWindowITCase.scala |  32 ++---
 .../table/api/scala/stream/sql/SqlITCase.scala  |  20 +--
 .../api/scala/stream/table/CalcITCase.scala     |  24 ++--
 .../table/GroupWindowAggregationsITCase.scala   |  10 +-
 .../scala/stream/table/OverWindowITCase.scala   |  10 +-
 .../api/scala/stream/table/UnionITCase.scala    |   8 +-
 .../datastream/DataStreamAggregateITCase.scala  |  12 +-
 .../datastream/DataStreamCalcITCase.scala       |   4 +-
 .../DataStreamUserDefinedFunctionITCase.scala   |  12 +-
 .../datastream/TimeAttributesITCase.scala       |  12 +-
 19 files changed, 339 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0246ce51/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
 
b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
index 2cdd8b8..665913e 100644
--- 
a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
+++ 
b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
@@ -62,7 +62,7 @@ object StreamSQLExample {
       "SELECT * FROM OrderA WHERE amount > 2 UNION ALL " +
         "SELECT * FROM OrderB WHERE amount < 2")
 
-    result.toDataStream[Order].print()
+    result.toAppendStream[Order].print()
 
     env.execute()
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/0246ce51/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala
 
b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala
index 5c5012b..1c0ffea 100644
--- 
a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala
+++ 
b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala
@@ -55,7 +55,7 @@ object StreamTableExample {
     val result: DataStream[Order] = orderA.unionAll(orderB)
       .select('user, 'product, 'amount)
       .where('amount > 2)
-      .toDataStream[Order]
+      .toAppendStream[Order]
 
     result.print()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0246ce51/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java
index 2409872..3dbf50f 100644
--- 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java
+++ 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java
@@ -57,8 +57,9 @@
  * <p>
  * As seen above, a {@link org.apache.flink.table.api.Table} can be converted 
back to the
  * underlying API representation using
- * {@link 
org.apache.flink.table.api.java.BatchTableEnvironment#toDataSet(Table, 
java.lang.Class)}
- * or {@link 
org.apache.flink.table.api.java.StreamTableEnvironment#toDataStream(Table, 
java.lang.Class)}}.
+ * {@link 
org.apache.flink.table.api.java.BatchTableEnvironment#toDataSet(Table, 
java.lang.Class)},
+ * {@link 
org.apache.flink.table.api.java.StreamTableEnvironment#toAppendStream(Table, 
java.lang.Class)}}, or
+ * {@link 
org.apache.flink.table.api.java.StreamTableEnvironment#toRetractStream(Table, 
java.lang.Class)}}.
  */
 package org.apache.flink.table.api.java;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0246ce51/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
index 311986c..be94df9 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
@@ -144,13 +144,122 @@ class StreamTableEnvironment(
     * types: Fields are mapped by position, field types must match.
     * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
     *
+    * NOTE: This method only supports conversion of append-only tables. In 
order to make this
+    * more explicit in the future, please use [[toAppendStream()]] instead.
+    * If add and retract messages are required, use [[toRetractStream()]].
+    *
+    * @param table The [[Table]] to convert.
+    * @param clazz The class of the type of the resulting [[DataStream]].
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    * @deprecated This method only supports conversion of append-only tables. 
In order to
+    *            make this more explicit in the future, please use 
toAppendStream() instead.
+    */
+  @Deprecated
+  def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = 
toAppendStream(table, clazz)
+
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] 
is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
+    * - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+    *
+    * NOTE: This method only supports conversion of append-only tables. In 
order to make this
+    * more explicit in the future, please use [[toAppendStream()]] instead.
+    * If add and retract messages are required, use [[toRetractStream()]].
+    *
+    * @param table The [[Table]] to convert.
+    * @param typeInfo The [[TypeInformation]] that specifies the type of the 
[[DataStream]].
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    * @deprecated This method only supports conversion of append-only tables. 
In order to
+    *            make this more explicit in the future, please use 
toAppendStream() instead.
+    */
+  @Deprecated
+  def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): 
DataStream[T] =
+    toAppendStream(table, typeInfo)
+
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] 
is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
+    * - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+    *
+    * NOTE: This method only supports conversion of append-only tables. In 
order to make this
+    * more explicit in the future, please use [[toAppendStream()]] instead.
+    * If add and retract messages are required, use [[toRetractStream()]].
+    *
+    * @param table The [[Table]] to convert.
+    * @param clazz The class of the type of the resulting [[DataStream]].
+    * @param queryConfig The configuration of the query to generate.
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    * @deprecated This method only supports conversion of append-only tables. 
In order to
+    *            make this more explicit in the future, please use 
toAppendStream() instead.
+    */
+  @Deprecated
+  def toDataStream[T](
+      table: Table,
+      clazz: Class[T],
+      queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(table, 
clazz, queryConfig)
+
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] 
is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
+    * - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+    *
+    * NOTE: This method only supports conversion of append-only tables. In 
order to make this
+    * more explicit in the future, please use [[toAppendStream()]] instead.
+    * If add and retract messages are required, use [[toRetractStream()]].
+    *
+    * @param table The [[Table]] to convert.
+    * @param typeInfo The [[TypeInformation]] that specifies the type of the 
[[DataStream]].
+    * @param queryConfig The configuration of the query to generate.
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    * @deprecated This method only supports conversion of append-only tables. 
In order to
+    *            make this more explicit in the future, please use 
toAppendStream() instead.
+    */
+  @Deprecated
+  def toDataStream[T](
+      table: Table,
+      typeInfo: TypeInformation[T],
+      queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(table, 
typeInfo, queryConfig)
+
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] 
is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
+    * - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+    *
     * @param table The [[Table]] to convert.
     * @param clazz The class of the type of the resulting [[DataStream]].
     * @tparam T The type of the resulting [[DataStream]].
     * @return The converted [[DataStream]].
     */
-  def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = {
-    toDataStream(table, clazz, queryConfig)
+  def toAppendStream[T](table: Table, clazz: Class[T]): DataStream[T] = {
+    toAppendStream(table, clazz, queryConfig)
   }
 
   /**
@@ -169,8 +278,8 @@ class StreamTableEnvironment(
     * @tparam T The type of the resulting [[DataStream]].
     * @return The converted [[DataStream]].
     */
-  def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): 
DataStream[T] = {
-    toDataStream(table, typeInfo, queryConfig)
+  def toAppendStream[T](table: Table, typeInfo: TypeInformation[T]): 
DataStream[T] = {
+    toAppendStream(table, typeInfo, queryConfig)
   }
 
   /**
@@ -190,7 +299,7 @@ class StreamTableEnvironment(
     * @tparam T The type of the resulting [[DataStream]].
     * @return The converted [[DataStream]].
     */
-  def toDataStream[T](
+  def toAppendStream[T](
       table: Table,
       clazz: Class[T],
       queryConfig: StreamQueryConfig): DataStream[T] = {
@@ -216,7 +325,7 @@ class StreamTableEnvironment(
     * @tparam T The type of the resulting [[DataStream]].
     * @return The converted [[DataStream]].
     */
-  def toDataStream[T](
+  def toAppendStream[T](
       table: Table,
       typeInfo: TypeInformation[T],
       queryConfig: StreamQueryConfig): DataStream[T] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/0246ce51/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
index 8c6b273..bfd443a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
@@ -138,13 +138,17 @@ class StreamTableEnvironment(
     * types must match.
     * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
     *
+    * NOTE: This method only supports conversion of append-only tables. In 
order to make this
+    * more explicit in the future, please use [[toAppendStream()]] instead.
+    * If add and retract messages are required, use [[toRetractStream()]].
+    *
     * @param table The [[Table]] to convert.
     * @tparam T The type of the resulting [[DataStream]].
     * @return The converted [[DataStream]].
     */
-  def toDataStream[T: TypeInformation](table: Table): DataStream[T] = {
-    toDataStream(table, queryConfig)
-  }
+  @deprecated("This method only supports conversion of append-only tables. In 
order to make this" +
+    " more explicit in the future, please use toAppendStream() instead.")
+  def toDataStream[T: TypeInformation](table: Table): DataStream[T] = 
toAppendStream(table)
 
   /**
     * Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
@@ -157,13 +161,58 @@ class StreamTableEnvironment(
     * types must match.
     * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
     *
+    * NOTE: This method only supports conversion of append-only tables. In 
order to make this
+    * more explicit in the future, please use [[toAppendStream()]] instead.
+    * If add and retract messages are required, use [[toRetractStream()]].
+    *
     * @param table The [[Table]] to convert.
     * @param queryConfig The configuration of the query to generate.
     * @tparam T The type of the resulting [[DataStream]].
     * @return The converted [[DataStream]].
     */
+  @deprecated("This method only supports conversion of append-only tables. In 
order to make this" +
+    " more explicit in the future, please use toAppendStream() instead.")
   def toDataStream[T: TypeInformation](
     table: Table,
+    queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(table, 
queryConfig)
+
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] 
is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
+    * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are 
mapped by position, field
+    * types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    */
+  def toAppendStream[T: TypeInformation](table: Table): DataStream[T] = {
+    toAppendStream(table, queryConfig)
+  }
+
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] 
is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
+    * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are 
mapped by position, field
+    * types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @param queryConfig The configuration of the query to generate.
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    */
+  def toAppendStream[T: TypeInformation](
+    table: Table,
     queryConfig: StreamQueryConfig): DataStream[T] = {
     val returnType = createTypeInformation[T]
     asScalaStream(translate(

http://git-wip-us.apache.org/repos/asf/flink/blob/0246ce51/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
index 9874a9e..bd431eb 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
@@ -32,7 +32,17 @@ import 
org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTa
   */
 class TableConversions(table: Table) {
 
-  /** Converts the [[Table]] to a [[DataSet]] of the specified type. */
+  /**
+    * Converts the given [[Table]] into a [[DataSet]] of a specified type.
+    *
+    * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
+    * - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataSet]] types: Fields are mapped by field name, field types 
must match.
+    *
+    * @tparam T The type of the resulting [[DataSet]].
+    * @return The converted [[DataSet]].
+    */
   def toDataSet[T: TypeInformation]: DataSet[T] = {
 
     table.tableEnv match {
@@ -44,12 +54,71 @@ class TableConversions(table: Table) {
     }
   }
 
-  /** Converts the [[Table]] to a [[DataStream]] of the specified type. */
-  def toDataStream[T: TypeInformation]: DataStream[T] = {
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] 
is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
+    * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are 
mapped by position, field
+    * types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+    *
+    * NOTE: This method only supports conversion of append-only tables. In 
order to make this
+    * more explicit in the future, please use [[toAppendStream()]] instead.
+    * If add and retract messages are required, use [[toRetractStream()]].
+    *
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    */
+  @deprecated("This method only supports conversion of append-only tables. In 
order to make this" +
+    " more explicit in the future, please use toAppendStream() instead.")
+  def toDataStream[T: TypeInformation]: DataStream[T] = toAppendStream
+
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] 
is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
+    * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are 
mapped by position, field
+    * types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+    *
+    * NOTE: This method only supports conversion of append-only tables. In 
order to make this
+    * more explicit in the future, please use [[toAppendStream()]] instead.
+    * If add and retract messages are required, use [[toRetractStream()]].
+    *
+    * @param queryConfig The configuration of the query to generate.
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    */
+  @deprecated("This method only supports conversion of append-only tables. In 
order to make this" +
+    " more explicit in the future, please use toAppendStream() instead.")
+  def toDataStream[T: TypeInformation](queryConfig: StreamQueryConfig): 
DataStream[T] =
+    toAppendStream(queryConfig)
+
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] 
is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
+    * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are 
mapped by position, field
+    * types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+    *
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    */
+  def toAppendStream[T: TypeInformation]: DataStream[T] = {
 
     table.tableEnv match {
       case tEnv: ScalaStreamTableEnv =>
-        tEnv.toDataStream(table)
+        tEnv.toAppendStream(table)
       case _ =>
         throw new TableException(
           "Only tables that originate from Scala DataStreams " +
@@ -57,14 +126,25 @@ class TableConversions(table: Table) {
     }
   }
 
-  /** Converts the [[Table]] to a [[DataStream]] of the specified type.
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
     *
-    * @param queryConfig The configuration for the generated query.
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] 
is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
+    * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are 
mapped by position, field
+    * types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+    *
+    * @param queryConfig The configuration of the query to generate.
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
     */
-  def toDataStream[T: TypeInformation](queryConfig: StreamQueryConfig): 
DataStream[T] = {
+  def toAppendStream[T: TypeInformation](queryConfig: StreamQueryConfig): 
DataStream[T] = {
     table.tableEnv match {
       case tEnv: ScalaStreamTableEnv =>
-        tEnv.toDataStream(table, queryConfig)
+        tEnv.toAppendStream(table, queryConfig)
       case _ =>
         throw new TableException(
           "Only tables that originate from Scala DataStreams " +

http://git-wip-us.apache.org/repos/asf/flink/blob/0246ce51/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
index e8a2017..9d15c14 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
@@ -87,7 +87,7 @@ package object scala extends ImplicitExpressionConversions {
 
   implicit def table2RowDataStream(table: Table): DataStream[Row] = {
     val tableEnv = table.tableEnv.asInstanceOf[ScalaStreamTableEnv]
-    tableEnv.toDataStream[Row](table)
+    tableEnv.toAppendStream[Row](table)
   }
 
   implicit def tableFunctionCall2Table[T](tf: TableFunction[T]): 
TableFunctionConversions[T] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/0246ce51/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
index 0c0b37e..d827cd6 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
@@ -66,7 +66,7 @@ public class SqlITCase extends 
StreamingMultipleProgramsTestBase {
                String sqlQuery = "SELECT a,c FROM MyTableRow";
                Table result = tableEnv.sql(sqlQuery);
 
-               DataStream<Row> resultSet = tableEnv.toDataStream(result, 
Row.class);
+               DataStream<Row> resultSet = tableEnv.toAppendStream(result, 
Row.class);
                resultSet.addSink(new StreamITCase.StringSink());
                env.execute();
 
@@ -91,7 +91,7 @@ public class SqlITCase extends 
StreamingMultipleProgramsTestBase {
                String sqlQuery = "SELECT * FROM MyTable";
                Table result = tableEnv.sql(sqlQuery);
 
-               DataStream<Row> resultSet = tableEnv.toDataStream(result, 
Row.class);
+               DataStream<Row> resultSet = tableEnv.toAppendStream(result, 
Row.class);
                resultSet.addSink(new StreamITCase.StringSink());
                env.execute();
 
@@ -115,7 +115,7 @@ public class SqlITCase extends 
StreamingMultipleProgramsTestBase {
                String sqlQuery = "SELECT a, b, e FROM MyTable WHERE c < 4";
                Table result = tableEnv.sql(sqlQuery);
 
-               DataStream<Row> resultSet = tableEnv.toDataStream(result, 
Row.class);
+               DataStream<Row> resultSet = tableEnv.toAppendStream(result, 
Row.class);
                resultSet.addSink(new StreamITCase.StringSink());
                env.execute();
 
@@ -146,7 +146,7 @@ public class SqlITCase extends 
StreamingMultipleProgramsTestBase {
                                                        "(SELECT a, b, c FROM 
T2 WHERE a        < 3)";
                Table result = tableEnv.sql(sqlQuery);
 
-               DataStream<Row> resultSet = tableEnv.toDataStream(result, 
Row.class);
+               DataStream<Row> resultSet = tableEnv.toAppendStream(result, 
Row.class);
                resultSet.addSink(new StreamITCase.StringSink());
                env.execute();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0246ce51/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
index 9298266..13ec2b4 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
@@ -46,7 +46,7 @@ class TableSourceITCase extends 
StreamingMultipleProgramsTestBase {
 
     tEnv.sql(
       "SELECT id, `first`, `last`, score FROM persons WHERE id < 4 ")
-      .toDataStream[Row]
+      .toAppendStream[Row]
       .addSink(new StreamITCase.StringSink)
 
     env.execute()
@@ -71,7 +71,7 @@ class TableSourceITCase extends 
StreamingMultipleProgramsTestBase {
     tEnv.scan("csvTable")
       .where('id > 4)
       .select('last, 'score * 2)
-      .toDataStream[Row]
+      .toAppendStream[Row]
       .addSink(new StreamITCase.StringSink)
 
     env.execute()

http://git-wip-us.apache.org/repos/asf/flink/blob/0246ce51/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
index a7fe1c4..7ba5c16 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
@@ -67,7 +67,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED 
preceding) as cnt2 " +
       "from T1"
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
   }
@@ -92,7 +92,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND 
CURRENT ROW) AS minC " +
       "FROM MyTable"
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -134,7 +134,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "  MIN(c) OVER (" +
       "    ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS 
minC " +
       "FROM MyTable"
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -177,7 +177,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED 
preceding) as cnt2 " +
       "from T1"
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -204,7 +204,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       " OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED 
preceding AND CURRENT ROW)" +
       "from T1"
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -234,7 +234,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
       "from T1"
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -259,7 +259,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND 
CURRENT ROW)" +
       "from T1"
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -321,7 +321,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW)" +
       " FROM T1"
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -382,7 +382,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "    OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND 
CURRENT ROW) " +
       "FROM T1"
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -450,7 +450,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "    OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING 
AND CURRENT ROW) " +
       " FROM T1"
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -511,7 +511,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "  SUM(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT 
ROW) " +
       "FROM T1"
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -572,7 +572,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
     tEnv.registerTable("T1", t1)
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -638,7 +638,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
     tEnv.registerTable("T1", t1)
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -700,7 +700,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
     tEnv.registerTable("T1", t1)
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -761,7 +761,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
     tEnv.registerTable("T1", t1)
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -836,7 +836,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
     tEnv.registerTable("T1", t1)
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0246ce51/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index ba8c185..bdc1fcc 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -57,7 +57,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t = ds.toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTableRow", t)
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -99,7 +99,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 
'b, 'c)
     tEnv.registerTable("MyTable", t)
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -120,7 +120,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 
'b, 'c)
     tEnv.registerTable("MyTable", t)
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -141,7 +141,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t = StreamTestData.getSmall3TupleDataStream(env)
     tEnv.registerDataStream("MyTable", t)
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -165,7 +165,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 
'b, 'c)
     tEnv.registerTable("T2", t2)
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -192,7 +192,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 
'b, 'c)
     tEnv.registerTable("T2", t2)
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -218,7 +218,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t2 = StreamTestData.get3TupleDataStream(env)
     tEnv.registerDataStream("T2", t2, 'a, 'b, 'c)
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -243,7 +243,7 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val sqlQuery = "SELECT a, b, s FROM T, UNNEST(T.b) AS A (s)"
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -275,7 +275,7 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) AS A (s)"
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -305,7 +305,7 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val sqlQuery = "SELECT a, b, s, t FROM T, UNNEST(T.b) AS A (s, t) WHERE s 
> 13"
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0246ce51/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
index 1114cf0..b355cf0 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
@@ -41,7 +41,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     StreamITCase.testResults = mutable.MutableList()
     val ds = 
StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1, '_2, '_3)
 
-    val results = ds.toDataStream[Row]
+    val results = ds.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -60,7 +60,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     StreamITCase.testResults = mutable.MutableList()
     val ds = 
StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1)
 
-    val results = ds.toDataStream[Row]
+    val results = ds.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -79,7 +79,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
       .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
       .select('a, 'b)
 
-    val results = ds.toDataStream[Row]
+    val results = ds.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -99,7 +99,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 
'b, 'c)
       .select('a, 'b, 'c)
 
-    val results = ds.toDataStream[Row]
+    val results = ds.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -118,7 +118,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     StreamITCase.testResults = mutable.MutableList()
     val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 
'd)
 
-    val results = ds.toDataStream[Row]
+    val results = ds.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -134,7 +134,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     StreamITCase.testResults = mutable.MutableList()
     val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'b)
 
-    val results = ds.toDataStream[Row]
+    val results = ds.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -151,7 +151,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     StreamITCase.testResults = mutable.MutableList()
     val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b as 
'c, 'd)
 
-    val results = ds.toDataStream[Row]
+    val results = ds.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -171,7 +171,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 
'b, 'c)
 
     val filterDs = ds.filter('a === 3)
-    val results = filterDs.toDataStream[Row]
+    val results = filterDs.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -191,7 +191,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 
'b, 'c)
 
     val filterDs = ds.filter( Literal(false) )
-    val results = filterDs.toDataStream[Row]
+    val results = filterDs.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -210,7 +210,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 
'b, 'c)
 
     val filterDs = ds.filter( Literal(true) )
-    val results = filterDs.toDataStream[Row]
+    val results = filterDs.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -233,7 +233,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
 
     val filterDs = ds.filter( 'a % 2 === 0 )
-    val results = filterDs.toDataStream[Row]
+    val results = filterDs.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -257,7 +257,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
     val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
 
     val filterDs = ds.filter( 'a % 2 !== 0)
-    val results = filterDs.toDataStream[Row]
+    val results = filterDs.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
     val expected = mutable.MutableList(

http://git-wip-us.apache.org/repos/asf/flink/blob/0246ce51/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
index 81d3577..87a35bf 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
@@ -70,7 +70,7 @@ class GroupWindowAggregationsITCase extends 
StreamingMultipleProgramsTestBase {
       .select('string, countFun('int), 'int.avg,
               weightAvgFun('long, 'int), weightAvgFun('int, 'int))
 
-    val results = windowedTable.toDataStream[Row](queryConfig)
+    val results = windowedTable.toAppendStream[Row](queryConfig)
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -112,7 +112,7 @@ class GroupWindowAggregationsITCase extends 
StreamingMultipleProgramsTestBase {
       .select('string, countFun('int), 'int.avg,
               weightAvgFun('long, 'int), weightAvgFun('int, 'int))
 
-    val results = windowedTable.toDataStream[Row]
+    val results = windowedTable.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -138,7 +138,7 @@ class GroupWindowAggregationsITCase extends 
StreamingMultipleProgramsTestBase {
       .select(countFun('string), 'int.avg,
               weightAvgFun('long, 'int), weightAvgFun('int, 'int))
 
-    val results = windowedTable.toDataStream[Row](queryConfig)
+    val results = windowedTable.toAppendStream[Row](queryConfig)
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -166,7 +166,7 @@ class GroupWindowAggregationsITCase extends 
StreamingMultipleProgramsTestBase {
       .select('string, countFun('string), 'int.avg, weightAvgFun('long, 'int),
               weightAvgFun('int, 'int), 'int.min, 'int.max, 'int.sum, 
'w.start, 'w.end)
 
-    val results = windowedTable.toDataStream[Row]
+    val results = windowedTable.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -202,7 +202,7 @@ class GroupWindowAggregationsITCase extends 
StreamingMultipleProgramsTestBase {
       .groupBy('w, 'int2, 'int3, 'string)
       .select(weightAvgFun('long, 'int))
 
-    val results = windowedTable.toDataStream[Row]
+    val results = windowedTable.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0246ce51/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
index b097767..f396896 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
@@ -68,7 +68,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       .select('c, countFun('b) over 'w as 'mycount, weightAvgFun('a, 'b) over 
'w as 'wAvg)
       .select('c, 'mycount, 'wAvg)
 
-    val results = windowedTable.toDataStream[Row]
+    val results = windowedTable.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -123,7 +123,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
         'b.min over 'w,
         weightAvgFun('b, 'a) over 'w)
 
-    val result = windowedTable.toDataStream[Row]
+    val result = windowedTable.toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -178,7 +178,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
     val windowedTable = table
       .window(Over partitionBy 'a orderBy 'proctime preceding 4.rows following 
CURRENT_ROW as 'w)
       .select('a, 'c.sum over 'w, 'c.min over 'w)
-    val result = windowedTable.toDataStream[Row]
+    val result = windowedTable.toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -241,7 +241,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following 
CURRENT_ROW as 'w)
       .select('c, 'a, 'a.count over 'w, 'a.sum over 'w)
 
-    val result = windowedTable.toDataStream[Row]
+    val result = windowedTable.toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -304,7 +304,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
         Over partitionBy 'c orderBy 'rowtime preceding 1.seconds following 
CURRENT_RANGE as 'w)
       .select('c, 'b, 'a.count over 'w, 'a.sum over 'w)
 
-    val result = windowedTable.toDataStream[Row]
+    val result = windowedTable.toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0246ce51/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
index f35ee76..2b496e3 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
@@ -43,7 +43,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
 
     val unionDs = ds1.unionAll(ds2).select('c)
 
-    val results = unionDs.toDataStream[Row]
+    val results = unionDs.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -63,7 +63,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
 
     val unionDs = ds1.unionAll(ds2.select('a, 'b, 'c)).filter('b < 
2).select('c)
 
-    val results = unionDs.toDataStream[Row]
+    val results = unionDs.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -82,7 +82,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase {
 
     val unionDs = ds1.unionAll(ds2)
 
-    val results = unionDs.toDataStream[Row]
+    val results = unionDs.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -101,7 +101,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase 
{
 
     val unionDs = ds1.unionAll(ds2)
 
-    val results = unionDs.toDataStream[Row]
+    val results = unionDs.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0246ce51/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
index 05e1892..3ac664d 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
@@ -69,7 +69,7 @@ class DataStreamAggregateITCase extends 
StreamingMultipleProgramsTestBase {
       .groupBy('w)
       .select('int.count, 'w.start, 'w.end)
 
-    val results = windowedTable.toDataStream[Row]
+    val results = windowedTable.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -104,7 +104,7 @@ class DataStreamAggregateITCase extends 
StreamingMultipleProgramsTestBase {
       .groupBy('w, 'string)
       .select('string, 'int.count, 'w.start, 'w.end)
 
-    val results = windowedTable.toDataStream[Row]
+    val results = windowedTable.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -141,7 +141,7 @@ class DataStreamAggregateITCase extends 
StreamingMultipleProgramsTestBase {
       .groupBy('w, 'string)
       .select('string, 'int.count, 'w.start, 'w.end)
 
-    val results = windowedTable.toDataStream[Row]
+    val results = windowedTable.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -175,7 +175,7 @@ class DataStreamAggregateITCase extends 
StreamingMultipleProgramsTestBase {
       .groupBy('w, 'string)
       .select('string, 'int.count, 'w.start, 'w.end)
 
-    val results = windowedTable.toDataStream[Row]
+    val results = windowedTable.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -204,7 +204,7 @@ class DataStreamAggregateITCase extends 
StreamingMultipleProgramsTestBase {
       .groupBy('w, 'string)
       .select('string, 'int.count, 'w.start, 'w.end)
 
-    val results = windowedTable.toDataStream[Row]
+    val results = windowedTable.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -232,7 +232,7 @@ class DataStreamAggregateITCase extends 
StreamingMultipleProgramsTestBase {
       .groupBy('w, 'string)
       .select('string, 'int.count, 'w.start, 'w.end)
 
-    val results = windowedTable.toDataStream[Row]
+    val results = windowedTable.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
     val expected = Seq(

http://git-wip-us.apache.org/repos/asf/flink/blob/0246ce51/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
index 1d48f2c..12d7202 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
@@ -48,7 +48,7 @@ class DataStreamCalcITCase extends 
StreamingMultipleProgramsTestBase {
       .where("RichFunc2(c)='ABC#Hello'")
       .select('c)
 
-    val results = result.toDataStream[Row]
+    val results = result.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -71,7 +71,7 @@ class DataStreamCalcITCase extends 
StreamingMultipleProgramsTestBase {
       .where("RichFunc2(c)='Abc#Hello' || RichFunc1(a)=3 && b=2")
       .select('c)
 
-    val results = result.toDataStream[Row]
+    val results = result.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0246ce51/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
index b3d9c6f..9efe6a1 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
@@ -53,7 +53,7 @@ class DataStreamUserDefinedFunctionITCase extends 
StreamingMultipleProgramsTestB
       .join(pojoFunc0('c))
       .where('age > 20)
       .select('c, 'name, 'age)
-      .toDataStream[Row]
+      .toAppendStream[Row]
 
     result.addSink(new StreamITCase.StringSink)
     env.execute()
@@ -70,7 +70,7 @@ class DataStreamUserDefinedFunctionITCase extends 
StreamingMultipleProgramsTestB
     val result = t
       .leftOuterJoin(func0('c) as('d, 'e))
       .select('c, 'd, 'e)
-      .toDataStream[Row]
+      .toAppendStream[Row]
 
     result.addSink(new StreamITCase.StringSink)
     env.execute()
@@ -90,7 +90,7 @@ class DataStreamUserDefinedFunctionITCase extends 
StreamingMultipleProgramsTestB
       .join(func0('c) as('d, 'e))
       .where(Func18('d, "J"))
       .select('c, 'd, 'e)
-      .toDataStream[Row]
+      .toAppendStream[Row]
 
     result.addSink(new StreamITCase.StringSink)
     env.execute()
@@ -111,7 +111,7 @@ class DataStreamUserDefinedFunctionITCase extends 
StreamingMultipleProgramsTestB
       .join(tableFunc1('c) as 's)
       .select('a, 's)
 
-    val results = result.toDataStream[Row]
+    val results = result.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -135,7 +135,7 @@ class DataStreamUserDefinedFunctionITCase extends 
StreamingMultipleProgramsTestB
       .join(tableFunc1(richFunc2('c)) as 's)
       .select('a, 's)
 
-    val results = result.toDataStream[Row]
+    val results = result.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -164,7 +164,7 @@ class DataStreamUserDefinedFunctionITCase extends 
StreamingMultipleProgramsTestB
       .select('c, 'd, 'e, 'f, 'g)
       .join(func32('c) as ('h, 'i))
       .select('c, 'd, 'f, 'h, 'e, 'g, 'i)
-      .toDataStream[Row]
+      .toAppendStream[Row]
 
     result.addSink(new StreamITCase.StringSink)
     env.execute()

http://git-wip-us.apache.org/repos/asf/flink/blob/0246ce51/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
index 3f12218..f2d6175 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
@@ -103,7 +103,7 @@ class TimeAttributesITCase extends 
StreamingMultipleProgramsTestBase {
 
     val t = table.select('rowtime.cast(Types.STRING))
 
-    val results = t.toDataStream[Row]
+    val results = t.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -134,7 +134,7 @@ class TimeAttributesITCase extends 
StreamingMultipleProgramsTestBase {
       .filter('rowtime.cast(Types.LONG) > 4)
       .select('rowtime, 'rowtime.floor(TimeIntervalUnit.DAY), 
'rowtime.ceil(TimeIntervalUnit.DAY))
 
-    val results = t.toDataStream[Row]
+    val results = t.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -161,7 +161,7 @@ class TimeAttributesITCase extends 
StreamingMultipleProgramsTestBase {
 
     val t = table.join(func('rowtime, 'proctime) as 's).select('rowtime, 's)
 
-    val results = t.toDataStream[Row]
+    val results = t.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -191,7 +191,7 @@ class TimeAttributesITCase extends 
StreamingMultipleProgramsTestBase {
 
     val t = table.unionAll(table).select('rowtime)
 
-    val results = t.toDataStream[Row]
+    val results = t.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -229,7 +229,7 @@ class TimeAttributesITCase extends 
StreamingMultipleProgramsTestBase {
     val t = tEnv.sql("SELECT COUNT(`rowtime`) FROM MyTable " +
       "GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)")
 
-    val results = t.toDataStream[Row]
+    val results = t.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
@@ -262,7 +262,7 @@ class TimeAttributesITCase extends 
StreamingMultipleProgramsTestBase {
       .groupBy('w2)
       .select('w2.rowtime, 'w2.end, 'int.count)
 
-    val results = t.toDataStream[Row]
+    val results = t.toAppendStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 

Reply via email to