http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala deleted file mode 100644 index 2cbd8fa..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.parser - -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} -import org.apache.flink.api.table.ExpressionException -import org.apache.flink.api.table.plan.As -import org.apache.flink.api.table.expressions._ - -import scala.util.parsing.combinator.{PackratParsers, JavaTokenParsers} - -/** - * Parser for expressions inside a String. This parses exactly the same expressions that - * would be accepted by the Scala Expression DSL. - * - * See [[org.apache.flink.api.scala.table.ImplicitExpressionConversions]] and - * [[org.apache.flink.api.scala.table.ImplicitExpressionOperations]] for the constructs - * available in the Scala Expression DSL. This parser must be kept in sync with the Scala DSL - * lazy valined in the above files. - */ -object ExpressionParser extends JavaTokenParsers with PackratParsers { - case class Keyword(key: String) - - // Convert the keyword into an case insensitive Parser - implicit def keyword2Parser(kw: Keyword): Parser[String] = { - ("""(?i)\Q""" + kw.key + """\E""").r - } - - // KeyWord - - lazy val AS: Keyword = Keyword("as") - lazy val COUNT: Keyword = Keyword("count") - lazy val AVG: Keyword = Keyword("avg") - lazy val MIN: Keyword = Keyword("min") - lazy val MAX: Keyword = Keyword("max") - lazy val SUM: Keyword = Keyword("sum") - - // Literals - - lazy val numberLiteral: PackratParser[Expression] = - ((wholeNumber <~ ("L" | "l")) | floatingPointNumber | decimalNumber | wholeNumber) ^^ { - str => - if (str.endsWith("L") || str.endsWith("l")) { - Literal(str.toLong) - } else if (str.matches("""-?\d+""")) { - Literal(str.toInt) - } else if (str.endsWith("f") | str.endsWith("F")) { - Literal(str.toFloat) - } else { - Literal(str.toDouble) - } - } - - lazy val singleQuoteStringLiteral: Parser[Expression] = - ("'" + """([^'\p{Cntrl}\\]|\\[\\'"bfnrt]|\\u[a-fA-F0-9]{4})*""" + "'").r ^^ { - str => Literal(str.substring(1, str.length - 1)) - } - - lazy val stringLiteralFlink: PackratParser[Expression] = super.stringLiteral ^^ { - str => Literal(str.substring(1, str.length - 1)) - } - - lazy val boolLiteral: PackratParser[Expression] = ("true" | "false") ^^ { - str => Literal(str.toBoolean) - } - - lazy val literalExpr: PackratParser[Expression] = - numberLiteral | - stringLiteralFlink | singleQuoteStringLiteral | - boolLiteral - - lazy val fieldReference: PackratParser[Expression] = ident ^^ { - case sym => UnresolvedFieldReference(sym) - } - - lazy val atom: PackratParser[Expression] = - ( "(" ~> expression <~ ")" ) | literalExpr | fieldReference - - // suffix ops - lazy val isNull: PackratParser[Expression] = atom <~ ".isNull" ^^ { e => IsNull(e) } - lazy val isNotNull: PackratParser[Expression] = atom <~ ".isNotNull" ^^ { e => IsNotNull(e) } - - lazy val abs: PackratParser[Expression] = atom <~ ".abs" ^^ { e => Abs(e) } - - lazy val sum: PackratParser[Expression] = - (atom <~ ".sum" ^^ { e => Sum(e) }) | (SUM ~ "(" ~> atom <~ ")" ^^ { e => Sum(e) }) - lazy val min: PackratParser[Expression] = - (atom <~ ".min" ^^ { e => Min(e) }) | (MIN ~ "(" ~> atom <~ ")" ^^ { e => Min(e) }) - lazy val max: PackratParser[Expression] = - (atom <~ ".max" ^^ { e => Max(e) }) | (MAX ~ "(" ~> atom <~ ")" ^^ { e => Max(e) }) - lazy val count: PackratParser[Expression] = - (atom <~ ".count" ^^ { e => Count(e) }) | (COUNT ~ "(" ~> atom <~ ")" ^^ { e => Count(e) }) - lazy val avg: PackratParser[Expression] = - (atom <~ ".avg" ^^ { e => Avg(e) }) | (AVG ~ "(" ~> atom <~ ")" ^^ { e => Avg(e) }) - - lazy val cast: PackratParser[Expression] = - atom <~ ".cast(BYTE)" ^^ { e => Cast(e, BasicTypeInfo.BYTE_TYPE_INFO) } | - atom <~ ".cast(SHORT)" ^^ { e => Cast(e, BasicTypeInfo.SHORT_TYPE_INFO) } | - atom <~ ".cast(INT)" ^^ { e => Cast(e, BasicTypeInfo.INT_TYPE_INFO) } | - atom <~ ".cast(LONG)" ^^ { e => Cast(e, BasicTypeInfo.LONG_TYPE_INFO) } | - atom <~ ".cast(FLOAT)" ^^ { e => Cast(e, BasicTypeInfo.FLOAT_TYPE_INFO) } | - atom <~ ".cast(DOUBLE)" ^^ { e => Cast(e, BasicTypeInfo.DOUBLE_TYPE_INFO) } | - atom <~ ".cast(BOOL)" ^^ { e => Cast(e, BasicTypeInfo.BOOLEAN_TYPE_INFO) } | - atom <~ ".cast(BOOLEAN)" ^^ { e => Cast(e, BasicTypeInfo.BOOLEAN_TYPE_INFO) } | - atom <~ ".cast(STRING)" ^^ { e => Cast(e, BasicTypeInfo.STRING_TYPE_INFO) } | - atom <~ ".cast(DATE)" ^^ { e => Cast(e, BasicTypeInfo.DATE_TYPE_INFO) } - - lazy val as: PackratParser[Expression] = atom ~ ".as(" ~ fieldReference ~ ")" ^^ { - case e ~ _ ~ as ~ _ => Naming(e, as.name) - } - - lazy val substring: PackratParser[Expression] = - atom ~ ".substring(" ~ expression ~ "," ~ expression ~ ")" ^^ { - case e ~ _ ~ from ~ _ ~ to ~ _ => Substring(e, from, to) - - } - - lazy val substringWithoutEnd: PackratParser[Expression] = - atom ~ ".substring(" ~ expression ~ ")" ^^ { - case e ~ _ ~ from ~ _ => Substring(e, from, Literal(Integer.MAX_VALUE)) - - } - - lazy val suffix = - isNull | isNotNull | - abs | sum | min | max | count | avg | cast | - substring | substringWithoutEnd | atom - - - // unary ops - - lazy val unaryNot: PackratParser[Expression] = "!" ~> suffix ^^ { e => Not(e) } - - lazy val unaryMinus: PackratParser[Expression] = "-" ~> suffix ^^ { e => UnaryMinus(e) } - - lazy val unaryBitwiseNot: PackratParser[Expression] = "~" ~> suffix ^^ { e => BitwiseNot(e) } - - lazy val unary = unaryNot | unaryMinus | unaryBitwiseNot | suffix - - // binary bitwise opts - - lazy val binaryBitwise = unary * ( - "&" ^^^ { (a:Expression, b:Expression) => BitwiseAnd(a,b) } | - "|" ^^^ { (a:Expression, b:Expression) => BitwiseOr(a,b) } | - "^" ^^^ { (a:Expression, b:Expression) => BitwiseXor(a,b) } ) - - // arithmetic - - lazy val product = binaryBitwise * ( - "*" ^^^ { (a:Expression, b:Expression) => Mul(a,b) } | - "/" ^^^ { (a:Expression, b:Expression) => Div(a,b) } | - "%" ^^^ { (a:Expression, b:Expression) => Mod(a,b) } ) - - lazy val term = product * ( - "+" ^^^ { (a:Expression, b:Expression) => Plus(a,b) } | - "-" ^^^ { (a:Expression, b:Expression) => Minus(a,b) } ) - - // Comparison - - lazy val equalTo: PackratParser[Expression] = term ~ ("===" | "=") ~ term ^^ { - case l ~ _ ~ r => EqualTo(l, r) - } - - lazy val notEqualTo: PackratParser[Expression] = term ~ ("!==" | "!=" | "<>") ~ term ^^ { - case l ~ _ ~ r => NotEqualTo(l, r) - } - - lazy val greaterThan: PackratParser[Expression] = term ~ ">" ~ term ^^ { - case l ~ _ ~ r => GreaterThan(l, r) - } - - lazy val greaterThanOrEqual: PackratParser[Expression] = term ~ ">=" ~ term ^^ { - case l ~ _ ~ r => GreaterThanOrEqual(l, r) - } - - lazy val lessThan: PackratParser[Expression] = term ~ "<" ~ term ^^ { - case l ~ _ ~ r => LessThan(l, r) - } - - lazy val lessThanOrEqual: PackratParser[Expression] = term ~ "<=" ~ term ^^ { - case l ~ _ ~ r => LessThanOrEqual(l, r) - } - - lazy val comparison: PackratParser[Expression] = - equalTo | notEqualTo | - greaterThan | greaterThanOrEqual | - lessThan | lessThanOrEqual | term - - // logic - - lazy val logic = comparison * ( - "&&" ^^^ { (a:Expression, b:Expression) => And(a,b) } | - "||" ^^^ { (a:Expression, b:Expression) => Or(a,b) } ) - - // alias - - lazy val alias: PackratParser[Expression] = logic ~ AS ~ fieldReference ^^ { - case e ~ _ ~ name => Naming(e, name.name) - } | logic - - lazy val expression: PackratParser[Expression] = alias - - lazy val expressionList: Parser[List[Expression]] = rep1sep(expression, ",") - - def parseExpressionList(expression: String): List[Expression] = { - parseAll(expressionList, expression) match { - case Success(lst, _) => lst - - case Failure(msg, _) => throw new ExpressionException("Could not parse expression: " + msg) - - case Error(msg, _) => throw new ExpressionException("Could not parse expression: " + msg) - } - } - - def parseExpression(exprString: String): Expression = { - parseAll(expression, exprString) match { - case Success(lst, _) => lst - - case fail => - throw new ExpressionException("Could not parse expression: " + fail.toString) - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/ExpandAggregations.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/ExpandAggregations.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/ExpandAggregations.scala deleted file mode 100644 index 2e09f39..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/ExpandAggregations.scala +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.plan - -import org.apache.flink.api.table.expressions.analysis.SelectionAnalyzer -import org.apache.flink.api.table.expressions._ -import org.apache.flink.api.java.aggregation.Aggregations - -import scala.collection.mutable - -/** - * This is used to expand a [[Select]] that contains aggregations. If it is called on a [[Select]] - * without aggregations it is simply returned. - * - * This select: - * {{{ - * in.select('key, 'value.avg) - * }}} - * - * is transformed to this expansion: - * {{{ - * in - * .select('key, 'value, Literal(1) as 'intermediate.1) - * .aggregate('value.sum, 'intermediate.1.sum) - * .select('key, 'value / 'intermediate.1) - * }}} - * - * If the input of the [[Select]] is a [[GroupBy]] this is preserved before the aggregation. - */ -object ExpandAggregations { - def apply(select: Select): PlanNode = select match { - case Select(input, selection) => - - val aggregations = mutable.HashMap[(Expression, Aggregations), String]() - val intermediateFields = mutable.HashSet[Expression]() - val aggregationIntermediates = mutable.HashMap[Aggregation, Seq[Expression]]() - - var intermediateCount = 0 - var resultCount = 0 - selection foreach { f => - f.transformPre { - case agg: Aggregation => - val intermediateReferences = agg.getIntermediateFields.zip(agg.getAggregations) map { - case (expr, basicAgg) => - resultCount += 1 - val resultName = s"result.$resultCount" - aggregations.get((expr, basicAgg)) match { - case Some(intermediateName) => - Naming(ResolvedFieldReference(intermediateName, expr.typeInfo), resultName) - case None => - intermediateCount = intermediateCount + 1 - val intermediateName = s"intermediate.$intermediateCount" - intermediateFields += Naming(expr, intermediateName) - aggregations((expr, basicAgg)) = intermediateName - Naming(ResolvedFieldReference(intermediateName, expr.typeInfo), resultName) - } - } - - aggregationIntermediates(agg) = intermediateReferences - // Return a NOP so that we don't add the children of the aggregation - // to intermediate fields. We already added the necessary fields to the list - // of intermediate fields. - NopExpression() - - case fa: ResolvedFieldReference => - if (!fa.name.startsWith("intermediate")) { - intermediateFields += Naming(fa, fa.name) - } - fa - } - } - - if (aggregations.isEmpty) { - // no aggregations, just return - return select - } - - // also add the grouping keys to the set of intermediate fields, because we use a Set, - // they are only added when not already present - input match { - case GroupBy(_, groupingFields) => - groupingFields foreach { - case fa: ResolvedFieldReference => - intermediateFields += Naming(fa, fa.name) - } - case _ => // Nothing to add - } - - val basicAggregations = aggregations.map { - case ((expr, basicAgg), fieldName) => - (fieldName, basicAgg) - } - - val finalFields = selection.map { f => - f.transformPre { - case agg: Aggregation => - val intermediates = aggregationIntermediates(agg) - agg.getFinalField(intermediates) - } - } - - val intermediateAnalyzer = new SelectionAnalyzer(input.outputFields) - val analyzedIntermediates = intermediateFields.toSeq.map(intermediateAnalyzer.analyze) - - val finalAnalyzer = - new SelectionAnalyzer(analyzedIntermediates.map(e => (e.name, e.typeInfo))) - val analyzedFinals = finalFields.map(finalAnalyzer.analyze) - - val result = input match { - case GroupBy(groupByInput, groupingFields) => - Select( - Aggregate( - GroupBy( - Select(groupByInput, analyzedIntermediates), - groupingFields), - basicAggregations.toSeq), - analyzedFinals) - - case _ => - Select( - Aggregate( - Select(input, analyzedIntermediates), - basicAggregations.toSeq), - analyzedFinals) - - } - - result - - case _ => select - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala deleted file mode 100644 index ba8aba4..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.plan - -import java.lang.reflect.Modifier - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.table.parser.ExpressionParser -import org.apache.flink.api.table.expressions.{Expression, Naming, ResolvedFieldReference, UnresolvedFieldReference} -import org.apache.flink.api.table.typeinfo.RowTypeInfo -import org.apache.flink.api.table.{ExpressionException, Table} - -import scala.language.reflectiveCalls - -/** - * Base class for translators that transform the logical plan in a [[Table]] to an executable - * Flink plan and also for creating a [[Table]] from a DataSet or DataStream. - */ -abstract class PlanTranslator { - - type Representation[A] <: { def getType(): TypeInformation[A] } - - /** - * Translates the given Table API [[PlanNode]] back to the underlying representation, i.e, - * a DataSet or a DataStream. - */ - def translate[A](op: PlanNode)(implicit tpe: TypeInformation[A]): Representation[A] - - /** - * Creates a [[Table]] from a DataSet or a DataStream (the underlying representation). - */ - def createTable[A]( - repr: Representation[A], - inputType: CompositeType[A], - expressions: Array[Expression], - resultFields: Seq[(String, TypeInformation[_])]): Table - - /** - * Creates a [[Table]] from the given DataSet or DataStream. - */ - def createTable[A](repr: Representation[A]): Table = { - - val fields = repr.getType() match { - case c: CompositeType[A] => c.getFieldNames.map(UnresolvedFieldReference) - - case tpe => Array() // createTable will throw an exception for this later - } - createTable( - repr, - fields.toArray.asInstanceOf[Array[Expression]], - checkDeterministicFields = false) - } - - /** - * Creates a [[Table]] from the given DataSet or DataStream while only taking those - * fields mentioned in the field expression. - */ - def createTable[A](repr: Representation[A], expression: String): Table = { - - val fields = ExpressionParser.parseExpressionList(expression) - - createTable(repr, fields.toArray, checkDeterministicFields = true) - } - - /** - * Creates a [[Table]] from the given DataSet or DataStream while only taking those - * fields mentioned in the fields parameter. - * - * When checkDeterministicFields is true check whether the fields of the underlying - * [[TypeInformation]] have a deterministic ordering. This is only the case for Tuples - * and Case classes. For a POJO, the field order is not obvious, this can lead to problems - * when a user renames fields and assumes a certain ordering. - */ - def createTable[A]( - repr: Representation[A], - fields: Array[Expression], - checkDeterministicFields: Boolean = true): Table = { - - // shortcut for DataSet[Row] or DataStream[Row] - repr.getType() match { - case rowTypeInfo: RowTypeInfo => - val expressions = rowTypeInfo.getFieldNames map { - name => (name, rowTypeInfo.getTypeAt(name)) - } - new Table( - Root(repr, expressions)) - - case c: CompositeType[A] => // us ok - - case tpe => throw new ExpressionException("Only DataSets or DataStreams of composite type" + - "can be transformed to a Table. These would be tuples, case classes and " + - "POJOs. Type is: " + tpe) - - } - - val clazz = repr.getType().getTypeClass - if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) - || clazz.getCanonicalName() == null) { - throw new ExpressionException("Cannot create Table from DataSet or DataStream of type " + - clazz.getName + ". Only top-level classes or static members classes " + - " are supported.") - } - - val inputType = repr.getType().asInstanceOf[CompositeType[A]] - - if (!inputType.hasDeterministicFieldOrder && checkDeterministicFields) { - throw new ExpressionException(s"You cannot rename fields upon Table creation: " + - s"Field order of input type $inputType is not deterministic." ) - } - - if (fields.length != inputType.getFieldNames.length) { - throw new ExpressionException("Number of selected fields: '" + fields.mkString(",") + - "' and number of fields in input type " + inputType + " do not match.") - } - - val newFieldNames = fields map { - case UnresolvedFieldReference(name) => name - case e => - throw new ExpressionException("Only field references allowed in 'as' operation, " + - " offending expression: " + e) - } - - if (newFieldNames.toSet.size != newFieldNames.size) { - throw new ExpressionException(s"Ambiguous field names in ${fields.mkString(", ")}") - } - - val resultFields: Seq[(String, TypeInformation[_])] = newFieldNames.zipWithIndex map { - case (name, index) => (name, inputType.getTypeAt(index)) - } - - val inputFields = inputType.getFieldNames - val fieldMappings = inputFields.zip(resultFields) - val expressions: Array[Expression] = fieldMappings map { - case (oldName, (newName, tpe)) => Naming(ResolvedFieldReference(oldName, tpe), newName) - } - - createTable(repr, inputType, expressions, resultFields) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala deleted file mode 100644 index 7ec34d7..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.plan - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.aggregation.Aggregations -import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.api.table.trees.TreeNode - -/** - * Base class for all Table API operations. - */ -sealed abstract class PlanNode extends TreeNode[PlanNode] { self: Product => - def outputFields: Seq[(String, TypeInformation[_])] -} - -/** - * Operation that transforms a [[org.apache.flink.api.scala.DataSet]] or - * [[org.apache.flink.streaming.api.scala.DataStream]] into a [[org.apache.flink.api.table.Table]]. - */ -case class Root[T](input: T, outputFields: Seq[(String, TypeInformation[_])]) extends PlanNode { - val children = Nil - override def toString = s"Root($outputFields)" -} - -/** - * Operation that joins two [[org.apache.flink.api.table.Table]]s. A "filter" and a "select" - * should be applied after a join operation. - */ -case class Join(left: PlanNode, right: PlanNode) extends PlanNode { - - val children = Seq(left, right) - - def outputFields = left.outputFields ++ right.outputFields - - override def toString = s"Join($left, $right)" -} - -/** - * Operation that filters out elements that do not match the predicate expression. - */ -case class Filter(input: PlanNode, predicate: Expression) extends PlanNode { - - val children = Seq(input) - - def outputFields = input.outputFields - - override def toString = s"Filter($input, $predicate)" -} - -/** - * Selection expression. Similar to an SQL SELECT statement. The expressions can select fields - * and perform arithmetic or logic operations. The expressions can also perform aggregates - * on fields. - */ -case class Select(input: PlanNode, selection: Seq[Expression]) extends PlanNode { - - val children = Seq(input) - - def outputFields = selection.toSeq map { e => (e.name, e.typeInfo) } - - override def toString = s"Select($input, ${selection.mkString(",")})" -} - -/** - * Operation that gives new names to fields. Use this to disambiguate fields before a join - * operation. - */ -case class As(input: PlanNode, names: Seq[String]) extends PlanNode { - - val children = Seq(input) - - val outputFields = input.outputFields.zip(names) map { - case ((_, tpe), newName) => (newName, tpe) - } - - override def toString = s"As($input, ${names.mkString(",")})" -} - -/** - * Grouping operation. Keys are specified using field references. A group by operation os only - * useful when performing a select with aggregates afterwards. - * @param input - * @param fields - */ -case class GroupBy(input: PlanNode, fields: Seq[Expression]) extends PlanNode { - - val children = Seq(input) - - def outputFields = input.outputFields - - override def toString = s"GroupBy($input, ${fields.mkString(",")})" -} - -/** - * Internal operation. Selection operations containing aggregates are expanded to an [[Aggregate]] - * and a simple [[Select]]. - */ -case class Aggregate( - input: PlanNode, - aggregations: Seq[(String, Aggregations)]) extends PlanNode { - - val children = Seq(input) - - def outputFields = input.outputFields - - override def toString = s"Aggregate($input, ${aggregations.mkString(",")})" -} - -/** - * UnionAll operation, union all elements from left and right. - */ -case class UnionAll(left: PlanNode, right: PlanNode) extends PlanNode{ - val children = Seq(left, right) - - def outputFields = left.outputFields - - override def toString = s"Union($left, $right)" -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala deleted file mode 100644 index a598483..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table - -/** - * The operations in this package are created by calling methods on [[Table]] they - * should not be manually created by users of the API. - */ -package object plan http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala deleted file mode 100644 index 932f9df..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.runtime - -import org.apache.flink.api.table.Row -import org.apache.flink.api.common.functions.RichGroupReduceFunction -import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable -import org.apache.flink.api.java.aggregation.AggregationFunction -import org.apache.flink.configuration.Configuration -import org.apache.flink.util.Collector - -@Combinable -class ExpressionAggregateFunction( - private val fieldPositions: Seq[Int], - private val functions: Seq[AggregationFunction[Any]]) - extends RichGroupReduceFunction[Row, Row] { - - override def open(conf: Configuration): Unit = { - var i = 0 - val len = functions.length - while (i < len) { - functions(i).initializeAggregate() - i += 1 - } - } - - override def reduce(in: java.lang.Iterable[Row], out: Collector[Row]): Unit = { - - val fieldPositions = this.fieldPositions - val functions = this.functions - - var current: Row = null - - val values = in.iterator() - while (values.hasNext) { - current = values.next() - - var i = 0 - val len = functions.length - while (i < len) { - functions(i).aggregate(current.productElement(fieldPositions(i))) - i += 1 - } - } - - var i = 0 - val len = functions.length - while (i < len) { - current.setField(fieldPositions(i), functions(i).getAggregate) - functions(i).initializeAggregate() - i += 1 - } - - out.collect(current) - } - -} - -@Combinable -class NoExpressionAggregateFunction() extends RichGroupReduceFunction[Row, Row] { - - override def reduce(in: java.lang.Iterable[Row], out: Collector[Row]): Unit = { - - var first: Row = null - - val values = in.iterator() - if (values.hasNext) { - first = values.next() - } - - out.collect(first) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala deleted file mode 100644 index 4e50272..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.runtime - -import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction} -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.table.TableConfig -import org.apache.flink.api.table.codegen.GenerateFilter -import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.configuration.Configuration - -/** - * Proxy function that takes an expression predicate. This is compiled - * upon runtime and calls to [[filter()]] are forwarded to the compiled code. - */ -class ExpressionFilterFunction[T]( - predicate: Expression, - inputType: CompositeType[T], - config: TableConfig = TableConfig.DEFAULT) extends RichFilterFunction[T] { - - var compiledFilter: FilterFunction[T] = null - - override def open(c: Configuration): Unit = { - if (compiledFilter == null) { - val codegen = new GenerateFilter[T]( - inputType, - predicate, - getRuntimeContext.getUserCodeClassLoader, - config) - compiledFilter = codegen.generate() - } - } - - override def filter(in: T) = compiledFilter.filter(in) -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala deleted file mode 100644 index cf2c90f..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.runtime - -import org.apache.flink.api.common.functions.{FlatJoinFunction, RichFlatJoinFunction} -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.table.TableConfig -import org.apache.flink.api.table.codegen.GenerateJoin -import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.configuration.Configuration -import org.apache.flink.util.Collector - -/** - * Proxy function that takes an expression predicate and output fields. These are compiled - * upon runtime and calls to [[join()]] are forwarded to the compiled code. - */ -class ExpressionJoinFunction[L, R, O]( - predicate: Expression, - leftType: CompositeType[L], - rightType: CompositeType[R], - resultType: CompositeType[O], - outputFields: Seq[Expression], - config: TableConfig = TableConfig.DEFAULT) extends RichFlatJoinFunction[L, R, O] { - - var compiledJoin: FlatJoinFunction[L, R, O] = null - - override def open(c: Configuration): Unit = { - val codegen = new GenerateJoin[L, R, O]( - leftType, - rightType, - resultType, - predicate, - outputFields, - getRuntimeContext.getUserCodeClassLoader, - config) - compiledJoin = codegen.generate() - } - - def join(left: L, right: R, out: Collector[O]) = { - compiledJoin.join(left, right, out) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala deleted file mode 100644 index ab7adb1..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.runtime - -import org.apache.flink.api.table.TableConfig -import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.table.codegen.GenerateSelect -import org.apache.flink.configuration.Configuration - -/** - * Proxy function that takes expressions. These are compiled - * upon runtime and calls to [[map()]] are forwarded to the compiled code. - */ -class ExpressionSelectFunction[I, O]( - inputType: CompositeType[I], - resultType: CompositeType[O], - outputFields: Seq[Expression], - config: TableConfig = TableConfig.DEFAULT) extends RichMapFunction[I, O] { - - var compiledSelect: MapFunction[I, O] = null - - override def open(c: Configuration): Unit = { - - if (compiledSelect == null) { - val resultCodegen = new GenerateSelect[I, O]( - inputType, - resultType, - outputFields, - getRuntimeContext.getUserCodeClassLoader, - config) - - compiledSelect = resultCodegen.generate() - } - } - - def map(in: I): O = { - compiledSelect.map(in) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala deleted file mode 100644 index a1bc4b7..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table - -/** - * The functions in this package are used transforming Table API operations to Java API operations. - */ -package object runtime http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala deleted file mode 100644 index 87051cf..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.trees - -/** - * Base class for tree analyzers/transformers. Analyzers must implement method `rules` to - * provide the chain of rules that are invoked one after another. The tree resulting - * from one rule is fed into the next rule and the final result is returned from method `analyze`. - */ -abstract class Analyzer[A <: TreeNode[A]] { - - def rules: Seq[Rule[A]] - - final def analyze(expr: A): A = { - var currentTree = expr - for (rule <- rules) { - var running = true - while (running) { - val newTree = rule(currentTree) - if (newTree fastEquals currentTree) { - running = false - } - currentTree = newTree - } - } - currentTree - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala deleted file mode 100644 index b8a27cb..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.trees - -/** - * Base class for a rule that is part of an [[Analyzer]] rule chain. Method `rule` gets a tree - * and must return a tree. The returned tree can also be the input tree. In an [[Analyzer]] - * rule chain the result tree of one [[Rule]] is fed into the next [[Rule]] in the chain. - * - * A [[Rule]] is repeatedly applied to a tree until the tree does not change between - * rule applications. - */ -abstract class Rule[A <: TreeNode[A]] { - def apply(expr: A): A -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala deleted file mode 100644 index 84f1d7e..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.trees - -/** - * Generic base class for trees that can be transformed and traversed. - */ -abstract class TreeNode[A <: TreeNode[A]] { self: A with Product => - - /** - * List of child nodes that should be considered when doing transformations. Other values - * in the Product will not be transformed, only handed through. - */ - def children: Seq[A] - - /** - * Tests for equality by first testing for reference equality. - */ - def fastEquals(other: TreeNode[_]): Boolean = this.eq(other) || this == other - - def transformPre(rule: PartialFunction[A, A]): A = { - val afterTransform = rule.applyOrElse(this, identity[A]) - - if (afterTransform fastEquals this) { - this.transformChildrenPre(rule) - } else { - afterTransform.transformChildrenPre(rule) - } - } - - def transformChildrenPre(rule: PartialFunction[A, A]): A = { - var changed = false - val newArgs = productIterator map { - case child: A if children.contains(child) => - val newChild = child.transformPre(rule) - if (newChild fastEquals child) { - child - } else { - changed = true - newChild - } - case other: AnyRef => other - case null => null - } toArray - - if (changed) makeCopy(newArgs) else this - } - - def transformPost(rule: PartialFunction[A, A]): A = { - val afterChildren = transformChildrenPost(rule) - if (afterChildren fastEquals this) { - rule.applyOrElse(this, identity[A]) - } else { - rule.applyOrElse(afterChildren, identity[A]) - } - } - - def transformChildrenPost(rule: PartialFunction[A, A]): A = { - var changed = false - val newArgs = productIterator map { - case child: A if children.contains(child) => - val newChild = child.transformPost(rule) - if (newChild fastEquals child) { - child - } else { - changed = true - newChild - } - case other: AnyRef => other - case null => null - } toArray - // toArray forces evaluation, toSeq does not seem to work here - - if (changed) makeCopy(newArgs) else this - } - - def exists(predicate: A => Boolean): Boolean = { - var exists = false - this.transformPre { - case e: A => if (predicate(e)) { - exists = true - } - e - } - exists - } - - /** - * Creates a new copy of this expression with new children. This is used during transformation - * if children change. This must be overridden by tree nodes that don't have the Constructor - * arguments in the same order as the `children`. - */ - def makeCopy(newArgs: Seq[AnyRef]): this.type = { - val defaultCtor = - this.getClass.getConstructors.find { _.getParameterTypes.size > 0}.head - try { - defaultCtor.newInstance(newArgs.toArray: _*).asInstanceOf[this.type] - } catch { - case iae: IllegalArgumentException => - println("IAE " + this) - throw new RuntimeException("Should never happen.") - } - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala deleted file mode 100644 index 3b5459b..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.typeinfo - -import org.apache.flink.api.common.operators.Operator -import org.apache.flink.api.java.operators.SingleInputOperator -import org.apache.flink.api.java.{DataSet => JavaDataSet} - -/** - * This is a logical operator that can hold a [[RenamingProxyTypeInfo]] for renaming some - * fields of a [[org.apache.flink.api.common.typeutils.CompositeType]]. At runtime this - * disappears since the translation methods simply returns the input. - */ -class RenameOperator[T]( - input: JavaDataSet[T], - renamingTypeInformation: RenamingProxyTypeInfo[T]) - extends SingleInputOperator[T, T, RenameOperator[T]](input, renamingTypeInformation) { - - override protected def translateToDataFlow( - input: Operator[T]): Operator[T] = input -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala deleted file mode 100644 index dd598ab..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.typeinfo - -import java.util - -import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.CompositeType.{TypeComparatorBuilder, -FlatFieldDescriptor} -import org.apache.flink.api.common.typeutils.{CompositeType, TypeSerializer} - -/** - * A TypeInformation that is used to rename fields of an underlying CompositeType. This - * allows the system to translate "as" Table API operations to a [[RenameOperator]] - * that does not get translated to a runtime operator. - */ -class RenamingProxyTypeInfo[T]( - val tpe: CompositeType[T], - val fieldNames: Array[String]) - extends CompositeType[T](tpe.getTypeClass) { - - def getUnderlyingType: CompositeType[T] = tpe - - if (tpe.getArity != fieldNames.length) { - throw new IllegalArgumentException(s"Number of field names '${fieldNames.mkString(",")}' and " + - s"number of fields in underlying type $tpe do not match.") - } - - if (fieldNames.toSet.size != fieldNames.length) { - throw new IllegalArgumentException(s"New field names must be unique. " + - s"Names: ${fieldNames.mkString(",")}.") - } - - override def getFieldIndex(fieldName: String): Int = { - val result = fieldNames.indexOf(fieldName) - if (result != fieldNames.lastIndexOf(fieldName)) { - -2 - } else { - result - } - } - override def getFieldNames: Array[String] = fieldNames - - override def isBasicType: Boolean = tpe.isBasicType - - override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = - tpe.createSerializer(executionConfig) - - override def getArity: Int = tpe.getArity - - override def isKeyType: Boolean = tpe.isKeyType - - override def getTypeClass: Class[T] = tpe.getTypeClass - - override def getGenericParameters: java.util.List[TypeInformation[_]] = tpe.getGenericParameters - - override def isTupleType: Boolean = tpe.isTupleType - - override def toString = { - s"RenamingType(type: ${tpe.getTypeClass.getSimpleName}; " + - s"fields: ${fieldNames.mkString(", ")})" - } - - override def getTypeAt[X](pos: Int): TypeInformation[X] = tpe.getTypeAt(pos) - - override def getTotalFields: Int = tpe.getTotalFields - - override def createComparator( - logicalKeyFields: Array[Int], - orders: Array[Boolean], - logicalFieldOffset: Int, - executionConfig: ExecutionConfig) = - tpe.createComparator(logicalKeyFields, orders, logicalFieldOffset, executionConfig) - - override def getFlatFields( - fieldExpression: String, - offset: Int, - result: util.List[FlatFieldDescriptor]): Unit = { - tpe.getFlatFields(fieldExpression, offset, result) - } - - override def getTypeAt[X](fieldExpression: String): TypeInformation[X] = { - tpe.getTypeAt(fieldExpression) - } - - override protected def createTypeComparatorBuilder(): TypeComparatorBuilder[T] = { - throw new RuntimeException("This method should never be called because createComparator is " + - "overwritten.") - } - - override def equals(obj: Any): Boolean = { - obj match { - case renamingProxy: RenamingProxyTypeInfo[_] => - renamingProxy.canEqual(this) && - tpe.equals(renamingProxy.tpe) && - fieldNames.sameElements(renamingProxy.fieldNames) - case _ => false - } - } - - override def hashCode(): Int = { - 31 * tpe.hashCode() + util.Arrays.hashCode(fieldNames.asInstanceOf[Array[AnyRef]]) - } - - override def canEqual(obj: Any): Boolean = { - obj.isInstanceOf[RenamingProxyTypeInfo[_]] - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala deleted file mode 100644 index 5e9613d..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.typeinfo - -import org.apache.flink.api.table.Row -import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.core.memory.{DataOutputView, DataInputView} - -/** - * Serializer for [[Row]]. - */ -class RowSerializer(val fieldSerializers: Array[TypeSerializer[Any]]) - extends TypeSerializer[Row] { - - override def isImmutableType: Boolean = false - - override def getLength: Int = -1 - - override def duplicate = this - - override def createInstance: Row = { - new Row(fieldSerializers.length) - } - - override def copy(from: Row, reuse: Row): Row = { - val len = fieldSerializers.length - - if (from.productArity != len) { - throw new RuntimeException("Row arity of reuse and from do not match.") - } - var i = 0 - while (i < len) { - val reuseField = reuse.productElement(i) - val fromField = from.productElement(i).asInstanceOf[AnyRef] - val copy = fieldSerializers(i).copy(fromField, reuseField) - reuse.setField(i, copy) - i += 1 - } - reuse - } - - override def copy(from: Row): Row = { - val len = fieldSerializers.length - - if (from.productArity != len) { - throw new RuntimeException("Row arity of reuse and from do not match.") - } - val result = new Row(len) - var i = 0 - while (i < len) { - val fromField = from.productElement(i).asInstanceOf[AnyRef] - val copy = fieldSerializers(i).copy(fromField) - result.setField(i, copy) - i += 1 - } - result - } - - override def serialize(value: Row, target: DataOutputView) { - val len = fieldSerializers.length - var i = 0 - while (i < len) { - val serializer = fieldSerializers(i) - serializer.serialize(value.productElement(i), target) - i += 1 - } - } - - override def deserialize(reuse: Row, source: DataInputView): Row = { - val len = fieldSerializers.length - - if (reuse.productArity != len) { - throw new RuntimeException("Row arity of reuse and fields do not match.") - } - - var i = 0 - while (i < len) { - val field = reuse.productElement(i).asInstanceOf[AnyRef] - reuse.setField(i, fieldSerializers(i).deserialize(field, source)) - i += 1 - } - reuse - } - - override def deserialize(source: DataInputView): Row = { - val len = fieldSerializers.length - - val result = new Row(len) - var i = 0 - while (i < len) { - result.setField(i, fieldSerializers(i).deserialize(source)) - i += 1 - } - result - } - - override def copy(source: DataInputView, target: DataOutputView): Unit = { - val len = fieldSerializers.length - var i = 0 - while (i < len) { - fieldSerializers(i).copy(source, target) - i += 1 - } - } - - override def equals(any: scala.Any): Boolean = { - any match { - case otherRS: RowSerializer => - otherRS.canEqual(this) && - fieldSerializers.sameElements(otherRS.fieldSerializers) - case _ => false - } - } - - override def canEqual(obj: scala.Any): Boolean = { - obj.isInstanceOf[RowSerializer] - } - - override def hashCode(): Int = { - java.util.Arrays.hashCode(fieldSerializers.asInstanceOf[Array[AnyRef]]) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala deleted file mode 100644 index db3c881..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.typeinfo - -import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.api.table.Row -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo} - -/** - * TypeInformation for [[Row]]. - */ -class RowTypeInfo( - fieldTypes: Seq[TypeInformation[_]], - fieldNames: Seq[String]) - extends CaseClassTypeInfo[Row](classOf[Row], Array(), fieldTypes, fieldNames) { - - def this(fields: Seq[Expression]) = this(fields.map(_.typeInfo), fields.map(_.name)) - - if (fieldNames.toSet.size != fieldNames.size) { - throw new IllegalArgumentException("Field names must be unique.") - } - - override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[Row] = { - val fieldSerializers: Array[TypeSerializer[Any]] = new Array[TypeSerializer[Any]](getArity) - for (i <- 0 until getArity) { - fieldSerializers(i) = this.types(i).createSerializer(executionConfig) - .asInstanceOf[TypeSerializer[Any]] - } - - new RowSerializer(fieldSerializers) - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala deleted file mode 100644 index dda6265..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.examples.scala - -import org.apache.flink.api.table.expressions.Literal -import org.apache.flink.api.common.functions.GroupReduceFunction -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.table._ -import org.apache.flink.examples.java.graph.util.PageRankData -import org.apache.flink.util.Collector - -import _root_.scala.collection.JavaConverters._ - -/** -* A basic implementation of the Page Rank algorithm using a bulk iteration. -* -* This implementation requires a set of pages and a set of directed links as input and works as -* follows. -* -* In each iteration, the rank of every page is evenly distributed to all pages it points to. Each -* page collects the partial ranks of all pages that point to it, sums them up, and applies a -* dampening factor to the sum. The result is the new rank of the page. A new iteration is started -* with the new ranks of all pages. This implementation terminates after a fixed number of -* iterations. This is the Wikipedia entry for the -* [[http://en.wikipedia.org/wiki/Page_rank Page Rank algorithm]] -* -* Input files are plain text files and must be formatted as follows: -* -* - Pages represented as an (long) ID separated by new-line characters. -* For example `"1\n2\n12\n42\n63"` gives five pages with IDs 1, 2, 12, 42, and 63. -* - Links are represented as pairs of page IDs which are separated by space characters. Links -* are separated by new-line characters. -* For example `"1 2\n2 12\n1 12\n42 63"` gives four (directed) links (1)->(2), (2)->(12), -* (1)->(12), and (42)->(63). For this simple implementation it is required that each page has -* at least one incoming and one outgoing link (a page can point to itself). -* -* Usage: -* {{{ -* PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations> -* }}} -* -* If no parameters are provided, the program is run with default data from -* [[org.apache.flink.examples.java.graph.util.PageRankData]] and 10 iterations. -* -* This example shows how to use: -* -* - Bulk Iterations -* - Table API expressions -*/ -object PageRankTable { - - private final val DAMPENING_FACTOR: Double = 0.85 - private final val EPSILON: Double = 0.0001 - - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - - // set up execution environment - val env = ExecutionEnvironment.getExecutionEnvironment - - // read input data - val pagesWithRanks = getPagesDataSet(env).map { p => (p, 1.0 / numPages) } - .as('pageId, 'rank) - - val links = getLinksDataSet(env) - - // build adjacency list from link input - val adjacencyLists = links - .groupBy("sourceId").reduceGroup( new GroupReduceFunction[Link, AdjacencyList] { - - override def reduce( - values: _root_.java.lang.Iterable[Link], - out: Collector[AdjacencyList]): Unit = { - var outputId = -1L - val outputList = values.asScala map { t => outputId = t.sourceId; t.targetId } - out.collect(new AdjacencyList(outputId, outputList.toArray)) - } - - }).as('sourceId, 'targetIds) - - // start iteration - val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) { - currentRanks => - val newRanks = currentRanks.toTable - // distribute ranks to target pages - .join(adjacencyLists).where('pageId === 'sourceId) - .select('rank, 'targetIds).toDataSet[RankOutput] - .flatMap { - (in, out: Collector[(Long, Double)]) => - val targets = in.targetIds - val len = targets.length - targets foreach { t => out.collect((t, in.rank / len )) } - } - .as('pageId, 'rank) - // collect ranks and sum them up - .groupBy('pageId).select('pageId, 'rank.sum as 'rank) - // apply dampening factor - .select( - 'pageId, - ('rank * DAMPENING_FACTOR) + (Literal(1) - DAMPENING_FACTOR) / numPages as 'rank) - - - val termination = currentRanks.toTable - .as('curId, 'curRank).join(newRanks.as('newId, 'newRank)) - .where('curId === 'newId && ('curRank - 'newRank).abs > EPSILON) - - (newRanks, termination) - } - - val result = finalRanks - - // emit result - if (fileOutput) { - result.writeAsCsv(outputPath, "\n", " ") - // execute program - env.execute("Expression PageRank Example") - } else { - // execute program and print result - result.print() - } - } - - // ************************************************************************* - // USER TYPES - // ************************************************************************* - - case class Link(sourceId: Long, targetId: Long) - - case class Page(pageId: Long, rank: Double) - - case class AdjacencyList(sourceId: Long, targetIds: Array[Long]) - - case class RankOutput(rank: Double, targetIds: Array[Long]) - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private def parseParameters(args: Array[String]): Boolean = { - if (args.length > 0) { - fileOutput = true - if (args.length == 5) { - pagesInputPath = args(0) - linksInputPath = args(1) - outputPath = args(2) - numPages = args(3).toLong - maxIterations = args(4).toInt - } else { - System.err.println("Usage: PageRankBasic <pages path> <links path> <output path> <num " + - "pages> <num iterations>") - false - } - } else { - System.out.println("Executing PageRank Basic example with default parameters and built-in " + - "default data.") - System.out.println(" Provide parameters to read input data from files.") - System.out.println(" See the documentation for the correct format of input files.") - System.out.println(" Usage: PageRankBasic <pages path> <links path> <output path> <num " + - "pages> <num iterations>") - - numPages = PageRankData.getNumberOfPages - } - true - } - - private def getPagesDataSet(env: ExecutionEnvironment): DataSet[Long] = { - if (fileOutput) { - env.readCsvFile[Tuple1[Long]](pagesInputPath, fieldDelimiter = " ", lineDelimiter = "\n") - .map(x => x._1) - } else { - env.generateSequence(1, 15) - } - } - - private def getLinksDataSet(env: ExecutionEnvironment): DataSet[Link] = { - if (fileOutput) { - env.readCsvFile[Link](linksInputPath, fieldDelimiter = " ", - includedFields = Array(0, 1)) - } else { - val edges = PageRankData.EDGES.map { case Array(v1, v2) => Link(v1.asInstanceOf[Long], - v2.asInstanceOf[Long])} - env.fromCollection(edges) - } - } - - private var fileOutput: Boolean = false - private var pagesInputPath: String = null - private var linksInputPath: String = null - private var outputPath: String = null - private var numPages: Double = 0 - private var maxIterations: Int = 10 - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala deleted file mode 100644 index 63dddc9..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.examples.scala - -import org.apache.flink.streaming.api.scala._ - -import org.apache.flink.api.scala.table._ - -import scala.Stream._ -import scala.math._ -import scala.language.postfixOps -import scala.util.Random - -/** - * Simple example for demonstrating the use of the Table API with Flink Streaming. - */ -object StreamingTableFilter { - - case class CarEvent(carId: Int, speed: Int, distance: Double, time: Long) extends Serializable - - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - - val cars = genCarStream().toTable - .filter('carId === 0) - .select('carId, 'speed, 'distance + 1000 as 'distance, 'time % 5 as 'time) - .toDataStream[CarEvent] - - cars.print() - - StreamExecutionEnvironment.getExecutionEnvironment.execute("TopSpeedWindowing") - - } - - def genCarStream(): DataStream[CarEvent] = { - - def nextSpeed(carEvent : CarEvent) : CarEvent = - { - val next = - if (Random.nextBoolean()) min(100, carEvent.speed + 5) else max(0, carEvent.speed - 5) - CarEvent(carEvent.carId, next, carEvent.distance + next/3.6d,System.currentTimeMillis) - } - def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] = - { - Thread.sleep(1000) - speeds.append(carStream(speeds.map(nextSpeed))) - } - carStream(range(0, numOfCars).map(CarEvent(_,50,0,System.currentTimeMillis()))) - } - - def parseParameters(args: Array[String]): Boolean = { - if (args.length > 0) { - if (args.length == 3) { - numOfCars = args(0).toInt - evictionSec = args(1).toInt - triggerMeters = args(2).toDouble - true - } - else { - System.err.println("Usage: TopSpeedWindowing <numCars> <evictSec> <triggerMeters>") - false - } - }else{ - true - } - } - - var numOfCars = 2 - var evictionSec = 10 - var triggerMeters = 50d - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala deleted file mode 100644 index f527a3c..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.examples.scala - -import org.apache.flink.api.table.expressions.Literal -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.table._ - -/** - * This program implements a modified version of the TPC-H query 3. The - * example demonstrates how to assign names to fields by extending the Tuple class. - * The original query can be found at - * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf) - * (page 29). - * - * This program implements the following SQL equivalent: - * - * {{{ - * SELECT - * l_orderkey, - * SUM(l_extendedprice*(1-l_discount)) AS revenue, - * o_orderdate, - * o_shippriority - * FROM customer, - * orders, - * lineitem - * WHERE - * c_mktsegment = '[SEGMENT]' - * AND c_custkey = o_custkey - * AND l_orderkey = o_orderkey - * AND o_orderdate < date '[DATE]' - * AND l_shipdate > date '[DATE]' - * GROUP BY - * l_orderkey, - * o_orderdate, - * o_shippriority; - * }}} - * - * Compared to the original TPC-H query this version does not sort the result by revenue - * and orderdate. - * - * Input files are plain text CSV files using the pipe character ('|') as field separator - * as generated by the TPC-H data generator which is available at - * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/). - * - * Usage: - * {{{ - * TPCHQuery3Expression <lineitem-csv path> <customer-csv path> <orders-csv path> <result path> - * }}} - * - * This example shows how to use: - * - Table API expressions - * - */ -object TPCHQuery3Table { - - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - - // set filter date - val dateFormat = new _root_.java.text.SimpleDateFormat("yyyy-MM-dd") - val date = dateFormat.parse("1995-03-12") - - // get execution environment - val env = ExecutionEnvironment.getExecutionEnvironment - - val lineitems = getLineitemDataSet(env) - .filter( l => dateFormat.parse(l.shipDate).after(date) ) - .as('id, 'extdPrice, 'discount, 'shipDate) - - val customers = getCustomerDataSet(env) - .as('id, 'mktSegment) - .filter( 'mktSegment === "AUTOMOBILE" ) - - val orders = getOrdersDataSet(env) - .filter( o => dateFormat.parse(o.orderDate).before(date) ) - .as('orderId, 'custId, 'orderDate, 'shipPrio) - - val items = - orders.join(customers) - .where('custId === 'id) - .select('orderId, 'orderDate, 'shipPrio) - .join(lineitems) - .where('orderId === 'id) - .select( - 'orderId, - 'extdPrice * (Literal(1.0f) - 'discount) as 'revenue, - 'orderDate, - 'shipPrio) - - val result = items - .groupBy('orderId, 'orderDate, 'shipPrio) - .select('orderId, 'revenue.sum, 'orderDate, 'shipPrio) - - // emit result - result.writeAsCsv(outputPath, "\n", "|") - - // execute program - env.execute("Scala TPCH Query 3 (Expression) Example") - } - - // ************************************************************************* - // USER DATA TYPES - // ************************************************************************* - - case class Lineitem(id: Long, extdPrice: Double, discount: Double, shipDate: String) - case class Customer(id: Long, mktSegment: String) - case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: Long) - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private var lineitemPath: String = null - private var customerPath: String = null - private var ordersPath: String = null - private var outputPath: String = null - - private def parseParameters(args: Array[String]): Boolean = { - if (args.length == 4) { - lineitemPath = args(0) - customerPath = args(1) - ordersPath = args(2) - outputPath = args(3) - true - } else { - System.err.println("This program expects data from the TPC-H benchmark as input data.\n" + - " Due to legal restrictions, we can not ship generated data.\n" + - " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" + - " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path>" + - "<orders-csv path> <result path>"); - false - } - } - - private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = { - env.readCsvFile[Lineitem]( - lineitemPath, - fieldDelimiter = "|", - includedFields = Array(0, 5, 6, 10) ) - } - - private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] = { - env.readCsvFile[Customer]( - customerPath, - fieldDelimiter = "|", - includedFields = Array(0, 6) ) - } - - private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = { - env.readCsvFile[Order]( - ordersPath, - fieldDelimiter = "|", - includedFields = Array(0, 1, 4, 7) ) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala deleted file mode 100644 index cac9590..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.examples.scala - -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.table._ - -/** - * Simple example for demonstrating the use of the Table API for a Word Count. - */ -object WordCountTable { - - case class WC(word: String, count: Int) - - def main(args: Array[String]): Unit = { - - // set up execution environment - val env = ExecutionEnvironment.getExecutionEnvironment - - val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1)) - val expr = input.toTable - val result = expr - .groupBy('word) - .select('word, 'count.sum as 'count) - .toDataSet[WC] - - result.print() - } -}