http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/fieldExpression.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/fieldExpression.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/fieldExpression.scala new file mode 100644 index 0000000..a1d8589 --- /dev/null +++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/fieldExpression.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.expressions.tree + +import org.apache.flink.api.expressions.ExpressionException +import org.apache.flink.api.common.typeinfo.TypeInformation + +case class UnresolvedFieldReference(override val name: String) extends LeafExpression { + def typeInfo = throw new ExpressionException(s"Unresolved field reference: $this") + + override def toString = "\"" + name +} + +case class ResolvedFieldReference( + override val name: String, + tpe: TypeInformation[_]) extends LeafExpression { + def typeInfo = tpe + + override def toString = s"'$name" +} + +case class Naming(child: Expression, override val name: String) extends UnaryExpression { + def typeInfo = child.typeInfo + + override def toString = s"$child as '$name" +}
http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/literals.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/literals.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/literals.scala new file mode 100644 index 0000000..03949ee --- /dev/null +++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/literals.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.expressions.tree + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.scala.expressions.ImplicitExpressionOperations + +object Literal { + def apply(l: Any): Literal = l match { + case i:Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO) + case l:Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO) + case d: Double => Literal(d, BasicTypeInfo.DOUBLE_TYPE_INFO) + case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO) + case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO) + case bool: Boolean => Literal(bool, BasicTypeInfo.BOOLEAN_TYPE_INFO) + } +} + +case class Literal(value: Any, tpe: TypeInformation[_]) + extends LeafExpression with ImplicitExpressionOperations { + def expr = this + def typeInfo = tpe + + override def toString = s"$value" +} http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/logic.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/logic.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/logic.scala new file mode 100644 index 0000000..8f0a068 --- /dev/null +++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/logic.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.expressions.tree + +import org.apache.flink.api.expressions.ExpressionException +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +abstract class BinaryPredicate extends BinaryExpression { + def typeInfo = { + if (left.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO || + right.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) { + throw new ExpressionException(s"Non-boolean operand types ${left.typeInfo} and " + + s"${right.typeInfo} in $this") + } + BasicTypeInfo.BOOLEAN_TYPE_INFO + } +} + +case class Not(child: Expression) extends UnaryExpression { + def typeInfo = { + if (child.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) { + throw new ExpressionException(s"Non-boolean operand type ${child.typeInfo} in $this") + } + BasicTypeInfo.BOOLEAN_TYPE_INFO + } + + override val name = Expression.freshName("not-" + child.name) + + override def toString = s"!($child)" +} + +case class And(left: Expression, right: Expression) extends BinaryPredicate { + override def toString = s"$left && $right" + + override val name = Expression.freshName(left.name + "-and-" + right.name) +} + +case class Or(left: Expression, right: Expression) extends BinaryPredicate { + override def toString = s"$left || $right" + + override val name = Expression.freshName(left.name + "-or-" + right.name) + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/package.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/package.scala new file mode 100644 index 0000000..04c29f7 --- /dev/null +++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/package.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.expressions + +/** + * This package contains the base class of AST nodes and all the expression language AST classes. + * Expression trees should not be manually constructed by users. They are implicitly constructed + * from the implicit DSL conversions in + * [[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and + * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]]. For the Java API, + * expression trees should be generated from a string parser that parses expressions and creates + * AST nodes. + */ +package object tree http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/stringExpressions.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/stringExpressions.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/stringExpressions.scala new file mode 100644 index 0000000..175d445 --- /dev/null +++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/stringExpressions.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.expressions.tree + +import org.apache.flink.api.expressions.ExpressionException +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo} + +case class Substring( + str: Expression, + beginIndex: Expression, + endIndex: Expression) extends Expression { + def typeInfo = { + if (str.typeInfo != BasicTypeInfo.STRING_TYPE_INFO) { + throw new ExpressionException( + s"""Operand must be of type String in $this, is ${str.typeInfo}.""") + } + if (!beginIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { + throw new ExpressionException( + s"""Begin index must be an integer type in $this, is ${beginIndex.typeInfo}.""") + } + if (!endIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { + throw new ExpressionException( + s"""End index must be an integer type in $this, is ${endIndex.typeInfo}.""") + } + + BasicTypeInfo.STRING_TYPE_INFO + } + + override def children: Seq[Expression] = Seq(str, beginIndex, endIndex) + override def toString = s"($str).substring($beginIndex, $endIndex)" +} http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenameOperator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenameOperator.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenameOperator.scala new file mode 100644 index 0000000..38c908d --- /dev/null +++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenameOperator.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.expressions.typeinfo + +import org.apache.flink.api.common.operators.Operator +import org.apache.flink.api.java.operators.SingleInputOperator +import org.apache.flink.api.java.{DataSet => JavaDataSet} + +/** + * This is a logical operator that can hold a [[RenamingProxyTypeInfo]] for renaming some + * fields of a [[org.apache.flink.api.common.typeutils.CompositeType]]. At runtime this + * disappears since the translation methods simply returns the input. + */ +class RenameOperator[T]( + input: JavaDataSet[T], + renamingTypeInformation: RenamingProxyTypeInfo[T]) + extends SingleInputOperator[T, T, RenameOperator[T]](input, renamingTypeInformation) { + + override protected def translateToDataFlow( + input: Operator[T]): Operator[T] = input +} http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenamingProxyTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenamingProxyTypeInfo.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenamingProxyTypeInfo.scala new file mode 100644 index 0000000..0263f8a --- /dev/null +++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenamingProxyTypeInfo.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.expressions.typeinfo + +import java.util + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor +import org.apache.flink.api.common.typeutils.{CompositeType, TypeComparator, TypeSerializer} + +/** + * A TypeInformation that is used to rename fields of an underlying CompositeType. This + * allows the system to translate "as" expression operations to a [[RenameOperator]] + * that does not get translated to a runtime operator. + */ +class RenamingProxyTypeInfo[T]( + tpe: CompositeType[T], + fieldNames: Array[String]) extends CompositeType[T](tpe.getTypeClass) { + + def getUnderlyingType: CompositeType[T] = tpe + + if (tpe.getArity != fieldNames.length) { + throw new IllegalArgumentException(s"Number of field names '${fieldNames.mkString(",")}' and " + + s"number of fields in underlying type $tpe do not match.") + } + + if (fieldNames.toSet.size != fieldNames.length) { + throw new IllegalArgumentException(s"New field names must be unique. " + + s"Names: ${fieldNames.mkString(",")}.") + } + + override def getFieldIndex(fieldName: String): Int = { + val result = fieldNames.indexOf(fieldName) + if (result != fieldNames.lastIndexOf(fieldName)) { + -2 + } else { + result + } + } + override def getFieldNames: Array[String] = fieldNames + + override def isBasicType: Boolean = tpe.isBasicType + + override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = + tpe.createSerializer(executionConfig) + + override def getArity: Int = tpe.getArity + + override def isKeyType: Boolean = tpe.isKeyType + + override def getTypeClass: Class[T] = tpe.getTypeClass + + override def getGenericParameters: java.util.List[TypeInformation[_]] = tpe.getGenericParameters + + override def isTupleType: Boolean = tpe.isTupleType + + override def toString = { + s"RenamingType(type: ${tpe.getTypeClass.getSimpleName}; " + + s"fields: ${fieldNames.mkString(", ")})" + } + + override def getTypeAt[X](pos: Int): TypeInformation[X] = tpe.getTypeAt(pos) + + override def getTotalFields: Int = tpe.getTotalFields + + override def createComparator( + logicalKeyFields: Array[Int], + orders: Array[Boolean], + logicalFieldOffset: Int, + executionConfig: ExecutionConfig) = + tpe.createComparator(logicalKeyFields, orders, logicalFieldOffset, executionConfig) + + // These are never called since we override create comparator + override protected def initializeNewComparator(localKeyCount: Int): Unit = + throw new RuntimeException("Cannot happen.") + + override protected def getNewComparator(executionConfig: ExecutionConfig): TypeComparator[T] = + throw new RuntimeException("Cannot happen.") + + override protected def addCompareField(fieldId: Int, comparator: TypeComparator[_]): Unit = + throw new RuntimeException("Cannot happen.") + + override def getFlatFields( + fieldExpression: String, + offset: Int, + result: util.List[FlatFieldDescriptor]): Unit = { + tpe.getFlatFields(fieldExpression, offset, result) + } + + override def getTypeAt[X](fieldExpression: String): TypeInformation[X] = { + tpe.getTypeAt(fieldExpression) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowSerializer.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowSerializer.scala new file mode 100644 index 0000000..006c0c9 --- /dev/null +++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowSerializer.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.expressions.typeinfo + +import org.apache.flink.api.expressions.Row +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.core.memory.{DataOutputView, DataInputView} +; + +/** + * Serializer for [[Row]]. + */ +class RowSerializer(fieldSerializers: Array[TypeSerializer[Any]]) + extends TypeSerializer[Row] { + + override def isImmutableType: Boolean = false + + override def getLength: Int = -1 + + override def duplicate = this + + override def createInstance: Row = { + new Row(fieldSerializers.length) + } + + override def copy(from: Row, reuse: Row): Row = { + val len = fieldSerializers.length + + if (from.productArity != len) { + throw new RuntimeException("Row arity of reuse and from do not match.") + } + var i = 0 + while (i < len) { + val reuseField = reuse.productElement(i) + val fromField = from.productElement(i).asInstanceOf[AnyRef] + val copy = fieldSerializers(i).copy(fromField, reuseField) + reuse.setField(i, copy) + i += 1 + } + reuse + } + + override def copy(from: Row): Row = { + val len = fieldSerializers.length + + if (from.productArity != len) { + throw new RuntimeException("Row arity of reuse and from do not match.") + } + val result = new Row(len) + var i = 0 + while (i < len) { + val fromField = from.productElement(i).asInstanceOf[AnyRef] + val copy = fieldSerializers(i).copy(fromField) + result.setField(i, copy) + i += 1 + } + result + } + + override def serialize(value: Row, target: DataOutputView) { + val len = fieldSerializers.length + var i = 0 + while (i < len) { + val serializer = fieldSerializers(i) + serializer.serialize(value.productElement(i), target) + i += 1 + } + } + + override def deserialize(reuse: Row, source: DataInputView): Row = { + val len = fieldSerializers.length + + if (reuse.productArity != len) { + throw new RuntimeException("Row arity of reuse and fields do not match.") + } + + var i = 0 + while (i < len) { + val field = reuse.productElement(i).asInstanceOf[AnyRef] + reuse.setField(i, fieldSerializers(i).deserialize(field, source)) + i += 1 + } + reuse + } + + override def deserialize(source: DataInputView): Row = { + val len = fieldSerializers.length + + val result = new Row(len) + var i = 0 + while (i < len) { + result.setField(i, fieldSerializers(i).deserialize(source)) + i += 1 + } + result + } + + override def copy(source: DataInputView, target: DataOutputView): Unit = { + val len = fieldSerializers.length + var i = 0 + while (i < len) { + fieldSerializers(i).copy(source, target) + i += 1 + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowTypeInfo.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowTypeInfo.scala new file mode 100644 index 0000000..92e9bc8 --- /dev/null +++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowTypeInfo.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.expressions.typeinfo + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.expressions.Row +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.expressions.tree.Expression +import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo} + +/** + * TypeInformation for [[Row]]. + */ +class RowTypeInfo( + fieldTypes: Seq[TypeInformation[_]], + fieldNames: Seq[String]) + extends CaseClassTypeInfo[Row](classOf[Row], Array(), fieldTypes, fieldNames) { + + def this(fields: Seq[Expression]) = this(fields.map(_.typeInfo), fields.map(_.name)) + + if (fieldNames.toSet.size != fieldNames.size) { + throw new IllegalArgumentException("Field names must be unique.") + } + + override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[Row] = { + val fieldSerializers: Array[TypeSerializer[Any]] = new Array[TypeSerializer[Any]](getArity) + for (i <- 0 until getArity) { + fieldSerializers(i) = this.types(i).createSerializer(executionConfig) + .asInstanceOf[TypeSerializer[Any]] + } + + new RowSerializer(fieldSerializers) + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataSetConversions.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataSetConversions.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataSetConversions.scala new file mode 100644 index 0000000..567d19c --- /dev/null +++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataSetConversions.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.expressions + +import org.apache.flink.api.expressions._ +import org.apache.flink.api.expressions.tree.{UnresolvedFieldReference, Expression} +import org.apache.flink.api.common.typeutils.CompositeType + +import org.apache.flink.api.scala._ + +/** + * Methods for converting a [[DataSet]] to an [[ExpressionOperation]]. A [[DataSet]] is + * wrapped in this by the implicit conversions in [[org.apache.flink.api.scala.expressions]]. + */ +class DataSetConversions[T](set: DataSet[T], inputType: CompositeType[T]) { + + /** + * Converts the [[DataSet]] to an [[ExpressionOperation]]. The field names of the resulting + * expression operation can be specified like this: + * + * {{{ + * val in: DataSet[(String, Int)] = ... + * val expr = in.as('a, 'b) + * }}} + * + * This results in an expression operation that has field `a` of type `String` and field `b` + * of type `Int`. + */ + def as(fields: Expression*): ExpressionOperation[ScalaBatchTranslator] = { + new ScalaBatchTranslator().createExpressionOperation(set, fields.toArray) + } + + /** + * Converts the [[DataSet]] to an [[ExpressionOperation]]. The field names of the resulting + * expression operation will be taken from the field names of the input type: + * + * {{{ + * val in: DataSet[(String, Int)] = ... + * val expr = in.toExpression + * }}} + * + * This results in an expression operation that has field `_1` of type `String` and field `_2` + * of type `Int`. + */ + def toExpression: ExpressionOperation[ScalaBatchTranslator] = { + val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference) + as(resultFields: _*) + } + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataStreamConversions.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataStreamConversions.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataStreamConversions.scala new file mode 100644 index 0000000..49dbce7 --- /dev/null +++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataStreamConversions.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.expressions + +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.expressions._ +import org.apache.flink.api.expressions.tree.{Expression, UnresolvedFieldReference} +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.DataStream + +class DataStreamConversions[T](set: DataStream[T], inputType: CompositeType[T]) { + + /** + * Converts the [[DataStream]] to an [[ExpressionOperation]]. The field names of the resulting + * expression operation can be specified like this: + * + * {{{ + * val in: DataSet[(String, Int)] = ... + * val expr = in.as('a, 'b) + * }}} + * + * This results in an expression operation that has field `a` of type `String` and field `b` + * of type `Int`. + */ + + def as(fields: Expression*): ExpressionOperation[ScalaStreamingTranslator] = { + new ScalaStreamingTranslator().createExpressionOperation(set, fields.toArray) + } + + /** + * Converts the [[DataStream]] to an [[ExpressionOperation]]. The field names of the resulting + * expression operation will be taken from the field names of the input type: + * + * {{{ + * val in: DataSet[(String, Int)] = ... + * val expr = in.toExpression + * }}} + * + * This results in an expression operation that has field `_1` of type `String` and field `_2` + * of type `Int`. + */ + + def toExpression: ExpressionOperation[ScalaStreamingTranslator] = { + val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference) + as(resultFields: _*) + } + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala new file mode 100644 index 0000000..037efd4 --- /dev/null +++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.expressions + +import java.lang.reflect.Modifier + +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.expressions.analysis.ExtractEquiJoinFields +import org.apache.flink.api.expressions.operations._ +import org.apache.flink.api.expressions.runtime.{ExpressionAggregateFunction, ExpressionFilterFunction, ExpressionJoinFunction, ExpressionSelectFunction} +import org.apache.flink.api.expressions.tree._ +import org.apache.flink.api.expressions.typeinfo.{RenameOperator, RenamingProxyTypeInfo, RowTypeInfo} +import org.apache.flink.api.expressions.{ExpressionException, ExpressionOperation, Row} +import org.apache.flink.api.java.aggregation.AggregationFunction +import org.apache.flink.api.java.operators.JoinOperator.EquiJoin +import org.apache.flink.api.java.operators.Keys.ExpressionKeys +import org.apache.flink.api.java.operators.{GroupReduceOperator, Keys, MapOperator, UnsortedGrouping} +import org.apache.flink.api.java.{DataSet => JavaDataSet} + +/** + * [[OperationTranslator]] for creating [[ExpressionOperation]]s from Java [[JavaDataSet]]s and + * translating them back to Scala [[JavaDataSet]]s. + */ +class JavaBatchTranslator extends OperationTranslator { + + type Representation[A] = JavaDataSet[A] + + def createExpressionOperation[A]( + repr: JavaDataSet[A], + fields: Array[Expression]): ExpressionOperation[JavaBatchTranslator] = { + + // shortcut for DataSet[Row] + repr.getType match { + case rowTypeInfo: RowTypeInfo => + val expressions = rowTypeInfo.getFieldNames map { + name => (name, rowTypeInfo.getTypeAt(name)) + } + new ExpressionOperation( + Root(repr.asInstanceOf[JavaDataSet[Row]], expressions), this) + case _ => + } + + val clazz = repr.getType.getTypeClass + if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) { + throw new ExpressionException("Cannot create expression Operation from DataSet of type " + + clazz.getName + ". Only top-level classes or static members classes " + + " are supported.") + } + + if (!repr.getType.isInstanceOf[CompositeType[_]]) { + throw new ExpressionException("Only DataSets of composite type can be transformed to an" + + " Expression Operation. These would be tuples, case classes and POJOs.") + } + + val inputType = repr.getType.asInstanceOf[CompositeType[A]] + + if (fields.length != inputType.getFieldNames.length) { + throw new ExpressionException("Number of selected fields: '" + fields.mkString(",") + + "' and number of fields in input type " + inputType + " do not match.") + } + + val newFieldNames = fields map { + case UnresolvedFieldReference(name) => name + case e => + throw new ExpressionException("Only field expressions allowed in 'as' operation, " + + " offending expression: " + e) + } + + if (newFieldNames.toSet.size != newFieldNames.size) { + throw new ExpressionException(s"Ambiguous field names in ${fields.mkString(", ")}") + } + + val resultFields: Seq[(String, TypeInformation[_])] = newFieldNames.zipWithIndex map { + case (name, index) => (name, inputType.getTypeAt(index)) + } + + val inputFields = inputType.getFieldNames + val fieldMappings = inputFields.zip(resultFields) + val expressions: Array[Expression] = fieldMappings map { + case (oldName, (newName, tpe)) => Naming(ResolvedFieldReference(oldName, tpe), newName) + } + + val rowDataSet = createSelect(expressions, repr, inputType) + + new ExpressionOperation(Root(rowDataSet, resultFields), new JavaBatchTranslator) + } + + override def translate[A](op: Operation)(implicit tpe: TypeInformation[A]): JavaDataSet[A] = { + + if (tpe.getTypeClass == classOf[Row]) { + // shortcut for DataSet[Row] + return translateInternal(op).asInstanceOf[JavaDataSet[A]] + } + + val clazz = tpe.getTypeClass + if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) { + throw new ExpressionException("Cannot create DataSet of type " + + clazz.getName + ". Only top-level classes or static member classes are supported.") + } + + + if (!implicitly[TypeInformation[A]].isInstanceOf[CompositeType[A]]) { + throw new ExpressionException( + "Expression operations can only be converted to composite types, type is: " + + implicitly[TypeInformation[A]] + + ". Composite types would be tuples, case classes and POJOs.") + } + + val resultSet = translateInternal(op) + + val resultType = resultSet.getType.asInstanceOf[RowTypeInfo] + + val outputType = implicitly[TypeInformation[A]].asInstanceOf[CompositeType[A]] + + val resultNames = resultType.getFieldNames + val outputNames = outputType.getFieldNames.toSeq + + if (resultNames.toSet != outputNames.toSet) { + throw new ExpressionException(s"Expression result type $resultType does not have the same" + + s"fields as output type $outputType") + } + + for (f <- outputNames) { + val in = resultType.getTypeAt(resultType.getFieldIndex(f)) + val out = outputType.getTypeAt(outputType.getFieldIndex(f)) + if (!in.equals(out)) { + throw new ExpressionException(s"Types for field $f differ on input $resultType and " + + s"output $outputType.") + } + } + + val outputFields = outputNames map { + f => ResolvedFieldReference(f, resultType.getTypeAt(f)) + } + + val function = new ExpressionSelectFunction( + resultSet.getType.asInstanceOf[RowTypeInfo], + outputType, + outputFields) + + val opName = s"select(${outputFields.mkString(",")})" + val operator = new MapOperator(resultSet, outputType, function, opName) + + operator + } + + private def translateInternal(op: Operation): JavaDataSet[Row] = { + op match { + case Root(dataSet: JavaDataSet[Row], resultFields) => + dataSet + + case As(input, newNames) => + val translatedInput = translateInternal(input) + val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] + val proxyType = new RenamingProxyTypeInfo[Row](inType, newNames.toArray) + new RenameOperator(translatedInput, proxyType) + + case sel@Select(Filter(Join(leftInput, rightInput), predicate), selection) => + + val expandedInput = ExpandAggregations(sel) + + if (expandedInput.eq(sel)) { + val translatedLeftInput = translateInternal(leftInput) + val translatedRightInput = translateInternal(rightInput) + val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]] + val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]] + + createJoin( + predicate, + selection, + translatedLeftInput, + translatedRightInput, + leftInType, + rightInType, + JoinHint.OPTIMIZER_CHOOSES) + } else { + translateInternal(expandedInput) + } + + case Filter(Join(leftInput, rightInput), predicate) => + val translatedLeftInput = translateInternal(leftInput) + val translatedRightInput = translateInternal(rightInput) + val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]] + val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]] + + createJoin( + predicate, + leftInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)) ++ + rightInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)), + translatedLeftInput, + translatedRightInput, + leftInType, + rightInType, + JoinHint.OPTIMIZER_CHOOSES) + + case Join(leftInput, rightInput) => + throw new ExpressionException("Join without filter condition encountered. " + + "Did you forget to add .where(...) ?") + + case sel@Select(input, selection) => + + val expandedInput = ExpandAggregations(sel) + + if (expandedInput.eq(sel)) { + // no expansions took place + val translatedInput = translateInternal(input) + val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] + val inputFields = inType.getFieldNames + createSelect( + selection, + translatedInput, + inType) + } else { + translateInternal(expandedInput) + } + + case agg@Aggregate(GroupBy(input, groupExpressions), aggregations) => + val translatedInput = translateInternal(input) + val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] + + val keyIndices = groupExpressions map { + case fe: ResolvedFieldReference => inType.getFieldIndex(fe.name) + case e => throw new ExpressionException(s"Expression $e is not a valid key expression.") + } + + val keys = new Keys.ExpressionKeys(keyIndices.toArray, inType, false) + + val grouping = new UnsortedGrouping(translatedInput, keys) + + val aggFunctions: Seq[AggregationFunction[Any]] = aggregations map { + case (fieldName, fun) => + fun.getFactory.createAggregationFunction[Any]( + inType.getTypeAt[Any](inType.getFieldIndex(fieldName)).getTypeClass) + } + + val aggIndices = aggregations map { + case (fieldName, _) => + inType.getFieldIndex(fieldName) + } + + val result = new GroupReduceOperator( + grouping, + inType, + new ExpressionAggregateFunction(aggIndices, aggFunctions), + "Expression Aggregation: " + agg) + + result + + case agg@Aggregate(input, aggregations) => + val translatedInput = translateInternal(input) + val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] + + val aggFunctions: Seq[AggregationFunction[Any]] = aggregations map { + case (fieldName, fun) => + fun.getFactory.createAggregationFunction[Any]( + inType.getTypeAt[Any](inType.getFieldIndex(fieldName)).getTypeClass) + } + + val aggIndices = aggregations map { + case (fieldName, _) => + inType.getFieldIndex(fieldName) + } + + val result = new GroupReduceOperator( + translatedInput, + inType, + new ExpressionAggregateFunction(aggIndices, aggFunctions), + "Expression Aggregation: " + agg) + + result + + + case Filter(input, predicate) => + val translatedInput = translateInternal(input) + val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] + val filter = new ExpressionFilterFunction[Row](predicate, inType) + translatedInput.filter(filter) + } + } + + private def createSelect[I]( + fields: Seq[Expression], + input: JavaDataSet[I], + inputType: CompositeType[I]): JavaDataSet[Row] = { + + fields foreach { + f => + if (f.exists(_.isInstanceOf[Aggregation])) { + throw new ExpressionException("Found aggregate in " + fields.mkString(", ") + ".") + } + + } + + val resultType = new RowTypeInfo(fields) + + val function = new ExpressionSelectFunction(inputType, resultType, fields) + + val opName = s"select(${fields.mkString(",")})" + val operator = new MapOperator(input, resultType, function, opName) + + operator + } + + private def createJoin[L, R]( + predicate: Expression, + fields: Seq[Expression], + leftInput: JavaDataSet[L], + rightInput: JavaDataSet[R], + leftType: CompositeType[L], + rightType: CompositeType[R], + joinHint: JoinHint): JavaDataSet[Row] = { + + val resultType = new RowTypeInfo(fields) + + val (reducedPredicate, leftFields, rightFields) = + ExtractEquiJoinFields(leftType, rightType, predicate) + + val leftKey = new ExpressionKeys[L](leftFields, leftType) + val rightKey = new ExpressionKeys[R](rightFields, rightType) + + val joiner = new ExpressionJoinFunction[L, R, Row]( + reducedPredicate, + leftType, + rightType, + resultType, + fields) + + new EquiJoin[L, R, Row]( + leftInput, + rightInput, + leftKey, + rightKey, + joiner, + resultType, + joinHint, + predicate.toString) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala new file mode 100644 index 0000000..095823e --- /dev/null +++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.expressions + +import java.lang.reflect.Modifier + +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.expressions.operations._ +import org.apache.flink.api.expressions.runtime.{ExpressionFilterFunction, ExpressionSelectFunction} +import org.apache.flink.api.expressions.tree._ +import org.apache.flink.api.expressions.typeinfo.RowTypeInfo +import org.apache.flink.api.expressions.{ExpressionException, ExpressionOperation, Row} +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.invokable.operator.MapInvokable + +/** + * [[OperationTranslator]] for creating [[ExpressionOperation]]s from Java [[DataStream]]s and + * translating them back to Java [[DataStream]]s. + * + * This is very limited right now. Only select and filter are implemented. Also, the expression + * operations must be extended to allow windowing operations. + */ + +class JavaStreamingTranslator extends OperationTranslator { + + type Representation[A] = DataStream[A] + + def createExpressionOperation[A]( + repr: DataStream[A], + fields: Array[Expression]): ExpressionOperation[JavaStreamingTranslator] = { + + // shortcut for DataSet[Row] + repr.getType match { + case rowTypeInfo: RowTypeInfo => + val expressions = rowTypeInfo.getFieldNames map { + name => (name, rowTypeInfo.getTypeAt(name)) + } + new ExpressionOperation( + Root(repr.asInstanceOf[DataStream[Row]], expressions), this) + case _ => + } + + val clazz = repr.getType.getTypeClass + if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) { + throw new ExpressionException("Cannot create expression Operation from DataSet of type " + + clazz.getName + ". Only top-level classes or static members classes " + + " are supported.") + } + + if (!repr.getType.isInstanceOf[CompositeType[_]]) { + throw new ExpressionException("Only DataSets of composite type can be transformed to an" + + " Expression Operation. These would be tuples, case classes and POJOs.") + } + + val inputType = repr.getType.asInstanceOf[CompositeType[A]] + + if (fields.length != inputType.getFieldNames.length) { + throw new ExpressionException("Number of selected fields: '" + fields.mkString(",") + + "' and number of fields in input type " + inputType + " do not match.") + } + + val newFieldNames = fields map { + case UnresolvedFieldReference(name) => name + case e => + throw new ExpressionException("Only field expressions allowed in 'as' operation, " + + " offending expression: " + e) + } + + if (newFieldNames.toSet.size != newFieldNames.size) { + throw new ExpressionException(s"Ambiguous field names in ${fields.mkString(", ")}") + } + + val resultFields: Seq[(String, TypeInformation[_])] = newFieldNames.zipWithIndex map { + case (name, index) => (name, inputType.getTypeAt(index)) + } + + val inputFields = inputType.getFieldNames + val fieldMappings = inputFields.zip(resultFields) + val expressions: Array[Expression] = fieldMappings map { + case (oldName, (newName, tpe)) => Naming(ResolvedFieldReference(oldName, tpe), newName) + } + + val rowDataSet = createSelect(expressions, repr, inputType) + + new ExpressionOperation(Root(rowDataSet, resultFields), new JavaStreamingTranslator) + } + + override def translate[A](op: Operation)(implicit tpe: TypeInformation[A]): DataStream[A] = { + + if (tpe.getTypeClass == classOf[Row]) { + // shortcut for DataSet[Row] + return translateInternal(op).asInstanceOf[DataStream[A]] + } + + val clazz = tpe.getTypeClass + if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) { + throw new ExpressionException("Cannot create DataStream of type " + + clazz.getName + ". Only top-level classes or static member classes are supported.") + } + + if (!implicitly[TypeInformation[A]].isInstanceOf[CompositeType[A]]) { + throw new ExpressionException( + "Expression operations can only be converted to composite types, type is: " + + implicitly[TypeInformation[A]] + + ". Composite types would be tuples, case classes and POJOs.") + + } + + val resultSet = translateInternal(op) + + val resultType = resultSet.getType.asInstanceOf[RowTypeInfo] + + val outputType = implicitly[TypeInformation[A]].asInstanceOf[CompositeType[A]] + + val resultNames = resultType.getFieldNames + val outputNames = outputType.getFieldNames.toSeq + + if (resultNames.toSet != outputNames.toSet) { + throw new ExpressionException(s"Expression result type $resultType does not have the same" + + s"fields as output type $outputType") + } + + for (f <- outputNames) { + val in = resultType.getTypeAt(resultType.getFieldIndex(f)) + val out = outputType.getTypeAt(outputType.getFieldIndex(f)) + if (!in.equals(out)) { + throw new ExpressionException(s"Types for field $f differ on input $resultType and " + + s"output $outputType.") + } + } + + val outputFields = outputNames map { + f => ResolvedFieldReference(f, resultType.getTypeAt(f)) + } + + val function = new ExpressionSelectFunction( + resultSet.getType.asInstanceOf[RowTypeInfo], + outputType, + outputFields) + + val opName = s"select(${outputFields.mkString(",")})" + + resultSet.transform(opName, outputType, new MapInvokable[Row, A](function)) + } + + private def translateInternal(op: Operation): DataStream[Row] = { + op match { + case Root(dataSet: DataStream[Row], resultFields) => + dataSet + + case As(input, newNames) => + throw new ExpressionException("As operation for Streams not yet implemented.") + + case sel@Select(Filter(Join(leftInput, rightInput), predicate), selection) => + + val expandedInput = ExpandAggregations(sel) + + if (expandedInput.eq(sel)) { + val translatedLeftInput = translateInternal(leftInput) + val translatedRightInput = translateInternal(rightInput) + val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]] + val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]] + + createJoin( + predicate, + selection, + translatedLeftInput, + translatedRightInput, + leftInType, + rightInType, + JoinHint.OPTIMIZER_CHOOSES) + } else { + translateInternal(expandedInput) + } + + case Filter(Join(leftInput, rightInput), predicate) => + val translatedLeftInput = translateInternal(leftInput) + val translatedRightInput = translateInternal(rightInput) + val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]] + val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]] + + createJoin( + predicate, + leftInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)) ++ + rightInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)), + translatedLeftInput, + translatedRightInput, + leftInType, + rightInType, + JoinHint.OPTIMIZER_CHOOSES) + + case Join(leftInput, rightInput) => + throw new ExpressionException("Join without filter condition encountered. " + + "Did you forget to add .where(...) ?") + + case sel@Select(input, selection) => + + val expandedInput = ExpandAggregations(sel) + + if (expandedInput.eq(sel)) { + // no expansions took place + val translatedInput = translateInternal(input) + val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] + val inputFields = inType.getFieldNames + createSelect( + selection, + translatedInput, + inType) + } else { + translateInternal(expandedInput) + } + + case agg@Aggregate(GroupBy(input, groupExpressions), aggregations) => + throw new ExpressionException("Aggregate operation for Streams not yet implemented.") + + case agg@Aggregate(input, aggregations) => + throw new ExpressionException("Aggregate operation for Streams not yet implemented.") + + case Filter(input, predicate) => + val translatedInput = translateInternal(input) + val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] + val filter = new ExpressionFilterFunction[Row](predicate, inType) + translatedInput.filter(filter) + } + } + + private def createSelect[I]( + fields: Seq[Expression], + input: DataStream[I], + inputType: CompositeType[I]): DataStream[Row] = { + + fields foreach { + f => + if (f.exists(_.isInstanceOf[Aggregation])) { + throw new ExpressionException("Found aggregate in " + fields.mkString(", ") + ".") + } + + } + + val resultType = new RowTypeInfo(fields) + + val function = new ExpressionSelectFunction(inputType, resultType, fields) + + val opName = s"select(${fields.mkString(",")})" + + input.transform(opName, resultType, new MapInvokable[I, Row](function)) + } + + private def createJoin[L, R]( + predicate: Expression, + fields: Seq[Expression], + leftInput: DataStream[L], + rightInput: DataStream[R], + leftType: CompositeType[L], + rightType: CompositeType[R], + joinHint: JoinHint): DataStream[Row] = { + + throw new ExpressionException("Join operation for Streams not yet implemented.") + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaBatchTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaBatchTranslator.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaBatchTranslator.scala new file mode 100644 index 0000000..724c8a7 --- /dev/null +++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaBatchTranslator.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.expressions + + +import org.apache.flink.api.expressions.tree.Expression +import org.apache.flink.api.scala.wrap +import org.apache.flink.api.expressions.operations._ +import org.apache.flink.api.expressions.ExpressionOperation +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.DataSet + +import scala.reflect.ClassTag + + +/** + * [[OperationTranslator]] for creating [[ExpressionOperation]]s from Scala [[DataSet]]s and + * translating them back to Scala [[DataSet]]s. + */ +class ScalaBatchTranslator extends OperationTranslator { + + private val javaTranslator = new JavaBatchTranslator + + override type Representation[A] = DataSet[A] + + def createExpressionOperation[A]( + repr: DataSet[A], + fields: Array[Expression]): ExpressionOperation[ScalaBatchTranslator] = { + + val result = javaTranslator.createExpressionOperation(repr.javaSet, fields) + + new ExpressionOperation[ScalaBatchTranslator](result.operation, this) + } + + override def translate[O](op: Operation)(implicit tpe: TypeInformation[O]): DataSet[O] = { + // fake it till you make it ... + wrap(javaTranslator.translate(op))(ClassTag.AnyRef.asInstanceOf[ClassTag[O]]) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaStreamingTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaStreamingTranslator.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaStreamingTranslator.scala new file mode 100644 index 0000000..7db483f --- /dev/null +++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaStreamingTranslator.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.expressions + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.expressions.operations._ +import org.apache.flink.api.expressions.tree.Expression +import org.apache.flink.api.expressions.{ExpressionOperation, Row} +import org.apache.flink.api.scala.DataSet +import org.apache.flink.streaming.api.scala.DataStream + +import org.apache.flink.streaming.api.scala.javaToScalaStream + +/** + * [[OperationTranslator]] for creating [[ExpressionOperation]]s from Scala [[DataStream]]s and + * translating them back to Scala [[DataStream]]s. + * + * This is very limited right now. Only select and filter are implemented. Also, the expression + * operations must be extended to allow windowing operations. + */ +class ScalaStreamingTranslator extends OperationTranslator { + + private val javaTranslator = new JavaStreamingTranslator + + override type Representation[A] = DataStream[A] + + def createExpressionOperation[A]( + repr: DataStream[A], + fields: Array[Expression]): ExpressionOperation[ScalaStreamingTranslator] = { + + val result = javaTranslator.createExpressionOperation(repr.getJavaStream, fields) + + new ExpressionOperation[ScalaStreamingTranslator](result.operation, this) + } + + override def translate[O](op: Operation)(implicit tpe: TypeInformation[O]): DataStream[O] = { + // fake it till you make it ... + javaToScalaStream(javaTranslator.translate(op)) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala new file mode 100644 index 0000000..ef25b5b --- /dev/null +++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.expressions + +import org.apache.flink.api.expressions.tree._ +import org.apache.flink.api.common.typeinfo.TypeInformation + +import scala.language.implicitConversions + +/** + * These are all the operations that can be used to construct an [[Expression]] AST for expression + * operations. + */ +trait ImplicitExpressionOperations { + def expr: Expression + + def && (other: Expression) = And(expr, other) + def || (other: Expression) = Or(expr, other) + + def > (other: Expression) = GreaterThan(expr, other) + def >= (other: Expression) = GreaterThanOrEqual(expr, other) + def < (other: Expression) = LessThan(expr, other) + def <= (other: Expression) = LessThanOrEqual(expr, other) + + def === (other: Expression) = EqualTo(expr, other) + def !== (other: Expression) = NotEqualTo(expr, other) + + def unary_! = Not(expr) + def unary_- = UnaryMinus(expr) + + def isNull = IsNull(expr) + def isNotNull = IsNotNull(expr) + + def + (other: Expression) = Plus(expr, other) + def - (other: Expression) = Minus(expr, other) + def / (other: Expression) = Div(expr, other) + def * (other: Expression) = Mul(expr, other) + def % (other: Expression) = Mod(expr, other) + + def & (other: Expression) = BitwiseAnd(expr, other) + def | (other: Expression) = BitwiseOr(expr, other) + def ^ (other: Expression) = BitwiseXor(expr, other) + def unary_~ = BitwiseNot(expr) + + def abs = Abs(expr) + + def sum = Sum(expr) + def min = Min(expr) + def max = Max(expr) + def count = Count(expr) + def avg = Avg(expr) + + def substring(beginIndex: Expression, endIndex: Expression = Literal(Int.MaxValue)) = { + Substring(expr, beginIndex, endIndex) + } + + def cast(toType: TypeInformation[_]) = Cast(expr, toType) + + def as(name: Symbol) = Naming(expr, name.name) +} + +/** + * Implicit conversions from Scala Literals to Expression [[Literal]] and from [[Expression]] + * to [[ImplicitExpressionOperations]]. + */ +trait ImplicitExpressionConversions { + implicit class WithOperations(e: Expression) extends ImplicitExpressionOperations { + def expr = e + } + + implicit class SymbolExpression(s: Symbol) extends ImplicitExpressionOperations { + def expr = UnresolvedFieldReference(s.name) + } + + implicit class LiteralIntExpression(i: Int) extends ImplicitExpressionOperations { + def expr = Literal(i) + } + + implicit class LiteralFloatExpression(f: Float) extends ImplicitExpressionOperations { + def expr = Literal(f) + } + + implicit class LiteralDoubleExpression(d: Double) extends ImplicitExpressionOperations { + def expr = Literal(d) + } + + implicit class LiteralStringExpression(str: String) extends ImplicitExpressionOperations { + def expr = Literal(str) + } + + implicit class LiteralBooleanExpression(bool: Boolean) extends ImplicitExpressionOperations { + def expr = Literal(bool) + } + + implicit def symbol2FieldExpression(sym: Symbol): Expression = UnresolvedFieldReference(sym.name) + implicit def int2Literal(i: Int): Expression = Literal(i) + implicit def long2Literal(l: Long): Expression = Literal(l) + implicit def double2Literal(d: Double): Expression = Literal(d) + implicit def float2Literal(d: Float): Expression = Literal(d) + implicit def string2Literal(str: String): Expression = Literal(str) + implicit def boolean2Literal(bool: Boolean): Expression = Literal(bool) +} http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/package.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/package.scala new file mode 100644 index 0000000..f12e7a0 --- /dev/null +++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/package.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala + +import com.google.common.base.Preconditions +import org.apache.flink.api.expressions.{Row, ExpressionOperation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.streaming.api.scala.DataStream + +import scala.language.implicitConversions + +/** + * == Language Integrated Queries (aka Expression Operations) == + * + * Importing this package with: + * + * {{{ + * import org.apache.flink.api.scala.expressions._ + * }}} + * + * imports implicit conversions for converting a [[DataSet]] or [[DataStream]] to an + * [[ExpressionOperation]]. This can be used to perform SQL-like queries on data. Please have + * a look at [[ExpressionOperation]] to see which operations are supported and + * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]] to see how an + * expression can be specified. + * + * Inside an expression operation you can use Scala Symbols to refer to field names. One would + * refer to field `a` by writing `'a`. Sometimes it is necessary to manually confert a + * Scala literal to an Expression Literal, in those cases use `Literal`, as in `Literal(3)`. + * + * Example: + * + * {{{ + * import org.apache.flink.api.scala._ + * import org.apache.flink.api.scala.expressions._ + * + * val env = ExecutionEnvironment.getExecutionEnvironment + * val input = env.fromElements(("Hello", 2), ("Hello", 5), ("Ciao", 3)) + * val result = input.as('word, 'count).groupBy('word).select('word, 'count.avg) + * result.print() + * + * env.execute() + * }}} + * + * The result of an [[ExpressionOperation]] can be converted back to the underlying API + * representation using `as`: + * + * {{{ + * case class Word(word: String, count: Int) + * + * val result = in.select(...).as('word, 'count) + * val set = result.as[Word] + * }}} + */ +package object expressions extends ImplicitExpressionConversions { + + implicit def dataSet2DataSetConversions[T](set: DataSet[T]): DataSetConversions[T] = { + new DataSetConversions[T](set, set.getType.asInstanceOf[CompositeType[T]]) + } + + implicit def expressionOperation2RowDataSet( + expressionOperation: ExpressionOperation[ScalaBatchTranslator]): DataSet[Row] = { + expressionOperation.as[Row] + } + + implicit def rowDataSet2ExpressionOperation( + rowDataSet: DataSet[Row]): ExpressionOperation[ScalaBatchTranslator] = { + rowDataSet.toExpression + } + + implicit def dataStream2DataSetConversions[T]( + stream: DataStream[T]): DataStreamConversions[T] = { + new DataStreamConversions[T]( + stream, + stream.getJavaStream.getType.asInstanceOf[CompositeType[T]]) + } + + implicit def expressionOperation2RowDataStream( + expressionOperation: ExpressionOperation[ScalaStreamingTranslator]): DataStream[Row] = { + expressionOperation.as[Row] + } + + implicit def rowDataStream2ExpressionOperation( + rowDataStream: DataStream[Row]): ExpressionOperation[ScalaStreamingTranslator] = { + rowDataStream.toExpression + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala new file mode 100644 index 0000000..dadfe09 --- /dev/null +++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.examples.scala + +import org.apache.flink.api.expressions.tree.Literal +import org.apache.flink.api.common.functions.GroupReduceFunction +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.expressions._ +import org.apache.flink.examples.java.graph.util.PageRankData +import org.apache.flink.util.Collector + +import _root_.scala.collection.JavaConverters._ + +/** +* A basic implementation of the Page Rank algorithm using a bulk iteration. +* +* This implementation requires a set of pages and a set of directed links as input and works as +* follows. +* +* In each iteration, the rank of every page is evenly distributed to all pages it points to. Each +* page collects the partial ranks of all pages that point to it, sums them up, and applies a +* dampening factor to the sum. The result is the new rank of the page. A new iteration is started +* with the new ranks of all pages. This implementation terminates after a fixed number of +* iterations. This is the Wikipedia entry for the +* [[http://en.wikipedia.org/wiki/Page_rank Page Rank algorithm]] +* +* Input files are plain text files and must be formatted as follows: +* +* - Pages represented as an (long) ID separated by new-line characters. +* For example `"1\n2\n12\n42\n63\n"` gives five pages with IDs 1, 2, 12, 42, and 63. +* - Links are represented as pairs of page IDs which are separated by space characters. Links +* are separated by new-line characters. +* For example `"1 2\n2 12\n1 12\n42 63\n"` gives four (directed) links (1)->(2), (2)->(12), +* (1)->(12), and (42)->(63). For this simple implementation it is required that each page has +* at least one incoming and one outgoing link (a page can point to itself). +* +* Usage: +* {{{ +* PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations> +* }}} +* +* If no parameters are provided, the program is run with default data from +* [[org.apache.flink.examples.java.graph.util.PageRankData]] and 10 iterations. +* +* This example shows how to use: +* +* - Bulk Iterations +* - Expression Operations +*/ +object PageRankExpression { + + private final val DAMPENING_FACTOR: Double = 0.85 + private final val EPSILON: Double = 0.0001 + + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + // set up execution environment + val env = ExecutionEnvironment.getExecutionEnvironment + + // read input data + val pagesWithRanks = getPagesDataSet(env).map { p => (p, 1.0 / numPages) } + .as('pageId, 'rank) + + val links = getLinksDataSet(env) + + // build adjacency list from link input + val adjacencyLists = links + .groupBy("sourceId").reduceGroup( new GroupReduceFunction[Link, AdjacencyList] { + + override def reduce( + values: _root_.java.lang.Iterable[Link], + out: Collector[AdjacencyList]): Unit = { + var outputId = -1L + val outputList = values.asScala map { t => outputId = t.sourceId; t.targetId } + out.collect(new AdjacencyList(outputId, outputList.toArray)) + } + + }).as('sourceId, 'targetIds) + + // start iteration + val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) { + currentRanks => + val newRanks = currentRanks.toExpression + // distribute ranks to target pages + .join(adjacencyLists).where('pageId === 'sourceId) + .select('rank, 'targetIds).as[RankOutput] + .flatMap { + (in, out: Collector[(Long, Double)]) => + val targets = in.targetIds + val len = targets.length + targets foreach { t => out.collect((t, in.rank / len )) } + } + .as('pageId, 'rank) + // collect ranks and sum them up + .groupBy('pageId).select('pageId, 'rank.sum as 'rank) + // apply dampening factor + .select( + 'pageId, + ('rank * DAMPENING_FACTOR) + (Literal(1) - DAMPENING_FACTOR) / numPages as 'rank) + + + val termination = currentRanks.toExpression + .as('curId, 'curRank).join(newRanks.as('newId, 'newRank)) + .where('curId === 'newId && ('curRank - 'newRank).abs > EPSILON) + + (newRanks, termination) + } + + val result = finalRanks + + // emit result + if (fileOutput) { + result.writeAsCsv(outputPath, "\n", " ") + } else { + result.print() + } + + // execute program + env.execute("Expression PageRank Example") + } + + // ************************************************************************* + // USER TYPES + // ************************************************************************* + + case class Link(sourceId: Long, targetId: Long) + + case class Page(pageId: Long, rank: Double) + + case class AdjacencyList(sourceId: Long, targetIds: Array[Long]) + + case class RankOutput(rank: Double, targetIds: Array[Long]) + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private def parseParameters(args: Array[String]): Boolean = { + if (args.length > 0) { + fileOutput = true + if (args.length == 5) { + pagesInputPath = args(0) + linksInputPath = args(1) + outputPath = args(2) + numPages = args(3).toLong + maxIterations = args(4).toInt + } else { + System.err.println("Usage: PageRankBasic <pages path> <links path> <output path> <num " + + "pages> <num iterations>") + false + } + } else { + System.out.println("Executing PageRank Basic example with default parameters and built-in " + + "default data.") + System.out.println(" Provide parameters to read input data from files.") + System.out.println(" See the documentation for the correct format of input files.") + System.out.println(" Usage: PageRankBasic <pages path> <links path> <output path> <num " + + "pages> <num iterations>") + + numPages = PageRankData.getNumberOfPages + } + true + } + + private def getPagesDataSet(env: ExecutionEnvironment): DataSet[Long] = { + if (fileOutput) { + env.readCsvFile[Tuple1[Long]](pagesInputPath, fieldDelimiter = " ", lineDelimiter = "\n") + .map(x => x._1) + } else { + env.generateSequence(1, 15) + } + } + + private def getLinksDataSet(env: ExecutionEnvironment): DataSet[Link] = { + if (fileOutput) { + env.readCsvFile[Link](linksInputPath, fieldDelimiter = " ", + includedFields = Array(0, 1)) + } else { + val edges = PageRankData.EDGES.map { case Array(v1, v2) => Link(v1.asInstanceOf[Long], + v2.asInstanceOf[Long])} + env.fromCollection(edges) + } + } + + private var fileOutput: Boolean = false + private var pagesInputPath: String = null + private var linksInputPath: String = null + private var outputPath: String = null + private var numPages: Double = 0 + private var maxIterations: Int = 10 + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala new file mode 100644 index 0000000..2d1d0ec --- /dev/null +++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.examples.scala + +import org.apache.flink.streaming.api.scala._ + +import org.apache.flink.api.scala.expressions._ + +import scala.Stream._ +import scala.math._ +import scala.language.postfixOps +import scala.util.Random + +/** + * Simple example for demonstrating the working streaming api expression operations. + */ +object StreamingExpressionFilter { + + case class CarEvent(carId: Int, speed: Int, distance: Double, time: Long) extends Serializable + + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + val cars = genCarStream().toExpression + .filter('carId === 0) + .select('carId, 'speed, 'distance + 1000 as 'distance, 'time % 5 as 'time) + .as[CarEvent] + + cars.print() + + StreamExecutionEnvironment.getExecutionEnvironment.execute("TopSpeedWindowing") + + } + + def genCarStream(): DataStream[CarEvent] = { + + def nextSpeed(carEvent : CarEvent) : CarEvent = + { + val next = + if (Random.nextBoolean()) min(100, carEvent.speed + 5) else max(0, carEvent.speed - 5) + CarEvent(carEvent.carId, next, carEvent.distance + next/3.6d,System.currentTimeMillis) + } + def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] = + { + Thread.sleep(1000) + speeds.append(carStream(speeds.map(nextSpeed))) + } + carStream(range(0, numOfCars).map(CarEvent(_,50,0,System.currentTimeMillis()))) + } + + def parseParameters(args: Array[String]): Boolean = { + if (args.length > 0) { + if (args.length == 3) { + numOfCars = args(0).toInt + evictionSec = args(1).toInt + triggerMeters = args(2).toDouble + true + } + else { + System.err.println("Usage: TopSpeedWindowing <numCars> <evictSec> <triggerMeters>") + false + } + }else{ + true + } + } + + var numOfCars = 2 + var evictionSec = 10 + var triggerMeters = 50d + +}