Repository: flink Updated Branches: refs/heads/master 5dab9345c -> 4d27f8f2d
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala index 7ff2340..16f1608 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala @@ -19,7 +19,7 @@ package org.apache.flink.api.table.runtime.aggregate import java.math.BigDecimal import org.apache.flink.api.common.typeinfo.BasicTypeInfo -import org.apache.flink.api.table.Row +import org.apache.flink.types.Row abstract class SumAggregate[T: Numeric] extends Aggregate[T] { @@ -32,9 +32,9 @@ abstract class SumAggregate[T: Numeric] } override def merge(partial1: Row, buffer: Row): Unit = { - val partialValue = partial1.productElement(sumIndex).asInstanceOf[T] + val partialValue = partial1.getField(sumIndex).asInstanceOf[T] if (partialValue != null) { - val bufferValue = buffer.productElement(sumIndex).asInstanceOf[T] + val bufferValue = buffer.getField(sumIndex).asInstanceOf[T] if (bufferValue != null) { buffer.setField(sumIndex, numeric.plus(partialValue, bufferValue)) } else { @@ -44,7 +44,7 @@ abstract class SumAggregate[T: Numeric] } override def evaluate(buffer: Row): T = { - buffer.productElement(sumIndex).asInstanceOf[T] + buffer.getField(sumIndex).asInstanceOf[T] } override def prepare(value: Any, partial: Row): Unit = { @@ -98,9 +98,9 @@ class DecimalSumAggregate extends Aggregate[BigDecimal] { } override def merge(partial1: Row, buffer: Row): Unit = { - val partialValue = partial1.productElement(sumIndex).asInstanceOf[BigDecimal] + val partialValue = partial1.getField(sumIndex).asInstanceOf[BigDecimal] if (partialValue != null) { - val bufferValue = buffer.productElement(sumIndex).asInstanceOf[BigDecimal] + val bufferValue = buffer.getField(sumIndex).asInstanceOf[BigDecimal] if (bufferValue != null) { buffer.setField(sumIndex, partialValue.add(bufferValue)) } else { @@ -110,7 +110,7 @@ class DecimalSumAggregate extends Aggregate[BigDecimal] { } override def evaluate(buffer: Row): BigDecimal = { - buffer.productElement(sumIndex).asInstanceOf[BigDecimal] + buffer.getField(sumIndex).asInstanceOf[BigDecimal] } override def prepare(value: Any, partial: Row): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/TimeWindowPropertyCollector.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/TimeWindowPropertyCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/TimeWindowPropertyCollector.scala index 9f1c23b..417c1f1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/TimeWindowPropertyCollector.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/TimeWindowPropertyCollector.scala @@ -19,7 +19,7 @@ package org.apache.flink.api.table.runtime.aggregate import org.apache.calcite.runtime.SqlFunctions -import org.apache.flink.api.table.Row +import org.apache.flink.types.Row import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector @@ -35,7 +35,7 @@ class TimeWindowPropertyCollector(windowStartOffset: Option[Int], windowEndOffse override def collect(record: Row): Unit = { - val lastFieldPos = record.productArity - 1 + val lastFieldPos = record.getArity - 1 if (windowStartOffset.isDefined) { record.setField( http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala index 7567ba8..5038d9b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala @@ -21,8 +21,8 @@ package org.apache.flink.api.table.sinks import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.Row -import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.types.Row +import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.streaming.api.datastream.DataStream /** @@ -53,7 +53,7 @@ class CsvTableSink( } override def getOutputType: TypeInformation[Row] = { - new RowTypeInfo(getFieldTypes) + new RowTypeInfo(getFieldTypes: _*) } } @@ -68,15 +68,15 @@ class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] { val builder = new StringBuilder // write first value - val v = row.productElement(0) + val v = row.getField(0) if (v != null) { builder.append(v.toString) } // write following values - for (i <- 1 until row.productArity) { + for (i <- 1 until row.getArity) { builder.append(fieldDelim) - val v = row.productElement(i) + val v = row.getField(i) if (v != null) { builder.append(v.toString) } http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala index 9cf4397..b60575a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala @@ -21,9 +21,10 @@ package org.apache.flink.api.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.io.CsvInputFormat import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} -import org.apache.flink.api.table.{Row, TableException} -import org.apache.flink.api.table.runtime.io.RowCsvInputFormat -import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.api.table.TableException +import org.apache.flink.types.Row +import org.apache.flink.api.java.io.RowCsvInputFormat +import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment @@ -71,7 +72,7 @@ class CsvTableSource( throw TableException("Number of field names and field types must be equal.") } - private val returnType = new RowTypeInfo(fieldTypes) + private val returnType = new RowTypeInfo(fieldTypes: _*) /** * Returns the data of the table as a [[DataSet]] of [[Row]]. http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala index a162d9f..a81577c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala @@ -25,14 +25,15 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.operators.join.JoinType import org.apache.flink.api.java.tuple.Tuple -import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} -import org.apache.flink.api.table.{FlinkTypeFactory, Row, TableException} +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo} +import org.apache.flink.api.table.{FlinkTypeFactory, TableException} +import org.apache.flink.types.Row import scala.collection.JavaConversions._ object TypeConverter { - val DEFAULT_ROW_TYPE = new RowTypeInfo(Seq()).asInstanceOf[TypeInformation[Any]] + val DEFAULT_ROW_TYPE = new RowTypeInfo().asInstanceOf[TypeInformation[Any]] /** * Determines the return type of Flink operators based on the logical fields, the expected @@ -115,7 +116,7 @@ object TypeConverter { // Row is expected, create the arity for it case Some(typeInfo) if typeInfo.getTypeClass == classOf[Row] => - new RowTypeInfo(logicalFieldTypes) + new RowTypeInfo(logicalFieldTypes: _*) // no physical type // determine type based on logical fields and configuration parameters @@ -123,7 +124,7 @@ object TypeConverter { // no need for efficient types -> use Row // we cannot use efficient types if row arity > tuple arity or nullable if (!useEfficientTypes || logicalFieldTypes.length > Tuple.MAX_ARITY || nullable) { - new RowTypeInfo(logicalFieldTypes) + new RowTypeInfo(logicalFieldTypes: _*) } // use efficient type tuple or atomic type else { http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java index 294cba2..75d964b 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java @@ -36,7 +36,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase; import org.apache.flink.api.table.CalciteConfig; import org.apache.flink.api.table.CalciteConfigBuilder; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.api.table.Table; import org.apache.flink.api.table.TableEnvironment; import org.apache.flink.api.table.TableException; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableSourceITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableSourceITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableSourceITCase.java index b634d51..7538808 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableSourceITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableSourceITCase.java @@ -25,11 +25,11 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.table.BatchTableEnvironment; import org.apache.flink.api.scala.batch.GeneratingInputFormat; import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.api.table.Table; import org.apache.flink.api.table.TableEnvironment; import org.apache.flink.api.table.sources.BatchTableSource; -import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java index 1364cbd..89b0d50 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java @@ -24,7 +24,7 @@ import org.apache.flink.api.java.table.BatchTableEnvironment; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.api.table.Table; import org.apache.flink.api.table.TableEnvironment; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java index 02f6e0b..0856a70 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java @@ -30,7 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.tuple.Tuple7; import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.api.table.Table; import org.apache.flink.api.table.TableEnvironment; import org.apache.flink.api.table.ValidationException; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CalcITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CalcITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CalcITCase.java index 6fc8173..1d5c189 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CalcITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CalcITCase.java @@ -23,7 +23,7 @@ import java.util.Collection; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase; import org.apache.flink.api.table.Table; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.table.BatchTableEnvironment; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CastingITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CastingITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CastingITCase.java index 333953b..1139837 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CastingITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CastingITCase.java @@ -28,7 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.tuple.Tuple6; import org.apache.flink.api.java.tuple.Tuple8; import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.api.table.Table; import org.apache.flink.api.table.TableEnvironment; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java index 9676608..014c127 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java @@ -25,7 +25,7 @@ import org.apache.flink.api.java.table.BatchTableEnvironment; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.api.table.Table; import org.apache.flink.api.table.TableEnvironment; import org.apache.flink.api.table.ValidationException; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java index 10ae5d9..53a1a7d 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java @@ -22,7 +22,7 @@ import org.apache.flink.api.java.table.StreamTableEnvironment; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.scala.stream.utils.StreamITCase; -import org.apache.flink.api.table.Row; +import org.apache.flink.types.Row; import org.apache.flink.api.table.Table; import org.apache.flink.api.table.TableEnvironment; import org.apache.flink.streaming.api.datastream.DataStream; http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ProjectableTableSourceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ProjectableTableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ProjectableTableSourceITCase.scala index 42b9de0..ddea3ba 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ProjectableTableSourceITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ProjectableTableSourceITCase.scala @@ -19,16 +19,17 @@ package org.apache.flink.api.scala.batch import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.java.{DataSet => JavaSet, ExecutionEnvironment => JavaExecEnv} import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.api.scala.table._ import org.apache.flink.api.table.sources.{BatchTableSource, ProjectableTableSource} -import org.apache.flink.api.table.typeutils.RowTypeInfo -import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.table.TableEnvironment import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils +import org.apache.flink.types.Row import org.junit.{Before, Test} import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -104,7 +105,7 @@ class TestProjectableTableSource( override def getFieldsNames: Array[String] = fieldNames /** Returns the [[TypeInformation]] for the return type. */ - override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes) + override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes: _*) /** Returns the number of fields of the table. */ override def getNumberOfFields: Int = fieldNames.length http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala index 4c07615..b7c8bc0 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala @@ -25,7 +25,8 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.{Row, TableEnvironment, TableException, ValidationException} +import org.apache.flink.api.table.{TableEnvironment, TableException, ValidationException} +import org.apache.flink.types.Row import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit._ http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala index 08bee72..b5c8ada 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala @@ -28,10 +28,11 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.api.scala.table._ import org.apache.flink.api.table.sources.{BatchTableSource, CsvTableSource} -import org.apache.flink.api.table.typeutils.RowTypeInfo -import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.table.TableEnvironment +import org.apache.flink.types.Row import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} +import org.apache.flink.test.util.TestBaseUtils import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -153,7 +154,7 @@ class TestBatchTableSource extends BatchTableSource[Row] { override def getFieldsNames: Array[String] = Array("name", "id", "amount") /** Returns the [[TypeInformation]] for the return type. */ - override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes) + override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes: _*) /** Returns the number of fields of the table. */ override def getNumberOfFields: Int = 3 http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala index 35bb7dc..d5d46ba 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala @@ -23,7 +23,8 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets import TableProgramsTestBase.TableConfigMode -import org.apache.flink.api.table.{TableException, Row, TableEnvironment} +import org.apache.flink.api.table.{TableException, TableEnvironment} +import org.apache.flink.types.Row import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit._ http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala index 155833b..5037469 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala @@ -29,7 +29,8 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigM import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.api.table.functions.ScalarFunction -import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException} +import org.apache.flink.api.table.{TableEnvironment, ValidationException} +import org.apache.flink.types.Row import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit._ http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/JoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/JoinITCase.scala index 68f63c3..074f70b 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/JoinITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/JoinITCase.scala @@ -23,7 +23,8 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.{ValidationException, Row, TableEnvironment, TableException} +import org.apache.flink.api.table.{ValidationException, TableEnvironment, TableException} +import org.apache.flink.types.Row import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit._ http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala index 7cdb746..42bd6e8 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala @@ -23,7 +23,8 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.table.TableEnvironment +import org.apache.flink.types.Row import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit._ http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala index f345984..b94cd00 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala @@ -25,7 +25,8 @@ import org.apache.flink.api.scala.batch.utils.SortTestUtils._ import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala._ -import org.apache.flink.api.table.{Row, TableEnvironment, TableException} +import org.apache.flink.api.table.{TableEnvironment, TableException} +import org.apache.flink.types.Row import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit._ @@ -46,6 +47,7 @@ class SortITCase( val tEnv = TableEnvironment.getTableEnvironment(env, config) val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC, _2 DESC" + implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => (- x.productElement(0).asInstanceOf[Int], - x.productElement(1).asInstanceOf[Long])) @@ -55,7 +57,10 @@ class SortITCase( val expected = sortExpectedly(tupleDataSetStrings) val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() - val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _) + val result = results + .filterNot(_.isEmpty) + .sortBy(_.head)(Ordering.by(f=> f.toString)) + .reduceLeft(_ ++ _) TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) } @@ -68,7 +73,7 @@ class SortITCase( val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC OFFSET 2 ROWS" implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => - -x.productElement(0).asInstanceOf[Int]) + - x.productElement(0).asInstanceOf[Int] ) val ds = CollectionDataSets.get3TupleDataSet(env) tEnv.registerDataSet("MyTable", ds) @@ -76,7 +81,10 @@ class SortITCase( val expected = sortExpectedly(tupleDataSetStrings, 2, 21) val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() - val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _) + val result = results. + filterNot(_.isEmpty) + .sortBy(_.head)(Ordering.by(f=> f.toString)) + .reduceLeft(_ ++ _) TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) } @@ -89,7 +97,7 @@ class SortITCase( val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 OFFSET 2 ROWS FETCH NEXT 5 ROWS ONLY" implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => - x.productElement(0).asInstanceOf[Int]) + x.productElement(0).asInstanceOf[Int] ) val ds = CollectionDataSets.get3TupleDataSet(env) tEnv.registerDataSet("MyTable", ds) @@ -97,7 +105,10 @@ class SortITCase( val expected = sortExpectedly(tupleDataSetStrings, 2, 7) val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() - val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _) + val result = results + .filterNot(_.isEmpty) + .sortBy(_.head)(Ordering.by(f=> f.toString)) + .reduceLeft(_ ++ _) TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) } @@ -107,10 +118,10 @@ class SortITCase( val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) - val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 LIMIT 5" + val sqlQuery = "SELECT * FROM MyTable ORDER BY _2, _1 LIMIT 5" implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => - x.productElement(0).asInstanceOf[Int]) + (x.productElement(1).asInstanceOf[Long], x.productElement(0).asInstanceOf[Int]) ) val ds = CollectionDataSets.get3TupleDataSet(env) tEnv.registerDataSet("MyTable", ds) @@ -118,7 +129,10 @@ class SortITCase( val expected = sortExpectedly(tupleDataSetStrings, 0, 5) val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() - val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _) + val result = results + .filterNot(_.isEmpty) + .sortBy(_.head)(Ordering.by(f=> f.toString)) + .reduceLeft(_ ++ _) TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) } http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/TableWithSQLITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/TableWithSQLITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/TableWithSQLITCase.scala index a770a6e..d41f3e0 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/TableWithSQLITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/TableWithSQLITCase.scala @@ -23,7 +23,8 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.table.TableEnvironment +import org.apache.flink.types.Row import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit._ http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala index 16c8ece..3f4e1e5 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala @@ -23,7 +23,8 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException} +import org.apache.flink.api.table.{TableEnvironment, ValidationException} +import org.apache.flink.types.Row import org.apache.flink.examples.scala.WordCountTable.{WC => MyWC} import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala index c3758a4..b011462 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala @@ -27,7 +27,8 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigM import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.api.table.expressions.Literal -import org.apache.flink.api.table.{Row, TableEnvironment, TableException, ValidationException} +import org.apache.flink.api.table.{TableEnvironment, TableException, ValidationException} +import org.apache.flink.types.Row import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit.Assert._ http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala index 67cac14..195027d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala @@ -24,7 +24,8 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigM import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.api.table.expressions.Literal -import org.apache.flink.api.table.{Row, TableEnvironment, TableException, ValidationException} +import org.apache.flink.api.table.{TableEnvironment, TableException, ValidationException} +import org.apache.flink.types.Row import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit._ http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala index 283ba10..0d32cb4 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala @@ -23,7 +23,8 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException} +import org.apache.flink.api.table.{TableEnvironment, ValidationException} +import org.apache.flink.types.Row import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit._ http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala index d4a1d8d..b3cc054 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala @@ -24,7 +24,8 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigM import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.api.scala.{ExecutionEnvironment, _} -import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException} +import org.apache.flink.api.table.{TableEnvironment, ValidationException} +import org.apache.flink.types.Row import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit._ @@ -53,12 +54,15 @@ class SortITCase( val ds = CollectionDataSets.get3TupleDataSet(env) val t = ds.toTable(tEnv).orderBy('_1.desc) implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => - - x.productElement(0).asInstanceOf[Int]) + - x.productElement(0).asInstanceOf[Int] ) val expected = sortExpectedly(tupleDataSetStrings) val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() - val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _) + val result = results + .filterNot(_.isEmpty) + .sortBy(_.head)(Ordering.by(f=> f.toString)) + .reduceLeft(_ ++ _) TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) } @@ -71,12 +75,15 @@ class SortITCase( val ds = CollectionDataSets.get3TupleDataSet(env) val t = ds.toTable(tEnv).orderBy('_1.asc) implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => - x.productElement(0).asInstanceOf[Int]) + x.productElement(0).asInstanceOf[Int] ) val expected = sortExpectedly(tupleDataSetStrings) val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() - val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _) + val result = results + .filterNot(_.isEmpty) + .sortBy(_.head)(Ordering.by(f=> f.toString)) + .reduceLeft(_ ++ _) TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) } @@ -87,14 +94,17 @@ class SortITCase( val tEnv = TableEnvironment.getTableEnvironment(env, config) val ds = CollectionDataSets.get3TupleDataSet(env) - val t = ds.toTable(tEnv).orderBy('_1.asc, '_2.desc) + val t = ds.toTable(tEnv).orderBy('_2.asc, '_1.desc) implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => - (x.productElement(0).asInstanceOf[Int], - x.productElement(1).asInstanceOf[Long])) + (x.productElement(1).asInstanceOf[Long], - x.productElement(0).asInstanceOf[Int]) ) val expected = sortExpectedly(tupleDataSetStrings) val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() - val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _) + val result = results + .filterNot(_.isEmpty) + .sortBy(_.head)(Ordering.by(f=> f.toString)) + .reduceLeft(_ ++ _) TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) } @@ -107,12 +117,15 @@ class SortITCase( val ds = CollectionDataSets.get3TupleDataSet(env) val t = ds.toTable(tEnv).orderBy('_1.asc).limit(3) implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => - x.productElement(0).asInstanceOf[Int]) + x.productElement(0).asInstanceOf[Int] ) val expected = sortExpectedly(tupleDataSetStrings, 3, 21) val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() - val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _) + val result = results + .filterNot(_.isEmpty) + .sortBy(_.head)(Ordering.by(f=> f.toString)) + .reduceLeft(_ ++ _) TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) } @@ -123,14 +136,17 @@ class SortITCase( val tEnv = TableEnvironment.getTableEnvironment(env, config) val ds = CollectionDataSets.get3TupleDataSet(env) - val t = ds.toTable(tEnv).orderBy('_1.asc).limit(3, 5) + val t = ds.toTable(tEnv).orderBy('_1.desc).limit(3, 5) implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => - x.productElement(0).asInstanceOf[Int]) + - x.productElement(0).asInstanceOf[Int] ) val expected = sortExpectedly(tupleDataSetStrings, 3, 8) val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() - val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _) + val result = results + .filterNot(_.isEmpty) + .sortBy(_.head)(Ordering.by(f=> f.toString)) + .reduceLeft(_ ++ _) TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) } @@ -143,12 +159,15 @@ class SortITCase( val ds = CollectionDataSets.get3TupleDataSet(env) val t = ds.toTable(tEnv).orderBy('_1.asc).limit(0, 5) implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => - x.productElement(0).asInstanceOf[Int]) + x.productElement(0).asInstanceOf[Int] ) val expected = sortExpectedly(tupleDataSetStrings, 0, 5) val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() - val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _) + val result = results + .filterNot(_.isEmpty) + .sortBy(_.head)(Ordering.by(f=> f.toString)) + .reduceLeft(_ ++ _) TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) } http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala index 7e170d4..285a181 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala @@ -21,10 +21,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.{DataSet => JDataSet, ExecutionEnvironment => JavaExecutionEnv} import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment => ScalaExecutionEnv, _} -import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.table.utils.TableTestUtil._ import org.apache.flink.api.table.utils.{PojoTableFunc, TableFunc2, _} -import org.apache.flink.api.table.{Row, TableEnvironment, Types} +import org.apache.flink.api.table.{TableEnvironment, Types} +import org.apache.flink.types.Row import org.junit.Test import org.mockito.Mockito._ @@ -35,7 +36,7 @@ class UserDefinedTableFunctionTest extends TableTestBase { // mock val ds = mock(classOf[DataSet[Row]]) val jDs = mock(classOf[JDataSet[Row]]) - val typeInfo: TypeInformation[Row] = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING)) + val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*) when(ds.javaSet).thenReturn(jDs) when(jDs.getType).thenReturn(typeInfo) http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala index c14ad97..1c93112 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala @@ -25,8 +25,9 @@ import org.apache.flink.api.scala._ import org.apache.flink.api.scala.stream.utils.StreamITCase import org.apache.flink.api.scala.table._ import org.apache.flink.api.table.sources.{CsvTableSource, StreamTableSource} -import org.apache.flink.api.table.typeutils.RowTypeInfo -import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.table.TableEnvironment +import org.apache.flink.types.Row import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment import org.apache.flink.streaming.api.functions.source.SourceFunction @@ -163,7 +164,7 @@ class TestStreamTableSource(val numRecords: Int) extends StreamTableSource[Row] override def getFieldsNames: Array[String] = Array("name", "id", "amount") /** Returns the [[TypeInformation]] for the return type. */ - override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes) + override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes: _*) /** Returns the number of fields of the table. */ override def getNumberOfFields: Int = 3 http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala index 5b278c1..c4ca964 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala @@ -21,7 +21,8 @@ package org.apache.flink.api.scala.stream.sql import org.apache.flink.api.scala._ import org.apache.flink.api.scala.stream.utils.{StreamTestData, StreamITCase} import org.apache.flink.api.scala.table._ -import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.table.TableEnvironment +import org.apache.flink.types.Row import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.junit.Assert._ http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala index 0753484..d398556 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala @@ -22,7 +22,8 @@ import org.apache.flink.api.scala._ import org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark import org.apache.flink.api.scala.stream.utils.StreamITCase import org.apache.flink.api.scala.table._ -import org.apache.flink.api.table.{Row, _} +import org.apache.flink.api.table._ +import org.apache.flink.types.Row import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala index 578ad30..3eee4d4 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala @@ -22,7 +22,8 @@ import org.apache.flink.api.scala._ import org.apache.flink.api.scala.stream.utils.{StreamITCase, StreamTestData} import org.apache.flink.api.scala.table._ import org.apache.flink.api.table.expressions.Literal -import org.apache.flink.api.table.{Row, TableEnvironment, TableException} +import org.apache.flink.api.table.{TableEnvironment, TableException} +import org.apache.flink.types.Row import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.junit.Assert._ http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnionITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnionITCase.scala index 131974e..5096b53 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnionITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnionITCase.scala @@ -21,7 +21,8 @@ package org.apache.flink.api.scala.stream.table import org.apache.flink.api.scala._ import org.apache.flink.api.scala.stream.utils.{StreamITCase, StreamTestData} import org.apache.flink.api.scala.table._ -import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException} +import org.apache.flink.api.table.{TableEnvironment, ValidationException} +import org.apache.flink.types.Row import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.junit.Assert._ http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala index b45ae8e..305f1db 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala @@ -17,12 +17,12 @@ */ package org.apache.flink.api.scala.stream.table -import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ -import org.apache.flink.api.table._ +import org.apache.flink.types.Row import org.apache.flink.api.table.expressions.utils._ -import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.table.{TableEnvironment, TableException, Types, ValidationException} import org.apache.flink.api.table.utils.TableTestUtil._ import org.apache.flink.api.table.utils._ import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream} @@ -39,7 +39,7 @@ class UserDefinedTableFunctionTest extends TableTestBase { // mock val ds = mock(classOf[DataStream[Row]]) val jDs = mock(classOf[JDataStream[Row]]) - val typeInfo: TypeInformation[Row] = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING)) + val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*) when(ds.javaStream).thenReturn(jDs) when(jDs.getType).thenReturn(typeInfo) http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamITCase.scala index 4860005..4fd3cd7 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamITCase.scala @@ -20,7 +20,7 @@ package org.apache.flink.api.scala.stream.utils import java.util.Collections -import org.apache.flink.api.table.Row +import org.apache.flink.types.Row import org.junit.Assert._ import scala.collection.mutable import org.apache.flink.streaming.api.functions.sink.RichSinkFunction http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ArrayTypeTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ArrayTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ArrayTypeTest.scala index 034ce0b..2ba76ad 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ArrayTypeTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ArrayTypeTest.scala @@ -24,8 +24,9 @@ import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, TypeInforma import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo import org.apache.flink.api.scala.table._ import org.apache.flink.api.table.expressions.utils.ExpressionTestBase -import org.apache.flink.api.table.typeutils.RowTypeInfo -import org.apache.flink.api.table.{Row, Types, ValidationException} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.table.{Types, ValidationException} +import org.apache.flink.types.Row import org.junit.Test class ArrayTypeTest extends ExpressionTestBase { @@ -342,7 +343,7 @@ class ArrayTypeTest extends ExpressionTestBase { } override def typeInfo: TypeInformation[Any] = { - new RowTypeInfo(Seq( + new RowTypeInfo( Types.INT, Types.INT, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO, @@ -354,6 +355,6 @@ class ArrayTypeTest extends ExpressionTestBase { PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO, ObjectArrayTypeInfo.getInfoFor(Types.INT), ObjectArrayTypeInfo.getInfoFor(Types.INT) - )).asInstanceOf[TypeInformation[Any]] + ).asInstanceOf[TypeInformation[Any]] } } http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/CompositeAccessTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/CompositeAccessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/CompositeAccessTest.scala index 3121c58..879f68d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/CompositeAccessTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/CompositeAccessTest.scala @@ -22,10 +22,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor} import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.api.scala.table._ -import org.apache.flink.api.table.{Row, Types, ValidationException} +import org.apache.flink.api.table.{Types, ValidationException} +import org.apache.flink.types.Row import org.apache.flink.api.table.expressions.CompositeAccessTest.{MyCaseClass, MyCaseClass2, MyPojo} import org.apache.flink.api.table.expressions.utils.ExpressionTestBase -import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.api.java.typeutils.RowTypeInfo import org.junit.Test @@ -154,7 +155,7 @@ class CompositeAccessTest extends ExpressionTestBase { } def typeInfo = { - new RowTypeInfo(Seq( + new RowTypeInfo( createTypeInformation[MyCaseClass], createTypeInformation[MyCaseClass2], createTypeInformation[(String, String)], @@ -163,7 +164,7 @@ class CompositeAccessTest extends ExpressionTestBase { Types.INT, createTypeInformation[MyCaseClass2], createTypeInformation[Tuple1[Boolean]] - )).asInstanceOf[TypeInformation[Any]] + ).asInstanceOf[TypeInformation[Any]] } } http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/DecimalTypeTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/DecimalTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/DecimalTypeTest.scala index 20a8af8..a986365 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/DecimalTypeTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/DecimalTypeTest.scala @@ -20,9 +20,10 @@ package org.apache.flink.api.table.expressions import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala.table._ -import org.apache.flink.api.table.{Row, Types} +import org.apache.flink.api.table.Types +import org.apache.flink.types.Row import org.apache.flink.api.table.expressions.utils.ExpressionTestBase -import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.api.java.typeutils.RowTypeInfo import org.junit.Test class DecimalTypeTest extends ExpressionTestBase { @@ -298,13 +299,13 @@ class DecimalTypeTest extends ExpressionTestBase { } def typeInfo = { - new RowTypeInfo(Seq( + new RowTypeInfo( Types.DECIMAL, Types.DECIMAL, Types.INT, Types.DOUBLE, Types.DECIMAL, - Types.DECIMAL)).asInstanceOf[TypeInformation[Any]] + Types.DECIMAL).asInstanceOf[TypeInformation[Any]] } } http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/NonDeterministicTests.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/NonDeterministicTests.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/NonDeterministicTests.scala index de48849..0b39d4d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/NonDeterministicTests.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/NonDeterministicTests.scala @@ -20,9 +20,9 @@ package org.apache.flink.api.table.expressions import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala.table._ -import org.apache.flink.api.table.Row +import org.apache.flink.types.Row import org.apache.flink.api.table.expressions.utils.ExpressionTestBase -import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.api.java.typeutils.RowTypeInfo import org.junit.{Ignore, Test} /** @@ -85,5 +85,5 @@ class NonDeterministicTests extends ExpressionTestBase { override def testData: Any = new Row(0) override def typeInfo: TypeInformation[Any] = - new RowTypeInfo(Seq[TypeInformation[_]]()).asInstanceOf[TypeInformation[Any]] + new RowTypeInfo().asInstanceOf[TypeInformation[Any]] } http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala index 1d2a1b7..3ef02a9 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala @@ -23,8 +23,9 @@ import java.sql.{Date, Time, Timestamp} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala.table._ import org.apache.flink.api.table.expressions.utils.ExpressionTestBase -import org.apache.flink.api.table.typeutils.RowTypeInfo -import org.apache.flink.api.table.{Row, Types, ValidationException} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.table.{Types, ValidationException} +import org.apache.flink.types.Row import org.junit.Test class ScalarFunctionsTest extends ExpressionTestBase { @@ -1134,7 +1135,7 @@ class ScalarFunctionsTest extends ExpressionTestBase { } def typeInfo = { - new RowTypeInfo(Seq( + new RowTypeInfo( Types.STRING, Types.BOOLEAN, Types.BYTE, @@ -1159,7 +1160,7 @@ class ScalarFunctionsTest extends ExpressionTestBase { Types.BOOLEAN, Types.DECIMAL, Types.STRING, - Types.STRING)).asInstanceOf[TypeInformation[Any]] + Types.STRING).asInstanceOf[TypeInformation[Any]] } } http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala index 7ad2212..86f884f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala @@ -21,8 +21,9 @@ package org.apache.flink.api.table.expressions import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala.table._ import org.apache.flink.api.table.expressions.utils.ExpressionTestBase -import org.apache.flink.api.table.typeutils.RowTypeInfo -import org.apache.flink.api.table.{Row, Types, ValidationException} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.table.{Types, ValidationException} +import org.apache.flink.types.Row import org.junit.Test class ScalarOperatorsTest extends ExpressionTestBase { @@ -201,7 +202,7 @@ class ScalarOperatorsTest extends ExpressionTestBase { } def typeInfo = { - new RowTypeInfo(Seq( + new RowTypeInfo( Types.BYTE, Types.SHORT, Types.INT, @@ -213,7 +214,7 @@ class ScalarOperatorsTest extends ExpressionTestBase { Types.INT, Types.INT, Types.STRING - )).asInstanceOf[TypeInformation[Any]] + ).asInstanceOf[TypeInformation[Any]] } } http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/SqlExpressionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/SqlExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/SqlExpressionTest.scala index 52dc848..56f40ea 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/SqlExpressionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/SqlExpressionTest.scala @@ -19,9 +19,9 @@ package org.apache.flink.api.table.expressions import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.Row +import org.apache.flink.types.Row import org.apache.flink.api.table.expressions.utils.ExpressionTestBase -import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.api.java.typeutils.RowTypeInfo import org.junit.{Ignore, Test} /** @@ -166,5 +166,5 @@ class SqlExpressionTest extends ExpressionTestBase { override def testData: Any = new Row(0) override def typeInfo: TypeInformation[Any] = - new RowTypeInfo(Seq()).asInstanceOf[TypeInformation[Any]] + new RowTypeInfo().asInstanceOf[TypeInformation[Any]] } http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/TemporalTypesTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/TemporalTypesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/TemporalTypesTest.scala index 0547552..bd771ba 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/TemporalTypesTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/TemporalTypesTest.scala @@ -23,8 +23,9 @@ import java.sql.{Date, Time, Timestamp} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala.table._ import org.apache.flink.api.table.expressions.utils.ExpressionTestBase -import org.apache.flink.api.table.typeutils.RowTypeInfo -import org.apache.flink.api.table.{Row, Types} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.table.Types +import org.apache.flink.types.Row import org.junit.Test class TemporalTypesTest extends ExpressionTestBase { @@ -556,7 +557,7 @@ class TemporalTypesTest extends ExpressionTestBase { } def typeInfo = { - new RowTypeInfo(Seq( + new RowTypeInfo( Types.DATE, Types.TIME, Types.TIMESTAMP, @@ -567,6 +568,6 @@ class TemporalTypesTest extends ExpressionTestBase { Types.INT, Types.LONG, Types.INTERVAL_MONTHS, - Types.INTERVAL_MILLIS)).asInstanceOf[TypeInformation[Any]] + Types.INTERVAL_MILLIS).asInstanceOf[TypeInformation[Any]] } } http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/UserDefinedScalarFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/UserDefinedScalarFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/UserDefinedScalarFunctionTest.scala index ffe3cd3..567cca1 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/UserDefinedScalarFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/UserDefinedScalarFunctionTest.scala @@ -25,8 +25,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala.table._ import org.apache.flink.api.table.expressions.utils._ import org.apache.flink.api.table.functions.ScalarFunction -import org.apache.flink.api.table.typeutils.RowTypeInfo -import org.apache.flink.api.table.{Row, Types} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.table.Types +import org.apache.flink.types.Row import org.junit.Test class UserDefinedScalarFunctionTest extends ExpressionTestBase { @@ -195,7 +196,7 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase { } override def typeInfo: TypeInformation[Any] = { - new RowTypeInfo(Seq( + new RowTypeInfo( Types.INT, Types.STRING, Types.BOOLEAN, @@ -205,7 +206,7 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase { Types.TIMESTAMP, Types.INTERVAL_MONTHS, Types.INTERVAL_MILLIS - )).asInstanceOf[TypeInformation[Any]] + ).asInstanceOf[TypeInformation[Any]] } override def functions: Map[String, ScalarFunction] = Map( http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala index 958fd25..3156ba8 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala @@ -33,7 +33,8 @@ import org.apache.flink.api.table.expressions.{Expression, ExpressionParser} import org.apache.flink.api.table.functions.ScalarFunction import org.apache.flink.api.table.plan.nodes.dataset.{DataSetCalc, DataSetConvention} import org.apache.flink.api.table.plan.rules.FlinkRuleSets -import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.types.Row import org.junit.Assert._ import org.junit.{After, Before} import org.mockito.Mockito._ @@ -96,7 +97,7 @@ abstract class ExpressionTestBase { val stringTestExprs = testExprs.map(expr => relBuilder.cast(expr._1, VARCHAR)).toSeq // generate code - val resultType = new RowTypeInfo(Seq.fill(testExprs.size)(STRING_TYPE_INFO)) + val resultType = new RowTypeInfo(Seq.fill(testExprs.size)(STRING_TYPE_INFO): _*) val genExpr = generator.generateResultExpression( resultType, resultType.getFieldNames, @@ -124,7 +125,7 @@ abstract class ExpressionTestBase { .zipWithIndex .foreach { case ((expr, expected), index) => - val actual = result.productElement(index) + val actual = result.getField(index) assertEquals( s"Wrong result for: $expr", expected, http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTestBase.scala index 54911a5..4e33a61 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTestBase.scala @@ -19,7 +19,7 @@ package org.apache.flink.api.table.runtime.aggregate import java.math.BigDecimal -import org.apache.flink.api.table.Row +import org.apache.flink.types.Row import org.junit.Test import org.junit.Assert.assertEquals http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala index 32559f1..993347f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/dataset/DataSetCorrelateITCase.scala @@ -23,7 +23,8 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigM import org.apache.flink.api.scala.table._ import org.apache.flink.api.table.expressions.utils._ import org.apache.flink.api.table.utils._ -import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.table.TableEnvironment +import org.apache.flink.types.Row import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit.Test http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala index 70b0359..21fe157 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala @@ -21,7 +21,8 @@ import org.apache.flink.api.scala._ import org.apache.flink.api.scala.stream.utils.StreamITCase import org.apache.flink.api.scala.table._ import org.apache.flink.api.table.utils.TableFunc0 -import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.table.TableEnvironment +import org.apache.flink.types.Row import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.junit.Assert._ http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/UserDefinedTableFunctions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/UserDefinedTableFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/UserDefinedTableFunctions.scala index 3da3857..4291b29 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/UserDefinedTableFunctions.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/UserDefinedTableFunctions.scala @@ -21,9 +21,9 @@ import java.lang.Boolean import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.tuple.Tuple3 -import org.apache.flink.api.table.Row +import org.apache.flink.types.Row import org.apache.flink.api.table.functions.TableFunction -import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.api.java.typeutils.RowTypeInfo case class SimpleUser(name: String, age: Int) @@ -66,8 +66,8 @@ class TableFunc2 extends TableFunction[Row] { } override def getResultType: TypeInformation[Row] = { - new RowTypeInfo(Seq(BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO)) + new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO) } }
