Repository: flink Updated Branches: refs/heads/master 5c7243c1e -> 7deeda788
Rename coGroupDataSet.scala to CoGroupDataSet.scala, and crossDataSet.scala to CrossDataSet.scala Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/00a978b4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/00a978b4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/00a978b4 Branch: refs/heads/master Commit: 00a978b44fd982b565bbe38990799c35a418d6a4 Parents: 5c7243c Author: Henry Saputra <[email protected]> Authored: Thu Jan 22 16:58:32 2015 -0800 Committer: Henry Saputra <[email protected]> Committed: Thu Jan 22 16:58:32 2015 -0800 ---------------------------------------------------------------------- .../apache/flink/api/scala/CoGroupDataSet.scala | 341 +++++++++++++++++++ .../apache/flink/api/scala/CrossDataSet.scala | 140 ++++++++ .../apache/flink/api/scala/coGroupDataSet.scala | 341 ------------------- .../apache/flink/api/scala/crossDataSet.scala | 140 -------- 4 files changed, 481 insertions(+), 481 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/00a978b4/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala new file mode 100644 index 0000000..54374ba --- /dev/null +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + +import org.apache.commons.lang3.Validate +import org.apache.commons.lang3.tuple.Pair +import org.apache.commons.lang3.tuple.ImmutablePair +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.common.InvalidProgramException +import org.apache.flink.api.common.functions.{RichCoGroupFunction, CoGroupFunction} +import org.apache.flink.api.common.functions.Partitioner +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.operators._ +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo +import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.util.Collector +import scala.collection.mutable +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag +import org.apache.flink.api.java.operators.Keys.ExpressionKeys + +/** + * A specific [[DataSet]] that results from a `coGroup` operation. The result of a default coGroup + * is a tuple containing two arrays of values from the two sides of the coGroup. The result of the + * coGroup can be changed by specifying a custom coGroup function using the `apply` method or by + * providing a [[RichCoGroupFunction]]. + * + * Example: + * {{{ + * val left = ... + * val right = ... + * val coGroupResult = left.coGroup(right).where(0, 2).isEqualTo(0, 1) { + * (left, right) => new MyCoGroupResult(left.min, right.max) + * } + * }}} + * + * Or, using key selector functions with tuple data types: + * {{{ + * val left = ... + * val right = ... + * val coGroupResult = left.coGroup(right).where({_._1}).isEqualTo({_._1) { + * (left, right) => new MyCoGroupResult(left.max, right.min) + * } + * }}} + * + * @tparam L Type of the left input of the coGroup. + * @tparam R Type of the right input of the coGroup. + */ +class CoGroupDataSet[L, R]( + defaultCoGroup: CoGroupOperator[L, R, (Array[L], Array[R])], + leftInput: DataSet[L], + rightInput: DataSet[R], + leftKeys: Keys[L], + rightKeys: Keys[R]) + extends DataSet(defaultCoGroup) { + + private val groupSortKeyPositionsFirst = mutable.MutableList[Either[Int, String]]() + private val groupSortKeyPositionsSecond = mutable.MutableList[Either[Int, String]]() + private val groupSortOrdersFirst = mutable.MutableList[Order]() + private val groupSortOrdersSecond = mutable.MutableList[Order]() + + private var customPartitioner : Partitioner[_] = _ + + /** + * Creates a new [[DataSet]] where the result for each pair of co-grouped element lists is the + * result of the given function. + */ + def apply[O: TypeInformation: ClassTag]( + fun: (Iterator[L], Iterator[R]) => O): DataSet[O] = { + Validate.notNull(fun, "CoGroup function must not be null.") + val coGrouper = new CoGroupFunction[L, R, O] { + val cleanFun = clean(fun) + def coGroup(left: java.lang.Iterable[L], right: java.lang.Iterable[R], out: Collector[O]) = { + out.collect(cleanFun(left.iterator().asScala, right.iterator().asScala)) + } + } + val coGroupOperator = new CoGroupOperator[L, R, O]( + leftInput.javaSet, + rightInput.javaSet, + leftKeys, + rightKeys, + coGrouper, + implicitly[TypeInformation[O]], + buildGroupSortList(leftInput.getType, groupSortKeyPositionsFirst, groupSortOrdersFirst), + buildGroupSortList(rightInput.getType, groupSortKeyPositionsSecond, groupSortOrdersSecond), + customPartitioner, + getCallLocationName()) + + + wrap(coGroupOperator) + } + + /** + * Creates a new [[DataSet]] where the result for each pair of co-grouped element lists is the + * result of the given function. The function can output zero or more elements using the + * [[Collector]] which will form the result. + */ + def apply[O: TypeInformation: ClassTag]( + fun: (Iterator[L], Iterator[R], Collector[O]) => Unit): DataSet[O] = { + Validate.notNull(fun, "CoGroup function must not be null.") + val coGrouper = new CoGroupFunction[L, R, O] { + val cleanFun = clean(fun) + def coGroup(left: java.lang.Iterable[L], right: java.lang.Iterable[R], out: Collector[O]) = { + cleanFun(left.iterator.asScala, right.iterator.asScala, out) + } + } + val coGroupOperator = new CoGroupOperator[L, R, O]( + leftInput.javaSet, + rightInput.javaSet, + leftKeys, + rightKeys, + coGrouper, + implicitly[TypeInformation[O]], + buildGroupSortList(leftInput.getType, groupSortKeyPositionsFirst, groupSortOrdersFirst), + buildGroupSortList(rightInput.getType, groupSortKeyPositionsSecond, groupSortOrdersSecond), + customPartitioner, + getCallLocationName()) + + wrap(coGroupOperator) + } + + /** + * Creates a new [[DataSet]] by passing each pair of co-grouped element lists to the given + * function. The function can output zero or more elements using the [[Collector]] which will form + * the result. + * + * A [[RichCoGroupFunction]] can be used to access the + * broadcast variables and the [[org.apache.flink.api.common.functions.RuntimeContext]]. + */ + def apply[O: TypeInformation: ClassTag](coGrouper: CoGroupFunction[L, R, O]): DataSet[O] = { + Validate.notNull(coGrouper, "CoGroup function must not be null.") + val coGroupOperator = new CoGroupOperator[L, R, O]( + leftInput.javaSet, + rightInput.javaSet, + leftKeys, + rightKeys, + coGrouper, + implicitly[TypeInformation[O]], + buildGroupSortList(leftInput.getType, groupSortKeyPositionsFirst, groupSortOrdersFirst), + buildGroupSortList(rightInput.getType, groupSortKeyPositionsSecond, groupSortOrdersSecond), + customPartitioner, + getCallLocationName()) + + wrap(coGroupOperator) + } + + // ---------------------------------------------------------------------------------------------- + // Properties + // ---------------------------------------------------------------------------------------------- + + def withPartitioner[K : TypeInformation](partitioner : Partitioner[K]) : CoGroupDataSet[L, R] = { + if (partitioner != null) { + val typeInfo : TypeInformation[K] = implicitly[TypeInformation[K]] + + leftKeys.validateCustomPartitioner(partitioner, typeInfo) + rightKeys.validateCustomPartitioner(partitioner, typeInfo) + } + this.customPartitioner = partitioner + defaultCoGroup.withPartitioner(partitioner) + + this + } + + /** + * Gets the custom partitioner used by this join, or null, if none is set. + */ + def getPartitioner[K]() : Partitioner[K] = { + customPartitioner.asInstanceOf[Partitioner[K]] + } + + /** + * Adds a secondary sort key to the first input of this [[CoGroupDataSet]]. + * + * This only works on Tuple DataSets. + */ + def sortFirstGroup(field: Int, order: Order): CoGroupDataSet[L, R] = { + if (!defaultCoGroup.getInput1Type().isTupleType) { + throw new InvalidProgramException("Specifying order keys via field positions is only valid " + + "for tuple data types.") + } + if (field >= defaultCoGroup.getInput1Type().getArity) { + throw new IllegalArgumentException("Order key out of tuple bounds.") + } + groupSortKeyPositionsFirst += Left(field) + groupSortOrdersFirst += order + this + } + + /** + * Adds a secondary sort key to the first input of this [[CoGroupDataSet]]. + */ + def sortFirstGroup(field: String, order: Order): CoGroupDataSet[L, R] = { + groupSortKeyPositionsFirst += Right(field) + groupSortOrdersFirst += order + this + } + + /** + * Adds a secondary sort key to the second input of this [[CoGroupDataSet]]. + * + * This only works on Tuple DataSets. + */ + def sortSecondGroup(field: Int, order: Order): CoGroupDataSet[L, R] = { + if (!defaultCoGroup.getInput2Type().isTupleType) { + throw new InvalidProgramException("Specifying order keys via field positions is only valid " + + "for tuple data types.") + } + if (field >= defaultCoGroup.getInput2Type().getArity) { + throw new IllegalArgumentException("Order key out of tuple bounds.") + } + groupSortKeyPositionsSecond += Left(field) + groupSortOrdersSecond += order + this + } + + /** + * Adds a secondary sort key to the second input of this [[CoGroupDataSet]]. + */ + def sortSecondGroup(field: String, order: Order): CoGroupDataSet[L, R] = { + groupSortKeyPositionsSecond += Right(field) + groupSortOrdersSecond += order + this + } + + private def buildGroupSortList[T](typeInfo: TypeInformation[T], + keys: mutable.MutableList[Either[Int, String]], + orders: mutable.MutableList[Order]) + : java.util.List[Pair[java.lang.Integer, Order]] = + { + if (keys.isEmpty) { + null + } + else { + val result = new java.util.ArrayList[Pair[java.lang.Integer, Order]] + + keys.zip(orders).foreach { + case ( Left(position), order ) => result.add( + new ImmutablePair[java.lang.Integer, Order](position, order)) + + case ( Right(expression), order ) => { + if (!typeInfo.isInstanceOf[CompositeType[_]]) { + throw new InvalidProgramException("Specifying order keys via field positions is only " + + "valid for composite data types (pojo / tuple / case class)") + } + else { + val ek = new ExpressionKeys[T](Array[String](expression), typeInfo) + val groupOrderKeys : Array[Int] = ek.computeLogicalKeyPositions() + + for (k <- groupOrderKeys) { + result.add(new ImmutablePair[java.lang.Integer, Order](k, order)) + } + } + } + } + + result + } + } +} + +/** + * An unfinished coGroup operation that results from [[DataSet.coGroup]] The keys for the left and + * right side must be specified using first `where` and then `isEqualTo`. For example: + * + * {{{ + * val left = ... + * val right = ... + * val coGroupResult = left.coGroup(right).where(...).isEqualTo(...) + * }}} + * @tparam L The type of the left input of the coGroup. + * @tparam R The type of the right input of the coGroup. + */ +class UnfinishedCoGroupOperation[L: ClassTag, R: ClassTag]( + leftInput: DataSet[L], + rightInput: DataSet[R]) + extends UnfinishedKeyPairOperation[L, R, CoGroupDataSet[L, R]](leftInput, rightInput) { + + private[flink] def finish(leftKey: Keys[L], rightKey: Keys[R]) = { + val coGrouper = new CoGroupFunction[L, R, (Array[L], Array[R])] { + def coGroup( + left: java.lang.Iterable[L], + right: java.lang.Iterable[R], + out: Collector[(Array[L], Array[R])]) = { + val leftResult = Array[Any](left.asScala.toSeq: _*).asInstanceOf[Array[L]] + val rightResult = Array[Any](right.asScala.toSeq: _*).asInstanceOf[Array[R]] + + out.collect((leftResult, rightResult)) + } + } + + // We have to use this hack, for some reason classOf[Array[T]] does not work. + // Maybe because ObjectArrayTypeInfo does not accept the Scala Array as an array class. + val leftArrayType = + ObjectArrayTypeInfo.getInfoFor(new Array[L](0).getClass, leftInput.getType) + val rightArrayType = + ObjectArrayTypeInfo.getInfoFor(new Array[R](0).getClass, rightInput.getType) + + val returnType = new CaseClassTypeInfo[(Array[L], Array[R])]( + classOf[(Array[L], Array[R])], Seq(leftArrayType, rightArrayType), Array("_1", "_2")) { + + override def createSerializer: TypeSerializer[(Array[L], Array[R])] = { + val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity) + for (i <- 0 until getArity) { + fieldSerializers(i) = types(i).createSerializer + } + + new CaseClassSerializer[(Array[L], Array[R])]( + classOf[(Array[L], Array[R])], + fieldSerializers) { + override def createInstance(fields: Array[AnyRef]) = { + (fields(0).asInstanceOf[Array[L]], fields(1).asInstanceOf[Array[R]]) + } + } + } + } + val coGroupOperator = new CoGroupOperator[L, R, (Array[L], Array[R])]( + leftInput.javaSet, rightInput.javaSet, leftKey, rightKey, coGrouper, returnType, + null, // partitioner + getCallLocationName()) + + new CoGroupDataSet(coGroupOperator, leftInput, rightInput, leftKey, rightKey) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00a978b4/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala new file mode 100644 index 0000000..2e69efa --- /dev/null +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/CrossDataSet.scala @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala + +import org.apache.commons.lang3.Validate +import org.apache.flink.api.common.functions.{RichCrossFunction, CrossFunction} +import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer} +import org.apache.flink.api.java.operators._ +import org.apache.flink.api.java.{DataSet => JavaDataSet} +import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.util.Collector +import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint + +import scala.reflect.ClassTag + +/** + * A specific [[DataSet]] that results from a `cross` operation. The result of a default cross is a + * tuple containing the two values from the two sides of the cartesian product. The result of the + * cross can be changed by specifying a custom cross function using the `apply` method or by + * providing a [[RichCrossFunction]]. + * + * Example: + * {{{ + * val left = ... + * val right = ... + * val crossResult = left.cross(right) { + * (left, right) => new MyCrossResult(left, right) + * } + * }}} + * + * @tparam L Type of the left input of the cross. + * @tparam R Type of the right input of the cross. + */ +class CrossDataSet[L, R]( + defaultCross: CrossOperator[L, R, (L, R)], + leftInput: DataSet[L], + rightInput: DataSet[R]) + extends DataSet(defaultCross) { + + /** + * Creates a new [[DataSet]] where the result for each pair of elements is the result + * of the given function. + */ + def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O] = { + Validate.notNull(fun, "Cross function must not be null.") + val crosser = new CrossFunction[L, R, O] { + val cleanFun = clean(fun) + def cross(left: L, right: R): O = { + cleanFun(left, right) + } + } + val crossOperator = new CrossOperator[L, R, O]( + leftInput.javaSet, + rightInput.javaSet, + crosser, + implicitly[TypeInformation[O]], + defaultCross.getCrossHint(), + getCallLocationName()) + wrap(crossOperator) + } + + /** + * Creates a new [[DataSet]] by passing each pair of values to the given function. + * The function can output zero or more elements using the [[Collector]] which will form the + * result. + * + * A [[RichCrossFunction]] can be used to access the + * broadcast variables and the [[org.apache.flink.api.common.functions.RuntimeContext]]. + */ + def apply[O: TypeInformation: ClassTag](crosser: CrossFunction[L, R, O]): DataSet[O] = { + Validate.notNull(crosser, "Cross function must not be null.") + val crossOperator = new CrossOperator[L, R, O]( + leftInput.javaSet, + rightInput.javaSet, + crosser, + implicitly[TypeInformation[O]], + defaultCross.getCrossHint(), + getCallLocationName()) + wrap(crossOperator) + } +} + +private[flink] object CrossDataSet { + + /** + * Creates a default cross operation with Tuple2 as result. + */ + def createCrossOperator[L, R]( + leftInput: DataSet[L], + rightInput: DataSet[R], + crossHint: CrossHint) = { + + val crosser = new CrossFunction[L, R, (L, R)] { + def cross(left: L, right: R) = { + (left, right) + } + } + val returnType = new CaseClassTypeInfo[(L, R)]( + classOf[(L, R)], Seq(leftInput.getType, rightInput.getType), Array("_1", "_2")) { + + override def createSerializer: TypeSerializer[(L, R)] = { + val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity) + for (i <- 0 until getArity) { + fieldSerializers(i) = types(i).createSerializer + } + + new CaseClassSerializer[(L, R)](classOf[(L, R)], fieldSerializers) { + override def createInstance(fields: Array[AnyRef]) = { + (fields(0).asInstanceOf[L], fields(1).asInstanceOf[R]) + } + } + } + } + val crossOperator = new CrossOperator[L, R, (L, R)]( + leftInput.javaSet, + rightInput.javaSet, + crosser, + returnType, + crossHint, + getCallLocationName()) + + new CrossDataSet(crossOperator, leftInput, rightInput) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/00a978b4/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala deleted file mode 100644 index 54374ba..0000000 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala +++ /dev/null @@ -1,341 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala - -import org.apache.commons.lang3.Validate -import org.apache.commons.lang3.tuple.Pair -import org.apache.commons.lang3.tuple.ImmutablePair -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.common.InvalidProgramException -import org.apache.flink.api.common.functions.{RichCoGroupFunction, CoGroupFunction} -import org.apache.flink.api.common.functions.Partitioner -import org.apache.flink.api.common.operators.Order -import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.api.java.operators._ -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo -import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.util.Collector -import scala.collection.mutable -import scala.collection.JavaConverters._ -import scala.reflect.ClassTag -import org.apache.flink.api.java.operators.Keys.ExpressionKeys - -/** - * A specific [[DataSet]] that results from a `coGroup` operation. The result of a default coGroup - * is a tuple containing two arrays of values from the two sides of the coGroup. The result of the - * coGroup can be changed by specifying a custom coGroup function using the `apply` method or by - * providing a [[RichCoGroupFunction]]. - * - * Example: - * {{{ - * val left = ... - * val right = ... - * val coGroupResult = left.coGroup(right).where(0, 2).isEqualTo(0, 1) { - * (left, right) => new MyCoGroupResult(left.min, right.max) - * } - * }}} - * - * Or, using key selector functions with tuple data types: - * {{{ - * val left = ... - * val right = ... - * val coGroupResult = left.coGroup(right).where({_._1}).isEqualTo({_._1) { - * (left, right) => new MyCoGroupResult(left.max, right.min) - * } - * }}} - * - * @tparam L Type of the left input of the coGroup. - * @tparam R Type of the right input of the coGroup. - */ -class CoGroupDataSet[L, R]( - defaultCoGroup: CoGroupOperator[L, R, (Array[L], Array[R])], - leftInput: DataSet[L], - rightInput: DataSet[R], - leftKeys: Keys[L], - rightKeys: Keys[R]) - extends DataSet(defaultCoGroup) { - - private val groupSortKeyPositionsFirst = mutable.MutableList[Either[Int, String]]() - private val groupSortKeyPositionsSecond = mutable.MutableList[Either[Int, String]]() - private val groupSortOrdersFirst = mutable.MutableList[Order]() - private val groupSortOrdersSecond = mutable.MutableList[Order]() - - private var customPartitioner : Partitioner[_] = _ - - /** - * Creates a new [[DataSet]] where the result for each pair of co-grouped element lists is the - * result of the given function. - */ - def apply[O: TypeInformation: ClassTag]( - fun: (Iterator[L], Iterator[R]) => O): DataSet[O] = { - Validate.notNull(fun, "CoGroup function must not be null.") - val coGrouper = new CoGroupFunction[L, R, O] { - val cleanFun = clean(fun) - def coGroup(left: java.lang.Iterable[L], right: java.lang.Iterable[R], out: Collector[O]) = { - out.collect(cleanFun(left.iterator().asScala, right.iterator().asScala)) - } - } - val coGroupOperator = new CoGroupOperator[L, R, O]( - leftInput.javaSet, - rightInput.javaSet, - leftKeys, - rightKeys, - coGrouper, - implicitly[TypeInformation[O]], - buildGroupSortList(leftInput.getType, groupSortKeyPositionsFirst, groupSortOrdersFirst), - buildGroupSortList(rightInput.getType, groupSortKeyPositionsSecond, groupSortOrdersSecond), - customPartitioner, - getCallLocationName()) - - - wrap(coGroupOperator) - } - - /** - * Creates a new [[DataSet]] where the result for each pair of co-grouped element lists is the - * result of the given function. The function can output zero or more elements using the - * [[Collector]] which will form the result. - */ - def apply[O: TypeInformation: ClassTag]( - fun: (Iterator[L], Iterator[R], Collector[O]) => Unit): DataSet[O] = { - Validate.notNull(fun, "CoGroup function must not be null.") - val coGrouper = new CoGroupFunction[L, R, O] { - val cleanFun = clean(fun) - def coGroup(left: java.lang.Iterable[L], right: java.lang.Iterable[R], out: Collector[O]) = { - cleanFun(left.iterator.asScala, right.iterator.asScala, out) - } - } - val coGroupOperator = new CoGroupOperator[L, R, O]( - leftInput.javaSet, - rightInput.javaSet, - leftKeys, - rightKeys, - coGrouper, - implicitly[TypeInformation[O]], - buildGroupSortList(leftInput.getType, groupSortKeyPositionsFirst, groupSortOrdersFirst), - buildGroupSortList(rightInput.getType, groupSortKeyPositionsSecond, groupSortOrdersSecond), - customPartitioner, - getCallLocationName()) - - wrap(coGroupOperator) - } - - /** - * Creates a new [[DataSet]] by passing each pair of co-grouped element lists to the given - * function. The function can output zero or more elements using the [[Collector]] which will form - * the result. - * - * A [[RichCoGroupFunction]] can be used to access the - * broadcast variables and the [[org.apache.flink.api.common.functions.RuntimeContext]]. - */ - def apply[O: TypeInformation: ClassTag](coGrouper: CoGroupFunction[L, R, O]): DataSet[O] = { - Validate.notNull(coGrouper, "CoGroup function must not be null.") - val coGroupOperator = new CoGroupOperator[L, R, O]( - leftInput.javaSet, - rightInput.javaSet, - leftKeys, - rightKeys, - coGrouper, - implicitly[TypeInformation[O]], - buildGroupSortList(leftInput.getType, groupSortKeyPositionsFirst, groupSortOrdersFirst), - buildGroupSortList(rightInput.getType, groupSortKeyPositionsSecond, groupSortOrdersSecond), - customPartitioner, - getCallLocationName()) - - wrap(coGroupOperator) - } - - // ---------------------------------------------------------------------------------------------- - // Properties - // ---------------------------------------------------------------------------------------------- - - def withPartitioner[K : TypeInformation](partitioner : Partitioner[K]) : CoGroupDataSet[L, R] = { - if (partitioner != null) { - val typeInfo : TypeInformation[K] = implicitly[TypeInformation[K]] - - leftKeys.validateCustomPartitioner(partitioner, typeInfo) - rightKeys.validateCustomPartitioner(partitioner, typeInfo) - } - this.customPartitioner = partitioner - defaultCoGroup.withPartitioner(partitioner) - - this - } - - /** - * Gets the custom partitioner used by this join, or null, if none is set. - */ - def getPartitioner[K]() : Partitioner[K] = { - customPartitioner.asInstanceOf[Partitioner[K]] - } - - /** - * Adds a secondary sort key to the first input of this [[CoGroupDataSet]]. - * - * This only works on Tuple DataSets. - */ - def sortFirstGroup(field: Int, order: Order): CoGroupDataSet[L, R] = { - if (!defaultCoGroup.getInput1Type().isTupleType) { - throw new InvalidProgramException("Specifying order keys via field positions is only valid " + - "for tuple data types.") - } - if (field >= defaultCoGroup.getInput1Type().getArity) { - throw new IllegalArgumentException("Order key out of tuple bounds.") - } - groupSortKeyPositionsFirst += Left(field) - groupSortOrdersFirst += order - this - } - - /** - * Adds a secondary sort key to the first input of this [[CoGroupDataSet]]. - */ - def sortFirstGroup(field: String, order: Order): CoGroupDataSet[L, R] = { - groupSortKeyPositionsFirst += Right(field) - groupSortOrdersFirst += order - this - } - - /** - * Adds a secondary sort key to the second input of this [[CoGroupDataSet]]. - * - * This only works on Tuple DataSets. - */ - def sortSecondGroup(field: Int, order: Order): CoGroupDataSet[L, R] = { - if (!defaultCoGroup.getInput2Type().isTupleType) { - throw new InvalidProgramException("Specifying order keys via field positions is only valid " + - "for tuple data types.") - } - if (field >= defaultCoGroup.getInput2Type().getArity) { - throw new IllegalArgumentException("Order key out of tuple bounds.") - } - groupSortKeyPositionsSecond += Left(field) - groupSortOrdersSecond += order - this - } - - /** - * Adds a secondary sort key to the second input of this [[CoGroupDataSet]]. - */ - def sortSecondGroup(field: String, order: Order): CoGroupDataSet[L, R] = { - groupSortKeyPositionsSecond += Right(field) - groupSortOrdersSecond += order - this - } - - private def buildGroupSortList[T](typeInfo: TypeInformation[T], - keys: mutable.MutableList[Either[Int, String]], - orders: mutable.MutableList[Order]) - : java.util.List[Pair[java.lang.Integer, Order]] = - { - if (keys.isEmpty) { - null - } - else { - val result = new java.util.ArrayList[Pair[java.lang.Integer, Order]] - - keys.zip(orders).foreach { - case ( Left(position), order ) => result.add( - new ImmutablePair[java.lang.Integer, Order](position, order)) - - case ( Right(expression), order ) => { - if (!typeInfo.isInstanceOf[CompositeType[_]]) { - throw new InvalidProgramException("Specifying order keys via field positions is only " - + "valid for composite data types (pojo / tuple / case class)") - } - else { - val ek = new ExpressionKeys[T](Array[String](expression), typeInfo) - val groupOrderKeys : Array[Int] = ek.computeLogicalKeyPositions() - - for (k <- groupOrderKeys) { - result.add(new ImmutablePair[java.lang.Integer, Order](k, order)) - } - } - } - } - - result - } - } -} - -/** - * An unfinished coGroup operation that results from [[DataSet.coGroup]] The keys for the left and - * right side must be specified using first `where` and then `isEqualTo`. For example: - * - * {{{ - * val left = ... - * val right = ... - * val coGroupResult = left.coGroup(right).where(...).isEqualTo(...) - * }}} - * @tparam L The type of the left input of the coGroup. - * @tparam R The type of the right input of the coGroup. - */ -class UnfinishedCoGroupOperation[L: ClassTag, R: ClassTag]( - leftInput: DataSet[L], - rightInput: DataSet[R]) - extends UnfinishedKeyPairOperation[L, R, CoGroupDataSet[L, R]](leftInput, rightInput) { - - private[flink] def finish(leftKey: Keys[L], rightKey: Keys[R]) = { - val coGrouper = new CoGroupFunction[L, R, (Array[L], Array[R])] { - def coGroup( - left: java.lang.Iterable[L], - right: java.lang.Iterable[R], - out: Collector[(Array[L], Array[R])]) = { - val leftResult = Array[Any](left.asScala.toSeq: _*).asInstanceOf[Array[L]] - val rightResult = Array[Any](right.asScala.toSeq: _*).asInstanceOf[Array[R]] - - out.collect((leftResult, rightResult)) - } - } - - // We have to use this hack, for some reason classOf[Array[T]] does not work. - // Maybe because ObjectArrayTypeInfo does not accept the Scala Array as an array class. - val leftArrayType = - ObjectArrayTypeInfo.getInfoFor(new Array[L](0).getClass, leftInput.getType) - val rightArrayType = - ObjectArrayTypeInfo.getInfoFor(new Array[R](0).getClass, rightInput.getType) - - val returnType = new CaseClassTypeInfo[(Array[L], Array[R])]( - classOf[(Array[L], Array[R])], Seq(leftArrayType, rightArrayType), Array("_1", "_2")) { - - override def createSerializer: TypeSerializer[(Array[L], Array[R])] = { - val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity) - for (i <- 0 until getArity) { - fieldSerializers(i) = types(i).createSerializer - } - - new CaseClassSerializer[(Array[L], Array[R])]( - classOf[(Array[L], Array[R])], - fieldSerializers) { - override def createInstance(fields: Array[AnyRef]) = { - (fields(0).asInstanceOf[Array[L]], fields(1).asInstanceOf[Array[R]]) - } - } - } - } - val coGroupOperator = new CoGroupOperator[L, R, (Array[L], Array[R])]( - leftInput.javaSet, rightInput.javaSet, leftKey, rightKey, coGrouper, returnType, - null, // partitioner - getCallLocationName()) - - new CoGroupDataSet(coGroupOperator, leftInput, rightInput, leftKey, rightKey) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/00a978b4/flink-scala/src/main/scala/org/apache/flink/api/scala/crossDataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/crossDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/crossDataSet.scala deleted file mode 100644 index 2e69efa..0000000 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/crossDataSet.scala +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.scala - -import org.apache.commons.lang3.Validate -import org.apache.flink.api.common.functions.{RichCrossFunction, CrossFunction} -import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer} -import org.apache.flink.api.java.operators._ -import org.apache.flink.api.java.{DataSet => JavaDataSet} -import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.util.Collector -import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint - -import scala.reflect.ClassTag - -/** - * A specific [[DataSet]] that results from a `cross` operation. The result of a default cross is a - * tuple containing the two values from the two sides of the cartesian product. The result of the - * cross can be changed by specifying a custom cross function using the `apply` method or by - * providing a [[RichCrossFunction]]. - * - * Example: - * {{{ - * val left = ... - * val right = ... - * val crossResult = left.cross(right) { - * (left, right) => new MyCrossResult(left, right) - * } - * }}} - * - * @tparam L Type of the left input of the cross. - * @tparam R Type of the right input of the cross. - */ -class CrossDataSet[L, R]( - defaultCross: CrossOperator[L, R, (L, R)], - leftInput: DataSet[L], - rightInput: DataSet[R]) - extends DataSet(defaultCross) { - - /** - * Creates a new [[DataSet]] where the result for each pair of elements is the result - * of the given function. - */ - def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O] = { - Validate.notNull(fun, "Cross function must not be null.") - val crosser = new CrossFunction[L, R, O] { - val cleanFun = clean(fun) - def cross(left: L, right: R): O = { - cleanFun(left, right) - } - } - val crossOperator = new CrossOperator[L, R, O]( - leftInput.javaSet, - rightInput.javaSet, - crosser, - implicitly[TypeInformation[O]], - defaultCross.getCrossHint(), - getCallLocationName()) - wrap(crossOperator) - } - - /** - * Creates a new [[DataSet]] by passing each pair of values to the given function. - * The function can output zero or more elements using the [[Collector]] which will form the - * result. - * - * A [[RichCrossFunction]] can be used to access the - * broadcast variables and the [[org.apache.flink.api.common.functions.RuntimeContext]]. - */ - def apply[O: TypeInformation: ClassTag](crosser: CrossFunction[L, R, O]): DataSet[O] = { - Validate.notNull(crosser, "Cross function must not be null.") - val crossOperator = new CrossOperator[L, R, O]( - leftInput.javaSet, - rightInput.javaSet, - crosser, - implicitly[TypeInformation[O]], - defaultCross.getCrossHint(), - getCallLocationName()) - wrap(crossOperator) - } -} - -private[flink] object CrossDataSet { - - /** - * Creates a default cross operation with Tuple2 as result. - */ - def createCrossOperator[L, R]( - leftInput: DataSet[L], - rightInput: DataSet[R], - crossHint: CrossHint) = { - - val crosser = new CrossFunction[L, R, (L, R)] { - def cross(left: L, right: R) = { - (left, right) - } - } - val returnType = new CaseClassTypeInfo[(L, R)]( - classOf[(L, R)], Seq(leftInput.getType, rightInput.getType), Array("_1", "_2")) { - - override def createSerializer: TypeSerializer[(L, R)] = { - val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity) - for (i <- 0 until getArity) { - fieldSerializers(i) = types(i).createSerializer - } - - new CaseClassSerializer[(L, R)](classOf[(L, R)], fieldSerializers) { - override def createInstance(fields: Array[AnyRef]) = { - (fields(0).asInstanceOf[L], fields(1).asInstanceOf[R]) - } - } - } - } - val crossOperator = new CrossOperator[L, R, (L, R)]( - leftInput.javaSet, - rightInput.javaSet, - crosser, - returnType, - crossHint, - getCallLocationName()) - - new CrossDataSet(crossOperator, leftInput, rightInput) - } -}
