http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/fieldExpression.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/fieldExpression.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/fieldExpression.scala
new file mode 100644
index 0000000..a1d8589
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/fieldExpression.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.tree
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+case class UnresolvedFieldReference(override val name: String) extends 
LeafExpression {
+  def typeInfo = throw new ExpressionException(s"Unresolved field reference: 
$this")
+
+  override def toString = "\"" + name
+}
+
+case class ResolvedFieldReference(
+    override val name: String,
+    tpe: TypeInformation[_]) extends LeafExpression {
+  def typeInfo = tpe
+
+  override def toString = s"'$name"
+}
+
+case class Naming(child: Expression, override val name: String) extends 
UnaryExpression {
+  def typeInfo = child.typeInfo
+
+  override def toString = s"$child as '$name"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/literals.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/literals.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/literals.scala
new file mode 100644
index 0000000..03949ee
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/literals.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.tree
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.scala.expressions.ImplicitExpressionOperations
+
+object Literal {
+  def apply(l: Any): Literal = l match {
+    case i:Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO)
+    case l:Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO)
+    case d: Double => Literal(d, BasicTypeInfo.DOUBLE_TYPE_INFO)
+    case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO)
+    case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO)
+    case bool: Boolean => Literal(bool, BasicTypeInfo.BOOLEAN_TYPE_INFO)
+  }
+}
+
+case class Literal(value: Any, tpe: TypeInformation[_])
+  extends LeafExpression with ImplicitExpressionOperations {
+  def expr = this
+  def typeInfo = tpe
+
+  override def toString = s"$value"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/logic.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/logic.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/logic.scala
new file mode 100644
index 0000000..8f0a068
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/logic.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.tree
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+abstract class BinaryPredicate extends BinaryExpression {
+  def typeInfo = {
+    if (left.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO ||
+      right.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+      throw new ExpressionException(s"Non-boolean operand types 
${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+}
+
+case class Not(child: Expression) extends UnaryExpression {
+  def typeInfo = {
+    if (child.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+      throw new ExpressionException(s"Non-boolean operand type 
${child.typeInfo} in $this")
+    }
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+
+  override val name = Expression.freshName("not-" + child.name)
+
+  override def toString = s"!($child)"
+}
+
+case class And(left: Expression, right: Expression) extends BinaryPredicate {
+  override def toString = s"$left && $right"
+
+  override val name = Expression.freshName(left.name + "-and-" + right.name)
+}
+
+case class Or(left: Expression, right: Expression) extends BinaryPredicate {
+  override def toString = s"$left || $right"
+
+  override val name = Expression.freshName(left.name + "-or-" + right.name)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/package.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/package.scala
new file mode 100644
index 0000000..04c29f7
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/package.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions
+
+/**
+ * This package contains the base class of AST nodes and all the expression 
language AST classes.
+ * Expression trees should not be manually constructed by users. They are 
implicitly constructed
+ * from the implicit DSL conversions in
+ * [[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and
+ * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]]. 
For the Java API,
+ * expression trees should be generated from a string parser that parses 
expressions and creates
+ * AST nodes.
+ */
+package object tree

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/stringExpressions.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/stringExpressions.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/stringExpressions.scala
new file mode 100644
index 0000000..175d445
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/stringExpressions.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.tree
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo}
+
+case class Substring(
+    str: Expression,
+    beginIndex: Expression,
+    endIndex: Expression) extends Expression {
+  def typeInfo = {
+    if (str.typeInfo != BasicTypeInfo.STRING_TYPE_INFO) {
+      throw new ExpressionException(
+        s"""Operand must be of type String in $this, is ${str.typeInfo}.""")
+    }
+    if (!beginIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Begin index must be an integer type in $this, is 
${beginIndex.typeInfo}.""")
+    }
+    if (!endIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""End index must be an integer type in $this, is 
${endIndex.typeInfo}.""")
+    }
+
+    BasicTypeInfo.STRING_TYPE_INFO
+  }
+
+  override def children: Seq[Expression] = Seq(str, beginIndex, endIndex)
+  override def toString = s"($str).substring($beginIndex, $endIndex)"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenameOperator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenameOperator.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenameOperator.scala
new file mode 100644
index 0000000..38c908d
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenameOperator.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.typeinfo
+
+import org.apache.flink.api.common.operators.Operator
+import org.apache.flink.api.java.operators.SingleInputOperator
+import org.apache.flink.api.java.{DataSet => JavaDataSet}
+
+/**
+ * This is a logical operator that can hold a [[RenamingProxyTypeInfo]] for 
renaming some
+ * fields of a [[org.apache.flink.api.common.typeutils.CompositeType]]. At 
runtime this
+ * disappears since the translation methods simply returns the input.
+ */
+class RenameOperator[T](
+    input: JavaDataSet[T],
+    renamingTypeInformation: RenamingProxyTypeInfo[T])
+  extends SingleInputOperator[T, T, RenameOperator[T]](input, 
renamingTypeInformation) {
+
+  override protected def translateToDataFlow(
+      input: Operator[T]): Operator[T] = input
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenamingProxyTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenamingProxyTypeInfo.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenamingProxyTypeInfo.scala
new file mode 100644
index 0000000..0263f8a
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenamingProxyTypeInfo.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.typeinfo
+
+import java.util
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor
+import org.apache.flink.api.common.typeutils.{CompositeType, TypeComparator, 
TypeSerializer}
+
+/**
+ * A TypeInformation that is used to rename fields of an underlying 
CompositeType. This
+ * allows the system to translate "as" expression operations to a 
[[RenameOperator]]
+ * that does not get translated to a runtime operator.
+ */
+class RenamingProxyTypeInfo[T](
+    tpe: CompositeType[T],
+    fieldNames: Array[String]) extends CompositeType[T](tpe.getTypeClass) {
+
+  def getUnderlyingType: CompositeType[T] = tpe
+
+  if (tpe.getArity != fieldNames.length) {
+    throw new IllegalArgumentException(s"Number of field names 
'${fieldNames.mkString(",")}' and " +
+      s"number of fields in underlying type $tpe do not match.")
+  }
+
+  if (fieldNames.toSet.size != fieldNames.length) {
+    throw new IllegalArgumentException(s"New field names must be unique. " +
+      s"Names: ${fieldNames.mkString(",")}.")
+  }
+
+  override def getFieldIndex(fieldName: String): Int = {
+    val result = fieldNames.indexOf(fieldName)
+    if (result != fieldNames.lastIndexOf(fieldName)) {
+      -2
+    } else {
+      result
+    }
+  }
+  override def getFieldNames: Array[String] = fieldNames
+
+  override def isBasicType: Boolean = tpe.isBasicType
+
+  override def createSerializer(executionConfig: ExecutionConfig): 
TypeSerializer[T] =
+    tpe.createSerializer(executionConfig)
+
+  override def getArity: Int = tpe.getArity
+
+  override def isKeyType: Boolean = tpe.isKeyType
+
+  override def getTypeClass: Class[T] = tpe.getTypeClass
+
+  override def getGenericParameters: java.util.List[TypeInformation[_]] = 
tpe.getGenericParameters
+
+  override def isTupleType: Boolean = tpe.isTupleType
+
+  override def toString = {
+    s"RenamingType(type: ${tpe.getTypeClass.getSimpleName}; " +
+      s"fields: ${fieldNames.mkString(", ")})"
+  }
+
+  override def getTypeAt[X](pos: Int): TypeInformation[X] = tpe.getTypeAt(pos)
+
+  override def getTotalFields: Int = tpe.getTotalFields
+
+  override def createComparator(
+        logicalKeyFields: Array[Int],
+        orders: Array[Boolean],
+        logicalFieldOffset: Int,
+        executionConfig: ExecutionConfig) =
+    tpe.createComparator(logicalKeyFields, orders, logicalFieldOffset, 
executionConfig)
+
+  // These are never called since we override create comparator
+  override protected def initializeNewComparator(localKeyCount: Int): Unit =
+    throw new RuntimeException("Cannot happen.")
+
+  override protected def getNewComparator(executionConfig: ExecutionConfig): 
TypeComparator[T] =
+    throw new RuntimeException("Cannot happen.")
+
+  override protected def addCompareField(fieldId: Int, comparator: 
TypeComparator[_]): Unit =
+    throw new RuntimeException("Cannot happen.")
+
+  override def getFlatFields(
+      fieldExpression: String,
+      offset: Int,
+      result: util.List[FlatFieldDescriptor]): Unit = {
+    tpe.getFlatFields(fieldExpression, offset, result)
+  }
+
+  override def getTypeAt[X](fieldExpression: String): TypeInformation[X] = {
+    tpe.getTypeAt(fieldExpression)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowSerializer.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowSerializer.scala
new file mode 100644
index 0000000..006c0c9
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowSerializer.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.typeinfo
+
+import org.apache.flink.api.expressions.Row
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.core.memory.{DataOutputView, DataInputView}
+;
+
+/**
+ * Serializer for [[Row]].
+ */
+class RowSerializer(fieldSerializers: Array[TypeSerializer[Any]])
+  extends TypeSerializer[Row] {
+
+  override def isImmutableType: Boolean = false
+
+  override def getLength: Int = -1
+
+  override def duplicate = this
+
+  override def createInstance: Row = {
+    new Row(fieldSerializers.length)
+  }
+
+  override def copy(from: Row, reuse: Row): Row = {
+    val len = fieldSerializers.length
+
+    if (from.productArity != len) {
+      throw new RuntimeException("Row arity of reuse and from do not match.")
+    }
+    var i = 0
+    while (i < len) {
+      val reuseField = reuse.productElement(i)
+      val fromField = from.productElement(i).asInstanceOf[AnyRef]
+      val copy = fieldSerializers(i).copy(fromField, reuseField)
+      reuse.setField(i, copy)
+      i += 1
+    }
+    reuse
+  }
+
+  override def copy(from: Row): Row = {
+    val len = fieldSerializers.length
+
+    if (from.productArity != len) {
+      throw new RuntimeException("Row arity of reuse and from do not match.")
+    }
+    val result = new Row(len)
+    var i = 0
+    while (i < len) {
+      val fromField = from.productElement(i).asInstanceOf[AnyRef]
+      val copy = fieldSerializers(i).copy(fromField)
+      result.setField(i, copy)
+      i += 1
+    }
+    result
+  }
+
+  override def serialize(value: Row, target: DataOutputView) {
+    val len = fieldSerializers.length
+    var i = 0
+    while (i < len) {
+      val serializer = fieldSerializers(i)
+      serializer.serialize(value.productElement(i), target)
+      i += 1
+    }
+  }
+
+  override def deserialize(reuse: Row, source: DataInputView): Row = {
+    val len = fieldSerializers.length
+
+    if (reuse.productArity != len) {
+      throw new RuntimeException("Row arity of reuse and fields do not match.")
+    }
+
+    var i = 0
+    while (i < len) {
+      val field = reuse.productElement(i).asInstanceOf[AnyRef]
+      reuse.setField(i, fieldSerializers(i).deserialize(field, source))
+      i += 1
+    }
+    reuse
+  }
+
+  override def deserialize(source: DataInputView): Row = {
+    val len = fieldSerializers.length
+
+    val result = new Row(len)
+    var i = 0
+    while (i < len) {
+      result.setField(i, fieldSerializers(i).deserialize(source))
+      i += 1
+    }
+    result
+  }
+
+  override def copy(source: DataInputView, target: DataOutputView): Unit = {
+    val len = fieldSerializers.length
+    var i = 0
+    while (i < len) {
+      fieldSerializers(i).copy(source, target)
+      i += 1
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowTypeInfo.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowTypeInfo.scala
new file mode 100644
index 0000000..92e9bc8
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowTypeInfo.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.typeinfo
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.expressions.Row
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.expressions.tree.Expression
+import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo}
+
+/**
+ * TypeInformation for [[Row]].
+ */
+class RowTypeInfo(
+    fieldTypes: Seq[TypeInformation[_]],
+    fieldNames: Seq[String])
+  extends CaseClassTypeInfo[Row](classOf[Row], Array(), fieldTypes, 
fieldNames) {
+
+  def this(fields: Seq[Expression]) = this(fields.map(_.typeInfo), 
fields.map(_.name))
+
+  if (fieldNames.toSet.size != fieldNames.size) {
+    throw new IllegalArgumentException("Field names must be unique.")
+  }
+
+  override def createSerializer(executionConfig: ExecutionConfig): 
TypeSerializer[Row] = {
+    val fieldSerializers: Array[TypeSerializer[Any]] = new 
Array[TypeSerializer[Any]](getArity)
+    for (i <- 0 until getArity) {
+      fieldSerializers(i) = this.types(i).createSerializer(executionConfig)
+        .asInstanceOf[TypeSerializer[Any]]
+    }
+
+    new RowSerializer(fieldSerializers)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataSetConversions.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataSetConversions.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataSetConversions.scala
new file mode 100644
index 0000000..567d19c
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataSetConversions.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.expressions
+
+import org.apache.flink.api.expressions._
+import org.apache.flink.api.expressions.tree.{UnresolvedFieldReference, 
Expression}
+import org.apache.flink.api.common.typeutils.CompositeType
+
+import org.apache.flink.api.scala._
+
+/**
+ * Methods for converting a [[DataSet]] to an [[ExpressionOperation]]. A 
[[DataSet]] is
+ * wrapped in this by the implicit conversions in 
[[org.apache.flink.api.scala.expressions]].
+ */
+class DataSetConversions[T](set: DataSet[T], inputType: CompositeType[T]) {
+
+  /**
+   * Converts the [[DataSet]] to an [[ExpressionOperation]]. The field names 
of the resulting
+   * expression operation can be specified like this:
+   *
+   * {{{
+   *   val in: DataSet[(String, Int)] = ...
+   *   val expr = in.as('a, 'b)
+   * }}}
+   *
+   * This results in an expression operation that has field `a` of type 
`String` and field `b`
+   * of type `Int`.
+   */
+  def as(fields: Expression*): ExpressionOperation[ScalaBatchTranslator] = {
+     new ScalaBatchTranslator().createExpressionOperation(set, fields.toArray)
+  }
+
+  /**
+   * Converts the [[DataSet]] to an [[ExpressionOperation]]. The field names 
of the resulting
+   * expression operation will be taken from the field names of the input type:
+   *
+   * {{{
+   *   val in: DataSet[(String, Int)] = ...
+   *   val expr = in.toExpression
+   * }}}
+   *
+   * This results in an expression operation that has field `_1` of type 
`String` and field `_2`
+   * of type `Int`.
+   */
+  def toExpression: ExpressionOperation[ScalaBatchTranslator] = {
+    val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference)
+    as(resultFields: _*)
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataStreamConversions.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataStreamConversions.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataStreamConversions.scala
new file mode 100644
index 0000000..49dbce7
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataStreamConversions.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions
+
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.expressions._
+import org.apache.flink.api.expressions.tree.{Expression, 
UnresolvedFieldReference}
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.DataStream
+
+class DataStreamConversions[T](set: DataStream[T], inputType: 
CompositeType[T]) {
+
+  /**
+   * Converts the [[DataStream]] to an [[ExpressionOperation]]. The field 
names of the resulting
+   * expression operation can be specified like this:
+   *
+   * {{{
+   *   val in: DataSet[(String, Int)] = ...
+   *   val expr = in.as('a, 'b)
+   * }}}
+   *
+   * This results in an expression operation that has field `a` of type 
`String` and field `b`
+   * of type `Int`.
+   */
+
+  def as(fields: Expression*): ExpressionOperation[ScalaStreamingTranslator] = 
{
+     new ScalaStreamingTranslator().createExpressionOperation(set, 
fields.toArray)
+  }
+
+  /**
+   * Converts the [[DataStream]] to an [[ExpressionOperation]]. The field 
names of the resulting
+   * expression operation will be taken from the field names of the input type:
+   *
+   * {{{
+   *   val in: DataSet[(String, Int)] = ...
+   *   val expr = in.toExpression
+   * }}}
+   *
+   * This results in an expression operation that has field `_1` of type 
`String` and field `_2`
+   * of type `Int`.
+   */
+
+  def toExpression: ExpressionOperation[ScalaStreamingTranslator] = {
+    val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference)
+    as(resultFields: _*)
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala
new file mode 100644
index 0000000..037efd4
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions
+
+import java.lang.reflect.Modifier
+
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.expressions.analysis.ExtractEquiJoinFields
+import org.apache.flink.api.expressions.operations._
+import org.apache.flink.api.expressions.runtime.{ExpressionAggregateFunction, 
ExpressionFilterFunction, ExpressionJoinFunction, ExpressionSelectFunction}
+import org.apache.flink.api.expressions.tree._
+import org.apache.flink.api.expressions.typeinfo.{RenameOperator, 
RenamingProxyTypeInfo, RowTypeInfo}
+import org.apache.flink.api.expressions.{ExpressionException, 
ExpressionOperation, Row}
+import org.apache.flink.api.java.aggregation.AggregationFunction
+import org.apache.flink.api.java.operators.JoinOperator.EquiJoin
+import org.apache.flink.api.java.operators.Keys.ExpressionKeys
+import org.apache.flink.api.java.operators.{GroupReduceOperator, Keys, 
MapOperator, UnsortedGrouping}
+import org.apache.flink.api.java.{DataSet => JavaDataSet}
+
+/**
+ * [[OperationTranslator]] for creating [[ExpressionOperation]]s from Java 
[[JavaDataSet]]s and
+ * translating them back to Scala [[JavaDataSet]]s.
+ */
+class JavaBatchTranslator extends OperationTranslator {
+
+  type Representation[A] = JavaDataSet[A]
+
+  def createExpressionOperation[A](
+      repr: JavaDataSet[A],
+      fields: Array[Expression]): ExpressionOperation[JavaBatchTranslator] = {
+
+    // shortcut for DataSet[Row]
+    repr.getType match {
+      case rowTypeInfo: RowTypeInfo =>
+        val expressions = rowTypeInfo.getFieldNames map {
+          name => (name, rowTypeInfo.getTypeAt(name))
+        }
+        new ExpressionOperation(
+          Root(repr.asInstanceOf[JavaDataSet[Row]], expressions), this)
+      case _ =>
+    }
+
+    val clazz = repr.getType.getTypeClass
+    if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) {
+      throw new ExpressionException("Cannot create expression Operation from 
DataSet of type " +
+        clazz.getName + ". Only top-level classes or static members classes " +
+        " are supported.")
+    }
+
+    if (!repr.getType.isInstanceOf[CompositeType[_]]) {
+      throw new ExpressionException("Only DataSets of composite type can be 
transformed to an" +
+        " Expression Operation. These would be tuples, case classes and 
POJOs.")
+    }
+
+    val inputType = repr.getType.asInstanceOf[CompositeType[A]]
+
+    if (fields.length != inputType.getFieldNames.length) {
+      throw new ExpressionException("Number of selected fields: '" + 
fields.mkString(",") +
+        "' and number of fields in input type " + inputType + " do not match.")
+    }
+
+    val newFieldNames = fields map {
+      case UnresolvedFieldReference(name) => name
+      case e =>
+        throw new ExpressionException("Only field expressions allowed in 'as' 
operation, " +
+          " offending expression: " + e)
+    }
+
+    if (newFieldNames.toSet.size != newFieldNames.size) {
+      throw new ExpressionException(s"Ambiguous field names in 
${fields.mkString(", ")}")
+    }
+
+    val resultFields: Seq[(String, TypeInformation[_])] = 
newFieldNames.zipWithIndex map {
+      case (name, index) => (name, inputType.getTypeAt(index))
+    }
+
+    val inputFields = inputType.getFieldNames
+    val fieldMappings = inputFields.zip(resultFields)
+    val expressions: Array[Expression] = fieldMappings map {
+      case (oldName, (newName, tpe)) => Naming(ResolvedFieldReference(oldName, 
tpe), newName)
+    }
+
+    val rowDataSet = createSelect(expressions, repr, inputType)
+
+    new ExpressionOperation(Root(rowDataSet, resultFields), new 
JavaBatchTranslator)
+  }
+
+  override def translate[A](op: Operation)(implicit tpe: TypeInformation[A]): 
JavaDataSet[A] = {
+
+    if (tpe.getTypeClass == classOf[Row]) {
+      // shortcut for DataSet[Row]
+      return translateInternal(op).asInstanceOf[JavaDataSet[A]]
+    }
+
+    val clazz = tpe.getTypeClass
+    if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) {
+      throw new ExpressionException("Cannot create DataSet of type " +
+        clazz.getName + ". Only top-level classes or static member classes are 
supported.")
+    }
+
+
+    if (!implicitly[TypeInformation[A]].isInstanceOf[CompositeType[A]]) {
+      throw new ExpressionException(
+        "Expression operations can only be converted to composite types, type 
is: " +
+          implicitly[TypeInformation[A]] +
+          ". Composite types would be tuples, case classes and POJOs.")
+    }
+
+    val resultSet = translateInternal(op)
+
+    val resultType = resultSet.getType.asInstanceOf[RowTypeInfo]
+
+    val outputType = 
implicitly[TypeInformation[A]].asInstanceOf[CompositeType[A]]
+
+    val resultNames = resultType.getFieldNames
+    val outputNames = outputType.getFieldNames.toSeq
+
+    if (resultNames.toSet != outputNames.toSet) {
+      throw new ExpressionException(s"Expression result type $resultType does 
not have the same" +
+        s"fields as output type $outputType")
+    }
+
+    for (f <- outputNames) {
+      val in = resultType.getTypeAt(resultType.getFieldIndex(f))
+      val out = outputType.getTypeAt(outputType.getFieldIndex(f))
+      if (!in.equals(out)) {
+        throw new ExpressionException(s"Types for field $f differ on input 
$resultType and " +
+          s"output $outputType.")
+      }
+    }
+
+    val outputFields = outputNames map {
+      f => ResolvedFieldReference(f, resultType.getTypeAt(f))
+    }
+
+    val function = new ExpressionSelectFunction(
+      resultSet.getType.asInstanceOf[RowTypeInfo],
+      outputType,
+      outputFields)
+
+    val opName = s"select(${outputFields.mkString(",")})"
+    val operator = new MapOperator(resultSet, outputType, function, opName)
+
+    operator
+  }
+
+  private def translateInternal(op: Operation): JavaDataSet[Row] = {
+    op match {
+      case Root(dataSet: JavaDataSet[Row], resultFields) =>
+        dataSet
+
+      case As(input, newNames) =>
+        val translatedInput = translateInternal(input)
+        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
+        val proxyType = new RenamingProxyTypeInfo[Row](inType, 
newNames.toArray)
+        new RenameOperator(translatedInput, proxyType)
+
+      case sel@Select(Filter(Join(leftInput, rightInput), predicate), 
selection) =>
+
+        val expandedInput = ExpandAggregations(sel)
+
+        if (expandedInput.eq(sel)) {
+          val translatedLeftInput = translateInternal(leftInput)
+          val translatedRightInput = translateInternal(rightInput)
+          val leftInType = 
translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
+          val rightInType = 
translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
+
+          createJoin(
+            predicate,
+            selection,
+            translatedLeftInput,
+            translatedRightInput,
+            leftInType,
+            rightInType,
+            JoinHint.OPTIMIZER_CHOOSES)
+        } else {
+          translateInternal(expandedInput)
+        }
+
+      case Filter(Join(leftInput, rightInput), predicate) =>
+        val translatedLeftInput = translateInternal(leftInput)
+        val translatedRightInput = translateInternal(rightInput)
+        val leftInType = 
translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
+        val rightInType = 
translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
+
+        createJoin(
+          predicate,
+          leftInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)) 
++
+            rightInput.outputFields.map( f => ResolvedFieldReference(f._1, 
f._2)),
+          translatedLeftInput,
+          translatedRightInput,
+          leftInType,
+          rightInType,
+          JoinHint.OPTIMIZER_CHOOSES)
+
+      case Join(leftInput, rightInput) =>
+        throw new ExpressionException("Join without filter condition 
encountered. " +
+          "Did you forget to add .where(...) ?")
+
+      case sel@Select(input, selection) =>
+
+        val expandedInput = ExpandAggregations(sel)
+
+        if (expandedInput.eq(sel)) {
+          // no expansions took place
+          val translatedInput = translateInternal(input)
+          val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
+          val inputFields = inType.getFieldNames
+          createSelect(
+            selection,
+            translatedInput,
+            inType)
+        } else {
+          translateInternal(expandedInput)
+        }
+
+      case agg@Aggregate(GroupBy(input, groupExpressions), aggregations) =>
+        val translatedInput = translateInternal(input)
+        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
+
+        val keyIndices = groupExpressions map {
+          case fe: ResolvedFieldReference => inType.getFieldIndex(fe.name)
+          case e => throw new ExpressionException(s"Expression $e is not a 
valid key expression.")
+        }
+
+        val keys = new Keys.ExpressionKeys(keyIndices.toArray, inType, false)
+
+        val grouping = new UnsortedGrouping(translatedInput, keys)
+
+        val aggFunctions: Seq[AggregationFunction[Any]] = aggregations map {
+          case (fieldName, fun) =>
+            fun.getFactory.createAggregationFunction[Any](
+              
inType.getTypeAt[Any](inType.getFieldIndex(fieldName)).getTypeClass)
+        }
+
+        val aggIndices = aggregations map {
+          case (fieldName, _) =>
+            inType.getFieldIndex(fieldName)
+        }
+
+        val result = new GroupReduceOperator(
+          grouping,
+          inType,
+          new ExpressionAggregateFunction(aggIndices, aggFunctions),
+          "Expression Aggregation: " + agg)
+
+        result
+
+      case agg@Aggregate(input, aggregations) =>
+        val translatedInput = translateInternal(input)
+        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
+
+        val aggFunctions: Seq[AggregationFunction[Any]] = aggregations map {
+          case (fieldName, fun) =>
+            fun.getFactory.createAggregationFunction[Any](
+              
inType.getTypeAt[Any](inType.getFieldIndex(fieldName)).getTypeClass)
+        }
+
+        val aggIndices = aggregations map {
+          case (fieldName, _) =>
+            inType.getFieldIndex(fieldName)
+        }
+
+        val result = new GroupReduceOperator(
+          translatedInput,
+          inType,
+          new ExpressionAggregateFunction(aggIndices, aggFunctions),
+          "Expression Aggregation: " + agg)
+
+        result
+
+
+      case Filter(input, predicate) =>
+        val translatedInput = translateInternal(input)
+        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
+        val filter = new ExpressionFilterFunction[Row](predicate, inType)
+        translatedInput.filter(filter)
+    }
+  }
+
+  private def createSelect[I](
+      fields: Seq[Expression],
+      input: JavaDataSet[I],
+      inputType: CompositeType[I]): JavaDataSet[Row] = {
+
+    fields foreach {
+      f =>
+        if (f.exists(_.isInstanceOf[Aggregation])) {
+          throw new ExpressionException("Found aggregate in " + 
fields.mkString(", ") + ".")
+        }
+
+    }
+
+    val resultType = new RowTypeInfo(fields)
+
+    val function = new ExpressionSelectFunction(inputType, resultType, fields)
+
+    val opName = s"select(${fields.mkString(",")})"
+    val operator = new MapOperator(input, resultType, function, opName)
+
+    operator
+  }
+
+  private def createJoin[L, R](
+      predicate: Expression,
+      fields: Seq[Expression],
+      leftInput: JavaDataSet[L],
+      rightInput: JavaDataSet[R],
+      leftType: CompositeType[L],
+      rightType: CompositeType[R],
+      joinHint: JoinHint): JavaDataSet[Row] = {
+
+    val resultType = new RowTypeInfo(fields)
+
+    val (reducedPredicate, leftFields, rightFields) =
+      ExtractEquiJoinFields(leftType, rightType, predicate)
+
+    val leftKey = new ExpressionKeys[L](leftFields, leftType)
+    val rightKey = new ExpressionKeys[R](rightFields, rightType)
+
+    val joiner = new ExpressionJoinFunction[L, R, Row](
+      reducedPredicate,
+      leftType,
+      rightType,
+      resultType,
+      fields)
+
+    new EquiJoin[L, R, Row](
+      leftInput,
+      rightInput,
+      leftKey,
+      rightKey,
+      joiner,
+      resultType,
+      joinHint,
+      predicate.toString)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala
new file mode 100644
index 0000000..095823e
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions
+
+import java.lang.reflect.Modifier
+
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.expressions.operations._
+import org.apache.flink.api.expressions.runtime.{ExpressionFilterFunction, 
ExpressionSelectFunction}
+import org.apache.flink.api.expressions.tree._
+import org.apache.flink.api.expressions.typeinfo.RowTypeInfo
+import org.apache.flink.api.expressions.{ExpressionException, 
ExpressionOperation, Row}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.invokable.operator.MapInvokable
+
+/**
+ * [[OperationTranslator]] for creating [[ExpressionOperation]]s from Java 
[[DataStream]]s and
+ * translating them back to Java [[DataStream]]s.
+ *
+ * This is very limited right now. Only select and filter are implemented. 
Also, the expression
+ * operations must be extended to allow windowing operations.
+ */
+
+class JavaStreamingTranslator extends OperationTranslator {
+
+  type Representation[A] = DataStream[A]
+
+  def createExpressionOperation[A](
+      repr: DataStream[A],
+      fields: Array[Expression]): ExpressionOperation[JavaStreamingTranslator] 
= {
+
+    // shortcut for DataSet[Row]
+    repr.getType match {
+      case rowTypeInfo: RowTypeInfo =>
+        val expressions = rowTypeInfo.getFieldNames map {
+          name => (name, rowTypeInfo.getTypeAt(name))
+        }
+        new ExpressionOperation(
+          Root(repr.asInstanceOf[DataStream[Row]], expressions), this)
+      case _ =>
+    }
+
+    val clazz = repr.getType.getTypeClass
+    if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) {
+      throw new ExpressionException("Cannot create expression Operation from 
DataSet of type " +
+        clazz.getName + ". Only top-level classes or static members classes " +
+        " are supported.")
+    }
+
+    if (!repr.getType.isInstanceOf[CompositeType[_]]) {
+      throw new ExpressionException("Only DataSets of composite type can be 
transformed to an" +
+        " Expression Operation. These would be tuples, case classes and 
POJOs.")
+    }
+
+    val inputType = repr.getType.asInstanceOf[CompositeType[A]]
+
+    if (fields.length != inputType.getFieldNames.length) {
+      throw new ExpressionException("Number of selected fields: '" + 
fields.mkString(",") +
+        "' and number of fields in input type " + inputType + " do not match.")
+    }
+
+    val newFieldNames = fields map {
+      case UnresolvedFieldReference(name) => name
+      case e =>
+        throw new ExpressionException("Only field expressions allowed in 'as' 
operation, " +
+          " offending expression: " + e)
+    }
+
+    if (newFieldNames.toSet.size != newFieldNames.size) {
+      throw new ExpressionException(s"Ambiguous field names in 
${fields.mkString(", ")}")
+    }
+
+    val resultFields: Seq[(String, TypeInformation[_])] = 
newFieldNames.zipWithIndex map {
+      case (name, index) => (name, inputType.getTypeAt(index))
+    }
+
+    val inputFields = inputType.getFieldNames
+    val fieldMappings = inputFields.zip(resultFields)
+    val expressions: Array[Expression] = fieldMappings map {
+      case (oldName, (newName, tpe)) => Naming(ResolvedFieldReference(oldName, 
tpe), newName)
+    }
+
+    val rowDataSet = createSelect(expressions, repr, inputType)
+
+    new ExpressionOperation(Root(rowDataSet, resultFields), new 
JavaStreamingTranslator)
+  }
+
+  override def translate[A](op: Operation)(implicit tpe: TypeInformation[A]): 
DataStream[A] = {
+
+    if (tpe.getTypeClass == classOf[Row]) {
+      // shortcut for DataSet[Row]
+      return translateInternal(op).asInstanceOf[DataStream[A]]
+    }
+
+    val clazz = tpe.getTypeClass
+    if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) {
+      throw new ExpressionException("Cannot create DataStream of type " +
+        clazz.getName + ". Only top-level classes or static member classes are 
supported.")
+    }
+
+    if (!implicitly[TypeInformation[A]].isInstanceOf[CompositeType[A]]) {
+      throw new ExpressionException(
+        "Expression operations can only be converted to composite types, type 
is: " +
+          implicitly[TypeInformation[A]] +
+          ". Composite types would be tuples, case classes and POJOs.")
+
+    }
+
+    val resultSet = translateInternal(op)
+
+    val resultType = resultSet.getType.asInstanceOf[RowTypeInfo]
+
+    val outputType = 
implicitly[TypeInformation[A]].asInstanceOf[CompositeType[A]]
+
+    val resultNames = resultType.getFieldNames
+    val outputNames = outputType.getFieldNames.toSeq
+
+    if (resultNames.toSet != outputNames.toSet) {
+      throw new ExpressionException(s"Expression result type $resultType does 
not have the same" +
+        s"fields as output type $outputType")
+    }
+
+    for (f <- outputNames) {
+      val in = resultType.getTypeAt(resultType.getFieldIndex(f))
+      val out = outputType.getTypeAt(outputType.getFieldIndex(f))
+      if (!in.equals(out)) {
+        throw new ExpressionException(s"Types for field $f differ on input 
$resultType and " +
+          s"output $outputType.")
+      }
+    }
+
+    val outputFields = outputNames map {
+      f => ResolvedFieldReference(f, resultType.getTypeAt(f))
+    }
+
+    val function = new ExpressionSelectFunction(
+      resultSet.getType.asInstanceOf[RowTypeInfo],
+      outputType,
+      outputFields)
+
+    val opName = s"select(${outputFields.mkString(",")})"
+
+    resultSet.transform(opName, outputType, new MapInvokable[Row, A](function))
+  }
+
+  private def translateInternal(op: Operation): DataStream[Row] = {
+    op match {
+      case Root(dataSet: DataStream[Row], resultFields) =>
+        dataSet
+
+      case As(input, newNames) =>
+        throw new ExpressionException("As operation for Streams not yet 
implemented.")
+
+      case sel@Select(Filter(Join(leftInput, rightInput), predicate), 
selection) =>
+
+        val expandedInput = ExpandAggregations(sel)
+
+        if (expandedInput.eq(sel)) {
+          val translatedLeftInput = translateInternal(leftInput)
+          val translatedRightInput = translateInternal(rightInput)
+          val leftInType = 
translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
+          val rightInType = 
translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
+
+          createJoin(
+            predicate,
+            selection,
+            translatedLeftInput,
+            translatedRightInput,
+            leftInType,
+            rightInType,
+            JoinHint.OPTIMIZER_CHOOSES)
+        } else {
+          translateInternal(expandedInput)
+        }
+
+      case Filter(Join(leftInput, rightInput), predicate) =>
+        val translatedLeftInput = translateInternal(leftInput)
+        val translatedRightInput = translateInternal(rightInput)
+        val leftInType = 
translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
+        val rightInType = 
translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
+
+        createJoin(
+          predicate,
+          leftInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)) 
++
+            rightInput.outputFields.map( f => ResolvedFieldReference(f._1, 
f._2)),
+          translatedLeftInput,
+          translatedRightInput,
+          leftInType,
+          rightInType,
+          JoinHint.OPTIMIZER_CHOOSES)
+
+      case Join(leftInput, rightInput) =>
+        throw new ExpressionException("Join without filter condition 
encountered. " +
+          "Did you forget to add .where(...) ?")
+
+      case sel@Select(input, selection) =>
+
+        val expandedInput = ExpandAggregations(sel)
+
+        if (expandedInput.eq(sel)) {
+          // no expansions took place
+          val translatedInput = translateInternal(input)
+          val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
+          val inputFields = inType.getFieldNames
+          createSelect(
+            selection,
+            translatedInput,
+            inType)
+        } else {
+          translateInternal(expandedInput)
+        }
+
+      case agg@Aggregate(GroupBy(input, groupExpressions), aggregations) =>
+        throw new ExpressionException("Aggregate operation for Streams not yet 
implemented.")
+
+      case agg@Aggregate(input, aggregations) =>
+        throw new ExpressionException("Aggregate operation for Streams not yet 
implemented.")
+
+      case Filter(input, predicate) =>
+        val translatedInput = translateInternal(input)
+        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
+        val filter = new ExpressionFilterFunction[Row](predicate, inType)
+        translatedInput.filter(filter)
+    }
+  }
+
+  private def createSelect[I](
+      fields: Seq[Expression],
+      input: DataStream[I],
+      inputType: CompositeType[I]): DataStream[Row] = {
+
+    fields foreach {
+      f =>
+        if (f.exists(_.isInstanceOf[Aggregation])) {
+          throw new ExpressionException("Found aggregate in " + 
fields.mkString(", ") + ".")
+        }
+
+    }
+
+    val resultType = new RowTypeInfo(fields)
+
+    val function = new ExpressionSelectFunction(inputType, resultType, fields)
+
+    val opName = s"select(${fields.mkString(",")})"
+
+    input.transform(opName, resultType, new MapInvokable[I, Row](function))
+  }
+
+  private def createJoin[L, R](
+      predicate: Expression,
+      fields: Seq[Expression],
+      leftInput: DataStream[L],
+      rightInput: DataStream[R],
+      leftType: CompositeType[L],
+      rightType: CompositeType[R],
+      joinHint: JoinHint): DataStream[Row] = {
+
+    throw new ExpressionException("Join operation for Streams not yet 
implemented.")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaBatchTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaBatchTranslator.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaBatchTranslator.scala
new file mode 100644
index 0000000..724c8a7
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaBatchTranslator.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions
+
+
+import org.apache.flink.api.expressions.tree.Expression
+import org.apache.flink.api.scala.wrap
+import org.apache.flink.api.expressions.operations._
+import org.apache.flink.api.expressions.ExpressionOperation
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.DataSet
+
+import scala.reflect.ClassTag
+
+
+/**
+ * [[OperationTranslator]] for creating [[ExpressionOperation]]s from Scala 
[[DataSet]]s and
+ * translating them back to Scala [[DataSet]]s.
+ */
+class ScalaBatchTranslator extends OperationTranslator {
+
+  private val javaTranslator = new JavaBatchTranslator
+
+  override type Representation[A] = DataSet[A]
+
+  def createExpressionOperation[A](
+      repr: DataSet[A],
+      fields: Array[Expression]): ExpressionOperation[ScalaBatchTranslator] = {
+
+    val result = javaTranslator.createExpressionOperation(repr.javaSet, fields)
+
+    new ExpressionOperation[ScalaBatchTranslator](result.operation, this)
+  }
+
+  override def translate[O](op: Operation)(implicit tpe: TypeInformation[O]): 
DataSet[O] = {
+    // fake it till you make it ...
+    
wrap(javaTranslator.translate(op))(ClassTag.AnyRef.asInstanceOf[ClassTag[O]])
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaStreamingTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaStreamingTranslator.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaStreamingTranslator.scala
new file mode 100644
index 0000000..7db483f
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaStreamingTranslator.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.expressions.operations._
+import org.apache.flink.api.expressions.tree.Expression
+import org.apache.flink.api.expressions.{ExpressionOperation, Row}
+import org.apache.flink.api.scala.DataSet
+import org.apache.flink.streaming.api.scala.DataStream
+
+import org.apache.flink.streaming.api.scala.javaToScalaStream
+
+/**
+ * [[OperationTranslator]] for creating [[ExpressionOperation]]s from Scala 
[[DataStream]]s and
+ * translating them back to Scala [[DataStream]]s.
+ *
+ * This is very limited right now. Only select and filter are implemented. 
Also, the expression
+ * operations must be extended to allow windowing operations.
+ */
+class ScalaStreamingTranslator extends OperationTranslator {
+
+  private val javaTranslator = new JavaStreamingTranslator
+
+  override type Representation[A] = DataStream[A]
+
+  def createExpressionOperation[A](
+      repr: DataStream[A],
+      fields: Array[Expression]): 
ExpressionOperation[ScalaStreamingTranslator] = {
+
+    val result = javaTranslator.createExpressionOperation(repr.getJavaStream, 
fields)
+
+    new ExpressionOperation[ScalaStreamingTranslator](result.operation, this)
+  }
+
+  override def translate[O](op: Operation)(implicit tpe: TypeInformation[O]): 
DataStream[O] = {
+    // fake it till you make it ...
+    javaToScalaStream(javaTranslator.translate(op))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala
new file mode 100644
index 0000000..ef25b5b
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.expressions
+
+import org.apache.flink.api.expressions.tree._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+import scala.language.implicitConversions
+
+/**
+ * These are all the operations that can be used to construct an 
[[Expression]] AST for expression
+ * operations.
+ */
+trait ImplicitExpressionOperations {
+  def expr: Expression
+
+  def && (other: Expression) = And(expr, other)
+  def || (other: Expression) = Or(expr, other)
+
+  def > (other: Expression) = GreaterThan(expr, other)
+  def >= (other: Expression) = GreaterThanOrEqual(expr, other)
+  def < (other: Expression) = LessThan(expr, other)
+  def <= (other: Expression) = LessThanOrEqual(expr, other)
+
+  def === (other: Expression) = EqualTo(expr, other)
+  def !== (other: Expression) = NotEqualTo(expr, other)
+
+  def unary_! = Not(expr)
+  def unary_- = UnaryMinus(expr)
+
+  def isNull = IsNull(expr)
+  def isNotNull = IsNotNull(expr)
+
+  def + (other: Expression) = Plus(expr, other)
+  def - (other: Expression) = Minus(expr, other)
+  def / (other: Expression) = Div(expr, other)
+  def * (other: Expression) = Mul(expr, other)
+  def % (other: Expression) = Mod(expr, other)
+
+  def & (other: Expression) = BitwiseAnd(expr, other)
+  def | (other: Expression) = BitwiseOr(expr, other)
+  def ^ (other: Expression) = BitwiseXor(expr, other)
+  def unary_~ = BitwiseNot(expr)
+
+  def abs = Abs(expr)
+
+  def sum = Sum(expr)
+  def min = Min(expr)
+  def max = Max(expr)
+  def count = Count(expr)
+  def avg = Avg(expr)
+
+  def substring(beginIndex: Expression, endIndex: Expression = 
Literal(Int.MaxValue)) = {
+    Substring(expr, beginIndex, endIndex)
+  }
+
+  def cast(toType: TypeInformation[_]) = Cast(expr, toType)
+
+  def as(name: Symbol) = Naming(expr, name.name)
+}
+
+/**
+ * Implicit conversions from Scala Literals to Expression [[Literal]] and from 
[[Expression]]
+ * to [[ImplicitExpressionOperations]].
+ */
+trait ImplicitExpressionConversions {
+  implicit class WithOperations(e: Expression) extends 
ImplicitExpressionOperations {
+    def expr = e
+  }
+
+  implicit class SymbolExpression(s: Symbol) extends 
ImplicitExpressionOperations {
+    def expr = UnresolvedFieldReference(s.name)
+  }
+
+  implicit class LiteralIntExpression(i: Int) extends 
ImplicitExpressionOperations {
+    def expr = Literal(i)
+  }
+
+  implicit class LiteralFloatExpression(f: Float) extends 
ImplicitExpressionOperations {
+    def expr = Literal(f)
+  }
+
+  implicit class LiteralDoubleExpression(d: Double) extends 
ImplicitExpressionOperations {
+    def expr = Literal(d)
+  }
+
+  implicit class LiteralStringExpression(str: String) extends 
ImplicitExpressionOperations {
+    def expr = Literal(str)
+  }
+
+  implicit class LiteralBooleanExpression(bool: Boolean) extends 
ImplicitExpressionOperations {
+    def expr = Literal(bool)
+  }
+
+  implicit def symbol2FieldExpression(sym: Symbol): Expression = 
UnresolvedFieldReference(sym.name)
+  implicit def int2Literal(i: Int): Expression = Literal(i)
+  implicit def long2Literal(l: Long): Expression = Literal(l)
+  implicit def double2Literal(d: Double): Expression = Literal(d)
+  implicit def float2Literal(d: Float): Expression = Literal(d)
+  implicit def string2Literal(str: String): Expression = Literal(str)
+  implicit def boolean2Literal(bool: Boolean): Expression = Literal(bool)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/package.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/package.scala
new file mode 100644
index 0000000..f12e7a0
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/package.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import com.google.common.base.Preconditions
+import org.apache.flink.api.expressions.{Row, ExpressionOperation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.streaming.api.scala.DataStream
+
+import scala.language.implicitConversions
+
+/**
+ * == Language Integrated Queries (aka Expression Operations) ==
+ *
+ * Importing this package with:
+ *
+ * {{{
+ *   import org.apache.flink.api.scala.expressions._
+ * }}}
+ *
+ * imports implicit conversions for converting a [[DataSet]] or [[DataStream]] 
to an
+ * [[ExpressionOperation]]. This can be used to perform SQL-like queries on 
data. Please have
+ * a look at [[ExpressionOperation]] to see which operations are supported and
+ * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]] to 
see how an
+ * expression can be specified.
+ *
+ * Inside an expression operation you can use Scala Symbols to refer to field 
names. One would
+ * refer to field `a` by writing `'a`. Sometimes it is necessary to manually 
confert a
+ * Scala literal to an Expression Literal, in those cases use `Literal`, as in 
`Literal(3)`.
+ *
+ * Example:
+ *
+ * {{{
+ *   import org.apache.flink.api.scala._
+ *   import org.apache.flink.api.scala.expressions._
+ *
+ *   val env = ExecutionEnvironment.getExecutionEnvironment
+ *   val input = env.fromElements(("Hello", 2), ("Hello", 5), ("Ciao", 3))
+ *   val result = input.as('word, 'count).groupBy('word).select('word, 
'count.avg)
+ *   result.print()
+ *
+ *   env.execute()
+ * }}}
+ *
+ * The result of an [[ExpressionOperation]] can be converted back to the 
underlying API
+ * representation using `as`:
+ *
+ * {{{
+ *   case class Word(word: String, count: Int)
+ *
+ *   val result = in.select(...).as('word, 'count)
+ *   val set = result.as[Word]
+ * }}}
+ */
+package object expressions extends ImplicitExpressionConversions {
+
+  implicit def dataSet2DataSetConversions[T](set: DataSet[T]): 
DataSetConversions[T] = {
+    new DataSetConversions[T](set, set.getType.asInstanceOf[CompositeType[T]])
+  }
+
+  implicit def expressionOperation2RowDataSet(
+      expressionOperation: ExpressionOperation[ScalaBatchTranslator]): 
DataSet[Row] = {
+    expressionOperation.as[Row]
+  }
+
+  implicit def rowDataSet2ExpressionOperation(
+      rowDataSet: DataSet[Row]): ExpressionOperation[ScalaBatchTranslator] = {
+    rowDataSet.toExpression
+  }
+
+  implicit def dataStream2DataSetConversions[T](
+      stream: DataStream[T]): DataStreamConversions[T] = {
+    new DataStreamConversions[T](
+      stream,
+      stream.getJavaStream.getType.asInstanceOf[CompositeType[T]])
+  }
+
+  implicit def expressionOperation2RowDataStream(
+      expressionOperation: ExpressionOperation[ScalaStreamingTranslator]): 
DataStream[Row] = {
+    expressionOperation.as[Row]
+  }
+
+  implicit def rowDataStream2ExpressionOperation(
+      rowDataStream: DataStream[Row]): 
ExpressionOperation[ScalaStreamingTranslator] = {
+    rowDataStream.toExpression
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala
new file mode 100644
index 0000000..dadfe09
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala
+
+import org.apache.flink.api.expressions.tree.Literal
+import org.apache.flink.api.common.functions.GroupReduceFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.expressions._
+import org.apache.flink.examples.java.graph.util.PageRankData
+import org.apache.flink.util.Collector
+
+import _root_.scala.collection.JavaConverters._
+
+/**
+* A basic implementation of the Page Rank algorithm using a bulk iteration.
+*
+* This implementation requires a set of pages and a set of directed links as 
input and works as
+* follows.
+*
+* In each iteration, the rank of every page is evenly distributed to all pages 
it points to. Each
+* page collects the partial ranks of all pages that point to it, sums them up, 
and applies a
+* dampening factor to the sum. The result is the new rank of the page. A new 
iteration is started
+* with the new ranks of all pages. This implementation terminates after a 
fixed number of
+* iterations. This is the Wikipedia entry for the
+* [[http://en.wikipedia.org/wiki/Page_rank Page Rank algorithm]]
+*
+* Input files are plain text files and must be formatted as follows:
+*
+*  - Pages represented as an (long) ID separated by new-line characters.
+*    For example `"1\n2\n12\n42\n63\n"` gives five pages with IDs 1, 2, 12, 
42, and 63.
+*  - Links are represented as pairs of page IDs which are separated by space  
characters. Links
+*    are separated by new-line characters.
+*    For example `"1 2\n2 12\n1 12\n42 63\n"` gives four (directed) links 
(1)->(2), (2)->(12),
+*    (1)->(12), and (42)->(63). For this simple implementation it is required 
that each page has
+*    at least one incoming and one outgoing link (a page can point to itself).
+*
+* Usage:
+* {{{
+*   PageRankBasic <pages path> <links path> <output path> <num pages> <num 
iterations>
+* }}}
+*
+* If no parameters are provided, the program is run with default data from
+* [[org.apache.flink.examples.java.graph.util.PageRankData]] and 10 iterations.
+*
+* This example shows how to use:
+*
+*  - Bulk Iterations
+*  - Expression Operations
+*/
+object PageRankExpression {
+
+  private final val DAMPENING_FACTOR: Double = 0.85
+  private final val EPSILON: Double = 0.0001
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    // set up execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    // read input data
+    val pagesWithRanks = getPagesDataSet(env).map { p => (p, 1.0 / numPages) }
+      .as('pageId, 'rank)
+
+    val links = getLinksDataSet(env)
+
+    // build adjacency list from link input
+    val adjacencyLists = links
+      .groupBy("sourceId").reduceGroup( new GroupReduceFunction[Link, 
AdjacencyList] {
+
+        override def reduce(
+            values: _root_.java.lang.Iterable[Link],
+            out: Collector[AdjacencyList]): Unit = {
+          var outputId = -1L
+          val outputList = values.asScala map { t => outputId = t.sourceId; 
t.targetId }
+          out.collect(new AdjacencyList(outputId, outputList.toArray))
+        }
+
+      }).as('sourceId, 'targetIds)
+
+    // start iteration
+    val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
+      currentRanks =>
+        val newRanks = currentRanks.toExpression
+          // distribute ranks to target pages
+          .join(adjacencyLists).where('pageId === 'sourceId)
+          .select('rank, 'targetIds).as[RankOutput]
+          .flatMap {
+            (in, out: Collector[(Long, Double)]) =>
+              val targets = in.targetIds
+              val len = targets.length
+              targets foreach { t => out.collect((t, in.rank / len )) }
+          }
+          .as('pageId, 'rank)
+          // collect ranks and sum them up
+          .groupBy('pageId).select('pageId, 'rank.sum as 'rank)
+          // apply dampening factor
+          .select(
+            'pageId,
+            ('rank * DAMPENING_FACTOR) + (Literal(1) - DAMPENING_FACTOR) / 
numPages as 'rank)
+
+
+        val termination = currentRanks.toExpression
+          .as('curId, 'curRank).join(newRanks.as('newId, 'newRank))
+          .where('curId === 'newId && ('curRank - 'newRank).abs > EPSILON)
+
+        (newRanks, termination)
+    }
+
+    val result = finalRanks
+
+    // emit result
+    if (fileOutput) {
+      result.writeAsCsv(outputPath, "\n", " ")
+    } else {
+      result.print()
+    }
+
+    // execute program
+    env.execute("Expression PageRank Example")
+  }
+
+  // *************************************************************************
+  //     USER TYPES
+  // *************************************************************************
+
+  case class Link(sourceId: Long, targetId: Long)
+
+  case class Page(pageId: Long, rank: Double)
+
+  case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
+
+  case class RankOutput(rank: Double, targetIds: Array[Long])
+
+  // *************************************************************************
+  //     UTIL METHODS
+  // *************************************************************************
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if (args.length > 0) {
+      fileOutput = true
+      if (args.length == 5) {
+        pagesInputPath = args(0)
+        linksInputPath = args(1)
+        outputPath = args(2)
+        numPages = args(3).toLong
+        maxIterations = args(4).toInt
+      } else {
+        System.err.println("Usage: PageRankBasic <pages path> <links path> 
<output path> <num " +
+          "pages> <num iterations>")
+        false
+      }
+    } else {
+      System.out.println("Executing PageRank Basic example with default 
parameters and built-in " +
+        "default data.")
+      System.out.println("  Provide parameters to read input data from files.")
+      System.out.println("  See the documentation for the correct format of 
input files.")
+      System.out.println("  Usage: PageRankBasic <pages path> <links path> 
<output path> <num " +
+        "pages> <num iterations>")
+
+      numPages = PageRankData.getNumberOfPages
+    }
+    true
+  }
+
+  private def getPagesDataSet(env: ExecutionEnvironment): DataSet[Long] = {
+    if (fileOutput) {
+      env.readCsvFile[Tuple1[Long]](pagesInputPath, fieldDelimiter = " ", 
lineDelimiter = "\n")
+        .map(x => x._1)
+    } else {
+      env.generateSequence(1, 15)
+    }
+  }
+
+  private def getLinksDataSet(env: ExecutionEnvironment): DataSet[Link] = {
+    if (fileOutput) {
+      env.readCsvFile[Link](linksInputPath, fieldDelimiter = " ",
+        includedFields = Array(0, 1))
+    } else {
+      val edges = PageRankData.EDGES.map { case Array(v1, v2) => 
Link(v1.asInstanceOf[Long],
+        v2.asInstanceOf[Long])}
+      env.fromCollection(edges)
+    }
+  }
+
+  private var fileOutput: Boolean = false
+  private var pagesInputPath: String = null
+  private var linksInputPath: String = null
+  private var outputPath: String = null
+  private var numPages: Double = 0
+  private var maxIterations: Int = 10
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e868d596/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala
new file mode 100644
index 0000000..2d1d0ec
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.scala
+
+import org.apache.flink.streaming.api.scala._
+
+import org.apache.flink.api.scala.expressions._
+
+import scala.Stream._
+import scala.math._
+import scala.language.postfixOps
+import scala.util.Random
+
+/**
+ * Simple example for demonstrating the working streaming api expression 
operations.
+ */
+object StreamingExpressionFilter {
+
+  case class CarEvent(carId: Int, speed: Int, distance: Double, time: Long) 
extends Serializable
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val cars = genCarStream().toExpression
+      .filter('carId === 0)
+      .select('carId, 'speed, 'distance + 1000 as 'distance, 'time % 5 as 
'time)
+      .as[CarEvent]
+
+    cars.print()
+
+    
StreamExecutionEnvironment.getExecutionEnvironment.execute("TopSpeedWindowing")
+
+  }
+
+  def genCarStream(): DataStream[CarEvent] = {
+
+    def nextSpeed(carEvent : CarEvent) : CarEvent =
+    {
+      val next =
+        if (Random.nextBoolean()) min(100, carEvent.speed + 5) else max(0, 
carEvent.speed - 5)
+      CarEvent(carEvent.carId, next, carEvent.distance + 
next/3.6d,System.currentTimeMillis)
+    }
+    def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] =
+    {
+      Thread.sleep(1000)
+      speeds.append(carStream(speeds.map(nextSpeed)))
+    }
+    carStream(range(0, 
numOfCars).map(CarEvent(_,50,0,System.currentTimeMillis())))
+  }
+
+  def parseParameters(args: Array[String]): Boolean = {
+    if (args.length > 0) {
+      if (args.length == 3) {
+        numOfCars = args(0).toInt
+        evictionSec = args(1).toInt
+        triggerMeters = args(2).toDouble
+        true
+      }
+      else {
+        System.err.println("Usage: TopSpeedWindowing <numCars> <evictSec> 
<triggerMeters>")
+        false
+      }
+    }else{
+      true
+    }
+  }
+
+  var numOfCars = 2
+  var evictionSec = 10
+  var triggerMeters = 50d
+
+}

Reply via email to