http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala deleted file mode 100644 index b706e6d..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.codegen - -import java.io.StringReader - -import org.apache.flink.api.common.functions.FlatJoinFunction -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.table.TableConfig -import org.apache.flink.api.table.codegen.Indenter._ -import org.apache.flink.api.table.expressions.{Expression, NopExpression} -import org.slf4j.LoggerFactory - -/** - * Code generator for assembling the result of a binary operation. - */ -class GenerateJoin[L, R, O]( - leftTypeInfo: CompositeType[L], - rightTypeInfo: CompositeType[R], - resultTypeInfo: CompositeType[O], - predicate: Expression, - outputFields: Seq[Expression], - cl: ClassLoader, - config: TableConfig) - extends GenerateResultAssembler[FlatJoinFunction[L, R, O]]( - Seq(("in0", leftTypeInfo), ("in1", rightTypeInfo)), - cl = cl, - config) { - - val LOG = LoggerFactory.getLogger(this.getClass) - - - override protected def generateInternal(): FlatJoinFunction[L, R, O] = { - - val leftTpe = typeTermForTypeInfo(leftTypeInfo) - val rightTpe = typeTermForTypeInfo(rightTypeInfo) - val resultTpe = typeTermForTypeInfo(resultTypeInfo) - - - val resultCode = createResult(resultTypeInfo, outputFields, o => s"coll.collect($o);") - - val generatedName = freshName("GeneratedJoin") - - - val code = predicate match { - case n: NopExpression => - // Janino does not support generics, that's why we need - // manual casting here - if (nullCheck) { - j""" - public class $generatedName - implements org.apache.flink.api.common.functions.FlatFlatJoinFunction { - - ${reuseCode(resultTypeInfo)} - - public org.apache.flink.api.table.TableConfig config = null; - public $generatedName(org.apache.flink.api.table.TableConfig config) { - this.config = config; - } - - public void join(Object _in0, Object _in1, org.apache.flink.util.Collector coll) { - $leftTpe in0 = ($leftTpe) _in0; - $rightTpe in1 = ($rightTpe) _in1; - - $resultCode - } - } - """ - } else { - j""" - public class $generatedName - implements org.apache.flink.api.common.functions.FlatJoinFunction { - - ${reuseCode(resultTypeInfo)} - - public org.apache.flink.api.table.TableConfig config = null; - public $generatedName(org.apache.flink.api.table.TableConfig config) { - this.config = config; - } - - public void join(Object _in0, Object _in1, org.apache.flink.util.Collector coll) { - $leftTpe in0 = ($leftTpe) _in0; - $rightTpe in1 = ($rightTpe) _in1; - - $resultCode - } - } - """ - } - - case _ => - val pred = generateExpression(predicate) - // Janino does not support generics, that's why we need - // manual casting here - if (nullCheck) { - j""" - public class $generatedName - implements org.apache.flink.api.common.functions.FlatFlatJoinFunction { - - ${reuseCode(resultTypeInfo)} - - org.apache.flink.api.table.TableConfig config = null; - - public $generatedName(org.apache.flink.api.table.TableConfig config) { - this.config = config; - ${reuseInitCode()} - } - - public void join(Object _in0, Object _in1, org.apache.flink.util.Collector coll) { - $leftTpe in0 = ($leftTpe) _in0; - $rightTpe in1 = ($rightTpe) _in1; - - ${pred.code} - - if (${pred.nullTerm} && ${pred.resultTerm}) { - $resultCode - } - } - } - """ - } else { - j""" - public class $generatedName - implements org.apache.flink.api.common.functions.FlatJoinFunction { - - ${reuseCode(resultTypeInfo)} - - org.apache.flink.api.table.TableConfig config = null; - - public $generatedName(org.apache.flink.api.table.TableConfig config) { - this.config = config; - ${reuseInitCode()} - } - - public void join(Object _in0, Object _in1, org.apache.flink.util.Collector coll) { - $leftTpe in0 = ($leftTpe) _in0; - $rightTpe in1 = ($rightTpe) _in1; - - ${pred.code} - - if (${pred.resultTerm}) { - $resultCode - } - } - } - """ - } - } - - LOG.debug(s"""Generated join:\n$code""") - compiler.cook(new StringReader(code)) - val clazz = compiler.getClassLoader().loadClass(generatedName) - val constructor = clazz.getConstructor(classOf[TableConfig]) - constructor.newInstance(config).asInstanceOf[FlatJoinFunction[L, R, O]] - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala deleted file mode 100644 index 3916410..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.codegen - -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} -import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.api.table.TableConfig -import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.api.table.typeinfo.RowTypeInfo - -/** - * Base class for unary and binary result assembler code generators. - */ -abstract class GenerateResultAssembler[R]( - inputs: Seq[(String, CompositeType[_])], - cl: ClassLoader, - config: TableConfig) - extends ExpressionCodeGenerator[R](inputs, cl = cl, config) { - - def reuseCode[A](resultTypeInfo: CompositeType[A]) = { - val resultTpe = typeTermForTypeInfo(resultTypeInfo) - resultTypeInfo match { - case pj: PojoTypeInfo[_] => - super.reuseMemberCode() + - s"$resultTpe out = new ${pj.getTypeClass.getCanonicalName}();" - - case row: RowTypeInfo => - super.reuseMemberCode() + - s"org.apache.flink.api.table.Row out =" + - s" new org.apache.flink.api.table.Row(${row.getArity});" - - case _ => "" - } - } - - def createResult[T]( - resultTypeInfo: CompositeType[T], - outputFields: Seq[Expression], - result: String => String): String = { - - val resultType = typeTermForTypeInfo(resultTypeInfo) - - val fieldsCode = outputFields.map(generateExpression) - - val block = resultTypeInfo match { - case ri: RowTypeInfo => - val resultSetters: String = fieldsCode.zipWithIndex map { - case (fieldCode, i) => - s""" - |${fieldCode.code} - |out.setField($i, ${fieldCode.resultTerm}); - """.stripMargin - } mkString("\n") - - s""" - |$resultSetters - |${result("out")} - """.stripMargin - - case pj: PojoTypeInfo[_] => - val resultSetters: String = fieldsCode.zip(outputFields) map { - case (fieldCode, expr) => - val fieldName = expr.name - s""" - |${fieldCode.code} - |out.$fieldName = ${fieldCode.resultTerm}; - """.stripMargin - } mkString("\n") - - s""" - |$resultSetters - |${result("out")} - """.stripMargin - - case tup: TupleTypeInfo[_] => - val resultSetters: String = fieldsCode.zip(outputFields) map { - case (fieldCode, expr) => - val fieldName = expr.name - s""" - |${fieldCode.code} - |out.$fieldName = ${fieldCode.resultTerm}; - """.stripMargin - } mkString("\n") - - s""" - |$resultSetters - |${result("out")} - """.stripMargin - - case cc: CaseClassTypeInfo[_] => - val fields: String = fieldsCode.map(_.code).mkString("\n") - val ctorParams: String = fieldsCode.map(_.resultTerm).mkString(",") - - s""" - |$fields - |return new $resultType($ctorParams); - """.stripMargin - } - - block - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala deleted file mode 100644 index a75d15b..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.codegen - -import java.io.StringReader - -import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.table.TableConfig -import org.apache.flink.api.table.codegen.Indenter._ -import org.apache.flink.api.table.expressions.Expression -import org.slf4j.LoggerFactory - -/** - * Code generator for assembling the result of a select operation. - */ -class GenerateSelect[I, O]( - inputTypeInfo: CompositeType[I], - resultTypeInfo: CompositeType[O], - outputFields: Seq[Expression], - cl: ClassLoader, - config: TableConfig) - extends GenerateResultAssembler[MapFunction[I, O]]( - Seq(("in0", inputTypeInfo)), - cl = cl, - config) { - - val LOG = LoggerFactory.getLogger(this.getClass) - - override protected def generateInternal(): MapFunction[I, O] = { - - val inputTpe = typeTermForTypeInfo(inputTypeInfo) - val resultTpe = typeTermForTypeInfo(resultTypeInfo) - - val resultCode = createResult(resultTypeInfo, outputFields, o => s"return $o;") - - val generatedName = freshName("GeneratedSelect") - - // Janino does not support generics, that's why we need - // manual casting here - val code = - j""" - public class $generatedName - implements org.apache.flink.api.common.functions.MapFunction<$inputTpe, $resultTpe> { - - ${reuseCode(resultTypeInfo)} - - org.apache.flink.api.table.TableConfig config = null; - - public $generatedName(org.apache.flink.api.table.TableConfig config) { - this.config = config; - ${reuseInitCode()} - } - - @Override - public Object map(Object _in0) { - $inputTpe in0 = ($inputTpe) _in0; - $resultCode - } - } - """ - - LOG.debug(s"""Generated select:\n$code""") - compiler.cook(new StringReader(code)) - val clazz = compiler.getClassLoader().loadClass(generatedName) - val constructor = clazz.getConstructor(classOf[TableConfig]) - constructor.newInstance(config).asInstanceOf[MapFunction[I, O]] - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Indenter.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Indenter.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Indenter.scala deleted file mode 100644 index 1319f21..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Indenter.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.codegen - -class IndentStringContext(sc: StringContext) { - def j(args: Any*):String = { - val sb = new StringBuilder() - for ((s, a) <- sc.parts zip args) { - sb append s - - val ind = getindent(s) - if (ind.size > 0) { - sb append a.toString().replaceAll("\n", "\n" + ind) - } else { - sb append a.toString() - } - } - if (sc.parts.size > args.size) { - sb append sc.parts.last - } - - sb.toString() - } - - // get white indent after the last new line, if any - def getindent(str: String): String = { - val lastnl = str.lastIndexOf("\n") - if (lastnl == -1) "" - else { - val ind = str.substring(lastnl + 1) - if (ind.trim.isEmpty) ind // ind is all whitespace. Use this - else "" - } - } -} - -object Indenter { - implicit def toISC(sc: StringContext) = new IndentStringContext(sc) -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/package.scala deleted file mode 100644 index b69ac1c..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/package.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table - -package object codegen { - // Used in ExpressionCodeGenerator because Scala 2.10 reflection is not thread safe. We might - // have several parallel expression operators in one TaskManager, therefore we need to guard - // these operations. - object ReflectionLock -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/Node.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/Node.java b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/Node.java deleted file mode 100644 index 9152260..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/Node.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.explain; - -import java.util.List; - -public class Node { - private int id; - private String type; - private String pact; - private String contents; - private int parallelism; - private String driver_strategy; - private List<Predecessors> predecessors; - private List<Global_properties> global_properties; - private List<LocalProperty> local_properties; - private List<Estimates> estimates; - private List<Costs> costs; - private List<Compiler_hints> compiler_hints; - - public int getId() { - return id; - } - public String getType() { - return type; - } - public String getPact() { - return pact; - } - public String getContents() { - return contents; - } - public int getParallelism() { - return parallelism; - } - public String getDriver_strategy() { - return driver_strategy; - } - public List<Predecessors> getPredecessors() { - return predecessors; - } - public List<Global_properties> getGlobal_properties() { - return global_properties; - } - public List<LocalProperty> getLocal_properties() { - return local_properties; - } - public List<Estimates> getEstimates() { - return estimates; - } - public List<Costs> getCosts() { - return costs; - } - public List<Compiler_hints> getCompiler_hints() { - return compiler_hints; - } -} - -class Predecessors { - private String ship_strategy; - private String exchange_mode; - - public String getShip_strategy() { - return ship_strategy; - } - public String getExchange_mode() { - return exchange_mode; - } -} - -class Global_properties { - private String name; - private String value; - - public String getValue() { - return value; - } - public String getName() { - return name; - } -} - -class LocalProperty { - private String name; - private String value; - - public String getValue() { - return value; - } - public String getName() { - return name; - } -} - -class Estimates { - private String name; - private String value; - - public String getValue() { - return value; - } - public String getName() { - return name; - } -} - -class Costs { - private String name; - private String value; - - public String getValue() { - return value; - } - public String getName() { - return name; - } -} - -class Compiler_hints { - private String name; - private String value; - - public String getValue() { - return value; - } - public String getName() { - return name; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/PlanJsonParser.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/PlanJsonParser.java b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/PlanJsonParser.java deleted file mode 100644 index 31a7cd68..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/PlanJsonParser.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.explain; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.DeserializationFeature; - -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.LinkedHashMap; -import java.util.List; - -public class PlanJsonParser { - - public static String getSqlExecutionPlan(String t, Boolean extended) throws Exception { - ObjectMapper objectMapper = new ObjectMapper(); - - //not every node is same, ignore the unknown field - objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - - PlanTree tree = objectMapper.readValue(t, PlanTree.class); - LinkedHashMap<String, Integer> map = new LinkedHashMap<>(); - StringWriter sw = new StringWriter(); - PrintWriter pw = new PrintWriter(sw); - int tabCount = 0; - - for (int index = 0; index < tree.getNodes().size(); index++) { - Node tempNode = tree.getNodes().get(index); - - //input with operation such as join or union is coordinate, keep the same indent - if ((tempNode.getPact().equals("Data Source")) && (map.containsKey(tempNode.getPact()))) { - tabCount = map.get(tempNode.getPact()); - } - else { - map.put(tempNode.getPact(), tabCount); - } - - printTab(tabCount, pw); - pw.print("Stage " + tempNode.getId() + " : " + tempNode.getPact() + "\n"); - - printTab(tabCount + 1, pw); - String content = tempNode.getContents(); - - //drop the hashcode of object instance - int dele = tempNode.getContents().indexOf("@"); - if (dele > -1) content = tempNode.getContents().substring(0, dele); - - //replace with certain content if node is dataSource to pass - //unit tests, because java and scala use different api to - //get input element - if (tempNode.getPact().equals("Data Source")) - content = "collect elements with CollectionInputFormat"; - pw.print("content : " + content + "\n"); - - List<Predecessors> predecessors = tempNode.getPredecessors(); - if (predecessors != null) { - printTab(tabCount + 1, pw); - pw.print("ship_strategy : " + predecessors.get(0).getShip_strategy() + "\n"); - - printTab(tabCount + 1, pw); - pw.print("exchange_mode : " + predecessors.get(0).getExchange_mode() + "\n"); - } - - if (tempNode.getDriver_strategy() != null) { - printTab(tabCount + 1, pw); - pw.print("driver_strategy : " + tempNode.getDriver_strategy() + "\n"); - } - - printTab(tabCount + 1, pw); - pw.print(tempNode.getGlobal_properties().get(0).getName() + " : " - + tempNode.getGlobal_properties().get(0).getValue() + "\n"); - - if (extended) { - List<Global_properties> globalProperties = tempNode.getGlobal_properties(); - for (int i = 1; i < globalProperties.size(); i++) { - printTab(tabCount + 1, pw); - pw.print(globalProperties.get(i).getName() + " : " - + globalProperties.get(i).getValue() + "\n"); - } - - List<LocalProperty> localProperties = tempNode.getLocal_properties(); - for (int i = 0; i < localProperties.size(); i++) { - printTab(tabCount + 1, pw); - pw.print(localProperties.get(i).getName() + " : " - + localProperties.get(i).getValue() + "\n"); - } - - List<Estimates> estimates = tempNode.getEstimates(); - for (int i = 0; i < estimates.size(); i++) { - printTab(tabCount + 1, pw); - pw.print(estimates.get(i).getName() + " : " - + estimates.get(i).getValue() + "\n"); - } - - List<Costs> costs = tempNode.getCosts(); - for (int i = 0; i < costs.size(); i++) { - printTab(tabCount + 1, pw); - pw.print(costs.get(i).getName() + " : " - + costs.get(i).getValue() + "\n"); - } - - List<Compiler_hints> compilerHintses = tempNode.getCompiler_hints(); - for (int i = 0; i < compilerHintses.size(); i++) { - printTab(tabCount + 1, pw); - pw.print(compilerHintses.get(i).getName() + " : " - + compilerHintses.get(i).getValue() + "\n"); - } - } - tabCount++; - pw.print("\n"); - } - pw.close(); - return sw.toString(); - } - - private static void printTab(int tabCount, PrintWriter pw) { - for (int i = 0; i < tabCount; i++) - pw.print("\t"); - } -} - -class PlanTree { - private List<Node> nodes; - - public List<Node> getNodes() { - return nodes; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala deleted file mode 100644 index 900ed8a..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.expressions - -import java.util.concurrent.atomic.AtomicInteger - -import scala.language.postfixOps - -import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, TypeInformation} -import org.apache.flink.api.table.trees.TreeNode - - -abstract class Expression extends TreeNode[Expression] { self: Product => - def name: String = Expression.freshName("expression") - def typeInfo: TypeInformation[_] -} - -abstract class BinaryExpression() extends Expression { self: Product => - def left: Expression - def right: Expression - def children = Seq(left, right) -} - -abstract class UnaryExpression() extends Expression { self: Product => - def child: Expression - def children = Seq(child) -} - -abstract class LeafExpression() extends Expression { self: Product => - val children = Nil -} - -case class NopExpression() extends LeafExpression { - val typeInfo = new NothingTypeInfo() - override val name = Expression.freshName("nop") - -} - -object Expression { - def freshName(prefix: String): String = { - s"$prefix-${freshNameCounter.getAndIncrement}" - } - - val freshNameCounter = new AtomicInteger -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala deleted file mode 100644 index 08e319d..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.expressions - -import org.apache.flink.api.table.ExpressionException -import org.apache.flink.api.common.typeinfo.BasicTypeInfo -import org.apache.flink.api.java.aggregation.Aggregations - - -abstract sealed class Aggregation extends UnaryExpression { self: Product => - def typeInfo = { - child.typeInfo match { - case BasicTypeInfo.LONG_TYPE_INFO => // ok - case BasicTypeInfo.INT_TYPE_INFO => - case BasicTypeInfo.DOUBLE_TYPE_INFO => - case BasicTypeInfo.FLOAT_TYPE_INFO => - case BasicTypeInfo.BYTE_TYPE_INFO => - case BasicTypeInfo.SHORT_TYPE_INFO => - case _ => - throw new ExpressionException(s"Unsupported type ${child.typeInfo} for " + - s"aggregation $this. Only numeric data types supported.") - } - child.typeInfo - } - - override def toString = s"Aggregate($child)" - - def getIntermediateFields: Seq[Expression] - def getFinalField(inputs: Seq[Expression]): Expression - def getAggregations: Seq[Aggregations] -} - -case class Sum(child: Expression) extends Aggregation { - override def toString = s"($child).sum" - - override def getIntermediateFields: Seq[Expression] = Seq(child) - override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0) - override def getAggregations = Seq(Aggregations.SUM) -} - -case class Min(child: Expression) extends Aggregation { - override def toString = s"($child).min" - - override def getIntermediateFields: Seq[Expression] = Seq(child) - override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0) - override def getAggregations = Seq(Aggregations.MIN) - -} - -case class Max(child: Expression) extends Aggregation { - override def toString = s"($child).max" - - override def getIntermediateFields: Seq[Expression] = Seq(child) - override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0) - override def getAggregations = Seq(Aggregations.MAX) -} - -case class Count(child: Expression) extends Aggregation { - override def typeInfo = { - child.typeInfo match { - case _ => // we can count anything... :D - } - BasicTypeInfo.INT_TYPE_INFO - } - - override def toString = s"($child).count" - - override def getIntermediateFields: Seq[Expression] = Seq(Literal(Integer.valueOf(1))) - override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0) - override def getAggregations = Seq(Aggregations.SUM) - -} - -case class Avg(child: Expression) extends Aggregation { - override def toString = s"($child).avg" - - override def getIntermediateFields: Seq[Expression] = Seq(child, Literal(1)) - // This is just sweet. Use our own AST representation and let the code generator do - // our dirty work. - override def getFinalField(inputs: Seq[Expression]): Expression = - Div(inputs(0), inputs(1)) - override def getAggregations = Seq(Aggregations.SUM, Aggregations.SUM) - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ExtractEquiJoinFields.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ExtractEquiJoinFields.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ExtractEquiJoinFields.scala deleted file mode 100644 index 797de55..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ExtractEquiJoinFields.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.expressions.analysis - -import org.apache.flink.api.table.expressions._ -import org.apache.flink.api.common.typeutils.CompositeType - -import scala.collection.mutable - -/** - * Equi-join field extractor for Join Predicates and CoGroup predicates. The result is a modified - * expression without the equi-join predicates together with indices of the join fields - * from both the left and right input. - */ -object ExtractEquiJoinFields { - def apply(leftType: CompositeType[_], rightType: CompositeType[_], predicate: Expression) = { - - val joinFieldsLeft = mutable.MutableList[Int]() - val joinFieldsRight = mutable.MutableList[Int]() - - val equiJoinExprs = mutable.MutableList[EqualTo]() - // First get all `===` expressions that are not below an `Or` - predicate.transformPre { - case or@Or(_, _) => NopExpression() - case eq@EqualTo(le: ResolvedFieldReference, re: ResolvedFieldReference) => - if (leftType.hasField(le.name) && rightType.hasField(re.name)) { - joinFieldsLeft += leftType.getFieldIndex(le.name) - joinFieldsRight += rightType.getFieldIndex(re.name) - } else if (leftType.hasField(re.name) && rightType.hasField(le.name)) { - joinFieldsLeft += leftType.getFieldIndex(re.name) - joinFieldsRight += rightType.getFieldIndex(le.name) - } else { - // not an equi-join predicate - } - equiJoinExprs += eq - eq - } - - // then remove the equi join expressions from the predicate - val resultExpr = predicate.transformPost { - // For OR, we can eliminate the OR since the equi join - // predicate is evaluated before the expression is evaluated - case or@Or(NopExpression(), _) => NopExpression() - case or@Or(_, NopExpression()) => NopExpression() - // For AND we replace it with the other expression, since the - // equi join predicate will always be true - case and@And(NopExpression(), other) => other - case and@And(other, NopExpression()) => other - case eq : EqualTo if equiJoinExprs.contains(eq) => - NopExpression() - } - - (resultExpr, joinFieldsLeft.toArray, joinFieldsRight.toArray) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/GroupByAnalyzer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/GroupByAnalyzer.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/GroupByAnalyzer.scala deleted file mode 100644 index 6c7ecb2..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/GroupByAnalyzer.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.expressions.analysis - -import org.apache.flink.api.table._ -import org.apache.flink.api.table.expressions.{ResolvedFieldReference, Expression} -import org.apache.flink.api.common.typeinfo.TypeInformation - -import scala.collection.mutable - -import org.apache.flink.api.table.trees.{Rule, Analyzer} - - -/** - * Analyzer for grouping expressions. Only field expressions are allowed as grouping expressions. - */ -class GroupByAnalyzer(inputFields: Seq[(String, TypeInformation[_])]) - extends Analyzer[Expression] { - - def rules = Seq(new ResolveFieldReferences(inputFields), CheckGroupExpression) - - object CheckGroupExpression extends Rule[Expression] { - - def apply(expr: Expression) = { - val errors = mutable.MutableList[String]() - - expr match { - case f: ResolvedFieldReference => // this is OK - case other => - throw new ExpressionException( - s"""Invalid grouping expression "$expr". Only field references are allowed.""") - } - expr - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala deleted file mode 100644 index 0fdcab6..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.expressions.analysis - -import org.apache.flink.api.table.expressions._ -import org.apache.flink.api.common.typeinfo.{IntegerTypeInfo, BasicTypeInfo} -import org.apache.flink.api.table.trees.Rule - -/** - * [[Rule]] that adds casts in arithmetic operations. - */ -class InsertAutoCasts extends Rule[Expression] { - - def apply(expr: Expression) = { - val result = expr.transformPost { - - case plus@Plus(o1, o2) => - // Plus is special case since we can cast anything to String for String concat - if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isBasicType && o2.typeInfo.isBasicType) { - if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo( - o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) { - Plus(Cast(o1, o2.typeInfo), o2) - } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo( - o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) { - Plus(o1, Cast(o2, o1.typeInfo)) - } else if (o1.typeInfo == BasicTypeInfo.STRING_TYPE_INFO) { - Plus(o1, Cast(o2, BasicTypeInfo.STRING_TYPE_INFO)) - } else if (o2.typeInfo == BasicTypeInfo.STRING_TYPE_INFO) { - Plus(Cast(o1, BasicTypeInfo.STRING_TYPE_INFO), o2) - } else { - plus - } - } else { - plus - } - - case ba: BinaryExpression if ba.isInstanceOf[BinaryArithmetic] || - ba.isInstanceOf[BinaryComparison] => - val o1 = ba.left - val o2 = ba.right - if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isBasicType && o2.typeInfo.isBasicType) { - if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo( - o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) { - ba.makeCopy(Seq(Cast(o1, o2.typeInfo), o2)) - } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo( - o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) { - ba.makeCopy(Seq(o1, Cast(o2, o1.typeInfo))) - } else { - ba - } - } else { - ba - } - - case ba: BinaryExpression if ba.isInstanceOf[BitwiseBinaryArithmetic] => - val o1 = ba.left - val o2 = ba.right - if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isInstanceOf[IntegerTypeInfo[_]] && - o2.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { - if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo( - o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) { - ba.makeCopy(Seq(Cast(o1, o2.typeInfo), o2)) - } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo( - o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) { - ba.makeCopy(Seq(o1, Cast(o2, o1.typeInfo))) - } else { - ba - } - } else { - ba - } - } - - result - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicateAnalyzer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicateAnalyzer.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicateAnalyzer.scala deleted file mode 100644 index e9236f7..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicateAnalyzer.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.expressions.analysis - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.api.table.trees.Analyzer - -/** - * Analyzer for predicates, i.e. filter operations and where clauses of joins. - */ -class PredicateAnalyzer(inputFields: Seq[(String, TypeInformation[_])]) - extends Analyzer[Expression] { - def rules = Seq( - new ResolveFieldReferences(inputFields), - new InsertAutoCasts, - new TypeCheck, - new VerifyNoAggregates, - new VerifyBoolean) -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ResolveFieldReferences.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ResolveFieldReferences.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ResolveFieldReferences.scala deleted file mode 100644 index db7ea6c..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ResolveFieldReferences.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.expressions.analysis - -import org.apache.flink.api.table.expressions.{ResolvedFieldReference, -UnresolvedFieldReference, Expression} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table._ - -import scala.collection.mutable - -import org.apache.flink.api.table.trees.Rule - -/** - * Rule that resolved field references. This rule verifies that field references point to existing - * fields of the input operation and creates [[ResolvedFieldReference]]s that hold the field - * [[TypeInformation]] in addition to the field name. - */ -class ResolveFieldReferences(inputFields: Seq[(String, TypeInformation[_])]) - extends Rule[Expression] { - - def apply(expr: Expression) = { - val errors = mutable.MutableList[String]() - - val result = expr.transformPost { - case fe@UnresolvedFieldReference(fieldName) => - inputFields.find { _._1 == fieldName } match { - case Some((_, tpe)) => ResolvedFieldReference(fieldName, tpe) - - case None => - errors += - s"Field '$fieldName' is not valid for input fields ${inputFields.mkString(",")}" - fe - } - } - - if (errors.length > 0) { - throw new ExpressionException( - s"""Invalid expression "$expr": ${errors.mkString(" ")}""") - } - - result - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala deleted file mode 100644 index 625fdbf..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.expressions.analysis - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.api.table.trees.Analyzer - -/** - * This analyzes selection expressions. - */ -class SelectionAnalyzer(inputFields: Seq[(String, TypeInformation[_])]) - extends Analyzer[Expression] { - - def rules = Seq( - new ResolveFieldReferences(inputFields), - new VerifyNoNestedAggregates, - new InsertAutoCasts, - new TypeCheck) - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala deleted file mode 100644 index b724561..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.expressions.analysis - -import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.api.table.trees.Rule -import org.apache.flink.api.table.{_} - -import scala.collection.mutable - -/** - * Rule that makes sure we call [[Expression.typeInfo]] on each [[Expression]] at least once. - * Expressions are expected to perform type verification in this method. - */ -class TypeCheck extends Rule[Expression] { - - def apply(expr: Expression) = { - val errors = mutable.MutableList[String]() - - val result = expr.transformPre { - case expr: Expression=> { - // simply get the typeInfo from the expression. this will perform type analysis - try { - expr.typeInfo - } catch { - case e: ExpressionException => - errors += e.getMessage - } - expr - } - } - - if (errors.length > 0) { - throw new ExpressionException( - s"""Invalid expression "$expr": ${errors.mkString(" ")}""") - } - - result - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala deleted file mode 100644 index e75dd20..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.expressions.analysis - -import org.apache.flink.api.table.expressions.{NopExpression, Expression} -import org.apache.flink.api.table.trees.Rule -import org.apache.flink.api.table.{_} -import org.apache.flink.api.common.typeinfo.BasicTypeInfo - -import scala.collection.mutable - -/** - * [[Rule]] that verifies that the result type of an [[Expression]] is Boolean. This is required - * for filter/join predicates. - */ -class VerifyBoolean extends Rule[Expression] { - - def apply(expr: Expression) = { - if (!expr.isInstanceOf[NopExpression] && expr.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) { - throw new ExpressionException(s"Expression $expr of type ${expr.typeInfo} is not boolean.") - } - - expr - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala deleted file mode 100644 index 09dbf88..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.expressions.analysis - -import org.apache.flink.api.table.ExpressionException -import org.apache.flink.api.table.expressions.{Aggregation, Expression} - -import scala.collection.mutable - -import org.apache.flink.api.table.trees.Rule - -/** - * Rule that verifies that an expression does not contain aggregate operations. Right now, join - * predicates and filter predicates cannot contain aggregates. - */ -class VerifyNoAggregates extends Rule[Expression] { - - def apply(expr: Expression) = { - val errors = mutable.MutableList[String]() - - val result = expr.transformPre { - case agg: Aggregation=> { - errors += - s"""Aggregations are not allowed in join/filter predicates.""" - agg - } - } - - if (errors.length > 0) { - throw new ExpressionException( - s"""Invalid expression "$expr": ${errors.mkString(" ")}""") - } - - result - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala deleted file mode 100644 index 07acf1e..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.expressions.analysis - -import org.apache.flink.api.table.ExpressionException -import org.apache.flink.api.table.expressions.{Expression, Aggregation} - -import scala.collection.mutable - -import org.apache.flink.api.table.trees.Rule - -/** - * Rule that verifies that an expression does not contain aggregate operations - * as children of aggregate operations. - */ -class VerifyNoNestedAggregates extends Rule[Expression] { - - def apply(expr: Expression) = { - val errors = mutable.MutableList[String]() - - val result = expr.transformPre { - case agg: Aggregation=> { - if (agg.child.exists(_.isInstanceOf[Aggregation])) { - errors += s"""Found nested aggregation inside "$agg".""" - } - agg - } - } - - if (errors.length > 0) { - throw new ExpressionException( - s"""Invalid expression "$expr": ${errors.mkString(" ")}""") - } - - result - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala deleted file mode 100644 index e866ea0..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.expressions - -import org.apache.flink.api.table.ExpressionException -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo, NumericTypeInfo, TypeInformation} - -abstract class BinaryArithmetic extends BinaryExpression { self: Product => - def typeInfo = { - if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) { - throw new ExpressionException( - s"""Non-numeric operand ${left} of type ${left.typeInfo} in $this""") - } - if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) { - throw new ExpressionException( - s"""Non-numeric operand "${right}" of type ${right.typeInfo} in $this""") - } - if (left.typeInfo != right.typeInfo) { - throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + - s"${right.typeInfo} in $this") - } - left.typeInfo - } -} - -case class Plus(left: Expression, right: Expression) extends BinaryArithmetic { - override def typeInfo = { - if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]] && - !(left.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) { - throw new ExpressionException(s"Non-numeric operand type ${left.typeInfo} in $this") - } - if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]] && - !(right.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) { - throw new ExpressionException(s"Non-numeric operand type ${right.typeInfo} in $this") - } - if (left.typeInfo != right.typeInfo) { - throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + - s"${right.typeInfo} in $this") - } - left.typeInfo - } - - override def toString = s"($left + $right)" -} - -case class UnaryMinus(child: Expression) extends UnaryExpression { - def typeInfo = { - if (!child.typeInfo.isInstanceOf[NumericTypeInfo[_]]) { - throw new ExpressionException( - s"""Non-numeric operand ${child} of type ${child.typeInfo} in $this""") - } - child.typeInfo - } - - override def toString = s"-($child)" -} - -case class Minus(left: Expression, right: Expression) extends BinaryArithmetic { - override def toString = s"($left - $right)" -} - -case class Div(left: Expression, right: Expression) extends BinaryArithmetic { - override def toString = s"($left / $right)" -} - -case class Mul(left: Expression, right: Expression) extends BinaryArithmetic { - override def toString = s"($left * $right)" -} - -case class Mod(left: Expression, right: Expression) extends BinaryArithmetic { - override def toString = s"($left * $right)" -} - -case class Abs(child: Expression) extends UnaryExpression { - def typeInfo = child.typeInfo - - override def toString = s"abs($child)" -} - -abstract class BitwiseBinaryArithmetic extends BinaryExpression { self: Product => - def typeInfo: TypeInformation[_] = { - if (!left.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { - throw new ExpressionException( - s"""Non-integer operand ${left} of type ${left.typeInfo} in $this""") - } - if (!right.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { - throw new ExpressionException( - s"""Non-integer operand "${right}" of type ${right.typeInfo} in $this""") - } - if (left.typeInfo != right.typeInfo) { - throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + - s"${right.typeInfo} in $this") - } - if (left.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) { - left.typeInfo - } else { - BasicTypeInfo.INT_TYPE_INFO - } - } -} - -case class BitwiseAnd(left: Expression, right: Expression) extends BitwiseBinaryArithmetic { - override def toString = s"($left & $right)" -} - -case class BitwiseOr(left: Expression, right: Expression) extends BitwiseBinaryArithmetic { - override def toString = s"($left | $right)" -} - - -case class BitwiseXor(left: Expression, right: Expression) extends BitwiseBinaryArithmetic { - override def toString = s"($left ^ $right)" -} - -case class BitwiseNot(child: Expression) extends UnaryExpression { - def typeInfo: TypeInformation[_] = { - if (!child.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { - throw new ExpressionException( - s"""Non-integer operand ${child} of type ${child.typeInfo} in $this""") - } - if (child.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) { - child.typeInfo - } else { - BasicTypeInfo.INT_TYPE_INFO - } - } - - override def toString = s"~($child)" -} - http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala deleted file mode 100644 index 9fae862..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.expressions - -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} -import org.apache.flink.api.table.ExpressionException - -case class Cast(child: Expression, tpe: TypeInformation[_]) extends UnaryExpression { - def typeInfo = tpe match { - case BasicTypeInfo.STRING_TYPE_INFO => tpe - - case b if b.isBasicType && child.typeInfo.isBasicType => tpe - - case _ => throw new ExpressionException( - s"Invalid cast: $this. Casts are only valid betwixt primitive types.") - } - - override def toString = s"$child.cast($tpe)" -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala deleted file mode 100644 index 687ea7a..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.expressions - -import org.apache.flink.api.table.ExpressionException -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NumericTypeInfo} - -abstract class BinaryComparison extends BinaryExpression { self: Product => - def typeInfo = { - if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) { - throw new ExpressionException(s"Non-numeric operand ${left} in $this") - } - if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) { - throw new ExpressionException(s"Non-numeric operand ${right} in $this") - } - if (left.typeInfo != right.typeInfo) { - throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + - s"${right.typeInfo} in $this") - } - BasicTypeInfo.BOOLEAN_TYPE_INFO - } -} - -case class EqualTo(left: Expression, right: Expression) extends BinaryComparison { - override def typeInfo = { - if (left.typeInfo != right.typeInfo) { - throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + - s"${right.typeInfo} in $this") - } - BasicTypeInfo.BOOLEAN_TYPE_INFO - } - - override def toString = s"$left === $right" -} - -case class NotEqualTo(left: Expression, right: Expression) extends BinaryComparison { - override def typeInfo = { - if (left.typeInfo != right.typeInfo) { - throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + - s"${right.typeInfo} in $this") - } - BasicTypeInfo.BOOLEAN_TYPE_INFO - } - - override def toString = s"$left !== $right" -} - -case class GreaterThan(left: Expression, right: Expression) extends BinaryComparison { - override def toString = s"$left > $right" -} - -case class GreaterThanOrEqual(left: Expression, right: Expression) extends BinaryComparison { - override def toString = s"$left >= $right" -} - -case class LessThan(left: Expression, right: Expression) extends BinaryComparison { - override def toString = s"$left < $right" -} - -case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryComparison { - override def toString = s"$left <= $right" -} - -case class IsNull(child: Expression) extends UnaryExpression { - def typeInfo = { - BasicTypeInfo.BOOLEAN_TYPE_INFO - } - - override def toString = s"($child).isNull" -} - -case class IsNotNull(child: Expression) extends UnaryExpression { - def typeInfo = { - BasicTypeInfo.BOOLEAN_TYPE_INFO - } - - override def toString = s"($child).isNotNull" -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala deleted file mode 100644 index a649aed..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.expressions - -import org.apache.flink.api.table.ExpressionException -import org.apache.flink.api.common.typeinfo.TypeInformation - -case class UnresolvedFieldReference(override val name: String) extends LeafExpression { - def typeInfo = throw new ExpressionException(s"Unresolved field reference: $this") - - override def toString = "\"" + name -} - -case class ResolvedFieldReference( - override val name: String, - tpe: TypeInformation[_]) extends LeafExpression { - def typeInfo = tpe - - override def toString = s"'$name" -} - -case class Naming(child: Expression, override val name: String) extends UnaryExpression { - def typeInfo = child.typeInfo - - override def toString = s"$child as '$name" -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala deleted file mode 100644 index f909cab..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.expressions - -import java.util.Date -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} -import org.apache.flink.api.scala.table.ImplicitExpressionOperations - -object Literal { - def apply(l: Any): Literal = l match { - case i:Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO) - case l:Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO) - case d: Double => Literal(d, BasicTypeInfo.DOUBLE_TYPE_INFO) - case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO) - case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO) - case bool: Boolean => Literal(bool, BasicTypeInfo.BOOLEAN_TYPE_INFO) - case date: Date => Literal(date, BasicTypeInfo.DATE_TYPE_INFO) - } -} - -case class Literal(value: Any, tpe: TypeInformation[_]) - extends LeafExpression with ImplicitExpressionOperations { - def expr = this - def typeInfo = tpe - - override def toString = s"$value" -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala deleted file mode 100644 index eaf0463..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.expressions - -import org.apache.flink.api.table.ExpressionException -import org.apache.flink.api.common.typeinfo.BasicTypeInfo - -abstract class BinaryPredicate extends BinaryExpression { self: Product => - def typeInfo = { - if (left.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO || - right.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) { - throw new ExpressionException(s"Non-boolean operand types ${left.typeInfo} and " + - s"${right.typeInfo} in $this") - } - BasicTypeInfo.BOOLEAN_TYPE_INFO - } -} - -case class Not(child: Expression) extends UnaryExpression { - def typeInfo = { - if (child.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) { - throw new ExpressionException(s"Non-boolean operand type ${child.typeInfo} in $this") - } - BasicTypeInfo.BOOLEAN_TYPE_INFO - } - - override val name = Expression.freshName("not-" + child.name) - - override def toString = s"!($child)" -} - -case class And(left: Expression, right: Expression) extends BinaryPredicate { - override def toString = s"$left && $right" - - override val name = Expression.freshName(left.name + "-and-" + right.name) -} - -case class Or(left: Expression, right: Expression) extends BinaryPredicate { - override def toString = s"$left || $right" - - override val name = Expression.freshName(left.name + "-or-" + right.name) - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala deleted file mode 100644 index c5c8c94..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table - -/** - * This package contains the base class of AST nodes and all the expression language AST classes. - * Expression trees should not be manually constructed by users. They are implicitly constructed - * from the implicit DSL conversions in - * [[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and - * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]]. For the Java API, - * expression trees should be generated from a string parser that parses expressions and creates - * AST nodes. - */ -package object expressions http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala deleted file mode 100644 index a39d601..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.expressions - -import org.apache.flink.api.table.ExpressionException -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo} - -case class Substring( - str: Expression, - beginIndex: Expression, - endIndex: Expression) extends Expression { - def typeInfo = { - if (str.typeInfo != BasicTypeInfo.STRING_TYPE_INFO) { - throw new ExpressionException( - s"""Operand must be of type String in $this, is ${str.typeInfo}.""") - } - if (!beginIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { - throw new ExpressionException( - s"""Begin index must be an integer type in $this, is ${beginIndex.typeInfo}.""") - } - if (!endIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { - throw new ExpressionException( - s"""End index must be an integer type in $this, is ${endIndex.typeInfo}.""") - } - - BasicTypeInfo.STRING_TYPE_INFO - } - - override def children: Seq[Expression] = Seq(str, beginIndex, endIndex) - override def toString = s"($str).substring($beginIndex, $endIndex)" -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala deleted file mode 100644 index bdcb22c..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api - -/** - * == Table API == - * - * This package contains the generic part of the Table API. It can be used with Flink Streaming - * and Flink Batch. From Scala as well as from Java. - * - * When using the Table API, as user creates a [[org.apache.flink.api.table.Table]] from - * a DataSet or DataStream. On this relational operations can be performed. A table can also - * be converted back to a DataSet or DataStream. - * - * Packages [[org.apache.flink.api.scala.table]] and [[org.apache.flink.api.java.table]] contain - * the language specific part of the API. Refer to these packages for documentation on how - * the Table API can be used in Java and Scala. - */ -package object table