[FLINK-4077] [tableAPI] Register Pojo DataSet/DataStream as Table with field references.
This closes #2107 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/efc344a4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/efc344a4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/efc344a4 Branch: refs/heads/master Commit: efc344a4e2ef8ea3e0b1e4da621196e9afeb75cc Parents: 298c009 Author: Fabian Hueske <fhue...@apache.org> Authored: Wed Jun 15 17:12:10 2016 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Fri Jun 17 00:19:26 2016 +0200 ---------------------------------------------------------------------- .../flink/api/table/TableEnvironment.scala | 19 +- .../flink/api/table/TableEnvironmentTest.scala | 289 +++++++++++++++++++ 2 files changed, 303 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/efc344a4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala index 7debb65..4d1bb1d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala @@ -124,8 +124,8 @@ abstract class TableEnvironment(val config: TableConfig) { * We use this method to replace a [[org.apache.flink.api.table.plan.schema.DataStreamTable]] * with a [[org.apache.calcite.schema.TranslatableTable]]. * - * @param name - * @param table + * @param name Name of the table to replace. + * @param table The table that replaces the previous table. */ protected def replaceRegisteredTable(name: String, table: AbstractTable): Unit = { @@ -230,7 +230,9 @@ abstract class TableEnvironment(val config: TableConfig) { * @tparam A The type of the TypeInformation. * @return A tuple of two arrays holding the field names and corresponding field positions. */ - protected def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = { + protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]): + (Array[String], Array[Int]) = + { val fieldNames: Array[String] = inputType match { case t: TupleTypeInfo[A] => t.getFieldNames case c: CaseClassTypeInfo[A] => c.getFieldNames @@ -251,7 +253,7 @@ abstract class TableEnvironment(val config: TableConfig) { * @tparam A The type of the TypeInformation. * @return A tuple of two arrays holding the field names and corresponding field positions. */ - protected def getFieldInfo[A]( + protected[flink] def getFieldInfo[A]( inputType: TypeInformation[A], exprs: Array[Expression]): (Array[String], Array[Int]) = { @@ -290,13 +292,20 @@ abstract class TableEnvironment(val config: TableConfig) { } case p: PojoTypeInfo[A] => exprs.map { + case (UnresolvedFieldReference(name)) => + val idx = p.getFieldIndex(name) + if (idx < 0) { + throw new TableException(s"$name is not a field of type $p") + } + (idx, name) case Alias(UnresolvedFieldReference(origName), name) => val idx = p.getFieldIndex(origName) if (idx < 0) { throw new TableException(s"$origName is not a field of type $p") } (idx, name) - case _ => throw new TableException("Alias on field reference expression expected.") + case _ => throw new TableException( + "Field reference expression or alias on field expression expected.") } case tpe => throw new TableException( s"Source of type $tpe cannot be converted into Table.") http://git-wip-us.apache.org/repos/asf/flink/blob/efc344a4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/TableEnvironmentTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/TableEnvironmentTest.scala new file mode 100644 index 0000000..263696b --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/TableEnvironmentTest.scala @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table + +import org.apache.flink.api.scala._ +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.{TypeExtractor, TupleTypeInfo} +import org.apache.flink.api.table.expressions.{Alias, UnresolvedFieldReference} +import org.apache.flink.api.table.sinks.TableSink +import org.junit.Test +import org.junit.Assert.assertEquals + +class TableEnvironmentTest { + + val tEnv = new MockTableEnvironment + + val tupleType = new TupleTypeInfo( + INT_TYPE_INFO, + STRING_TYPE_INFO, + DOUBLE_TYPE_INFO) + + val caseClassType = implicitly[TypeInformation[CClass]] + + val pojoType = TypeExtractor.createTypeInfo(classOf[PojoClass]) + + val atomicType = INT_TYPE_INFO + + @Test + def testGetFieldInfoTuple(): Unit = { + val fieldInfo = tEnv.getFieldInfo(tupleType) + + fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x => assertEquals(x._2, x._1)) + fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1)) + } + + @Test + def testGetFieldInfoCClass(): Unit = { + val fieldInfo = tEnv.getFieldInfo(caseClassType) + + fieldInfo._1.zip(Array("cf1", "cf2", "cf3")).foreach(x => assertEquals(x._2, x._1)) + fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1)) + } + + @Test + def testGetFieldInfoPojo(): Unit = { + val fieldInfo = tEnv.getFieldInfo(pojoType) + + fieldInfo._1.zip(Array("pf1", "pf2", "pf3")).foreach(x => assertEquals(x._2, x._1)) + fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1)) + } + + @Test(expected = classOf[TableException]) + def testGetFieldInfoAtomic(): Unit = { + tEnv.getFieldInfo(atomicType) + } + + @Test + def testGetFieldInfoTupleNames(): Unit = { + val fieldInfo = tEnv.getFieldInfo( + tupleType, + Array( + new UnresolvedFieldReference("name1"), + new UnresolvedFieldReference("name2"), + new UnresolvedFieldReference("name3") + )) + + fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) + fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1)) + } + + @Test + def testGetFieldInfoCClassNames(): Unit = { + val fieldInfo = tEnv.getFieldInfo( + caseClassType, + Array( + new UnresolvedFieldReference("name1"), + new UnresolvedFieldReference("name2"), + new UnresolvedFieldReference("name3") + )) + + fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) + fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1)) + } + + @Test(expected = classOf[TableException]) + def testGetFieldInfoPojoNames1(): Unit = { + tEnv.getFieldInfo( + pojoType, + Array( + new UnresolvedFieldReference("name1"), + new UnresolvedFieldReference("name2"), + new UnresolvedFieldReference("name3") + )) + } + + @Test + def testGetFieldInfoPojoNames2(): Unit = { + val fieldInfo = tEnv.getFieldInfo( + pojoType, + Array( + new UnresolvedFieldReference("pf3"), + new UnresolvedFieldReference("pf1"), + new UnresolvedFieldReference("pf2") + )) + + fieldInfo._1.zip(Array("pf3", "pf1", "pf2")).foreach(x => assertEquals(x._2, x._1)) + fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1)) + } + + @Test + def testGetFieldInfoAtomicName1(): Unit = { + val fieldInfo = tEnv.getFieldInfo( + atomicType, + Array(new UnresolvedFieldReference("name")) + ) + + fieldInfo._1.zip(Array("name")).foreach(x => assertEquals(x._2, x._1)) + fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1)) + } + + @Test(expected = classOf[TableException]) + def testGetFieldInfoAtomicName2(): Unit = { + tEnv.getFieldInfo( + atomicType, + Array( + new UnresolvedFieldReference("name1"), + new UnresolvedFieldReference("name2") + )) + } + + @Test + def testGetFieldInfoTupleAlias1(): Unit = { + val fieldInfo = tEnv.getFieldInfo( + tupleType, + Array( + new Alias(UnresolvedFieldReference("f0"), "name1"), + new Alias(UnresolvedFieldReference("f1"), "name2"), + new Alias(UnresolvedFieldReference("f2"), "name3") + )) + + fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) + fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1)) + } + + @Test + def testGetFieldInfoTupleAlias2(): Unit = { + val fieldInfo = tEnv.getFieldInfo( + tupleType, + Array( + new Alias(UnresolvedFieldReference("f2"), "name1"), + new Alias(UnresolvedFieldReference("f0"), "name2"), + new Alias(UnresolvedFieldReference("f1"), "name3") + )) + + fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) + fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1)) + } + + @Test(expected = classOf[TableException]) + def testGetFieldInfoTupleAlias3(): Unit = { + tEnv.getFieldInfo( + tupleType, + Array( + new Alias(UnresolvedFieldReference("xxx"), "name1"), + new Alias(UnresolvedFieldReference("yyy"), "name2"), + new Alias(UnresolvedFieldReference("zzz"), "name3") + )) + } + + @Test + def testGetFieldInfoCClassAlias1(): Unit = { + val fieldInfo = tEnv.getFieldInfo( + caseClassType, + Array( + new Alias(new UnresolvedFieldReference("cf1"), "name1"), + new Alias(new UnresolvedFieldReference("cf2"), "name2"), + new Alias(new UnresolvedFieldReference("cf3"), "name3") + )) + + fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) + fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1)) + } + + @Test + def testGetFieldInfoCClassAlias2(): Unit = { + val fieldInfo = tEnv.getFieldInfo( + caseClassType, + Array( + new Alias(new UnresolvedFieldReference("cf3"), "name1"), + new Alias(new UnresolvedFieldReference("cf1"), "name2"), + new Alias(new UnresolvedFieldReference("cf2"), "name3") + )) + + fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) + fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1)) + } + + @Test(expected = classOf[TableException]) + def testGetFieldInfoCClassAlias3(): Unit = { + tEnv.getFieldInfo( + caseClassType, + Array( + new Alias(new UnresolvedFieldReference("xxx"), "name1"), + new Alias(new UnresolvedFieldReference("yyy"), "name2"), + new Alias(new UnresolvedFieldReference("zzz"), "name3") + )) + } + + @Test + def testGetFieldInfoPojoAlias1(): Unit = { + val fieldInfo = tEnv.getFieldInfo( + pojoType, + Array( + new Alias(new UnresolvedFieldReference("pf1"), "name1"), + new Alias(new UnresolvedFieldReference("pf2"), "name2"), + new Alias(new UnresolvedFieldReference("pf3"), "name3") + )) + + fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) + fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1)) + } + + @Test + def testGetFieldInfoPojoAlias2(): Unit = { + val fieldInfo = tEnv.getFieldInfo( + pojoType, + Array( + new Alias(new UnresolvedFieldReference("pf3"), "name1"), + new Alias(new UnresolvedFieldReference("pf1"), "name2"), + new Alias(new UnresolvedFieldReference("pf2"), "name3") + )) + + fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) + fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1)) + } + + @Test(expected = classOf[TableException]) + def testGetFieldInfoPojoAlias3(): Unit = { + tEnv.getFieldInfo( + pojoType, + Array( + new Alias(new UnresolvedFieldReference("xxx"), "name1"), + new Alias(new UnresolvedFieldReference("yyy"), "name2"), + new Alias(new UnresolvedFieldReference("zzz"), "name3") + )) + } + + @Test(expected = classOf[TableException]) + def testGetFieldInfoAtomicAlias(): Unit = { + tEnv.getFieldInfo( + atomicType, + Array( + new Alias(new UnresolvedFieldReference("name1"), "name2") + )) + } + +} + +class MockTableEnvironment extends TableEnvironment(new TableConfig) { + + override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = ??? + + override protected def checkValidTableName(name: String): Unit = ??? + + override def sql(query: String): Table = ??? +} + +case class CClass(cf1: Int, cf2: String, cf3: Double) + +class PojoClass(var pf1: Int, var pf2: String, var pf3: Double) { + def this() = this(0, "", 0.0) +}