Repository: flink Updated Branches: refs/heads/master c2585c6c8 -> 1f1788619
[FLINK-4420] [table] Introduce star(*) to select all of the columns in the table This closes #2384. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1f178861 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1f178861 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1f178861 Branch: refs/heads/master Commit: 1f17886198eb67c8fdb53b96098ebb812029b3ba Parents: c2585c6 Author: Jark Wu <[email protected]> Authored: Thu Aug 18 13:08:01 2016 +0800 Committer: twalthr <[email protected]> Committed: Mon Aug 29 15:56:05 2016 +0200 ---------------------------------------------------------------------- docs/dev/table_api.md | 9 ++++ .../flink/api/table/TableEnvironment.scala | 10 ++++ .../table/expressions/ExpressionParser.scala | 3 +- .../api/table/expressions/fieldExpression.scala | 11 +++- .../api/table/plan/RexNodeTranslator.scala | 17 +++++++ .../api/table/plan/logical/operators.scala | 6 ++- .../org/apache/flink/api/table/table.scala | 8 +-- .../api/java/batch/table/SelectITCase.java | 23 +++++++++ .../api/scala/batch/table/SelectITCase.scala | 53 ++++++++++++++++++++ 9 files changed, 132 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1f178861/docs/dev/table_api.md ---------------------------------------------------------------------- diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index 7a20e6a..8ca602d 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -501,6 +501,10 @@ This section gives a brief overview of the available operators. You can find mor Table in = tableEnv.fromDataSet(ds, "a, b, c"); Table result = in.select("a, c as d"); {% endhighlight %} + <p>You can use star (<code>*</code>) to act as a wild card, selecting all of the columns in the table.</p> +{% highlight java %} +Table result = in.select("*"); +{% endhighlight %} </td> </tr> @@ -723,6 +727,11 @@ Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning w val in = ds.toTable(tableEnv, 'a, 'b, 'c); val result = in.select('a, 'c as 'd); {% endhighlight %} + <p>You can use star (<code>*</code>) to act as a wild card, selecting all of the columns in the table.</p> +{% highlight scala %} +val in = ds.toTable(tableEnv, 'a, 'b, 'c); +val result = in.select('*); +{% endhighlight %} </td> </tr> http://git-wip-us.apache.org/repos/asf/flink/blob/1f178861/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala index 8f61540..d7e650c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala @@ -265,6 +265,11 @@ abstract class TableEnvironment(val config: TableConfig) { throw new TableException(s"Type $tpe lacks explicit field naming") } val fieldIndexes = fieldNames.indices.toArray + + if (fieldNames.contains("*")) { + throw new ValidationException("Field name can not be '*'.") + } + (fieldNames, fieldIndexes) } @@ -336,6 +341,11 @@ abstract class TableEnvironment(val config: TableConfig) { } val (fieldIndexes, fieldNames) = indexedNames.unzip + + if (fieldNames.contains("*")) { + throw new ValidationException("Field name can not be '*'.") + } + (fieldNames.toArray, fieldIndexes.toArray) } http://git-wip-us.apache.org/repos/asf/flink/blob/1f178861/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala index c57d43b..cb92573 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala @@ -75,6 +75,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val MINUTE: Keyword = Keyword("minute") lazy val SECOND: Keyword = Keyword("second") lazy val MILLI: Keyword = Keyword("milli") + lazy val STAR: Keyword = Keyword("*") def functionIdent: ExpressionParser.Parser[String] = not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~ @@ -159,7 +160,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { stringLiteralFlink | singleQuoteStringLiteral | boolLiteral | nullLiteral - lazy val fieldReference: PackratParser[NamedExpression] = ident ^^ { + lazy val fieldReference: PackratParser[NamedExpression] = (STAR | ident) ^^ { sym => UnresolvedFieldReference(sym) } http://git-wip-us.apache.org/repos/asf/flink/blob/1f178861/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala index 0219d38..5f20751 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala @@ -22,7 +22,8 @@ import org.apache.calcite.tools.RelBuilder import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.table.UnresolvedException -import org.apache.flink.api.table.validate.{ExprValidationResult, ValidationFailure} +import org.apache.flink.api.table.validate.{ValidationSuccess, ExprValidationResult, +ValidationFailure} trait NamedExpression extends Expression { private[flink] def name: String @@ -91,6 +92,14 @@ case class Alias(child: Expression, name: String) UnresolvedFieldReference(name) } } + + override private[flink] def validateInput(): ExprValidationResult = { + if (name == "*") { + ValidationFailure("Alias can not accept '*' as name.") + } else { + ValidationSuccess + } + } } case class UnresolvedAlias(child: Expression) extends UnaryExpression with NamedExpression { http://git-wip-us.apache.org/repos/asf/flink/blob/1f178861/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala index ae8c7c3..eb40bba 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala @@ -20,6 +20,9 @@ package org.apache.flink.api.table.plan import org.apache.flink.api.table.TableEnvironment import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.plan.logical.LogicalNode + +import scala.collection.mutable.ListBuffer object RexNodeTranslator { @@ -68,4 +71,18 @@ object RexNodeTranslator { (e.makeCopy(newArgs.map(_._1).toArray), newArgs.flatMap(_._2).toList) } } + + /** + * Parses all input expressions to [[UnresolvedAlias]]. + * And expands star to parent's full project list. + */ + def expandProjectList(exprs: Seq[Expression], parent: LogicalNode): Seq[NamedExpression] = { + val projectList = new ListBuffer[NamedExpression] + exprs.foreach { + case n: UnresolvedFieldReference if n.name == "*" => + projectList ++= parent.output.map(UnresolvedAlias(_)) + case e: Expression => projectList += UnresolvedAlias(e) + } + projectList + } } http://git-wip-us.apache.org/repos/asf/flink/blob/1f178861/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala index c33efd0..ccdab85 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala @@ -62,14 +62,14 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend case n: Alias => // explicit name if (names.contains(n.name)) { - throw ValidationException(s"Duplicate field name $n.name.") + throw ValidationException(s"Duplicate field name ${n.name}.") } else { names.add(n.name) } case r: ResolvedFieldReference => // simple field forwarding if (names.contains(r.name)) { - throw ValidationException(s"Duplicate field name $r.name.") + throw ValidationException(s"Duplicate field name ${r.name}.") } else { names.add(r.name) } @@ -109,6 +109,8 @@ case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) extends Una failValidation("Aliasing more fields than we actually have") } else if (!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) { failValidation("Alias only accept name expressions as arguments") + } else if (!aliasList.forall(_.asInstanceOf[UnresolvedFieldReference].name != "*")) { + failValidation("Alias can not accept '*' as name") } else { val names = aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name) val input = child.output http://git-wip-us.apache.org/repos/asf/flink/blob/1f178861/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala index bfabd32..9d96780 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala @@ -21,7 +21,7 @@ import org.apache.calcite.rel.RelNode import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.operators.join.JoinType import org.apache.flink.api.table.expressions.{Asc, ExpressionParser, UnresolvedAlias, Expression, Ordering} -import org.apache.flink.api.table.plan.RexNodeTranslator.extractAggregations +import org.apache.flink.api.table.plan.RexNodeTranslator._ import org.apache.flink.api.table.plan.logical._ import org.apache.flink.api.table.sinks.TableSink @@ -78,14 +78,14 @@ class Table( def select(fields: Expression*): Table = { val projectionOnAggregates = fields.map(extractAggregations(_, tableEnv)) val aggregations = projectionOnAggregates.flatMap(_._2) + val projectList = expandProjectList(projectionOnAggregates.map(_._1), logicalPlan) if (aggregations.nonEmpty) { new Table(tableEnv, - Project(projectionOnAggregates.map(e => UnresolvedAlias(e._1)), + Project(projectList, Aggregate(Nil, aggregations, logicalPlan).validate(tableEnv)).validate(tableEnv)) } else { new Table(tableEnv, - Project( - projectionOnAggregates.map(e => UnresolvedAlias(e._1)), logicalPlan).validate(tableEnv)) + Project(projectList, logicalPlan).validate(tableEnv)) } } http://git-wip-us.apache.org/repos/asf/flink/blob/1f178861/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java index e48914c..581c8ed 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java @@ -127,4 +127,27 @@ public class SelectITCase extends TableProgramsTestBase { // Must fail. Field foo does not exist .select("a + 1 as foo, b + 2 as foo"); } + + @Test + public void testSelectStar() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + Table in = tableEnv.fromDataSet(ds, "a,b,c"); + + Table result = in + .select("*"); + + DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); + List<Row> results = resultSet.collect(); + String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + + "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + + "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + + "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" + + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" + + "20,6,Comment#14\n" + "21,6,Comment#15\n"; + compareResultAsText(results, expected); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/1f178861/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SelectITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SelectITCase.scala index 9aed5a7..1143afd 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SelectITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SelectITCase.scala @@ -26,6 +26,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException} import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils +import org.junit.Assert._ import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -134,4 +135,56 @@ class SelectITCase( .select('a, 'b as 'a).toDataSet[Row].print() } + @Test + def testSelectStar(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*) + + val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + + "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + + "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + + "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + + "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + val results = t.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testAliasStarException(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + try { + CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, '*, 'b, 'c) + fail("ValidationException expected") + } catch { + case _: ValidationException => //ignore + } + + try { + CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) + .select('_1 as '*, '_2 as 'b, '_1 as 'c) + fail("ValidationException expected") + } catch { + case _: ValidationException => //ignore + } + + try { + CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('*, 'b, 'c) + fail("ValidationException expected") + } catch { + case _: ValidationException => //ignore + } + + try { + CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*, 'b) + fail("ValidationException expected") + } catch { + case _: ValidationException => //ignore + } + } + }
