[FLINK-6543] [table] Deprecate toDataStream and add toAppendStream. This closes #3929.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3a65e5ac Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3a65e5ac Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3a65e5ac Branch: refs/heads/master Commit: 3a65e5acbcc29636b0ce1631815861089fc21dca Parents: d5310ed Author: twalthr <twal...@apache.org> Authored: Wed May 17 11:31:33 2017 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Thu May 18 21:22:12 2017 +0200 ---------------------------------------------------------------------- .../table/examples/scala/StreamSQLExample.scala | 2 +- .../examples/scala/StreamTableExample.scala | 2 +- .../flink/table/api/java/package-info.java | 5 +- .../table/api/java/StreamTableEnvironment.scala | 121 ++++++++++++++++++- .../api/scala/StreamTableEnvironment.scala | 55 ++++++++- .../table/api/scala/TableConversions.scala | 96 +++++++++++++-- .../apache/flink/table/api/scala/package.scala | 2 +- .../table/api/java/stream/sql/SqlITCase.java | 8 +- .../api/scala/stream/TableSourceITCase.scala | 4 +- .../api/scala/stream/sql/OverWindowITCase.scala | 32 ++--- .../table/api/scala/stream/sql/SqlITCase.scala | 20 +-- .../api/scala/stream/table/CalcITCase.scala | 24 ++-- .../table/GroupWindowAggregationsITCase.scala | 10 +- .../scala/stream/table/OverWindowITCase.scala | 10 +- .../api/scala/stream/table/UnionITCase.scala | 8 +- .../datastream/DataStreamAggregateITCase.scala | 12 +- .../datastream/DataStreamCalcITCase.scala | 4 +- .../DataStreamUserDefinedFunctionITCase.scala | 12 +- .../datastream/TimeAttributesITCase.scala | 12 +- 19 files changed, 339 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala index 2cdd8b8..665913e 100644 --- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala +++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala @@ -62,7 +62,7 @@ object StreamSQLExample { "SELECT * FROM OrderA WHERE amount > 2 UNION ALL " + "SELECT * FROM OrderB WHERE amount < 2") - result.toDataStream[Order].print() + result.toAppendStream[Order].print() env.execute() } http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala index 5c5012b..1c0ffea 100644 --- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala +++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala @@ -55,7 +55,7 @@ object StreamTableExample { val result: DataStream[Order] = orderA.unionAll(orderB) .select('user, 'product, 'amount) .where('amount > 2) - .toDataStream[Order] + .toAppendStream[Order] result.print() http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java index 2409872..3dbf50f 100644 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java +++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java @@ -57,8 +57,9 @@ * <p> * As seen above, a {@link org.apache.flink.table.api.Table} can be converted back to the * underlying API representation using - * {@link org.apache.flink.table.api.java.BatchTableEnvironment#toDataSet(Table, java.lang.Class)} - * or {@link org.apache.flink.table.api.java.StreamTableEnvironment#toDataStream(Table, java.lang.Class)}}. + * {@link org.apache.flink.table.api.java.BatchTableEnvironment#toDataSet(Table, java.lang.Class)}, + * {@link org.apache.flink.table.api.java.StreamTableEnvironment#toAppendStream(Table, java.lang.Class)}}, or + * {@link org.apache.flink.table.api.java.StreamTableEnvironment#toRetractStream(Table, java.lang.Class)}}. */ package org.apache.flink.table.api.java; http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala index 311986c..be94df9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala @@ -144,13 +144,122 @@ class StreamTableEnvironment( * types: Fields are mapped by position, field types must match. * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. * + * NOTE: This method only supports conversion of append-only tables. In order to make this + * more explicit in the future, please use [[toAppendStream()]] instead. + * If add and retract messages are required, use [[toRetractStream()]]. + * + * @param table The [[Table]] to convert. + * @param clazz The class of the type of the resulting [[DataStream]]. + * @tparam T The type of the resulting [[DataStream]]. + * @return The converted [[DataStream]]. + * @deprecated This method only supports conversion of append-only tables. In order to + * make this more explicit in the future, please use toAppendStream() instead. + */ + @Deprecated + def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = toAppendStream(table, clazz) + + /** + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. + * + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. + * + * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * types: Fields are mapped by position, field types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * NOTE: This method only supports conversion of append-only tables. In order to make this + * more explicit in the future, please use [[toAppendStream()]] instead. + * If add and retract messages are required, use [[toRetractStream()]]. + * + * @param table The [[Table]] to convert. + * @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]]. + * @tparam T The type of the resulting [[DataStream]]. + * @return The converted [[DataStream]]. + * @deprecated This method only supports conversion of append-only tables. In order to + * make this more explicit in the future, please use toAppendStream() instead. + */ + @Deprecated + def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = + toAppendStream(table, typeInfo) + + /** + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. + * + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. + * + * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * types: Fields are mapped by position, field types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * NOTE: This method only supports conversion of append-only tables. In order to make this + * more explicit in the future, please use [[toAppendStream()]] instead. + * If add and retract messages are required, use [[toRetractStream()]]. + * + * @param table The [[Table]] to convert. + * @param clazz The class of the type of the resulting [[DataStream]]. + * @param queryConfig The configuration of the query to generate. + * @tparam T The type of the resulting [[DataStream]]. + * @return The converted [[DataStream]]. + * @deprecated This method only supports conversion of append-only tables. In order to + * make this more explicit in the future, please use toAppendStream() instead. + */ + @Deprecated + def toDataStream[T]( + table: Table, + clazz: Class[T], + queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(table, clazz, queryConfig) + + /** + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. + * + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. + * + * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * types: Fields are mapped by position, field types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * NOTE: This method only supports conversion of append-only tables. In order to make this + * more explicit in the future, please use [[toAppendStream()]] instead. + * If add and retract messages are required, use [[toRetractStream()]]. + * + * @param table The [[Table]] to convert. + * @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]]. + * @param queryConfig The configuration of the query to generate. + * @tparam T The type of the resulting [[DataStream]]. + * @return The converted [[DataStream]]. + * @deprecated This method only supports conversion of append-only tables. In order to + * make this more explicit in the future, please use toAppendStream() instead. + */ + @Deprecated + def toDataStream[T]( + table: Table, + typeInfo: TypeInformation[T], + queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(table, typeInfo, queryConfig) + + /** + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. + * + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. + * + * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * types: Fields are mapped by position, field types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * * @param table The [[Table]] to convert. * @param clazz The class of the type of the resulting [[DataStream]]. * @tparam T The type of the resulting [[DataStream]]. * @return The converted [[DataStream]]. */ - def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = { - toDataStream(table, clazz, queryConfig) + def toAppendStream[T](table: Table, clazz: Class[T]): DataStream[T] = { + toAppendStream(table, clazz, queryConfig) } /** @@ -169,8 +278,8 @@ class StreamTableEnvironment( * @tparam T The type of the resulting [[DataStream]]. * @return The converted [[DataStream]]. */ - def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = { - toDataStream(table, typeInfo, queryConfig) + def toAppendStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = { + toAppendStream(table, typeInfo, queryConfig) } /** @@ -190,7 +299,7 @@ class StreamTableEnvironment( * @tparam T The type of the resulting [[DataStream]]. * @return The converted [[DataStream]]. */ - def toDataStream[T]( + def toAppendStream[T]( table: Table, clazz: Class[T], queryConfig: StreamQueryConfig): DataStream[T] = { @@ -216,7 +325,7 @@ class StreamTableEnvironment( * @tparam T The type of the resulting [[DataStream]]. * @return The converted [[DataStream]]. */ - def toDataStream[T]( + def toAppendStream[T]( table: Table, typeInfo: TypeInformation[T], queryConfig: StreamQueryConfig): DataStream[T] = { http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala index 8c6b273..bfd443a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala @@ -138,13 +138,17 @@ class StreamTableEnvironment( * types must match. * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. * + * NOTE: This method only supports conversion of append-only tables. In order to make this + * more explicit in the future, please use [[toAppendStream()]] instead. + * If add and retract messages are required, use [[toRetractStream()]]. + * * @param table The [[Table]] to convert. * @tparam T The type of the resulting [[DataStream]]. * @return The converted [[DataStream]]. */ - def toDataStream[T: TypeInformation](table: Table): DataStream[T] = { - toDataStream(table, queryConfig) - } + @deprecated("This method only supports conversion of append-only tables. In order to make this" + + " more explicit in the future, please use toAppendStream() instead.") + def toDataStream[T: TypeInformation](table: Table): DataStream[T] = toAppendStream(table) /** * Converts the given [[Table]] into an append [[DataStream]] of a specified type. @@ -157,13 +161,58 @@ class StreamTableEnvironment( * types must match. * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. * + * NOTE: This method only supports conversion of append-only tables. In order to make this + * more explicit in the future, please use [[toAppendStream()]] instead. + * If add and retract messages are required, use [[toRetractStream()]]. + * * @param table The [[Table]] to convert. * @param queryConfig The configuration of the query to generate. * @tparam T The type of the resulting [[DataStream]]. * @return The converted [[DataStream]]. */ + @deprecated("This method only supports conversion of append-only tables. In order to make this" + + " more explicit in the future, please use toAppendStream() instead.") def toDataStream[T: TypeInformation]( table: Table, + queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(table, queryConfig) + + /** + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. + * + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. + * + * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: + * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field + * types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * @param table The [[Table]] to convert. + * @tparam T The type of the resulting [[DataStream]]. + * @return The converted [[DataStream]]. + */ + def toAppendStream[T: TypeInformation](table: Table): DataStream[T] = { + toAppendStream(table, queryConfig) + } + + /** + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. + * + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. + * + * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: + * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field + * types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * @param table The [[Table]] to convert. + * @param queryConfig The configuration of the query to generate. + * @tparam T The type of the resulting [[DataStream]]. + * @return The converted [[DataStream]]. + */ + def toAppendStream[T: TypeInformation]( + table: Table, queryConfig: StreamQueryConfig): DataStream[T] = { val returnType = createTypeInformation[T] asScalaStream(translate( http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala index 9874a9e..bd431eb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala @@ -32,7 +32,17 @@ import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTa */ class TableConversions(table: Table) { - /** Converts the [[Table]] to a [[DataSet]] of the specified type. */ + /** + * Converts the given [[Table]] into a [[DataSet]] of a specified type. + * + * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows: + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * types: Fields are mapped by position, field types must match. + * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match. + * + * @tparam T The type of the resulting [[DataSet]]. + * @return The converted [[DataSet]]. + */ def toDataSet[T: TypeInformation]: DataSet[T] = { table.tableEnv match { @@ -44,12 +54,71 @@ class TableConversions(table: Table) { } } - /** Converts the [[Table]] to a [[DataStream]] of the specified type. */ - def toDataStream[T: TypeInformation]: DataStream[T] = { + /** + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. + * + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. + * + * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: + * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field + * types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * NOTE: This method only supports conversion of append-only tables. In order to make this + * more explicit in the future, please use [[toAppendStream()]] instead. + * If add and retract messages are required, use [[toRetractStream()]]. + * + * @tparam T The type of the resulting [[DataStream]]. + * @return The converted [[DataStream]]. + */ + @deprecated("This method only supports conversion of append-only tables. In order to make this" + + " more explicit in the future, please use toAppendStream() instead.") + def toDataStream[T: TypeInformation]: DataStream[T] = toAppendStream + + /** + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. + * + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. + * + * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: + * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field + * types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * NOTE: This method only supports conversion of append-only tables. In order to make this + * more explicit in the future, please use [[toAppendStream()]] instead. + * If add and retract messages are required, use [[toRetractStream()]]. + * + * @param queryConfig The configuration of the query to generate. + * @tparam T The type of the resulting [[DataStream]]. + * @return The converted [[DataStream]]. + */ + @deprecated("This method only supports conversion of append-only tables. In order to make this" + + " more explicit in the future, please use toAppendStream() instead.") + def toDataStream[T: TypeInformation](queryConfig: StreamQueryConfig): DataStream[T] = + toAppendStream(queryConfig) + + /** + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. + * + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. + * + * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: + * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field + * types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * @tparam T The type of the resulting [[DataStream]]. + * @return The converted [[DataStream]]. + */ + def toAppendStream[T: TypeInformation]: DataStream[T] = { table.tableEnv match { case tEnv: ScalaStreamTableEnv => - tEnv.toDataStream(table) + tEnv.toAppendStream(table) case _ => throw new TableException( "Only tables that originate from Scala DataStreams " + @@ -57,14 +126,25 @@ class TableConversions(table: Table) { } } - /** Converts the [[Table]] to a [[DataStream]] of the specified type. + /** + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. * - * @param queryConfig The configuration for the generated query. + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. + * + * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: + * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field + * types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * @param queryConfig The configuration of the query to generate. + * @tparam T The type of the resulting [[DataStream]]. + * @return The converted [[DataStream]]. */ - def toDataStream[T: TypeInformation](queryConfig: StreamQueryConfig): DataStream[T] = { + def toAppendStream[T: TypeInformation](queryConfig: StreamQueryConfig): DataStream[T] = { table.tableEnv match { case tEnv: ScalaStreamTableEnv => - tEnv.toDataStream(table, queryConfig) + tEnv.toAppendStream(table, queryConfig) case _ => throw new TableException( "Only tables that originate from Scala DataStreams " + http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala index e8a2017..9d15c14 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala @@ -87,7 +87,7 @@ package object scala extends ImplicitExpressionConversions { implicit def table2RowDataStream(table: Table): DataStream[Row] = { val tableEnv = table.tableEnv.asInstanceOf[ScalaStreamTableEnv] - tableEnv.toDataStream[Row](table) + tableEnv.toAppendStream[Row](table) } implicit def tableFunctionCall2Table[T](tf: TableFunction[T]): TableFunctionConversions[T] = { http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java index 0c0b37e..d827cd6 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java @@ -66,7 +66,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase { String sqlQuery = "SELECT a,c FROM MyTableRow"; Table result = tableEnv.sql(sqlQuery); - DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class); + DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class); resultSet.addSink(new StreamITCase.StringSink()); env.execute(); @@ -91,7 +91,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase { String sqlQuery = "SELECT * FROM MyTable"; Table result = tableEnv.sql(sqlQuery); - DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class); + DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class); resultSet.addSink(new StreamITCase.StringSink()); env.execute(); @@ -115,7 +115,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase { String sqlQuery = "SELECT a, b, e FROM MyTable WHERE c < 4"; Table result = tableEnv.sql(sqlQuery); - DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class); + DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class); resultSet.addSink(new StreamITCase.StringSink()); env.execute(); @@ -146,7 +146,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase { "(SELECT a, b, c FROM T2 WHERE a < 3)"; Table result = tableEnv.sql(sqlQuery); - DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class); + DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class); resultSet.addSink(new StreamITCase.StringSink()); env.execute(); http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala index 9298266..13ec2b4 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala @@ -46,7 +46,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase { tEnv.sql( "SELECT id, `first`, `last`, score FROM persons WHERE id < 4 ") - .toDataStream[Row] + .toAppendStream[Row] .addSink(new StreamITCase.StringSink) env.execute() @@ -71,7 +71,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase { tEnv.scan("csvTable") .where('id > 4) .select('last, 'score * 2) - .toDataStream[Row] + .toAppendStream[Row] .addSink(new StreamITCase.StringSink) env.execute() http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala index a7fe1c4..7ba5c16 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala @@ -67,7 +67,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " + "from T1" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() } @@ -92,7 +92,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS minC " + "FROM MyTable" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -134,7 +134,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { " MIN(c) OVER (" + " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS minC " + "FROM MyTable" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -177,7 +177,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " + "from T1" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -204,7 +204,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { " OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" + "from T1" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -234,7 +234,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " + "from T1" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -259,7 +259,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" + "from T1" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -321,7 +321,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { " BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW)" + " FROM T1" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -382,7 +382,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { " OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) " + "FROM T1" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -450,7 +450,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { " OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW) " + " FROM T1" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -511,7 +511,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { " SUM(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW) " + "FROM T1" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -572,7 +572,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { tEnv.registerTable("T1", t1) - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -638,7 +638,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { tEnv.registerTable("T1", t1) - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -700,7 +700,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { tEnv.registerTable("T1", t1) - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -761,7 +761,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { tEnv.registerTable("T1", t1) - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -836,7 +836,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { tEnv.registerTable("T1", t1) - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala index ba8c185..bdc1fcc 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala @@ -57,7 +57,7 @@ class SqlITCase extends StreamingWithStateTestBase { val t = ds.toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("MyTableRow", t) - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -99,7 +99,7 @@ class SqlITCase extends StreamingWithStateTestBase { val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("MyTable", t) - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -120,7 +120,7 @@ class SqlITCase extends StreamingWithStateTestBase { val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("MyTable", t) - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -141,7 +141,7 @@ class SqlITCase extends StreamingWithStateTestBase { val t = StreamTestData.getSmall3TupleDataStream(env) tEnv.registerDataStream("MyTable", t) - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -165,7 +165,7 @@ class SqlITCase extends StreamingWithStateTestBase { val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("T2", t2) - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -192,7 +192,7 @@ class SqlITCase extends StreamingWithStateTestBase { val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("T2", t2) - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -218,7 +218,7 @@ class SqlITCase extends StreamingWithStateTestBase { val t2 = StreamTestData.get3TupleDataStream(env) tEnv.registerDataStream("T2", t2, 'a, 'b, 'c) - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -243,7 +243,7 @@ class SqlITCase extends StreamingWithStateTestBase { val sqlQuery = "SELECT a, b, s FROM T, UNNEST(T.b) AS A (s)" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -275,7 +275,7 @@ class SqlITCase extends StreamingWithStateTestBase { val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) AS A (s)" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -305,7 +305,7 @@ class SqlITCase extends StreamingWithStateTestBase { val sqlQuery = "SELECT a, b, s, t FROM T, UNNEST(T.b) AS A (s, t) WHERE s > 13" - val result = tEnv.sql(sqlQuery).toDataStream[Row] + val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala index 1114cf0..b355cf0 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala @@ -41,7 +41,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { StreamITCase.testResults = mutable.MutableList() val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1, '_2, '_3) - val results = ds.toDataStream[Row] + val results = ds.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -60,7 +60,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { StreamITCase.testResults = mutable.MutableList() val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1) - val results = ds.toDataStream[Row] + val results = ds.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -79,7 +79,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { .select('_1 as 'a, '_2 as 'b, '_1 as 'c) .select('a, 'b) - val results = ds.toDataStream[Row] + val results = ds.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -99,7 +99,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) .select('a, 'b, 'c) - val results = ds.toDataStream[Row] + val results = ds.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -118,7 +118,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { StreamITCase.testResults = mutable.MutableList() val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd) - val results = ds.toDataStream[Row] + val results = ds.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -134,7 +134,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { StreamITCase.testResults = mutable.MutableList() val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'b) - val results = ds.toDataStream[Row] + val results = ds.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -151,7 +151,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { StreamITCase.testResults = mutable.MutableList() val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b as 'c, 'd) - val results = ds.toDataStream[Row] + val results = ds.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -171,7 +171,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) val filterDs = ds.filter('a === 3) - val results = filterDs.toDataStream[Row] + val results = filterDs.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -191,7 +191,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) val filterDs = ds.filter( Literal(false) ) - val results = filterDs.toDataStream[Row] + val results = filterDs.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -210,7 +210,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) val filterDs = ds.filter( Literal(true) ) - val results = filterDs.toDataStream[Row] + val results = filterDs.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -233,7 +233,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) val filterDs = ds.filter( 'a % 2 === 0 ) - val results = filterDs.toDataStream[Row] + val results = filterDs.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -257,7 +257,7 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) val filterDs = ds.filter( 'a % 2 !== 0) - val results = filterDs.toDataStream[Row] + val results = filterDs.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() val expected = mutable.MutableList( http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala index 81d3577..87a35bf 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala @@ -70,7 +70,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase { .select('string, countFun('int), 'int.avg, weightAvgFun('long, 'int), weightAvgFun('int, 'int)) - val results = windowedTable.toDataStream[Row](queryConfig) + val results = windowedTable.toAppendStream[Row](queryConfig) results.addSink(new StreamITCase.StringSink) env.execute() @@ -112,7 +112,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase { .select('string, countFun('int), 'int.avg, weightAvgFun('long, 'int), weightAvgFun('int, 'int)) - val results = windowedTable.toDataStream[Row] + val results = windowedTable.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -138,7 +138,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase { .select(countFun('string), 'int.avg, weightAvgFun('long, 'int), weightAvgFun('int, 'int)) - val results = windowedTable.toDataStream[Row](queryConfig) + val results = windowedTable.toAppendStream[Row](queryConfig) results.addSink(new StreamITCase.StringSink) env.execute() @@ -166,7 +166,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase { .select('string, countFun('string), 'int.avg, weightAvgFun('long, 'int), weightAvgFun('int, 'int), 'int.min, 'int.max, 'int.sum, 'w.start, 'w.end) - val results = windowedTable.toDataStream[Row] + val results = windowedTable.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -202,7 +202,7 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase { .groupBy('w, 'int2, 'int3, 'string) .select(weightAvgFun('long, 'int)) - val results = windowedTable.toDataStream[Row] + val results = windowedTable.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala index b097767..f396896 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala @@ -68,7 +68,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { .select('c, countFun('b) over 'w as 'mycount, weightAvgFun('a, 'b) over 'w as 'wAvg) .select('c, 'mycount, 'wAvg) - val results = windowedTable.toDataStream[Row] + val results = windowedTable.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -123,7 +123,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { 'b.min over 'w, weightAvgFun('b, 'a) over 'w) - val result = windowedTable.toDataStream[Row] + val result = windowedTable.toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -178,7 +178,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { val windowedTable = table .window(Over partitionBy 'a orderBy 'proctime preceding 4.rows following CURRENT_ROW as 'w) .select('a, 'c.sum over 'w, 'c.min over 'w) - val result = windowedTable.toDataStream[Row] + val result = windowedTable.toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -241,7 +241,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w) .select('c, 'a, 'a.count over 'w, 'a.sum over 'w) - val result = windowedTable.toDataStream[Row] + val result = windowedTable.toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -304,7 +304,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { Over partitionBy 'c orderBy 'rowtime preceding 1.seconds following CURRENT_RANGE as 'w) .select('c, 'b, 'a.count over 'w, 'a.sum over 'w) - val result = windowedTable.toDataStream[Row] + val result = windowedTable.toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala index f35ee76..2b496e3 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala @@ -43,7 +43,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase { val unionDs = ds1.unionAll(ds2).select('c) - val results = unionDs.toDataStream[Row] + val results = unionDs.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -63,7 +63,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase { val unionDs = ds1.unionAll(ds2.select('a, 'b, 'c)).filter('b < 2).select('c) - val results = unionDs.toDataStream[Row] + val results = unionDs.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -82,7 +82,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase { val unionDs = ds1.unionAll(ds2) - val results = unionDs.toDataStream[Row] + val results = unionDs.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -101,7 +101,7 @@ class UnionITCase extends StreamingMultipleProgramsTestBase { val unionDs = ds1.unionAll(ds2) - val results = unionDs.toDataStream[Row] + val results = unionDs.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala index 05e1892..3ac664d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala @@ -69,7 +69,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase { .groupBy('w) .select('int.count, 'w.start, 'w.end) - val results = windowedTable.toDataStream[Row] + val results = windowedTable.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -104,7 +104,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase { .groupBy('w, 'string) .select('string, 'int.count, 'w.start, 'w.end) - val results = windowedTable.toDataStream[Row] + val results = windowedTable.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -141,7 +141,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase { .groupBy('w, 'string) .select('string, 'int.count, 'w.start, 'w.end) - val results = windowedTable.toDataStream[Row] + val results = windowedTable.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -175,7 +175,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase { .groupBy('w, 'string) .select('string, 'int.count, 'w.start, 'w.end) - val results = windowedTable.toDataStream[Row] + val results = windowedTable.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -204,7 +204,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase { .groupBy('w, 'string) .select('string, 'int.count, 'w.start, 'w.end) - val results = windowedTable.toDataStream[Row] + val results = windowedTable.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -232,7 +232,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase { .groupBy('w, 'string) .select('string, 'int.count, 'w.start, 'w.end) - val results = windowedTable.toDataStream[Row] + val results = windowedTable.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() val expected = Seq( http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala index 1d48f2c..12d7202 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala @@ -48,7 +48,7 @@ class DataStreamCalcITCase extends StreamingMultipleProgramsTestBase { .where("RichFunc2(c)='ABC#Hello'") .select('c) - val results = result.toDataStream[Row] + val results = result.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -71,7 +71,7 @@ class DataStreamCalcITCase extends StreamingMultipleProgramsTestBase { .where("RichFunc2(c)='Abc#Hello' || RichFunc1(a)=3 && b=2") .select('c) - val results = result.toDataStream[Row] + val results = result.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala index b3d9c6f..9efe6a1 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala @@ -53,7 +53,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB .join(pojoFunc0('c)) .where('age > 20) .select('c, 'name, 'age) - .toDataStream[Row] + .toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -70,7 +70,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB val result = t .leftOuterJoin(func0('c) as('d, 'e)) .select('c, 'd, 'e) - .toDataStream[Row] + .toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -90,7 +90,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB .join(func0('c) as('d, 'e)) .where(Func18('d, "J")) .select('c, 'd, 'e) - .toDataStream[Row] + .toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() @@ -111,7 +111,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB .join(tableFunc1('c) as 's) .select('a, 's) - val results = result.toDataStream[Row] + val results = result.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -135,7 +135,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB .join(tableFunc1(richFunc2('c)) as 's) .select('a, 's) - val results = result.toDataStream[Row] + val results = result.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -164,7 +164,7 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB .select('c, 'd, 'e, 'f, 'g) .join(func32('c) as ('h, 'i)) .select('c, 'd, 'f, 'h, 'e, 'g, 'i) - .toDataStream[Row] + .toAppendStream[Row] result.addSink(new StreamITCase.StringSink) env.execute() http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala index 3f12218..f2d6175 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala @@ -103,7 +103,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { val t = table.select('rowtime.cast(Types.STRING)) - val results = t.toDataStream[Row] + val results = t.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -134,7 +134,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { .filter('rowtime.cast(Types.LONG) > 4) .select('rowtime, 'rowtime.floor(TimeIntervalUnit.DAY), 'rowtime.ceil(TimeIntervalUnit.DAY)) - val results = t.toDataStream[Row] + val results = t.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -161,7 +161,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { val t = table.join(func('rowtime, 'proctime) as 's).select('rowtime, 's) - val results = t.toDataStream[Row] + val results = t.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -191,7 +191,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { val t = table.unionAll(table).select('rowtime) - val results = t.toDataStream[Row] + val results = t.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -229,7 +229,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { val t = tEnv.sql("SELECT COUNT(`rowtime`) FROM MyTable " + "GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)") - val results = t.toDataStream[Row] + val results = t.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() @@ -262,7 +262,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { .groupBy('w2) .select('w2.rowtime, 'w2.end, 'int.count) - val results = t.toDataStream[Row] + val results = t.toAppendStream[Row] results.addSink(new StreamITCase.StringSink) env.execute()