Repository: flink Updated Branches: refs/heads/release-1.2 386bdd299 -> 6aa38ee22
[FLINK-5418] [table] Estimated row size does not support nested types This closes #3073. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6aa38ee2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6aa38ee2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6aa38ee2 Branch: refs/heads/release-1.2 Commit: 6aa38ee22cdee9e2ab2ad819dc9d2a91a4b315fe Parents: 386bdd2 Author: twalthr <[email protected]> Authored: Fri Jan 6 13:59:38 2017 +0100 Committer: twalthr <[email protected]> Committed: Wed Jan 11 15:04:20 2017 +0100 ---------------------------------------------------------------------- .../flink/table/plan/nodes/FlinkRel.scala | 47 +++++++++++--------- .../api/scala/batch/sql/SetOperatorsTest.scala | 17 +++++++ 2 files changed, 42 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6aa38ee2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala index 9b844be..a7765d1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala @@ -18,9 +18,7 @@ package org.apache.flink.table.plan.nodes -import java.util - -import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} +import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex._ import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.flink.api.common.functions.MapFunction @@ -105,28 +103,33 @@ trait FlinkRel { } - private[flink] def estimateRowSize(rowType: RelDataType): Double = { val fieldList = rowType.getFieldList - fieldList.map(_.getType.getSqlTypeName).zipWithIndex.foldLeft(0) { (s, t) => - t._1 match { - case SqlTypeName.TINYINT => s + 1 - case SqlTypeName.SMALLINT => s + 2 - case SqlTypeName.INTEGER => s + 4 - case SqlTypeName.BIGINT => s + 8 - case SqlTypeName.BOOLEAN => s + 1 - case SqlTypeName.FLOAT => s + 4 - case SqlTypeName.DOUBLE => s + 8 - case SqlTypeName.VARCHAR => s + 12 - case SqlTypeName.CHAR => s + 1 - case SqlTypeName.DECIMAL => s + 12 - case typeName if SqlTypeName.YEAR_INTERVAL_TYPES.contains(typeName) => s + 8 - case typeName if SqlTypeName.DAY_INTERVAL_TYPES.contains(typeName) => s + 4 - case SqlTypeName.TIME | SqlTypeName.TIMESTAMP | SqlTypeName.DATE => s + 12 - case SqlTypeName.ROW => s + estimateRowSize(fieldList.get(t._2).getType()).asInstanceOf[Int] - case _ => throw TableException(s"Unsupported data type encountered: $t") - } + fieldList.map(_.getType).foldLeft(0.0) { (s, t) => + s + estimateDataTypeSize(t) } } + + private[flink] def estimateDataTypeSize(t: RelDataType): Double = t.getSqlTypeName match { + case SqlTypeName.TINYINT => 1 + case SqlTypeName.SMALLINT => 2 + case SqlTypeName.INTEGER => 4 + case SqlTypeName.BIGINT => 8 + case SqlTypeName.BOOLEAN => 1 + case SqlTypeName.FLOAT => 4 + case SqlTypeName.DOUBLE => 8 + case SqlTypeName.VARCHAR => 12 + case SqlTypeName.CHAR => 1 + case SqlTypeName.DECIMAL => 12 + case typeName if SqlTypeName.YEAR_INTERVAL_TYPES.contains(typeName) => 8 + case typeName if SqlTypeName.DAY_INTERVAL_TYPES.contains(typeName) => 4 + case SqlTypeName.TIME | SqlTypeName.TIMESTAMP | SqlTypeName.DATE => 12 + case SqlTypeName.ROW => estimateRowSize(t) + case SqlTypeName.ARRAY => + // 16 is an arbitrary estimate + estimateDataTypeSize(t.getComponentType) * 16 + case SqlTypeName.ANY => 128 // 128 is an arbitrary estimate + case _ => throw TableException(s"Unsupported data type encountered: $t") + } } http://git-wip-us.apache.org/repos/asf/flink/blob/6aa38ee2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala index 6c07c6e..be98a89 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala @@ -27,6 +27,23 @@ import org.junit.Test class SetOperatorsTest extends TableTestBase { @Test + def testMinusWithNestedTypes(): Unit = { + val util = batchTestUtil() + val t = util.addTable[(Long, (Int, String), Array[Boolean])]("MyTable", 'a, 'b, 'c) + + val expected = binaryNode( + "DataSetMinus", + batchTableNode(0), + batchTableNode(0), + term("minus", "a", "b", "c") + ) + + val result = t.minus(t) + + util.verifyTable(result, expected) + } + + @Test def testExists(): Unit = { val util = batchTestUtil() util.addTable[(Long, Int, String)]("A", 'a_long, 'a_int, 'a_string)
