This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch WAYANG-28 in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 26af4622521a397fa3ef4a6a680d1ee8d0568a95 Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Thu May 13 22:31:37 2021 -0400 [WAYANG-31] DataQuanta was refactored as abstract class --- .../apache/wayang/api/dataquanta/DataQuanta.scala | 263 ++++--------- .../wayang/api/dataquanta/DataQuantaDefault.scala | 409 +++++++++++++++++++++ .../wayang/api/dataquanta/DataQuantaFactory.scala | 49 +++ .../wayang/api/dataquanta/JoinedDataQuanta.scala | 2 +- .../wayang/api/dataquanta/KeyedDataQuanta.scala | 7 +- .../main/scala/org/apache/wayang/api/package.scala | 10 +- 6 files changed, 536 insertions(+), 204 deletions(-) diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala index a614467..6e464e0 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala @@ -21,7 +21,7 @@ package org.apache.wayang.api.dataquanta import de.hpi.isg.profiledb.store.model.Experiment import org.apache.commons.lang3.Validate -import org.apache.wayang.api.{PlanBuilder, basicDataUnitType, dataSetType, groupedDataSetType, groupedDataUnitType, toConsumer, toSerializableBinaryOperator, toSerializableFlatteningFunction, toSerializableFunction, toSerializablePartitionFunction, toSerializablePredicate} +import org.apache.wayang.api.{PlanBuilder, basicDataUnitType, dataSetType, toConsumer, toSerializableBinaryOperator, toSerializableFlatteningFunction, toSerializableFunction, toSerializablePartitionFunction, toSerializablePredicate} import org.apache.wayang.basic.data.{Tuple2 => WayangTuple2} import org.apache.wayang.basic.function.ProjectionDescriptor import org.apache.wayang.basic.operators._ @@ -35,7 +35,7 @@ import org.apache.wayang.core.platform.Platform import org.apache.wayang.core.util.{Tuple => WayangTuple} import java.lang.{Iterable => JavaIterable} -import java.util.function.{Consumer, IntUnaryOperator, BiFunction => JavaBiFunction, Function => JavaFunction} +import java.util.function.{Consumer, IntUnaryOperator, Function => JavaFunction} import java.util.{Collection => JavaCollection} import scala.collection.JavaConversions import scala.collection.JavaConversions._ @@ -43,12 +43,13 @@ import scala.reflect._ /** * Represents an intermediate result/data flow edge in a [[WayangPlan]]. + * However this is just a template that help to be easy extendable the API * * @param operator a unary [[Operator]] that produces this instance * @param ev$1 the data type of the elements in this instance * @param planBuilder keeps track of the [[WayangPlan]] being build */ -class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: Int = 0)(implicit val planBuilder: PlanBuilder) { +abstract class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: Int = 0)(implicit val planBuilder: PlanBuilder) { Validate.isTrue(operator.getNumOutputs > outputIndex) @@ -67,8 +68,9 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I * @return a new instance representing the [[MapOperator]]'s output */ def map[NewOut: ClassTag](udf: Out => NewOut, - udfLoad: LoadProfileEstimator = null): DataQuanta[NewOut] = + udfLoad: LoadProfileEstimator = null): DataQuanta[NewOut] = { mapJava(toSerializableFunction(udf), udfLoad) + } /** * Feed this instance into a [[MapOperator]]. @@ -78,13 +80,7 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I * @return a new instance representing the [[MapOperator]]'s output */ def mapJava[NewOut: ClassTag](udf: SerializableFunction[Out, NewOut], - udfLoad: LoadProfileEstimator = null): DataQuanta[NewOut] = { - val mapOperator = new MapOperator(new TransformationDescriptor( - udf, basicDataUnitType[Out], basicDataUnitType[NewOut], udfLoad - )) - this.connectTo(mapOperator, 0) - mapOperator - } + udfLoad: LoadProfileEstimator = null): DataQuanta[NewOut] /** * Feed this instance into a [[MapPartitionsOperator]]. @@ -96,8 +92,9 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I */ def mapPartitions[NewOut: ClassTag](udf: Iterable[Out] => Iterable[NewOut], selectivity: ProbabilisticDoubleInterval = null, - udfLoad: LoadProfileEstimator = null): DataQuanta[NewOut] = + udfLoad: LoadProfileEstimator = null): DataQuanta[NewOut] = { mapPartitionsJava(toSerializablePartitionFunction(udf), selectivity, udfLoad) + } /** * Feed this instance into a [[MapPartitionsOperator]]. @@ -109,13 +106,7 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I */ def mapPartitionsJava[NewOut: ClassTag](udf: SerializableFunction[JavaIterable[Out], JavaIterable[NewOut]], selectivity: ProbabilisticDoubleInterval = null, - udfLoad: LoadProfileEstimator = null): DataQuanta[NewOut] = { - val mapOperator = new MapPartitionsOperator( - new MapPartitionsDescriptor(udf, basicDataUnitType[Out], basicDataUnitType[NewOut], selectivity, udfLoad) - ) - this.connectTo(mapOperator, 0) - mapOperator - } + udfLoad: LoadProfileEstimator = null): DataQuanta[NewOut] /** * Feed this instance into a [[MapOperator]] with a [[ProjectionDescriptor]]. @@ -123,13 +114,7 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I * @param fieldNames names of the fields to be projected * @return a new instance representing the [[MapOperator]]'s output */ - def project[NewOut: ClassTag](fieldNames: Seq[String]): DataQuanta[NewOut] = { - val projectionOperator = new MapOperator( - new ProjectionDescriptor(basicDataUnitType[Out], basicDataUnitType[NewOut], fieldNames: _*) - ) - this.connectTo(projectionOperator, 0) - projectionOperator - } + def project[NewOut: ClassTag](fieldNames: Seq[String]): DataQuanta[NewOut] /** * Connects the [[operator]] to a further [[Operator]]. @@ -153,8 +138,9 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I def filter(udf: Out => Boolean, sqlUdf: String = null, selectivity: ProbabilisticDoubleInterval = null, - udfLoad: LoadProfileEstimator = null) = + udfLoad: LoadProfileEstimator = null) = { filterJava(toSerializablePredicate(udf), sqlUdf, selectivity, udfLoad) + } /** * Feed this instance into a [[FilterOperator]]. @@ -168,13 +154,7 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I def filterJava(udf: SerializablePredicate[Out], sqlUdf: String = null, selectivity: ProbabilisticDoubleInterval = null, - udfLoad: LoadProfileEstimator = null): DataQuanta[Out] = { - val filterOperator = new FilterOperator(new PredicateDescriptor( - udf, this.output.getType.getDataUnitType.toBasicDataUnitType, selectivity, udfLoad - ).withSqlImplementation(sqlUdf)) - this.connectTo(filterOperator, 0) - filterOperator - } + udfLoad: LoadProfileEstimator = null): DataQuanta[Out] /** * Feed this instance into a [[FlatMapOperator]]. @@ -186,8 +166,9 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I */ def flatMap[NewOut: ClassTag](udf: Out => Iterable[NewOut], selectivity: ProbabilisticDoubleInterval = null, - udfLoad: LoadProfileEstimator = null): DataQuanta[NewOut] = + udfLoad: LoadProfileEstimator = null): DataQuanta[NewOut] = { flatMapJava(toSerializableFlatteningFunction(udf), selectivity, udfLoad) + } /** * Feed this instance into a [[FlatMapOperator]]. @@ -199,13 +180,7 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I */ def flatMapJava[NewOut: ClassTag](udf: SerializableFunction[Out, JavaIterable[NewOut]], selectivity: ProbabilisticDoubleInterval = null, - udfLoad: LoadProfileEstimator = null): DataQuanta[NewOut] = { - val flatMapOperator = new FlatMapOperator(new FlatMapDescriptor( - udf, basicDataUnitType[Out], basicDataUnitType[NewOut], selectivity, udfLoad - )) - this.connectTo(flatMapOperator, 0) - flatMapOperator - } + udfLoad: LoadProfileEstimator = null): DataQuanta[NewOut] /** * Feed this instance into a [[SampleOperator]]. If this operation is inside of a loop, the sampling size @@ -220,7 +195,7 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I datasetSize: Long = SampleOperator.UNKNOWN_DATASET_SIZE, seed: Option[Long] = None, sampleMethod: SampleOperator.Methods = SampleOperator.Methods.ANY): DataQuanta[Out] = - this.sampleDynamic(_ => sampleSize, datasetSize, seed, sampleMethod) + sampleDynamic(_ => sampleSize, datasetSize, seed, sampleMethod) /** * Feed this instance into a [[SampleOperator]]. If this operation is inside of a loop, the sampling size @@ -234,8 +209,8 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I def sampleDynamic(sampleSizeFunction: Int => Int, datasetSize: Long = SampleOperator.UNKNOWN_DATASET_SIZE, seed: Option[Long] = None, - sampleMethod: SampleOperator.Methods = SampleOperator.Methods.ANY): DataQuanta[Out] = - this.sampleDynamicJava( + sampleMethod: SampleOperator.Methods = SampleOperator.Methods.ANY): DataQuanta[Out] = { + sampleDynamicJava( new IntUnaryOperator { override def applyAsInt(operand: Int): Int = sampleSizeFunction(operand) }, @@ -243,6 +218,7 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I seed, sampleMethod ) + } /** @@ -256,29 +232,7 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I def sampleDynamicJava(sampleSizeFunction: IntUnaryOperator, datasetSize: Long = SampleOperator.UNKNOWN_DATASET_SIZE, seed: Option[Long] = None, - sampleMethod: SampleOperator.Methods = SampleOperator.Methods.ANY): DataQuanta[Out] = { - if (seed.isEmpty) { - val sampleOperator = new SampleOperator( - sampleSizeFunction, - dataSetType[Out], - sampleMethod - ) - sampleOperator.setDatasetSize(datasetSize) - this.connectTo(sampleOperator, 0) - sampleOperator - } - else { - val sampleOperator = new SampleOperator( - sampleSizeFunction, - dataSetType[Out], - sampleMethod, - seed.get - ) - sampleOperator.setDatasetSize(datasetSize) - this.connectTo(sampleOperator, 0) - sampleOperator - } - } + sampleMethod: SampleOperator.Methods = SampleOperator.Methods.ANY): DataQuanta[Out] /** * Assigns this instance a key extractor, which enables some key-based operations. @@ -287,7 +241,10 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I * @param keyExtractor extracts the key from the [[DataQuanta]] * @return the [[KeyedDataQuanta]] */ - def keyBy[Key: ClassTag](keyExtractor: Out => Key) = new KeyedDataQuanta[Out, Key](this, keyExtractor) + //TODO: may need to be build by the extenders + def keyBy[Key: ClassTag](keyExtractor: Out => Key) : KeyedDataQuanta[Out, Key] = { + keyByJava(keyExtractor) + } /** * Assigns this instance a key extractor, which enables some key-based operations. @@ -296,7 +253,8 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I * @param keyExtractor extracts the key from the [[DataQuanta]] * @return the [[KeyedDataQuanta]] */ - def keyByJava[Key: ClassTag](keyExtractor: SerializableFunction[Out, Key]) = new KeyedDataQuanta[Out, Key](this, keyExtractor) + //TODO: may need to be build by the extenders + def keyByJava[Key: ClassTag](keyExtractor: SerializableFunction[Out, Key]) : KeyedDataQuanta[Out, Key] /** * Feed this instance into a [[ReduceByOperator]]. @@ -308,8 +266,9 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I */ def reduceByKey[Key: ClassTag](keyUdf: Out => Key, udf: (Out, Out) => Out, - udfLoad: LoadProfileEstimator = null): DataQuanta[Out] = + udfLoad: LoadProfileEstimator = null): DataQuanta[Out] = { reduceByKeyJava(toSerializableFunction(keyUdf), toSerializableBinaryOperator(udf), udfLoad) + } /** * Feed this instance into a [[ReduceByOperator]]. @@ -321,15 +280,7 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I */ def reduceByKeyJava[Key: ClassTag](keyUdf: SerializableFunction[Out, Key], udf: SerializableBinaryOperator[Out], - udfLoad: LoadProfileEstimator = null) - : DataQuanta[Out] = { - val reduceByOperator = new ReduceByOperator( - new TransformationDescriptor(keyUdf, basicDataUnitType[Out], basicDataUnitType[Key]), - new ReduceDescriptor(udf, groupedDataUnitType[Out], basicDataUnitType[Out], udfLoad) - ) - this.connectTo(reduceByOperator, 0) - reduceByOperator - } + udfLoad: LoadProfileEstimator = null) : DataQuanta[Out] /** * Feed this instance into a [[MaterializedGroupByOperator]]. @@ -350,15 +301,7 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I * @return a new instance representing the [[MaterializedGroupByOperator]]'s output */ def groupByKeyJava[Key: ClassTag](keyUdf: SerializableFunction[Out, Key], - keyUdfLoad: LoadProfileEstimator = null): DataQuanta[java.lang.Iterable[Out]] = { - val groupByOperator = new MaterializedGroupByOperator( - new TransformationDescriptor(keyUdf, basicDataUnitType[Out], basicDataUnitType[Key], keyUdfLoad), - dataSetType[Out], - groupedDataSetType[Out] - ) - this.connectTo(groupByOperator, 0) - groupByOperator - } + keyUdfLoad: LoadProfileEstimator = null): DataQuanta[java.lang.Iterable[Out]] /** * Feed this instance into a [[GlobalReduceOperator]]. @@ -379,24 +322,14 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I * @return a new instance representing the [[GlobalReduceOperator]]'s output */ def reduceJava(udf: SerializableBinaryOperator[Out], - udfLoad: LoadProfileEstimator = null): DataQuanta[Out] = { - val globalReduceOperator = new GlobalReduceOperator( - new ReduceDescriptor(udf, groupedDataUnitType[Out], basicDataUnitType[Out], udfLoad) - ) - this.connectTo(globalReduceOperator, 0) - globalReduceOperator - } + udfLoad: LoadProfileEstimator = null): DataQuanta[Out] /** * Feed this instance into a [[GlobalMaterializedGroupOperator]]. * * @return a new instance representing the [[GlobalMaterializedGroupOperator]]'s output */ - def group(): DataQuanta[java.lang.Iterable[Out]] = { - val groupOperator = new GlobalMaterializedGroupOperator(dataSetType[Out], groupedDataSetType[Out]) - this.connectTo(groupOperator, 0) - groupOperator - } + def group(): DataQuanta[java.lang.Iterable[Out]] /** * Feed this instance and a further instance into a [[UnionAllOperator]]. @@ -404,13 +337,7 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I * @param that the other instance to union with * @return a new instance representing the [[UnionAllOperator]]'s output */ - def union(that: DataQuanta[Out]): DataQuanta[Out] = { - require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.") - val unionAllOperator = new UnionAllOperator(dataSetType[Out]) - this.connectTo(unionAllOperator, 0) - that.connectTo(unionAllOperator, 1) - unionAllOperator - } + def union(that: DataQuanta[Out]): DataQuanta[Out] /** * Feed this instance and a further instance into a [[IntersectOperator]]. @@ -418,13 +345,7 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I * @param that the other instance to intersect with * @return a new instance representing the [[IntersectOperator]]'s output */ - def intersect(that: DataQuanta[Out]): DataQuanta[Out] = { - require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.") - val intersectOperator = new IntersectOperator(dataSetType[Out]) - this.connectTo(intersectOperator, 0) - that.connectTo(intersectOperator, 1) - intersectOperator - } + def intersect(that: DataQuanta[Out]): DataQuanta[Out] /** * Feeds this and a further instance into a [[JoinOperator]]. @@ -434,12 +355,11 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I * @param thatKeyUdf UDF to extract keys from data quanta from `that` instance * @return a new instance representing the [[JoinOperator]]'s output */ - def join[ThatOut: ClassTag, Key: ClassTag] - (thisKeyUdf: Out => Key, - that: DataQuanta[ThatOut], - thatKeyUdf: ThatOut => Key) - : DataQuanta[WayangTuple2[Out, ThatOut]] = + def join[ThatOut: ClassTag, Key: ClassTag] (thisKeyUdf: Out => Key, + that: DataQuanta[ThatOut], + thatKeyUdf: ThatOut => Key): DataQuanta[WayangTuple2[Out, ThatOut]] = { joinJava(toSerializableFunction(thisKeyUdf), that, toSerializableFunction(thatKeyUdf)) + } /** * Feeds this and a further instance into a [[JoinOperator]]. @@ -449,20 +369,9 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I * @param thatKeyUdf UDF to extract keys from data quanta from `that` instance * @return a new instance representing the [[JoinOperator]]'s output */ - def joinJava[ThatOut: ClassTag, Key: ClassTag] - (thisKeyUdf: SerializableFunction[Out, Key], - that: DataQuanta[ThatOut], - thatKeyUdf: SerializableFunction[ThatOut, Key]) - : DataQuanta[WayangTuple2[Out, ThatOut]] = { - require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.") - val joinOperator = new JoinOperator( - new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], basicDataUnitType[Key]), - new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], basicDataUnitType[Key]) - ) - this.connectTo(joinOperator, 0) - that.connectTo(joinOperator, 1) - joinOperator - } + def joinJava[ThatOut: ClassTag, Key: ClassTag] (thisKeyUdf: SerializableFunction[Out, Key], + that: DataQuanta[ThatOut], + thatKeyUdf: SerializableFunction[ThatOut, Key]) : DataQuanta[WayangTuple2[Out, ThatOut]] /** * Feeds this and a further instance into a [[CoGroupOperator]]. @@ -472,12 +381,11 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I * @param thatKeyUdf UDF to extract keys from data quanta from `that` instance * @return a new instance representing the [[CoGroupOperator]]'s output */ - def coGroup[ThatOut: ClassTag, Key: ClassTag] - (thisKeyUdf: Out => Key, - that: DataQuanta[ThatOut], - thatKeyUdf: ThatOut => Key) - : DataQuanta[WayangTuple2[java.lang.Iterable[Out], java.lang.Iterable[ThatOut]]] = + def coGroup[ThatOut: ClassTag, Key: ClassTag] (thisKeyUdf: Out => Key, + that: DataQuanta[ThatOut], + thatKeyUdf: ThatOut => Key): DataQuanta[WayangTuple2[java.lang.Iterable[Out], java.lang.Iterable[ThatOut]]] = { coGroupJava(toSerializableFunction(thisKeyUdf), that, toSerializableFunction(thatKeyUdf)) + } /** * Feeds this and a further instance into a [[CoGroupOperator]]. @@ -487,20 +395,9 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I * @param thatKeyUdf UDF to extract keys from data quanta from `that` instance * @return a new instance representing the [[CoGroupOperator]]'s output */ - def coGroupJava[ThatOut: ClassTag, Key: ClassTag] - (thisKeyUdf: SerializableFunction[Out, Key], - that: DataQuanta[ThatOut], - thatKeyUdf: SerializableFunction[ThatOut, Key]) - : DataQuanta[WayangTuple2[java.lang.Iterable[Out], java.lang.Iterable[ThatOut]]] = { - require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.") - val coGroupOperator = new CoGroupOperator( - new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], basicDataUnitType[Key]), - new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], basicDataUnitType[Key]) - ) - this.connectTo(coGroupOperator, 0) - that.connectTo(coGroupOperator, 1) - coGroupOperator - } + def coGroupJava[ThatOut: ClassTag, Key: ClassTag] (thisKeyUdf: SerializableFunction[Out, Key], + that: DataQuanta[ThatOut], + thatKeyUdf: SerializableFunction[ThatOut, Key]): DataQuanta[WayangTuple2[java.lang.Iterable[Out], java.lang.Iterable[ThatOut]]] /** * Feeds this and a further instance into a [[SortOperator]]. @@ -508,10 +405,9 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I * @param keyUdf UDF to extract key from data quanta in this instance * @return a new instance representing the [[SortOperator]]'s output */ - def sort[Key: ClassTag] - (keyUdf: Out => Key) - : DataQuanta[Out] = + def sort[Key: ClassTag] (keyUdf: Out => Key): DataQuanta[Out] = { sortJava(toSerializableFunction(keyUdf)) + } /** * Feeds this and a further instance into a [[SortOperator]]. @@ -519,16 +415,7 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I * @param keyUdf UDF to extract key from data quanta in this instance * @return a new instance representing the [[SortOperator]]'s output */ - def sortJava[Key: ClassTag] - (keyUdf: SerializableFunction[Out, Key]) - : DataQuanta[Out] = { - val sortOperator = new SortOperator(new TransformationDescriptor( - keyUdf, basicDataUnitType[Out], basicDataUnitType[Key])) - this.connectTo(sortOperator, 0) - sortOperator - } - - + def sortJava[Key: ClassTag] (keyUdf: SerializableFunction[Out, Key]) : DataQuanta[Out] /** * Feeds this and a further instance into a [[CartesianOperator]]. @@ -536,48 +423,28 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I * @param that the other instance * @return a new instance representing the [[CartesianOperator]]'s output */ - def cartesian[ThatOut: ClassTag](that: DataQuanta[ThatOut]) - : DataQuanta[WayangTuple2[Out, ThatOut]] = { - require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.") - val cartesianOperator = new CartesianOperator(dataSetType[Out], dataSetType[ThatOut]) - this.connectTo(cartesianOperator, 0) - that.connectTo(cartesianOperator, 1) - cartesianOperator - } + def cartesian[ThatOut: ClassTag](that: DataQuanta[ThatOut]) : DataQuanta[WayangTuple2[Out, ThatOut]] /** * Feeds this instance into a [[ZipWithIdOperator]]. * * @return a new instance representing the [[ZipWithIdOperator]]'s output */ - def zipWithId: DataQuanta[WayangTuple2[java.lang.Long, Out]] = { - val zipWithIdOperator = new ZipWithIdOperator(dataSetType[Out]) - this.connectTo(zipWithIdOperator, 0) - zipWithIdOperator - } + def zipWithId: DataQuanta[WayangTuple2[java.lang.Long, Out]] /** * Feeds this instance into a [[DistinctOperator]]. * * @return a new instance representing the [[DistinctOperator]]'s output */ - def distinct: DataQuanta[Out] = { - val distinctOperator = new DistinctOperator(dataSetType[Out]) - this.connectTo(distinctOperator, 0) - distinctOperator - } + def distinct: DataQuanta[Out] /** * Feeds this instance into a [[CountOperator]]. * * @return a new instance representing the [[CountOperator]]'s output */ - def count: DataQuanta[java.lang.Long] = { - val countOperator = new CountOperator(dataSetType[Out]) - this.connectTo(countOperator, 0) - countOperator - } - + def count: DataQuanta[java.lang.Long] /** * Feeds this instance into a do-while loop (guarded by a [[DoWhileOperator]]. @@ -625,13 +492,13 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I this.connectTo(doWhileOperator, DoWhileOperator.INITIAL_INPUT_INDEX) // Create and wire the loop body. - val loopDataQuanta = new DataQuanta[Out](doWhileOperator, DoWhileOperator.ITERATION_OUTPUT_INDEX) + val loopDataQuanta = DataQuantaFactory.build[Out](doWhileOperator, DoWhileOperator.ITERATION_OUTPUT_INDEX) val iterationResults = bodyBuilder.apply(loopDataQuanta) iterationResults.getField0.connectTo(doWhileOperator, DoWhileOperator.ITERATION_INPUT_INDEX) iterationResults.getField1.connectTo(doWhileOperator, DoWhileOperator.CONVERGENCE_INPUT_INDEX) // Return the iteration result. - new DataQuanta[Out](doWhileOperator, DoWhileOperator.FINAL_OUTPUT_INDEX) + DataQuantaFactory.build[Out](doWhileOperator, DoWhileOperator.FINAL_OUTPUT_INDEX) } /** @@ -661,12 +528,12 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I this.connectTo(repeatOperator, RepeatOperator.INITIAL_INPUT_INDEX) // Create and wire the loop body. - val loopDataQuanta = new DataQuanta[Out](repeatOperator, RepeatOperator.ITERATION_OUTPUT_INDEX) + val loopDataQuanta = DataQuantaFactory.build[Out](repeatOperator, RepeatOperator.ITERATION_OUTPUT_INDEX) val iterationResult = bodyBuilder.apply(loopDataQuanta) iterationResult.connectTo(repeatOperator, RepeatOperator.ITERATION_INPUT_INDEX) // Return the iteration result. - new DataQuanta[Out](repeatOperator, RepeatOperator.FINAL_OUTPUT_INDEX) + DataQuantaFactory.build[Out](repeatOperator, RepeatOperator.FINAL_OUTPUT_INDEX) } /** @@ -730,6 +597,7 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I * * @param f the action to perform as Java 8 lambda expression */ + //TODO validate if it is the correct way def foreachJava(f: Consumer[Out]): Unit = { val sink = new LocalCallbackSink(f, dataSetType[Out]) sink.setName("foreach()") @@ -744,6 +612,7 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I * * @return the data quanta */ + //TODO validate if it is the correct way def collect(): Iterable[Out] = { // Set up the sink. val collector = new java.util.LinkedList[Out]() @@ -780,6 +649,7 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I * @param formatterUdf UDF to format data quanta to [[String]]s * @param udfLoad optional [[LoadProfileEstimator]] for the `udf` */ + //TODO validate if it is the correct way def writeTextFileJava(url: String, formatterUdf: SerializableFunction[Out, String], udfLoad: LoadProfileEstimator = null): Unit = { @@ -869,7 +739,8 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I object DataQuanta { - def create[T](output: OutputSlot[T])(implicit planBuilder: PlanBuilder): DataQuanta[_] = - new DataQuanta(output.getOwner.asInstanceOf[ElementaryOperator], output.getIndex)(ClassTag(output.getType.getDataUnitType.getTypeClass), planBuilder) + def create[T](output: OutputSlot[T])(implicit planBuilder: PlanBuilder): DataQuanta[_] = { + DataQuantaFactory.build[T](output.getOwner.asInstanceOf[ElementaryOperator], output.getIndex)(ClassTag(output.getType.getDataUnitType.getTypeClass), planBuilder) + } } diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaDefault.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaDefault.scala new file mode 100644 index 0000000..1f79ae7 --- /dev/null +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaDefault.scala @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.wayang.api.dataquanta + +import org.apache.wayang.api.{PlanBuilder, basicDataUnitType, dataSetType, groupedDataSetType, groupedDataUnitType, toConsumer, toSerializableBinaryOperator, toSerializableFlatteningFunction, toSerializableFunction, toSerializablePartitionFunction, toSerializablePredicate} +import org.apache.wayang.basic.data.{Tuple2 => WayangTuple2} +import org.apache.wayang.basic.function.ProjectionDescriptor +import org.apache.wayang.basic.operators._ +import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBinaryOperator, SerializableFunction, SerializablePredicate} +import org.apache.wayang.core.function._ +import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval +import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator +import org.apache.wayang.core.plan.wayangplan._ + +import java.lang +import java.lang.{Iterable => JavaIterable} +import java.util.function.{Consumer, IntUnaryOperator, Function => JavaFunction} +import scala.reflect._ + +/** + * Represents an intermediate result/data flow edge in a [[WayangPlan]]. + * + * @param operator a unary [[Operator]] that produces this instance + * @param ev$1 the data type of the elements in this instance + * @param planBuilder keeps track of the [[WayangPlan]] being build + */ +class DataQuantaDefault[Out: ClassTag] + (override val operator: ElementaryOperator, outputIndex: Int = 0) + (implicit override val planBuilder: PlanBuilder) + extends DataQuanta[Out](operator, outputIndex) { + + /** + * Feed this instance into a [[MapOperator]]. + * + * @param udf a Java 8 lambda expression as UDF for the [[MapOperator]] + * @param udfLoad optional [[LoadProfileEstimator]] for the `udf` + * @return a new instance representing the [[MapOperator]]'s output + */ + override def mapJava[NewOut: ClassTag](udf: SerializableFunction[Out, NewOut], + udfLoad: LoadProfileEstimator = null): DataQuantaDefault[NewOut] = { + val mapOperator = new MapOperator(new TransformationDescriptor( + udf, basicDataUnitType[Out], basicDataUnitType[NewOut], udfLoad + )) + this.connectTo(mapOperator, 0) + DataQuantaDefault.wrap[NewOut](mapOperator) + } + + /** + * Feed this instance into a [[MapPartitionsOperator]]. + * + * @param udf a Java 8 lambda expression as UDF for the [[MapPartitionsOperator]] + * @param selectivity selectivity of the UDF + * @param udfLoad optional [[LoadProfileEstimator]] for the `udf` + * @return a new instance representing the [[MapOperator]]'s output + */ + override def mapPartitionsJava[NewOut: ClassTag](udf: SerializableFunction[JavaIterable[Out], JavaIterable[NewOut]], + selectivity: ProbabilisticDoubleInterval = null, + udfLoad: LoadProfileEstimator = null): DataQuantaDefault[NewOut] = { + val mapOperator = new MapPartitionsOperator( + new MapPartitionsDescriptor(udf, basicDataUnitType[Out], basicDataUnitType[NewOut], selectivity, udfLoad) + ) + this.connectTo(mapOperator, 0) + DataQuantaDefault.wrap[NewOut](mapOperator) + } + + /** + * Feed this instance into a [[MapOperator]] with a [[ProjectionDescriptor]]. + * + * @param fieldNames names of the fields to be projected + * @return a new instance representing the [[MapOperator]]'s output + */ + override def project[NewOut: ClassTag](fieldNames: Seq[String]): DataQuantaDefault[NewOut] = { + val projectionOperator = new MapOperator( + new ProjectionDescriptor(basicDataUnitType[Out], basicDataUnitType[NewOut], fieldNames: _*) + ) + this.connectTo(projectionOperator, 0) + DataQuantaDefault.wrap[NewOut](projectionOperator) + } + + /** + * Feed this instance into a [[FilterOperator]]. + * + * @param udf UDF for the [[FilterOperator]] + * @param sqlUdf UDF as SQL `WHERE` clause + * @param selectivity selectivity of the UDF + * @param udfLoad optional [[LoadProfileEstimator]] for the `udf` + * @return a new instance representing the [[FilterOperator]]'s output + */ + override def filterJava(udf: SerializablePredicate[Out], + sqlUdf: String = null, + selectivity: ProbabilisticDoubleInterval = null, + udfLoad: LoadProfileEstimator = null): DataQuantaDefault[Out] = { + val filterOperator = new FilterOperator(new PredicateDescriptor( + udf, this.output.getType.getDataUnitType.toBasicDataUnitType, selectivity, udfLoad + ).withSqlImplementation(sqlUdf)) + this.connectTo(filterOperator, 0) + DataQuantaDefault.wrap[Out](filterOperator) + } + + /** + * Feed this instance into a [[FlatMapOperator]]. + * + * @param udf a Java 8 lambda expression as UDF for the [[FlatMapOperator]] + * @param selectivity selectivity of the UDF + * @param udfLoad optional [[LoadProfileEstimator]] for the `udf` + * @return a new instance representing the [[FlatMapOperator]]'s output + */ + override def flatMapJava[NewOut: ClassTag](udf: SerializableFunction[Out, JavaIterable[NewOut]], + selectivity: ProbabilisticDoubleInterval = null, + udfLoad: LoadProfileEstimator = null): DataQuantaDefault[NewOut] = { + val flatMapOperator = new FlatMapOperator(new FlatMapDescriptor( + udf, basicDataUnitType[Out], basicDataUnitType[NewOut], selectivity, udfLoad + )) + this.connectTo(flatMapOperator, 0) + DataQuantaDefault.wrap[NewOut](flatMapOperator) + } + + + /** + * Feed this instance into a [[SampleOperator]]. + * + * @param sampleSizeFunction absolute size of the sample as a function of the current iteration number + * @param datasetSize optional size of the dataset to be sampled + * @param sampleMethod the [[SampleOperator.Methods]] to use for sampling + * @return a new instance representing the [[FlatMapOperator]]'s output + */ + override def sampleDynamicJava(sampleSizeFunction: IntUnaryOperator, + datasetSize: Long = SampleOperator.UNKNOWN_DATASET_SIZE, + seed: Option[Long] = None, + sampleMethod: SampleOperator.Methods = SampleOperator.Methods.ANY): DataQuantaDefault[Out] = { + if (seed.isEmpty) { + val sampleOperator = new SampleOperator( + sampleSizeFunction, + dataSetType[Out], + sampleMethod + ) + sampleOperator.setDatasetSize(datasetSize) + this.connectTo(sampleOperator, 0) + DataQuantaDefault.wrap[Out](sampleOperator) + } + else { + val sampleOperator = new SampleOperator( + sampleSizeFunction, + dataSetType[Out], + sampleMethod, + seed.get + ) + sampleOperator.setDatasetSize(datasetSize) + this.connectTo(sampleOperator, 0) + DataQuantaDefault.wrap[Out](sampleOperator) + } + } + + /** + * Assigns this instance a key extractor, which enables some key-based operations. + * + * @see KeyedDataQuanta + * @param keyExtractor extracts the key from the [[DataQuantaDefault]] + * @return the [[KeyedDataQuanta]] + */ + //TODO validate this implementation + override def keyByJava[Key: ClassTag](keyExtractor: SerializableFunction[Out, Key]) : KeyedDataQuanta[Out, Key] = { + new KeyedDataQuanta[Out, Key](this, keyExtractor) + } + + /** + * Feed this instance into a [[ReduceByOperator]]. + * + * @param keyUdf UDF to extract the grouping key from the data quanta + * @param udf aggregation UDF for the [[ReduceByOperator]] + * @param udfLoad optional [[LoadProfileEstimator]] for the `udf` + * @return a new instance representing the [[ReduceByOperator]]'s output + */ + override def reduceByKeyJava[Key: ClassTag](keyUdf: SerializableFunction[Out, Key], + udf: SerializableBinaryOperator[Out], + udfLoad: LoadProfileEstimator = null) + : DataQuantaDefault[Out] = { + val reduceByOperator = new ReduceByOperator( + new TransformationDescriptor(keyUdf, basicDataUnitType[Out], basicDataUnitType[Key]), + new ReduceDescriptor(udf, groupedDataUnitType[Out], basicDataUnitType[Out], udfLoad) + ) + this.connectTo(reduceByOperator, 0) + DataQuantaDefault.wrap[Out](reduceByOperator) + } + + /** + * Feed this instance into a [[MaterializedGroupByOperator]]. + * + * @param keyUdf UDF to extract the grouping key from the data quanta + * @param keyUdfLoad optional [[LoadProfileEstimator]] for the `keyUdf` + * @return a new instance representing the [[MaterializedGroupByOperator]]'s output + */ + override def groupByKeyJava[Key: ClassTag](keyUdf: SerializableFunction[Out, Key], + keyUdfLoad: LoadProfileEstimator = null): DataQuantaDefault[java.lang.Iterable[Out]] = { + val groupByOperator = new MaterializedGroupByOperator( + new TransformationDescriptor(keyUdf, basicDataUnitType[Out], basicDataUnitType[Key], keyUdfLoad), + dataSetType[Out], + groupedDataSetType[Out] + ) + this.connectTo(groupByOperator, 0) + DataQuantaDefault.wrap[java.lang.Iterable[Out]](groupByOperator) + } + + /** + * Feed this instance into a [[GlobalReduceOperator]]. + * + * @param udf aggregation UDF for the [[GlobalReduceOperator]] + * @param udfLoad optional [[LoadProfileEstimator]] for the `udf` + * @return a new instance representing the [[GlobalReduceOperator]]'s output + */ + override def reduceJava(udf: SerializableBinaryOperator[Out], + udfLoad: LoadProfileEstimator = null): DataQuantaDefault[Out] = { + val globalReduceOperator = new GlobalReduceOperator( + new ReduceDescriptor(udf, groupedDataUnitType[Out], basicDataUnitType[Out], udfLoad) + ) + this.connectTo(globalReduceOperator, 0) + DataQuantaDefault.wrap[Out](globalReduceOperator) + } + + /** + * Feeds this and a further instance into a [[JoinOperator]]. + * + * @param thisKeyUdf UDF to extract keys from data quanta in this instance + * @param that the other instance + * @param thatKeyUdf UDF to extract keys from data quanta from `that` instance + * @return a new instance representing the [[JoinOperator]]'s output + */ + override def joinJava[ThatOut: ClassTag, Key: ClassTag](thisKeyUdf: SerializableFunction[Out, Key], that: DataQuanta[ThatOut], thatKeyUdf: SerializableFunction[ThatOut, Key]): DataQuanta[WayangTuple2[Out, ThatOut]] = { + require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.") + val joinOperator = new JoinOperator( + new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], basicDataUnitType[Key]), + new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], basicDataUnitType[Key]) + ) + this.connectTo(joinOperator, 0) + that.connectTo(joinOperator, 1) + DataQuantaDefault.wrap[WayangTuple2[Out, ThatOut]](joinOperator) + } + + /** + * Feeds this and a further instance into a [[CoGroupOperator]]. + * + * @param thisKeyUdf UDF to extract keys from data quanta in this instance + * @param that the other instance + * @param thatKeyUdf UDF to extract keys from data quanta from `that` instance + * @return a new instance representing the [[CoGroupOperator]]'s output + */ + override def coGroupJava[ThatOut: ClassTag, Key: ClassTag](thisKeyUdf: SerializableFunction[Out, Key], that: DataQuanta[ThatOut], thatKeyUdf: SerializableFunction[ThatOut, Key]): DataQuanta[WayangTuple2[JavaIterable[Out], JavaIterable[ThatOut]]] = { + require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.") + val coGroupOperator = new CoGroupOperator( + new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], basicDataUnitType[Key]), + new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], basicDataUnitType[Key]) + ) + this.connectTo(coGroupOperator, 0) + that.connectTo(coGroupOperator, 1) + DataQuantaDefault.wrap[WayangTuple2[java.lang.Iterable[Out], java.lang.Iterable[ThatOut]]](coGroupOperator) + } + + + /** + * Feeds this and a further instance into a [[SortOperator]]. + * + * @param keyUdf UDF to extract key from data quanta in this instance + * @return a new instance representing the [[SortOperator]]'s output + */ + override def sortJava[Key: ClassTag] + (keyUdf: SerializableFunction[Out, Key]) + : DataQuantaDefault[Out] = { + val sortOperator = new SortOperator(new TransformationDescriptor( + keyUdf, basicDataUnitType[Out], basicDataUnitType[Key])) + this.connectTo(sortOperator, 0) + DataQuantaDefault.wrap[Out](sortOperator) + } + + + + /** + * Broadcasts the data quanta in this instance to a further instance. + * + * @param receiver the instance that receives the broadcast + * @param broadcastName the name with that the broadcast will be registered + */ + private def broadcast(receiver: DataQuantaDefault[_], broadcastName: String) = + receiver.registerBroadcast(this.operator, this.outputIndex, broadcastName) + + /** + * Register a further instance as broadcast. + * + * @param sender provides the broadcast data quanta + * @param outputIndex identifies the output index of the sender + * @param broadcastName the name with that the broadcast will be registered + */ + private def registerBroadcast(sender: Operator, outputIndex: Int, broadcastName: String) = + sender.broadcastTo(outputIndex, this.operator, broadcastName) + + /** + * Feed this instance into a [[GlobalMaterializedGroupOperator]]. + * + * @return a new instance representing the [[GlobalMaterializedGroupOperator]]'s output + */ + override def group(): DataQuanta[JavaIterable[Out]] = { + val groupOperator = new GlobalMaterializedGroupOperator(dataSetType[Out], groupedDataSetType[Out]) + this.connectTo(groupOperator, 0) + DataQuantaDefault.wrap[JavaIterable[Out]](groupOperator) + } + + /** + * Feed this instance and a further instance into a [[UnionAllOperator]]. + * + * @param that the other instance to union with + * @return a new instance representing the [[UnionAllOperator]]'s output + */ + override def union(that: DataQuanta[Out]): DataQuanta[Out] = { + require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.") + val unionAllOperator = new UnionAllOperator(dataSetType[Out]) + this.connectTo(unionAllOperator, 0) + that.connectTo(unionAllOperator, 1) + DataQuantaDefault.wrap[Out](unionAllOperator) + } + + /** + * Feed this instance and a further instance into a [[IntersectOperator]]. + * + * @param that the other instance to intersect with + * @return a new instance representing the [[IntersectOperator]]'s output + */ + override def intersect(that: DataQuanta[Out]): DataQuanta[Out] = { + require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.") + val intersectOperator = new IntersectOperator(dataSetType[Out]) + this.connectTo(intersectOperator, 0) + that.connectTo(intersectOperator, 1) + DataQuantaDefault.wrap[Out](intersectOperator) + } + + /** + * Feeds this and a further instance into a [[CartesianOperator]]. + * + * @param that the other instance + * @return a new instance representing the [[CartesianOperator]]'s output + */ + override def cartesian[ThatOut: ClassTag](that: DataQuanta[ThatOut]): DataQuanta[WayangTuple2[Out, ThatOut]] = { + require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.") + val cartesianOperator = new CartesianOperator(dataSetType[Out], dataSetType[ThatOut]) + this.connectTo(cartesianOperator, 0) + that.connectTo(cartesianOperator, 1) + DataQuantaDefault.wrap[WayangTuple2[Out, ThatOut]](cartesianOperator) + } + + /** + * Feeds this instance into a [[ZipWithIdOperator]]. + * + * @return a new instance representing the [[ZipWithIdOperator]]'s output + */ + override def zipWithId: DataQuanta[WayangTuple2[lang.Long, Out]] = { + val zipWithIdOperator = new ZipWithIdOperator(dataSetType[Out]) + this.connectTo(zipWithIdOperator, 0) + DataQuantaDefault.wrap[WayangTuple2[lang.Long, Out]](zipWithIdOperator) + } + + /** + * Feeds this instance into a [[DistinctOperator]]. + * + * @return a new instance representing the [[DistinctOperator]]'s output + */ + override def distinct: DataQuanta[Out] = { + val distinctOperator = new DistinctOperator(dataSetType[Out]) + this.connectTo(distinctOperator, 0) + DataQuantaDefault.wrap[Out](distinctOperator) + } + + /** + * Feeds this instance into a [[CountOperator]]. + * + * @return a new instance representing the [[CountOperator]]'s output + */ + override def count: DataQuanta[lang.Long] = { + val countOperator = new CountOperator(dataSetType[Out]) + this.connectTo(countOperator, 0) + DataQuantaDefault.wrap[lang.Long](countOperator) + } +} + +object DataQuantaDefault { + + def wrap[T:ClassTag](operator: ElementaryOperator, outputIndex: Int = 0)(implicit planBuilder: PlanBuilder): DataQuantaDefault[T] = { + new DataQuantaDefault[T](operator, outputIndex) + } + + def create[T](output: OutputSlot[T])(implicit planBuilder: PlanBuilder): DataQuantaDefault[_] = + new DataQuantaDefault(output.getOwner.asInstanceOf[ElementaryOperator], output.getIndex)(ClassTag(output.getType.getDataUnitType.getTypeClass), planBuilder) + +} + diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaFactory.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaFactory.scala new file mode 100644 index 0000000..aad0e42 --- /dev/null +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaFactory.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.wayang.api.dataquanta + +import org.apache.wayang.api.PlanBuilder +import org.apache.wayang.core.plan.wayangplan.ElementaryOperator + +import scala.reflect.ClassTag + +/** + * Because the [[DataQuanta]] could be implemented by several instances, then is need to have a factory + * that know what is the instance that need to build. To know which is the kind that need to be produced + * the [[DataQuantaFactory]] read a configuration files and create the right instance + */ +object DataQuantaFactory { + + /** + * Given the configuration loaded the [[DataQuantaFactory.build()]] the right extender, if not configuration is + * provided the [[DataQuantaFactory]] will create a [[DataQuantaDefault]] instance + * + * @param operator is the operator that will be wrapped + * @param outputIndex index of the operator that will be used + * @param planBuilder implicit [[PlanBuilder]] + * @tparam T type that is process by the operator + * @return Instance of [[DataQuanta]] depending on the configurations provided + */ + def build[T:ClassTag](operator: ElementaryOperator, outputIndex: Int = 0)(implicit planBuilder: PlanBuilder): DataQuanta[T] = { + //TODO validate if the correct way + DataQuantaDefault.wrap[T](operator, outputIndex) + } + +} diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/JoinedDataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/JoinedDataQuanta.scala index 99768cc..19410cf 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/JoinedDataQuanta.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/JoinedDataQuanta.scala @@ -21,7 +21,7 @@ package org.apache.wayang.api.dataquanta import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator import org.apache.wayang.basic.data.{Tuple2 => WayangTuple2} - +import java.util.function.{ BiFunction => JavaBiFunction } import scala.reflect.ClassTag /** diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/KeyedDataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/KeyedDataQuanta.scala index fb6e3f1..71d909b 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/KeyedDataQuanta.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/KeyedDataQuanta.scala @@ -36,9 +36,9 @@ class KeyedDataQuanta[Out: ClassTag, Key: ClassTag](val dataQuanta: DataQuanta[O * @param that the other [[KeyedDataQuanta]] to join with * @return the join product [[DataQuanta]] */ - def join[ThatOut: ClassTag](that: KeyedDataQuanta[ThatOut, Key]): - DataQuanta[WayangTuple2[Out, ThatOut]] = + def join[ThatOut: ClassTag](that: KeyedDataQuanta[ThatOut, Key]): DataQuanta[WayangTuple2[Out, ThatOut]] = { dataQuanta.joinJava[ThatOut, Key](this.keyExtractor, that.dataQuanta, that.keyExtractor) + } /** * Performs a co-group. The grouping fields are governed by the [[KeyedDataQuanta]]'s keys. @@ -47,7 +47,8 @@ class KeyedDataQuanta[Out: ClassTag, Key: ClassTag](val dataQuanta: DataQuanta[O * @return the co-grouped [[DataQuanta]] */ def coGroup[ThatOut: ClassTag](that: KeyedDataQuanta[ThatOut, Key]): - DataQuanta[WayangTuple2[java.lang.Iterable[Out], java.lang.Iterable[ThatOut]]] = + DataQuanta[WayangTuple2[java.lang.Iterable[Out], java.lang.Iterable[ThatOut]]] = { dataQuanta.coGroupJava[ThatOut, Key](this.keyExtractor, that.dataQuanta, that.keyExtractor) + } } diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala index 7dfefd0..0de9a4a 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala @@ -18,7 +18,7 @@ package org.apache.wayang -import org.apache.wayang.api.dataquanta.{DataQuanta, JoinedDataQuanta} +import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaFactory, JoinedDataQuanta} import _root_.java.lang.{Class => JavaClass, Iterable => JavaIterable} import _root_.java.util.function.{Consumer, ToLongBiFunction, ToLongFunction} @@ -142,10 +142,12 @@ package object api { implicit def createPlanBuilder(wayangContext: WayangContext): PlanBuilder = new PlanBuilder(wayangContext) - implicit private[api] def wrap[Out: ClassTag](op: ElementaryOperator)(implicit planBuilder: PlanBuilder): DataQuanta[Out] = - new DataQuanta[Out](op) + implicit private[api] def wrap[Out: ClassTag](op: ElementaryOperator)(implicit planBuilder: PlanBuilder): DataQuanta[Out] = { + DataQuantaFactory.build[Out](op) + } - implicit def elevateRecordDataQuanta(dataQuanta: DataQuanta[Record]): RecordDataQuanta = + implicit def elevateRecordDataQuanta(dataQuanta: DataQuanta[Record]): RecordDataQuanta = { new RecordDataQuanta(dataQuanta) + } }
