[FLINK-7657] [table] Add all basic types to RexProgramExtractor
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce1cb8fd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce1cb8fd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce1cb8fd Branch: refs/heads/release-1.4 Commit: ce1cb8fd6d667713be9b5f9ec8f1c394b9ca4644 Parents: bb04187 Author: twalthr <[email protected]> Authored: Mon Nov 13 14:09:45 2017 +0100 Committer: twalthr <[email protected]> Committed: Mon Nov 13 14:22:27 2017 +0100 ---------------------------------------------------------------------- .../flink/table/expressions/literals.scala | 9 +++-- .../table/plan/util/RexProgramExtractor.scala | 33 +++++++++++++---- .../flink/table/api/TableSourceTest.scala | 25 ++++++------- .../flink/table/plan/RexProgramTestBase.scala | 1 - .../runtime/batch/table/TableSourceITCase.scala | 2 +- .../table/utils/TestFilterableTableSource.scala | 39 ++++++++++---------- .../flink/table/utils/testTableSources.scala | 6 +-- 7 files changed, 63 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ce1cb8fd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala index d797cc4..e6905ef 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala @@ -108,10 +108,11 @@ case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpre } /** - * Convert a date value to a calendar. Calcite fromCalendarField functions use the Calendar.get - * methods, so the raw values of the individual fields are preserved when converted to the - * string formats. - * @return Get the Calendar value + * Convert a Date value to a Calendar. Calcite's fromCalendarField functions use the + * Calendar.get methods, so the raw values of the individual fields are preserved when + * converted to the String formats. + * + * @return get the Calendar value */ private def valueAsCalendar: Calendar = { val date = value.asInstanceOf[java.util.Date] http://git-wip-us.apache.org/repos/asf/flink/blob/ce1cb8fd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala index d11a43d..9c06135 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala @@ -165,25 +165,42 @@ class RexNodeToExpressionConverter( val literalType = FlinkTypeFactory.toTypeInfo(literal.getType) val literalValue = literalType match { - // Chrono use cases. + case [email protected] => val rexValue = literal.getValueAs(classOf[DateString]) Date.valueOf(rexValue.toString) + case [email protected] => val rexValue = literal.getValueAs(classOf[TimeString]) Time.valueOf(rexValue.toString(0)) + case [email protected] => val rexValue = literal.getValueAs(classOf[TimestampString]) Timestamp.valueOf(rexValue.toString(3)) + case [email protected]_TYPE_INFO => + // convert from BigDecimal to Byte + literal.getValueAs(classOf[java.lang.Byte]) + + case [email protected]_TYPE_INFO => + // convert from BigDecimal to Short + literal.getValueAs(classOf[java.lang.Short]) + case [email protected]_TYPE_INFO => - /* - Force integer conversion. RelDataType is INTEGER and SqlTypeName is DECIMAL, - meaning that it will assume that we are using a BigDecimal - and refuse to convert to Integer. - */ - val rexValue = literal.getValueAs(classOf[java.math.BigDecimal]) - rexValue.intValue() + // convert from BigDecimal to Integer + literal.getValueAs(classOf[java.lang.Integer]) + + case [email protected]_TYPE_INFO => + // convert from BigDecimal to Long + literal.getValueAs(classOf[java.lang.Long]) + + case [email protected]_TYPE_INFO => + // convert from BigDecimal to Float + literal.getValueAs(classOf[java.lang.Float]) + + case [email protected]_TYPE_INFO => + // convert from BigDecimal to Double + literal.getValueAs(classOf[java.lang.Double]) case _ => literal.getValue } http://git-wip-us.apache.org/repos/asf/flink/blob/ce1cb8fd/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala index dc84c19..42f0769 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala @@ -18,19 +18,18 @@ package org.apache.flink.table.api +import _root_.java.sql.{Date, Time, Timestamp} + import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.table.api.scala._ -import org.apache.flink.table.expressions.{BinaryComparison, Expression, Literal} import org.apache.flink.table.expressions.utils._ import org.apache.flink.table.runtime.utils.CommonTestData import org.apache.flink.table.sources.{CsvTableSource, TableSource} import org.apache.flink.table.utils.TableTestUtil._ import org.apache.flink.table.utils.{TableTestBase, TestFilterableTableSource} -import org.junit.{Assert, Test} -import _root_.java.sql.{Date, Time, Timestamp} - -import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.types.Row +import org.junit.{Assert, Test} class TableSourceTest extends TableTestBase { @@ -225,7 +224,8 @@ class TableSourceTest extends TableTestBase { val result = tableEnv .scan(tableName) .select('price, 'id, 'amount) - .where("amount > 2 && (amount < 32 || amount.cast(LONG) > 10)") // cast can not be converted + .where("amount > 2 && id < 1.2 && " + + "(amount < 32 || amount.cast(LONG) > 10)") // cast can not be converted val expected = unaryNode( "DataSetCalc", @@ -234,7 +234,7 @@ class TableSourceTest extends TableTestBase { Array("price", "id", "amount"), "'amount > 2"), term("select", "price", "id", "amount"), - term("where", "OR(<(amount, 32), >(CAST(amount), 10))") + term("where", "AND(<(id, 1.2E0), OR(<(amount, 32), >(CAST(amount), 10)))") ) util.verifyTable(result, expected) } @@ -403,13 +403,10 @@ class TableSourceTest extends TableTestBase { "'tv > 14:25:02.toTime && " + "'dv > 2017-02-03.toDate && " + "'tsv > 2017-02-03 14:25:02.0.toTimestamp" - val expected = unaryNode( - "DataSetCalc", - batchFilterableSourceTableNode( - tableName, - Array("id", "dv", "tv", "tsv"), - expectedFilter), - term("select", "id") + val expected = batchFilterableSourceTableNode( + tableName, + Array("id"), + expectedFilter ) util.verifyTable(result, expected) } http://git-wip-us.apache.org/repos/asf/flink/blob/ce1cb8fd/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RexProgramTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RexProgramTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RexProgramTestBase.scala index 728694f..05870ca 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RexProgramTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RexProgramTestBase.scala @@ -80,5 +80,4 @@ abstract class RexProgramTestBase { protected def makeTypes(fieldTypes: SqlTypeName*): java.util.List[RelDataType] = { fieldTypes.toList.map(typeFactory.createSqlType).asJava } - } http://git-wip-us.apache.org/repos/asf/flink/blob/ce1cb8fd/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala index f0fe896..eb73f1b 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala @@ -528,7 +528,7 @@ class TableSourceITCase( new TestProjectableTableSource(tableSchema, returnType, data, "rtime", "ptime")) val results = tEnv.scan("T") - .select('ptime > 0) + .select('ptime.cast(Types.LONG) > 0) .select(1.count) .collect() http://git-wip-us.apache.org/repos/asf/flink/blob/ce1cb8fd/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala index ae2b1d6..da8220d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala @@ -35,6 +35,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable object TestFilterableTableSource { + /** * @return The default filterable table source. */ @@ -44,15 +45,17 @@ object TestFilterableTableSource { /** * A filterable data source with custom data. - * @param rowTypeInfo The type of the data. - * Its expected that both types and field names are provided + * @param rowTypeInfo The type of the data. Its expected that both types and field + * names are provided. * @param rows The data as a sequence of rows. * @param filterableFields The fields that are allowed to be filtered on. * @return The table source. */ - def apply(rowTypeInfo: RowTypeInfo, - rows: Seq[Row], - filterableFields: Set[String]): TestFilterableTableSource = { + def apply( + rowTypeInfo: RowTypeInfo, + rows: Seq[Row], + filterableFields: Set[String]) + : TestFilterableTableSource = { new TestFilterableTableSource(rowTypeInfo, rows, filterableFields) } @@ -64,24 +67,20 @@ object TestFilterableTableSource { new RowTypeInfo(fieldTypes, fieldNames) } - private lazy val defaultRows: Seq[Row] = { for { cnt <- 0 until 33 } yield { Row.of( s"Record_$cnt", - cnt.toLong.asInstanceOf[Object], - cnt.toInt.asInstanceOf[Object], - cnt.toDouble.asInstanceOf[Object]) + cnt.toLong.asInstanceOf[AnyRef], + cnt.toInt.asInstanceOf[AnyRef], + cnt.toDouble.asInstanceOf[AnyRef]) } } } - /** - * - * * A data source that implements some very basic filtering in-memory in order to test * expression push-down logic. * @@ -91,11 +90,12 @@ object TestFilterableTableSource { * @param filterPredicates The predicates that should be used to filter. * @param filterPushedDown Whether predicates have been pushed down yet. */ -class TestFilterableTableSource(rowTypeInfo: RowTypeInfo, - data: Seq[Row], - filterableFields: Set[String] = Set(), - filterPredicates: Seq[Expression] = Seq(), - val filterPushedDown: Boolean = false) +class TestFilterableTableSource( + rowTypeInfo: RowTypeInfo, + data: Seq[Row], + filterableFields: Set[String] = Set(), + filterPredicates: Seq[Expression] = Seq(), + val filterPushedDown: Boolean = false) extends BatchTableSource[Row] with StreamTableSource[Row] with FilterableTableSource[Row] { @@ -195,8 +195,9 @@ class TestFilterableTableSource(rowTypeInfo: RowTypeInfo, } } - private def extractValues(expr: BinaryComparison, - row: Row): (Comparable[Any], Comparable[Any]) = { + private def extractValues(expr: BinaryComparison, row: Row) + : (Comparable[Any], Comparable[Any]) = { + (expr.left, expr.right) match { case (l: ResolvedFieldReference, r: Literal) => val idx = rowTypeInfo.getFieldIndex(l.name) http://git-wip-us.apache.org/repos/asf/flink/blob/ce1cb8fd/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala index c2eba32..a546919 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala @@ -22,21 +22,17 @@ import java.util import java.util.{Collections, List => JList} import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import org.apache.flink.table.api.Types.{DOUBLE, INT, LONG, STRING} import org.apache.flink.table.api.TableSchema -import org.apache.flink.table.expressions._ import org.apache.flink.table.sources._ import org.apache.flink.table.sources.tsextractors.ExistingField import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps import org.apache.flink.types.Row -import org.apache.flink.util.Preconditions import scala.collection.JavaConverters._ -import scala.collection.mutable class TestTableSourceWithTime[T]( tableSchema: TableSchema,
