Move the UnfinishedCoGroupOperation class into its own Scala file. The UnfinishedCoGroupOperation does not relate closely to CoGroupOperation via sealed modifier so per Scala style guide [1] I propose to move it to separate file.
[1] http://docs.scala-lang.org/style/files.html This closes #324. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7deeda78 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7deeda78 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7deeda78 Branch: refs/heads/master Commit: 7deeda7884a0843a768352364855d21254343079 Parents: 00a978b Author: Henry Saputra <[email protected]> Authored: Thu Jan 22 17:13:21 2015 -0800 Committer: Henry Saputra <[email protected]> Committed: Thu Jan 22 17:13:21 2015 -0800 ---------------------------------------------------------------------- .../apache/flink/api/scala/CoGroupDataSet.scala | 67 -------------- .../api/scala/UnfinishedCoGroupOperation.scala | 94 ++++++++++++++++++++ 2 files changed, 94 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7deeda78/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala index 54374ba..9969dc0 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala @@ -26,10 +26,7 @@ import org.apache.flink.api.common.InvalidProgramException import org.apache.flink.api.common.functions.{RichCoGroupFunction, CoGroupFunction} import org.apache.flink.api.common.functions.Partitioner import org.apache.flink.api.common.operators.Order -import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.api.java.operators._ -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo -import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.util.Collector import scala.collection.mutable @@ -275,67 +272,3 @@ class CoGroupDataSet[L, R]( } } } - -/** - * An unfinished coGroup operation that results from [[DataSet.coGroup]] The keys for the left and - * right side must be specified using first `where` and then `isEqualTo`. For example: - * - * {{{ - * val left = ... - * val right = ... - * val coGroupResult = left.coGroup(right).where(...).isEqualTo(...) - * }}} - * @tparam L The type of the left input of the coGroup. - * @tparam R The type of the right input of the coGroup. - */ -class UnfinishedCoGroupOperation[L: ClassTag, R: ClassTag]( - leftInput: DataSet[L], - rightInput: DataSet[R]) - extends UnfinishedKeyPairOperation[L, R, CoGroupDataSet[L, R]](leftInput, rightInput) { - - private[flink] def finish(leftKey: Keys[L], rightKey: Keys[R]) = { - val coGrouper = new CoGroupFunction[L, R, (Array[L], Array[R])] { - def coGroup( - left: java.lang.Iterable[L], - right: java.lang.Iterable[R], - out: Collector[(Array[L], Array[R])]) = { - val leftResult = Array[Any](left.asScala.toSeq: _*).asInstanceOf[Array[L]] - val rightResult = Array[Any](right.asScala.toSeq: _*).asInstanceOf[Array[R]] - - out.collect((leftResult, rightResult)) - } - } - - // We have to use this hack, for some reason classOf[Array[T]] does not work. - // Maybe because ObjectArrayTypeInfo does not accept the Scala Array as an array class. - val leftArrayType = - ObjectArrayTypeInfo.getInfoFor(new Array[L](0).getClass, leftInput.getType) - val rightArrayType = - ObjectArrayTypeInfo.getInfoFor(new Array[R](0).getClass, rightInput.getType) - - val returnType = new CaseClassTypeInfo[(Array[L], Array[R])]( - classOf[(Array[L], Array[R])], Seq(leftArrayType, rightArrayType), Array("_1", "_2")) { - - override def createSerializer: TypeSerializer[(Array[L], Array[R])] = { - val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity) - for (i <- 0 until getArity) { - fieldSerializers(i) = types(i).createSerializer - } - - new CaseClassSerializer[(Array[L], Array[R])]( - classOf[(Array[L], Array[R])], - fieldSerializers) { - override def createInstance(fields: Array[AnyRef]) = { - (fields(0).asInstanceOf[Array[L]], fields(1).asInstanceOf[Array[R]]) - } - } - } - } - val coGroupOperator = new CoGroupOperator[L, R, (Array[L], Array[R])]( - leftInput.javaSet, rightInput.javaSet, leftKey, rightKey, coGrouper, returnType, - null, // partitioner - getCallLocationName()) - - new CoGroupDataSet(coGroupOperator, leftInput, rightInput, leftKey, rightKey) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7deeda78/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala new file mode 100644 index 0000000..9f895fb --- /dev/null +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + +import org.apache.flink.api.common.functions.CoGroupFunction +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.operators._ +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo +import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo} +import org.apache.flink.util.Collector +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag + + +/** + * An unfinished coGroup operation that results from [[DataSet.coGroup]] The keys for the left and + * right side must be specified using first `where` and then `isEqualTo`. For example: + * + * {{{ + * val left = ... + * val right = ... + * val coGroupResult = left.coGroup(right).where(...).isEqualTo(...) + * }}} + * @tparam L The type of the left input of the coGroup. + * @tparam R The type of the right input of the coGroup. + */ +class UnfinishedCoGroupOperation[L: ClassTag, R: ClassTag]( + leftInput: DataSet[L], + rightInput: DataSet[R]) + extends UnfinishedKeyPairOperation[L, R, CoGroupDataSet[L, R]](leftInput, rightInput) { + + private[flink] def finish(leftKey: Keys[L], rightKey: Keys[R]) = { + val coGrouper = new CoGroupFunction[L, R, (Array[L], Array[R])] { + def coGroup( + left: java.lang.Iterable[L], + right: java.lang.Iterable[R], + out: Collector[(Array[L], Array[R])]) = { + val leftResult = Array[Any](left.asScala.toSeq: _*).asInstanceOf[Array[L]] + val rightResult = Array[Any](right.asScala.toSeq: _*).asInstanceOf[Array[R]] + + out.collect((leftResult, rightResult)) + } + } + + // We have to use this hack, for some reason classOf[Array[T]] does not work. + // Maybe because ObjectArrayTypeInfo does not accept the Scala Array as an array class. + val leftArrayType = + ObjectArrayTypeInfo.getInfoFor(new Array[L](0).getClass, leftInput.getType) + val rightArrayType = + ObjectArrayTypeInfo.getInfoFor(new Array[R](0).getClass, rightInput.getType) + + val returnType = new CaseClassTypeInfo[(Array[L], Array[R])]( + classOf[(Array[L], Array[R])], Seq(leftArrayType, rightArrayType), Array("_1", "_2")) { + + override def createSerializer: TypeSerializer[(Array[L], Array[R])] = { + val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity) + for (i <- 0 until getArity) { + fieldSerializers(i) = types(i).createSerializer + } + + new CaseClassSerializer[(Array[L], Array[R])]( + classOf[(Array[L], Array[R])], + fieldSerializers) { + override def createInstance(fields: Array[AnyRef]) = { + (fields(0).asInstanceOf[Array[L]], fields(1).asInstanceOf[Array[R]]) + } + } + } + } + val coGroupOperator = new CoGroupOperator[L, R, (Array[L], Array[R])]( + leftInput.javaSet, rightInput.javaSet, leftKey, rightKey, coGrouper, returnType, + null, // partitioner + getCallLocationName()) + + new CoGroupDataSet(coGroupOperator, leftInput, rightInput, leftKey, rightKey) + } +} +
