Move the UnfinishedCoGroupOperation class into its own Scala file.

The UnfinishedCoGroupOperation does not relate closely to CoGroupOperation
via sealed modifier so per Scala style guide [1] I propose to move it to
separate file.

[1] http://docs.scala-lang.org/style/files.html

This closes #324.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7deeda78
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7deeda78
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7deeda78

Branch: refs/heads/master
Commit: 7deeda7884a0843a768352364855d21254343079
Parents: 00a978b
Author: Henry Saputra <[email protected]>
Authored: Thu Jan 22 17:13:21 2015 -0800
Committer: Henry Saputra <[email protected]>
Committed: Thu Jan 22 17:13:21 2015 -0800

----------------------------------------------------------------------
 .../apache/flink/api/scala/CoGroupDataSet.scala | 67 --------------
 .../api/scala/UnfinishedCoGroupOperation.scala  | 94 ++++++++++++++++++++
 2 files changed, 94 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7deeda78/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala
index 54374ba..9969dc0 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala
@@ -26,10 +26,7 @@ import org.apache.flink.api.common.InvalidProgramException
 import org.apache.flink.api.common.functions.{RichCoGroupFunction, 
CoGroupFunction}
 import org.apache.flink.api.common.functions.Partitioner
 import org.apache.flink.api.common.operators.Order
-import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.operators._
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
-import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, 
CaseClassTypeInfo}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.util.Collector
 import scala.collection.mutable
@@ -275,67 +272,3 @@ class CoGroupDataSet[L, R](
     }
   }
 }
-
-/**
- * An unfinished coGroup operation that results from [[DataSet.coGroup]] The 
keys for the left and
- * right side must be specified using first `where` and then `isEqualTo`. For 
example:
- *
- * {{{
- *   val left = ...
- *   val right = ...
- *   val coGroupResult = left.coGroup(right).where(...).isEqualTo(...)
- * }}}
- * @tparam L The type of the left input of the coGroup.
- * @tparam R The type of the right input of the coGroup.
- */
-class UnfinishedCoGroupOperation[L: ClassTag, R: ClassTag](
-    leftInput: DataSet[L],
-    rightInput: DataSet[R])
-  extends UnfinishedKeyPairOperation[L, R, CoGroupDataSet[L, R]](leftInput, 
rightInput) {
-
-  private[flink] def finish(leftKey: Keys[L], rightKey: Keys[R]) = {
-    val coGrouper = new CoGroupFunction[L, R, (Array[L], Array[R])] {
-      def coGroup(
-                   left: java.lang.Iterable[L],
-                   right: java.lang.Iterable[R],
-                   out: Collector[(Array[L], Array[R])]) = {
-        val leftResult = Array[Any](left.asScala.toSeq: 
_*).asInstanceOf[Array[L]]
-        val rightResult = Array[Any](right.asScala.toSeq: 
_*).asInstanceOf[Array[R]]
-
-        out.collect((leftResult, rightResult))
-      }
-    }
-
-    // We have to use this hack, for some reason classOf[Array[T]] does not 
work.
-    // Maybe because ObjectArrayTypeInfo does not accept the Scala Array as an 
array class.
-    val leftArrayType =
-      ObjectArrayTypeInfo.getInfoFor(new Array[L](0).getClass, 
leftInput.getType)
-    val rightArrayType =
-      ObjectArrayTypeInfo.getInfoFor(new Array[R](0).getClass, 
rightInput.getType)
-
-    val returnType = new CaseClassTypeInfo[(Array[L], Array[R])](
-      classOf[(Array[L], Array[R])], Seq(leftArrayType, rightArrayType), 
Array("_1", "_2")) {
-
-      override def createSerializer: TypeSerializer[(Array[L], Array[R])] = {
-        val fieldSerializers: Array[TypeSerializer[_]] = new 
Array[TypeSerializer[_]](getArity)
-        for (i <- 0 until getArity) {
-          fieldSerializers(i) = types(i).createSerializer
-        }
-
-        new CaseClassSerializer[(Array[L], Array[R])](
-          classOf[(Array[L], Array[R])],
-          fieldSerializers) {
-          override def createInstance(fields: Array[AnyRef]) = {
-            (fields(0).asInstanceOf[Array[L]], 
fields(1).asInstanceOf[Array[R]])
-          }
-        }
-      }
-    }
-    val coGroupOperator = new CoGroupOperator[L, R, (Array[L], Array[R])](
-      leftInput.javaSet, rightInput.javaSet, leftKey, rightKey, coGrouper, 
returnType,
-      null, // partitioner
-      getCallLocationName())
-
-    new CoGroupDataSet(coGroupOperator, leftInput, rightInput, leftKey, 
rightKey)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7deeda78/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
new file mode 100644
index 0000000..9f895fb
--- /dev/null
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.functions.CoGroupFunction
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.operators._
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, 
CaseClassTypeInfo}
+import org.apache.flink.util.Collector
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+
+/**
+ * An unfinished coGroup operation that results from [[DataSet.coGroup]] The 
keys for the left and
+ * right side must be specified using first `where` and then `isEqualTo`. For 
example:
+ *
+ * {{{
+ *   val left = ...
+ *   val right = ...
+ *   val coGroupResult = left.coGroup(right).where(...).isEqualTo(...)
+ * }}}
+ * @tparam L The type of the left input of the coGroup.
+ * @tparam R The type of the right input of the coGroup.
+ */
+class UnfinishedCoGroupOperation[L: ClassTag, R: ClassTag](
+    leftInput: DataSet[L],
+    rightInput: DataSet[R])
+  extends UnfinishedKeyPairOperation[L, R, CoGroupDataSet[L, R]](leftInput, 
rightInput) {
+
+  private[flink] def finish(leftKey: Keys[L], rightKey: Keys[R]) = {
+    val coGrouper = new CoGroupFunction[L, R, (Array[L], Array[R])] {
+      def coGroup(
+                   left: java.lang.Iterable[L],
+                   right: java.lang.Iterable[R],
+                   out: Collector[(Array[L], Array[R])]) = {
+        val leftResult = Array[Any](left.asScala.toSeq: 
_*).asInstanceOf[Array[L]]
+        val rightResult = Array[Any](right.asScala.toSeq: 
_*).asInstanceOf[Array[R]]
+
+        out.collect((leftResult, rightResult))
+      }
+    }
+
+    // We have to use this hack, for some reason classOf[Array[T]] does not 
work.
+    // Maybe because ObjectArrayTypeInfo does not accept the Scala Array as an 
array class.
+    val leftArrayType =
+      ObjectArrayTypeInfo.getInfoFor(new Array[L](0).getClass, 
leftInput.getType)
+    val rightArrayType =
+      ObjectArrayTypeInfo.getInfoFor(new Array[R](0).getClass, 
rightInput.getType)
+
+    val returnType = new CaseClassTypeInfo[(Array[L], Array[R])](
+      classOf[(Array[L], Array[R])], Seq(leftArrayType, rightArrayType), 
Array("_1", "_2")) {
+
+      override def createSerializer: TypeSerializer[(Array[L], Array[R])] = {
+        val fieldSerializers: Array[TypeSerializer[_]] = new 
Array[TypeSerializer[_]](getArity)
+        for (i <- 0 until getArity) {
+          fieldSerializers(i) = types(i).createSerializer
+        }
+
+        new CaseClassSerializer[(Array[L], Array[R])](
+          classOf[(Array[L], Array[R])],
+          fieldSerializers) {
+          override def createInstance(fields: Array[AnyRef]) = {
+            (fields(0).asInstanceOf[Array[L]], 
fields(1).asInstanceOf[Array[R]])
+          }
+        }
+      }
+    }
+    val coGroupOperator = new CoGroupOperator[L, R, (Array[L], Array[R])](
+      leftInput.javaSet, rightInput.javaSet, leftKey, rightKey, coGrouper, 
returnType,
+      null, // partitioner
+      getCallLocationName())
+
+    new CoGroupDataSet(coGroupOperator, leftInput, rightInput, leftKey, 
rightKey)
+  }
+}
+

Reply via email to