Repository: flink
Updated Branches:
  refs/heads/master 5c7243c1e -> 7deeda788


Rename coGroupDataSet.scala to CoGroupDataSet.scala, and crossDataSet.scala to 
CrossDataSet.scala


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/00a978b4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/00a978b4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/00a978b4

Branch: refs/heads/master
Commit: 00a978b44fd982b565bbe38990799c35a418d6a4
Parents: 5c7243c
Author: Henry Saputra <[email protected]>
Authored: Thu Jan 22 16:58:32 2015 -0800
Committer: Henry Saputra <[email protected]>
Committed: Thu Jan 22 16:58:32 2015 -0800

----------------------------------------------------------------------
 .../apache/flink/api/scala/CoGroupDataSet.scala | 341 +++++++++++++++++++
 .../apache/flink/api/scala/CrossDataSet.scala   | 140 ++++++++
 .../apache/flink/api/scala/coGroupDataSet.scala | 341 -------------------
 .../apache/flink/api/scala/crossDataSet.scala   | 140 --------
 4 files changed, 481 insertions(+), 481 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/00a978b4/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala
new file mode 100644
index 0000000..54374ba
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import org.apache.commons.lang3.Validate
+import org.apache.commons.lang3.tuple.Pair
+import org.apache.commons.lang3.tuple.ImmutablePair
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.common.functions.{RichCoGroupFunction, 
CoGroupFunction}
+import org.apache.flink.api.common.functions.Partitioner
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.operators._
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, 
CaseClassTypeInfo}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.util.Collector
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+import org.apache.flink.api.java.operators.Keys.ExpressionKeys
+
+/**
+ * A specific [[DataSet]] that results from a `coGroup` operation. The result 
of a default coGroup
+ * is a tuple containing two arrays of values from the two sides of the 
coGroup. The result of the
+ * coGroup can be changed by specifying a custom coGroup function using the 
`apply` method or by
+ * providing a [[RichCoGroupFunction]].
+ *
+ * Example:
+ * {{{
+ *   val left = ...
+ *   val right = ...
+ *   val coGroupResult = left.coGroup(right).where(0, 2).isEqualTo(0, 1) {
+ *     (left, right) => new MyCoGroupResult(left.min, right.max)
+ *   }
+ * }}}
+ *
+ * Or, using key selector functions with tuple data types:
+ * {{{
+ *   val left = ...
+ *   val right = ...
+ *   val coGroupResult = left.coGroup(right).where({_._1}).isEqualTo({_._1) {
+ *     (left, right) => new MyCoGroupResult(left.max, right.min)
+ *   }
+ * }}}
+ *
+ * @tparam L Type of the left input of the coGroup.
+ * @tparam R Type of the right input of the coGroup.
+ */
+class CoGroupDataSet[L, R](
+    defaultCoGroup: CoGroupOperator[L, R, (Array[L], Array[R])],
+    leftInput: DataSet[L],
+    rightInput: DataSet[R],
+    leftKeys: Keys[L],
+    rightKeys: Keys[R])
+  extends DataSet(defaultCoGroup) {
+
+  private val groupSortKeyPositionsFirst = mutable.MutableList[Either[Int, 
String]]()
+  private val groupSortKeyPositionsSecond = mutable.MutableList[Either[Int, 
String]]()
+  private val groupSortOrdersFirst = mutable.MutableList[Order]()
+  private val groupSortOrdersSecond = mutable.MutableList[Order]()
+  
+  private var customPartitioner : Partitioner[_] = _
+  
+  /**
+   * Creates a new [[DataSet]] where the result for each pair of co-grouped 
element lists is the
+   * result of the given function.
+   */
+  def apply[O: TypeInformation: ClassTag](
+      fun: (Iterator[L], Iterator[R]) => O): DataSet[O] = {
+    Validate.notNull(fun, "CoGroup function must not be null.")
+    val coGrouper = new CoGroupFunction[L, R, O] {
+      val cleanFun = clean(fun)
+      def coGroup(left: java.lang.Iterable[L], right: java.lang.Iterable[R], 
out: Collector[O]) = {
+        out.collect(cleanFun(left.iterator().asScala, 
right.iterator().asScala))
+      }
+    }
+    val coGroupOperator = new CoGroupOperator[L, R, O](
+      leftInput.javaSet,
+      rightInput.javaSet,
+      leftKeys,
+      rightKeys,
+      coGrouper,
+      implicitly[TypeInformation[O]],
+      buildGroupSortList(leftInput.getType, groupSortKeyPositionsFirst, 
groupSortOrdersFirst),
+      buildGroupSortList(rightInput.getType, groupSortKeyPositionsSecond, 
groupSortOrdersSecond),
+      customPartitioner,
+      getCallLocationName())
+
+    
+    wrap(coGroupOperator)
+  }
+
+  /**
+   * Creates a new [[DataSet]] where the result for each pair of co-grouped 
element lists is the
+   * result of the given function. The function can output zero or more 
elements using the
+   * [[Collector]] which will form the result.
+   */
+  def apply[O: TypeInformation: ClassTag](
+      fun: (Iterator[L], Iterator[R], Collector[O]) => Unit): DataSet[O] = {
+    Validate.notNull(fun, "CoGroup function must not be null.")
+    val coGrouper = new CoGroupFunction[L, R, O] {
+      val cleanFun = clean(fun)
+      def coGroup(left: java.lang.Iterable[L], right: java.lang.Iterable[R], 
out: Collector[O]) = {
+        cleanFun(left.iterator.asScala, right.iterator.asScala, out)
+      }
+    }
+    val coGroupOperator = new CoGroupOperator[L, R, O](
+      leftInput.javaSet,
+      rightInput.javaSet,
+      leftKeys,
+      rightKeys,
+      coGrouper,
+      implicitly[TypeInformation[O]],
+      buildGroupSortList(leftInput.getType, groupSortKeyPositionsFirst, 
groupSortOrdersFirst),
+      buildGroupSortList(rightInput.getType, groupSortKeyPositionsSecond, 
groupSortOrdersSecond),
+      customPartitioner,
+      getCallLocationName())
+
+    wrap(coGroupOperator)
+  }
+
+  /**
+   * Creates a new [[DataSet]] by passing each pair of co-grouped element 
lists to the given
+   * function. The function can output zero or more elements using the 
[[Collector]] which will form
+   * the result.
+   *
+   * A [[RichCoGroupFunction]] can be used to access the
+   * broadcast variables and the 
[[org.apache.flink.api.common.functions.RuntimeContext]].
+   */
+  def apply[O: TypeInformation: ClassTag](coGrouper: CoGroupFunction[L, R, 
O]): DataSet[O] = {
+    Validate.notNull(coGrouper, "CoGroup function must not be null.")
+    val coGroupOperator = new CoGroupOperator[L, R, O](
+      leftInput.javaSet,
+      rightInput.javaSet,
+      leftKeys,
+      rightKeys,
+      coGrouper,
+      implicitly[TypeInformation[O]],
+      buildGroupSortList(leftInput.getType, groupSortKeyPositionsFirst, 
groupSortOrdersFirst),
+      buildGroupSortList(rightInput.getType, groupSortKeyPositionsSecond, 
groupSortOrdersSecond),
+      customPartitioner,
+      getCallLocationName())
+
+    wrap(coGroupOperator)
+  }
+  
+  // 
----------------------------------------------------------------------------------------------
+  //  Properties
+  // 
----------------------------------------------------------------------------------------------
+  
+  def withPartitioner[K : TypeInformation](partitioner : Partitioner[K]) : 
CoGroupDataSet[L, R] = {
+    if (partitioner != null) {
+      val typeInfo : TypeInformation[K] = implicitly[TypeInformation[K]]
+      
+      leftKeys.validateCustomPartitioner(partitioner, typeInfo)
+      rightKeys.validateCustomPartitioner(partitioner, typeInfo)
+    }
+    this.customPartitioner = partitioner
+    defaultCoGroup.withPartitioner(partitioner)
+    
+    this
+  }
+
+  /**
+   * Gets the custom partitioner used by this join, or null, if none is set.
+   */
+  def getPartitioner[K]() : Partitioner[K] = {
+    customPartitioner.asInstanceOf[Partitioner[K]]
+  }
+  
+  /**
+   * Adds a secondary sort key to the first input of this [[CoGroupDataSet]].
+   *
+   * This only works on Tuple DataSets.
+   */
+  def sortFirstGroup(field: Int, order: Order): CoGroupDataSet[L, R] = {
+    if (!defaultCoGroup.getInput1Type().isTupleType) {
+      throw new InvalidProgramException("Specifying order keys via field 
positions is only valid " +
+        "for tuple data types.")
+    }
+    if (field >= defaultCoGroup.getInput1Type().getArity) {
+      throw new IllegalArgumentException("Order key out of tuple bounds.")
+    }
+    groupSortKeyPositionsFirst += Left(field)
+    groupSortOrdersFirst += order
+    this
+  }
+
+  /**
+   * Adds a secondary sort key to the first input of this [[CoGroupDataSet]].
+   */
+  def sortFirstGroup(field: String, order: Order): CoGroupDataSet[L, R] = {
+    groupSortKeyPositionsFirst += Right(field)
+    groupSortOrdersFirst += order
+    this
+  }
+  
+  /**
+   * Adds a secondary sort key to the second input of this [[CoGroupDataSet]].
+   *
+   * This only works on Tuple DataSets.
+   */
+  def sortSecondGroup(field: Int, order: Order): CoGroupDataSet[L, R] = {
+    if (!defaultCoGroup.getInput2Type().isTupleType) {
+      throw new InvalidProgramException("Specifying order keys via field 
positions is only valid " +
+        "for tuple data types.")
+    }
+    if (field >= defaultCoGroup.getInput2Type().getArity) {
+      throw new IllegalArgumentException("Order key out of tuple bounds.")
+    }
+    groupSortKeyPositionsSecond += Left(field)
+    groupSortOrdersSecond += order
+    this
+  }
+
+  /**
+   * Adds a secondary sort key to the second input of this [[CoGroupDataSet]].
+   */
+  def sortSecondGroup(field: String, order: Order): CoGroupDataSet[L, R] = {
+    groupSortKeyPositionsSecond += Right(field)
+    groupSortOrdersSecond += order
+    this
+  }
+  
+  private def buildGroupSortList[T](typeInfo: TypeInformation[T],
+                                    keys: mutable.MutableList[Either[Int, 
String]],
+                                    orders: mutable.MutableList[Order])
+          : java.util.List[Pair[java.lang.Integer, Order]] =
+  {
+    if (keys.isEmpty) {
+      null
+    }
+    else {
+      val result = new java.util.ArrayList[Pair[java.lang.Integer, Order]]
+      
+      keys.zip(orders).foreach {
+        case ( Left(position), order )  => result.add(
+                                      new ImmutablePair[java.lang.Integer, 
Order](position, order))
+        
+        case ( Right(expression), order ) => {
+          if (!typeInfo.isInstanceOf[CompositeType[_]]) {
+            throw new InvalidProgramException("Specifying order keys via field 
positions is only "
+                                   + "valid for composite data types (pojo / 
tuple / case class)")
+          }
+          else {
+            val ek = new ExpressionKeys[T](Array[String](expression), typeInfo)
+            val groupOrderKeys : Array[Int] = ek.computeLogicalKeyPositions()
+            
+            for (k <- groupOrderKeys) {
+              result.add(new ImmutablePair[java.lang.Integer, Order](k, order))
+            }
+          }
+        }
+      }
+      
+      result
+    }
+  }
+}
+
+/**
+ * An unfinished coGroup operation that results from [[DataSet.coGroup]] The 
keys for the left and
+ * right side must be specified using first `where` and then `isEqualTo`. For 
example:
+ *
+ * {{{
+ *   val left = ...
+ *   val right = ...
+ *   val coGroupResult = left.coGroup(right).where(...).isEqualTo(...)
+ * }}}
+ * @tparam L The type of the left input of the coGroup.
+ * @tparam R The type of the right input of the coGroup.
+ */
+class UnfinishedCoGroupOperation[L: ClassTag, R: ClassTag](
+    leftInput: DataSet[L],
+    rightInput: DataSet[R])
+  extends UnfinishedKeyPairOperation[L, R, CoGroupDataSet[L, R]](leftInput, 
rightInput) {
+
+  private[flink] def finish(leftKey: Keys[L], rightKey: Keys[R]) = {
+    val coGrouper = new CoGroupFunction[L, R, (Array[L], Array[R])] {
+      def coGroup(
+                   left: java.lang.Iterable[L],
+                   right: java.lang.Iterable[R],
+                   out: Collector[(Array[L], Array[R])]) = {
+        val leftResult = Array[Any](left.asScala.toSeq: 
_*).asInstanceOf[Array[L]]
+        val rightResult = Array[Any](right.asScala.toSeq: 
_*).asInstanceOf[Array[R]]
+
+        out.collect((leftResult, rightResult))
+      }
+    }
+
+    // We have to use this hack, for some reason classOf[Array[T]] does not 
work.
+    // Maybe because ObjectArrayTypeInfo does not accept the Scala Array as an 
array class.
+    val leftArrayType =
+      ObjectArrayTypeInfo.getInfoFor(new Array[L](0).getClass, 
leftInput.getType)
+    val rightArrayType =
+      ObjectArrayTypeInfo.getInfoFor(new Array[R](0).getClass, 
rightInput.getType)
+
+    val returnType = new CaseClassTypeInfo[(Array[L], Array[R])](
+      classOf[(Array[L], Array[R])], Seq(leftArrayType, rightArrayType), 
Array("_1", "_2")) {
+
+      override def createSerializer: TypeSerializer[(Array[L], Array[R])] = {
+        val fieldSerializers: Array[TypeSerializer[_]] = new 
Array[TypeSerializer[_]](getArity)
+        for (i <- 0 until getArity) {
+          fieldSerializers(i) = types(i).createSerializer
+        }
+
+        new CaseClassSerializer[(Array[L], Array[R])](
+          classOf[(Array[L], Array[R])],
+          fieldSerializers) {
+          override def createInstance(fields: Array[AnyRef]) = {
+            (fields(0).asInstanceOf[Array[L]], 
fields(1).asInstanceOf[Array[R]])
+          }
+        }
+      }
+    }
+    val coGroupOperator = new CoGroupOperator[L, R, (Array[L], Array[R])](
+      leftInput.javaSet, rightInput.javaSet, leftKey, rightKey, coGrouper, 
returnType,
+      null, // partitioner
+      getCallLocationName())
+
+    new CoGroupDataSet(coGroupOperator, leftInput, rightInput, leftKey, 
rightKey)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00a978b4/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala
new file mode 100644
index 0000000..2e69efa
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.commons.lang3.Validate
+import org.apache.flink.api.common.functions.{RichCrossFunction, CrossFunction}
+import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
+import org.apache.flink.api.java.operators._
+import org.apache.flink.api.java.{DataSet => JavaDataSet}
+import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, 
CaseClassTypeInfo}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
+
+import scala.reflect.ClassTag
+
+/**
+ * A specific [[DataSet]] that results from a `cross` operation. The result of 
a default cross is a
+ * tuple containing the two values from the two sides of the cartesian 
product. The result of the
+ * cross can be changed by specifying a custom cross function using the 
`apply` method or by
+ * providing a [[RichCrossFunction]].
+ *
+ * Example:
+ * {{{
+ *   val left = ...
+ *   val right = ...
+ *   val crossResult = left.cross(right) {
+ *     (left, right) => new MyCrossResult(left, right)
+ *   }
+ * }}}
+ *
+ * @tparam L Type of the left input of the cross.
+ * @tparam R Type of the right input of the cross.
+ */
+class CrossDataSet[L, R](
+    defaultCross: CrossOperator[L, R, (L, R)],
+    leftInput: DataSet[L],
+    rightInput: DataSet[R])
+  extends DataSet(defaultCross) {
+
+  /**
+   * Creates a new [[DataSet]] where the result for each pair of elements is 
the result
+   * of the given function.
+   */
+  def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O] = {
+    Validate.notNull(fun, "Cross function must not be null.")
+    val crosser = new CrossFunction[L, R, O] {
+      val cleanFun = clean(fun)
+      def cross(left: L, right: R): O = {
+        cleanFun(left, right)
+      }
+    }
+    val crossOperator = new CrossOperator[L, R, O](
+      leftInput.javaSet,
+      rightInput.javaSet,
+      crosser,
+      implicitly[TypeInformation[O]],
+      defaultCross.getCrossHint(),
+      getCallLocationName())
+    wrap(crossOperator)
+  }
+
+  /**
+   * Creates a new [[DataSet]] by passing each pair of values to the given 
function.
+   * The function can output zero or more elements using the [[Collector]] 
which will form the
+   * result.
+   *
+   * A [[RichCrossFunction]] can be used to access the
+   * broadcast variables and the 
[[org.apache.flink.api.common.functions.RuntimeContext]].
+   */
+  def apply[O: TypeInformation: ClassTag](crosser: CrossFunction[L, R, O]): 
DataSet[O] = {
+    Validate.notNull(crosser, "Cross function must not be null.")
+    val crossOperator = new CrossOperator[L, R, O](
+      leftInput.javaSet,
+      rightInput.javaSet,
+      crosser,
+      implicitly[TypeInformation[O]],
+      defaultCross.getCrossHint(),
+      getCallLocationName())
+    wrap(crossOperator)
+  }
+}
+
+private[flink] object CrossDataSet {
+
+  /**
+   * Creates a default cross operation with Tuple2 as result.
+   */
+  def createCrossOperator[L, R](
+      leftInput: DataSet[L],
+      rightInput: DataSet[R],
+      crossHint: CrossHint) = {
+    
+    val crosser = new CrossFunction[L, R, (L, R)] {
+      def cross(left: L, right: R) = {
+        (left, right)
+      }
+    }
+    val returnType = new CaseClassTypeInfo[(L, R)](
+      classOf[(L, R)], Seq(leftInput.getType, rightInput.getType), Array("_1", 
"_2")) {
+
+      override def createSerializer: TypeSerializer[(L, R)] = {
+        val fieldSerializers: Array[TypeSerializer[_]] = new 
Array[TypeSerializer[_]](getArity)
+        for (i <- 0 until getArity) {
+          fieldSerializers(i) = types(i).createSerializer
+        }
+
+        new CaseClassSerializer[(L, R)](classOf[(L, R)], fieldSerializers) {
+          override def createInstance(fields: Array[AnyRef]) = {
+            (fields(0).asInstanceOf[L], fields(1).asInstanceOf[R])
+          }
+        }
+      }
+    }
+    val crossOperator = new CrossOperator[L, R, (L, R)](
+      leftInput.javaSet,
+      rightInput.javaSet,
+      crosser,
+      returnType,
+      crossHint,
+      getCallLocationName())
+
+    new CrossDataSet(crossOperator, leftInput, rightInput)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00a978b4/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala
deleted file mode 100644
index 54374ba..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala
-
-import org.apache.commons.lang3.Validate
-import org.apache.commons.lang3.tuple.Pair
-import org.apache.commons.lang3.tuple.ImmutablePair
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.common.InvalidProgramException
-import org.apache.flink.api.common.functions.{RichCoGroupFunction, 
CoGroupFunction}
-import org.apache.flink.api.common.functions.Partitioner
-import org.apache.flink.api.common.operators.Order
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.java.operators._
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
-import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, 
CaseClassTypeInfo}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.util.Collector
-import scala.collection.mutable
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-import org.apache.flink.api.java.operators.Keys.ExpressionKeys
-
-/**
- * A specific [[DataSet]] that results from a `coGroup` operation. The result 
of a default coGroup
- * is a tuple containing two arrays of values from the two sides of the 
coGroup. The result of the
- * coGroup can be changed by specifying a custom coGroup function using the 
`apply` method or by
- * providing a [[RichCoGroupFunction]].
- *
- * Example:
- * {{{
- *   val left = ...
- *   val right = ...
- *   val coGroupResult = left.coGroup(right).where(0, 2).isEqualTo(0, 1) {
- *     (left, right) => new MyCoGroupResult(left.min, right.max)
- *   }
- * }}}
- *
- * Or, using key selector functions with tuple data types:
- * {{{
- *   val left = ...
- *   val right = ...
- *   val coGroupResult = left.coGroup(right).where({_._1}).isEqualTo({_._1) {
- *     (left, right) => new MyCoGroupResult(left.max, right.min)
- *   }
- * }}}
- *
- * @tparam L Type of the left input of the coGroup.
- * @tparam R Type of the right input of the coGroup.
- */
-class CoGroupDataSet[L, R](
-    defaultCoGroup: CoGroupOperator[L, R, (Array[L], Array[R])],
-    leftInput: DataSet[L],
-    rightInput: DataSet[R],
-    leftKeys: Keys[L],
-    rightKeys: Keys[R])
-  extends DataSet(defaultCoGroup) {
-
-  private val groupSortKeyPositionsFirst = mutable.MutableList[Either[Int, 
String]]()
-  private val groupSortKeyPositionsSecond = mutable.MutableList[Either[Int, 
String]]()
-  private val groupSortOrdersFirst = mutable.MutableList[Order]()
-  private val groupSortOrdersSecond = mutable.MutableList[Order]()
-  
-  private var customPartitioner : Partitioner[_] = _
-  
-  /**
-   * Creates a new [[DataSet]] where the result for each pair of co-grouped 
element lists is the
-   * result of the given function.
-   */
-  def apply[O: TypeInformation: ClassTag](
-      fun: (Iterator[L], Iterator[R]) => O): DataSet[O] = {
-    Validate.notNull(fun, "CoGroup function must not be null.")
-    val coGrouper = new CoGroupFunction[L, R, O] {
-      val cleanFun = clean(fun)
-      def coGroup(left: java.lang.Iterable[L], right: java.lang.Iterable[R], 
out: Collector[O]) = {
-        out.collect(cleanFun(left.iterator().asScala, 
right.iterator().asScala))
-      }
-    }
-    val coGroupOperator = new CoGroupOperator[L, R, O](
-      leftInput.javaSet,
-      rightInput.javaSet,
-      leftKeys,
-      rightKeys,
-      coGrouper,
-      implicitly[TypeInformation[O]],
-      buildGroupSortList(leftInput.getType, groupSortKeyPositionsFirst, 
groupSortOrdersFirst),
-      buildGroupSortList(rightInput.getType, groupSortKeyPositionsSecond, 
groupSortOrdersSecond),
-      customPartitioner,
-      getCallLocationName())
-
-    
-    wrap(coGroupOperator)
-  }
-
-  /**
-   * Creates a new [[DataSet]] where the result for each pair of co-grouped 
element lists is the
-   * result of the given function. The function can output zero or more 
elements using the
-   * [[Collector]] which will form the result.
-   */
-  def apply[O: TypeInformation: ClassTag](
-      fun: (Iterator[L], Iterator[R], Collector[O]) => Unit): DataSet[O] = {
-    Validate.notNull(fun, "CoGroup function must not be null.")
-    val coGrouper = new CoGroupFunction[L, R, O] {
-      val cleanFun = clean(fun)
-      def coGroup(left: java.lang.Iterable[L], right: java.lang.Iterable[R], 
out: Collector[O]) = {
-        cleanFun(left.iterator.asScala, right.iterator.asScala, out)
-      }
-    }
-    val coGroupOperator = new CoGroupOperator[L, R, O](
-      leftInput.javaSet,
-      rightInput.javaSet,
-      leftKeys,
-      rightKeys,
-      coGrouper,
-      implicitly[TypeInformation[O]],
-      buildGroupSortList(leftInput.getType, groupSortKeyPositionsFirst, 
groupSortOrdersFirst),
-      buildGroupSortList(rightInput.getType, groupSortKeyPositionsSecond, 
groupSortOrdersSecond),
-      customPartitioner,
-      getCallLocationName())
-
-    wrap(coGroupOperator)
-  }
-
-  /**
-   * Creates a new [[DataSet]] by passing each pair of co-grouped element 
lists to the given
-   * function. The function can output zero or more elements using the 
[[Collector]] which will form
-   * the result.
-   *
-   * A [[RichCoGroupFunction]] can be used to access the
-   * broadcast variables and the 
[[org.apache.flink.api.common.functions.RuntimeContext]].
-   */
-  def apply[O: TypeInformation: ClassTag](coGrouper: CoGroupFunction[L, R, 
O]): DataSet[O] = {
-    Validate.notNull(coGrouper, "CoGroup function must not be null.")
-    val coGroupOperator = new CoGroupOperator[L, R, O](
-      leftInput.javaSet,
-      rightInput.javaSet,
-      leftKeys,
-      rightKeys,
-      coGrouper,
-      implicitly[TypeInformation[O]],
-      buildGroupSortList(leftInput.getType, groupSortKeyPositionsFirst, 
groupSortOrdersFirst),
-      buildGroupSortList(rightInput.getType, groupSortKeyPositionsSecond, 
groupSortOrdersSecond),
-      customPartitioner,
-      getCallLocationName())
-
-    wrap(coGroupOperator)
-  }
-  
-  // 
----------------------------------------------------------------------------------------------
-  //  Properties
-  // 
----------------------------------------------------------------------------------------------
-  
-  def withPartitioner[K : TypeInformation](partitioner : Partitioner[K]) : 
CoGroupDataSet[L, R] = {
-    if (partitioner != null) {
-      val typeInfo : TypeInformation[K] = implicitly[TypeInformation[K]]
-      
-      leftKeys.validateCustomPartitioner(partitioner, typeInfo)
-      rightKeys.validateCustomPartitioner(partitioner, typeInfo)
-    }
-    this.customPartitioner = partitioner
-    defaultCoGroup.withPartitioner(partitioner)
-    
-    this
-  }
-
-  /**
-   * Gets the custom partitioner used by this join, or null, if none is set.
-   */
-  def getPartitioner[K]() : Partitioner[K] = {
-    customPartitioner.asInstanceOf[Partitioner[K]]
-  }
-  
-  /**
-   * Adds a secondary sort key to the first input of this [[CoGroupDataSet]].
-   *
-   * This only works on Tuple DataSets.
-   */
-  def sortFirstGroup(field: Int, order: Order): CoGroupDataSet[L, R] = {
-    if (!defaultCoGroup.getInput1Type().isTupleType) {
-      throw new InvalidProgramException("Specifying order keys via field 
positions is only valid " +
-        "for tuple data types.")
-    }
-    if (field >= defaultCoGroup.getInput1Type().getArity) {
-      throw new IllegalArgumentException("Order key out of tuple bounds.")
-    }
-    groupSortKeyPositionsFirst += Left(field)
-    groupSortOrdersFirst += order
-    this
-  }
-
-  /**
-   * Adds a secondary sort key to the first input of this [[CoGroupDataSet]].
-   */
-  def sortFirstGroup(field: String, order: Order): CoGroupDataSet[L, R] = {
-    groupSortKeyPositionsFirst += Right(field)
-    groupSortOrdersFirst += order
-    this
-  }
-  
-  /**
-   * Adds a secondary sort key to the second input of this [[CoGroupDataSet]].
-   *
-   * This only works on Tuple DataSets.
-   */
-  def sortSecondGroup(field: Int, order: Order): CoGroupDataSet[L, R] = {
-    if (!defaultCoGroup.getInput2Type().isTupleType) {
-      throw new InvalidProgramException("Specifying order keys via field 
positions is only valid " +
-        "for tuple data types.")
-    }
-    if (field >= defaultCoGroup.getInput2Type().getArity) {
-      throw new IllegalArgumentException("Order key out of tuple bounds.")
-    }
-    groupSortKeyPositionsSecond += Left(field)
-    groupSortOrdersSecond += order
-    this
-  }
-
-  /**
-   * Adds a secondary sort key to the second input of this [[CoGroupDataSet]].
-   */
-  def sortSecondGroup(field: String, order: Order): CoGroupDataSet[L, R] = {
-    groupSortKeyPositionsSecond += Right(field)
-    groupSortOrdersSecond += order
-    this
-  }
-  
-  private def buildGroupSortList[T](typeInfo: TypeInformation[T],
-                                    keys: mutable.MutableList[Either[Int, 
String]],
-                                    orders: mutable.MutableList[Order])
-          : java.util.List[Pair[java.lang.Integer, Order]] =
-  {
-    if (keys.isEmpty) {
-      null
-    }
-    else {
-      val result = new java.util.ArrayList[Pair[java.lang.Integer, Order]]
-      
-      keys.zip(orders).foreach {
-        case ( Left(position), order )  => result.add(
-                                      new ImmutablePair[java.lang.Integer, 
Order](position, order))
-        
-        case ( Right(expression), order ) => {
-          if (!typeInfo.isInstanceOf[CompositeType[_]]) {
-            throw new InvalidProgramException("Specifying order keys via field 
positions is only "
-                                   + "valid for composite data types (pojo / 
tuple / case class)")
-          }
-          else {
-            val ek = new ExpressionKeys[T](Array[String](expression), typeInfo)
-            val groupOrderKeys : Array[Int] = ek.computeLogicalKeyPositions()
-            
-            for (k <- groupOrderKeys) {
-              result.add(new ImmutablePair[java.lang.Integer, Order](k, order))
-            }
-          }
-        }
-      }
-      
-      result
-    }
-  }
-}
-
-/**
- * An unfinished coGroup operation that results from [[DataSet.coGroup]] The 
keys for the left and
- * right side must be specified using first `where` and then `isEqualTo`. For 
example:
- *
- * {{{
- *   val left = ...
- *   val right = ...
- *   val coGroupResult = left.coGroup(right).where(...).isEqualTo(...)
- * }}}
- * @tparam L The type of the left input of the coGroup.
- * @tparam R The type of the right input of the coGroup.
- */
-class UnfinishedCoGroupOperation[L: ClassTag, R: ClassTag](
-    leftInput: DataSet[L],
-    rightInput: DataSet[R])
-  extends UnfinishedKeyPairOperation[L, R, CoGroupDataSet[L, R]](leftInput, 
rightInput) {
-
-  private[flink] def finish(leftKey: Keys[L], rightKey: Keys[R]) = {
-    val coGrouper = new CoGroupFunction[L, R, (Array[L], Array[R])] {
-      def coGroup(
-                   left: java.lang.Iterable[L],
-                   right: java.lang.Iterable[R],
-                   out: Collector[(Array[L], Array[R])]) = {
-        val leftResult = Array[Any](left.asScala.toSeq: 
_*).asInstanceOf[Array[L]]
-        val rightResult = Array[Any](right.asScala.toSeq: 
_*).asInstanceOf[Array[R]]
-
-        out.collect((leftResult, rightResult))
-      }
-    }
-
-    // We have to use this hack, for some reason classOf[Array[T]] does not 
work.
-    // Maybe because ObjectArrayTypeInfo does not accept the Scala Array as an 
array class.
-    val leftArrayType =
-      ObjectArrayTypeInfo.getInfoFor(new Array[L](0).getClass, 
leftInput.getType)
-    val rightArrayType =
-      ObjectArrayTypeInfo.getInfoFor(new Array[R](0).getClass, 
rightInput.getType)
-
-    val returnType = new CaseClassTypeInfo[(Array[L], Array[R])](
-      classOf[(Array[L], Array[R])], Seq(leftArrayType, rightArrayType), 
Array("_1", "_2")) {
-
-      override def createSerializer: TypeSerializer[(Array[L], Array[R])] = {
-        val fieldSerializers: Array[TypeSerializer[_]] = new 
Array[TypeSerializer[_]](getArity)
-        for (i <- 0 until getArity) {
-          fieldSerializers(i) = types(i).createSerializer
-        }
-
-        new CaseClassSerializer[(Array[L], Array[R])](
-          classOf[(Array[L], Array[R])],
-          fieldSerializers) {
-          override def createInstance(fields: Array[AnyRef]) = {
-            (fields(0).asInstanceOf[Array[L]], 
fields(1).asInstanceOf[Array[R]])
-          }
-        }
-      }
-    }
-    val coGroupOperator = new CoGroupOperator[L, R, (Array[L], Array[R])](
-      leftInput.javaSet, rightInput.javaSet, leftKey, rightKey, coGrouper, 
returnType,
-      null, // partitioner
-      getCallLocationName())
-
-    new CoGroupDataSet(coGroupOperator, leftInput, rightInput, leftKey, 
rightKey)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/00a978b4/flink-scala/src/main/scala/org/apache/flink/api/scala/crossDataSet.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/crossDataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/crossDataSet.scala
deleted file mode 100644
index 2e69efa..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/crossDataSet.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala
-
-import org.apache.commons.lang3.Validate
-import org.apache.flink.api.common.functions.{RichCrossFunction, CrossFunction}
-import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
-import org.apache.flink.api.java.operators._
-import org.apache.flink.api.java.{DataSet => JavaDataSet}
-import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, 
CaseClassTypeInfo}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.util.Collector
-import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
-
-import scala.reflect.ClassTag
-
-/**
- * A specific [[DataSet]] that results from a `cross` operation. The result of 
a default cross is a
- * tuple containing the two values from the two sides of the cartesian 
product. The result of the
- * cross can be changed by specifying a custom cross function using the 
`apply` method or by
- * providing a [[RichCrossFunction]].
- *
- * Example:
- * {{{
- *   val left = ...
- *   val right = ...
- *   val crossResult = left.cross(right) {
- *     (left, right) => new MyCrossResult(left, right)
- *   }
- * }}}
- *
- * @tparam L Type of the left input of the cross.
- * @tparam R Type of the right input of the cross.
- */
-class CrossDataSet[L, R](
-    defaultCross: CrossOperator[L, R, (L, R)],
-    leftInput: DataSet[L],
-    rightInput: DataSet[R])
-  extends DataSet(defaultCross) {
-
-  /**
-   * Creates a new [[DataSet]] where the result for each pair of elements is 
the result
-   * of the given function.
-   */
-  def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O] = {
-    Validate.notNull(fun, "Cross function must not be null.")
-    val crosser = new CrossFunction[L, R, O] {
-      val cleanFun = clean(fun)
-      def cross(left: L, right: R): O = {
-        cleanFun(left, right)
-      }
-    }
-    val crossOperator = new CrossOperator[L, R, O](
-      leftInput.javaSet,
-      rightInput.javaSet,
-      crosser,
-      implicitly[TypeInformation[O]],
-      defaultCross.getCrossHint(),
-      getCallLocationName())
-    wrap(crossOperator)
-  }
-
-  /**
-   * Creates a new [[DataSet]] by passing each pair of values to the given 
function.
-   * The function can output zero or more elements using the [[Collector]] 
which will form the
-   * result.
-   *
-   * A [[RichCrossFunction]] can be used to access the
-   * broadcast variables and the 
[[org.apache.flink.api.common.functions.RuntimeContext]].
-   */
-  def apply[O: TypeInformation: ClassTag](crosser: CrossFunction[L, R, O]): 
DataSet[O] = {
-    Validate.notNull(crosser, "Cross function must not be null.")
-    val crossOperator = new CrossOperator[L, R, O](
-      leftInput.javaSet,
-      rightInput.javaSet,
-      crosser,
-      implicitly[TypeInformation[O]],
-      defaultCross.getCrossHint(),
-      getCallLocationName())
-    wrap(crossOperator)
-  }
-}
-
-private[flink] object CrossDataSet {
-
-  /**
-   * Creates a default cross operation with Tuple2 as result.
-   */
-  def createCrossOperator[L, R](
-      leftInput: DataSet[L],
-      rightInput: DataSet[R],
-      crossHint: CrossHint) = {
-    
-    val crosser = new CrossFunction[L, R, (L, R)] {
-      def cross(left: L, right: R) = {
-        (left, right)
-      }
-    }
-    val returnType = new CaseClassTypeInfo[(L, R)](
-      classOf[(L, R)], Seq(leftInput.getType, rightInput.getType), Array("_1", 
"_2")) {
-
-      override def createSerializer: TypeSerializer[(L, R)] = {
-        val fieldSerializers: Array[TypeSerializer[_]] = new 
Array[TypeSerializer[_]](getArity)
-        for (i <- 0 until getArity) {
-          fieldSerializers(i) = types(i).createSerializer
-        }
-
-        new CaseClassSerializer[(L, R)](classOf[(L, R)], fieldSerializers) {
-          override def createInstance(fields: Array[AnyRef]) = {
-            (fields(0).asInstanceOf[L], fields(1).asInstanceOf[R])
-          }
-        }
-      }
-    }
-    val crossOperator = new CrossOperator[L, R, (L, R)](
-      leftInput.javaSet,
-      rightInput.javaSet,
-      crosser,
-      returnType,
-      crossHint,
-      getCallLocationName())
-
-    new CrossDataSet(crossOperator, leftInput, rightInput)
-  }
-}

Reply via email to