This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch WAYANG-28
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit 26af4622521a397fa3ef4a6a680d1ee8d0568a95
Author: Bertty Contreras-Rojas <[email protected]>
AuthorDate: Thu May 13 22:31:37 2021 -0400

    [WAYANG-31] DataQuanta was refactored as abstract class
---
 .../apache/wayang/api/dataquanta/DataQuanta.scala  | 263 ++++---------
 .../wayang/api/dataquanta/DataQuantaDefault.scala  | 409 +++++++++++++++++++++
 .../wayang/api/dataquanta/DataQuantaFactory.scala  |  49 +++
 .../wayang/api/dataquanta/JoinedDataQuanta.scala   |   2 +-
 .../wayang/api/dataquanta/KeyedDataQuanta.scala    |   7 +-
 .../main/scala/org/apache/wayang/api/package.scala |  10 +-
 6 files changed, 536 insertions(+), 204 deletions(-)

diff --git 
a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala
 
b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala
index a614467..6e464e0 100644
--- 
a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala
+++ 
b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala
@@ -21,7 +21,7 @@ package org.apache.wayang.api.dataquanta
 
 import de.hpi.isg.profiledb.store.model.Experiment
 import org.apache.commons.lang3.Validate
-import org.apache.wayang.api.{PlanBuilder, basicDataUnitType, dataSetType, 
groupedDataSetType, groupedDataUnitType, toConsumer, 
toSerializableBinaryOperator, toSerializableFlatteningFunction, 
toSerializableFunction, toSerializablePartitionFunction, 
toSerializablePredicate}
+import org.apache.wayang.api.{PlanBuilder, basicDataUnitType, dataSetType, 
toConsumer, toSerializableBinaryOperator, toSerializableFlatteningFunction, 
toSerializableFunction, toSerializablePartitionFunction, 
toSerializablePredicate}
 import org.apache.wayang.basic.data.{Tuple2 => WayangTuple2}
 import org.apache.wayang.basic.function.ProjectionDescriptor
 import org.apache.wayang.basic.operators._
@@ -35,7 +35,7 @@ import org.apache.wayang.core.platform.Platform
 import org.apache.wayang.core.util.{Tuple => WayangTuple}
 
 import java.lang.{Iterable => JavaIterable}
-import java.util.function.{Consumer, IntUnaryOperator, BiFunction => 
JavaBiFunction, Function => JavaFunction}
+import java.util.function.{Consumer, IntUnaryOperator, Function => 
JavaFunction}
 import java.util.{Collection => JavaCollection}
 import scala.collection.JavaConversions
 import scala.collection.JavaConversions._
@@ -43,12 +43,13 @@ import scala.reflect._
 
 /**
   * Represents an intermediate result/data flow edge in a [[WayangPlan]].
+  * However this is just a template that help to be easy extendable the API
   *
   * @param operator    a unary [[Operator]] that produces this instance
   * @param ev$1        the data type of the elements in this instance
   * @param planBuilder keeps track of the [[WayangPlan]] being build
   */
-class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: 
Int = 0)(implicit val planBuilder: PlanBuilder) {
+abstract class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, 
outputIndex: Int = 0)(implicit val planBuilder: PlanBuilder) {
 
   Validate.isTrue(operator.getNumOutputs > outputIndex)
 
@@ -67,8 +68,9 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
     * @return a new instance representing the [[MapOperator]]'s output
     */
   def map[NewOut: ClassTag](udf: Out => NewOut,
-                            udfLoad: LoadProfileEstimator = null): 
DataQuanta[NewOut] =
+                            udfLoad: LoadProfileEstimator = null):  
DataQuanta[NewOut] = {
     mapJava(toSerializableFunction(udf), udfLoad)
+  }
 
   /**
     * Feed this instance into a [[MapOperator]].
@@ -78,13 +80,7 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
     * @return a new instance representing the [[MapOperator]]'s output
     */
   def mapJava[NewOut: ClassTag](udf: SerializableFunction[Out, NewOut],
-                                udfLoad: LoadProfileEstimator = null): 
DataQuanta[NewOut] = {
-    val mapOperator = new MapOperator(new TransformationDescriptor(
-      udf, basicDataUnitType[Out], basicDataUnitType[NewOut], udfLoad
-    ))
-    this.connectTo(mapOperator, 0)
-    mapOperator
-  }
+                                udfLoad: LoadProfileEstimator = null): 
DataQuanta[NewOut]
 
   /**
     * Feed this instance into a [[MapPartitionsOperator]].
@@ -96,8 +92,9 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
     */
   def mapPartitions[NewOut: ClassTag](udf: Iterable[Out] => Iterable[NewOut],
                                       selectivity: ProbabilisticDoubleInterval 
= null,
-                                      udfLoad: LoadProfileEstimator = null): 
DataQuanta[NewOut] =
+                                      udfLoad: LoadProfileEstimator = null): 
DataQuanta[NewOut] = {
     mapPartitionsJava(toSerializablePartitionFunction(udf), selectivity, 
udfLoad)
+  }
 
   /**
     * Feed this instance into a [[MapPartitionsOperator]].
@@ -109,13 +106,7 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
     */
   def mapPartitionsJava[NewOut: ClassTag](udf: 
SerializableFunction[JavaIterable[Out], JavaIterable[NewOut]],
                                           selectivity: 
ProbabilisticDoubleInterval = null,
-                                          udfLoad: LoadProfileEstimator = 
null): DataQuanta[NewOut] = {
-    val mapOperator = new MapPartitionsOperator(
-      new MapPartitionsDescriptor(udf, basicDataUnitType[Out], 
basicDataUnitType[NewOut], selectivity, udfLoad)
-    )
-    this.connectTo(mapOperator, 0)
-    mapOperator
-  }
+                                          udfLoad: LoadProfileEstimator = 
null): DataQuanta[NewOut]
 
   /**
     * Feed this instance into a [[MapOperator]] with a 
[[ProjectionDescriptor]].
@@ -123,13 +114,7 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
     * @param fieldNames names of the fields to be projected
     * @return a new instance representing the [[MapOperator]]'s output
     */
-  def project[NewOut: ClassTag](fieldNames: Seq[String]): DataQuanta[NewOut] = 
{
-    val projectionOperator = new MapOperator(
-      new ProjectionDescriptor(basicDataUnitType[Out], 
basicDataUnitType[NewOut], fieldNames: _*)
-    )
-    this.connectTo(projectionOperator, 0)
-    projectionOperator
-  }
+  def project[NewOut: ClassTag](fieldNames: Seq[String]): DataQuanta[NewOut]
 
   /**
     * Connects the [[operator]] to a further [[Operator]].
@@ -153,8 +138,9 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
   def filter(udf: Out => Boolean,
              sqlUdf: String = null,
              selectivity: ProbabilisticDoubleInterval = null,
-             udfLoad: LoadProfileEstimator = null) =
+             udfLoad: LoadProfileEstimator = null) = {
     filterJava(toSerializablePredicate(udf), sqlUdf, selectivity, udfLoad)
+  }
 
   /**
     * Feed this instance into a [[FilterOperator]].
@@ -168,13 +154,7 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
   def filterJava(udf: SerializablePredicate[Out],
                  sqlUdf: String = null,
                  selectivity: ProbabilisticDoubleInterval = null,
-                 udfLoad: LoadProfileEstimator = null): DataQuanta[Out] = {
-    val filterOperator = new FilterOperator(new PredicateDescriptor(
-      udf, this.output.getType.getDataUnitType.toBasicDataUnitType, 
selectivity, udfLoad
-    ).withSqlImplementation(sqlUdf))
-    this.connectTo(filterOperator, 0)
-    filterOperator
-  }
+                 udfLoad: LoadProfileEstimator = null): DataQuanta[Out]
 
   /**
     * Feed this instance into a [[FlatMapOperator]].
@@ -186,8 +166,9 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
     */
   def flatMap[NewOut: ClassTag](udf: Out => Iterable[NewOut],
                                 selectivity: ProbabilisticDoubleInterval = 
null,
-                                udfLoad: LoadProfileEstimator = null): 
DataQuanta[NewOut] =
+                                udfLoad: LoadProfileEstimator = null): 
DataQuanta[NewOut] = {
     flatMapJava(toSerializableFlatteningFunction(udf), selectivity, udfLoad)
+  }
 
   /**
     * Feed this instance into a [[FlatMapOperator]].
@@ -199,13 +180,7 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
     */
   def flatMapJava[NewOut: ClassTag](udf: SerializableFunction[Out, 
JavaIterable[NewOut]],
                                     selectivity: ProbabilisticDoubleInterval = 
null,
-                                    udfLoad: LoadProfileEstimator = null): 
DataQuanta[NewOut] = {
-    val flatMapOperator = new FlatMapOperator(new FlatMapDescriptor(
-      udf, basicDataUnitType[Out], basicDataUnitType[NewOut], selectivity, 
udfLoad
-    ))
-    this.connectTo(flatMapOperator, 0)
-    flatMapOperator
-  }
+                                    udfLoad: LoadProfileEstimator = null): 
DataQuanta[NewOut]
 
   /**
     * Feed this instance into a [[SampleOperator]]. If this operation is 
inside of a loop, the sampling size
@@ -220,7 +195,7 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
              datasetSize: Long = SampleOperator.UNKNOWN_DATASET_SIZE,
              seed: Option[Long] = None,
              sampleMethod: SampleOperator.Methods = 
SampleOperator.Methods.ANY): DataQuanta[Out] =
-    this.sampleDynamic(_ => sampleSize, datasetSize, seed, sampleMethod)
+    sampleDynamic(_ => sampleSize, datasetSize, seed, sampleMethod)
 
   /**
     * Feed this instance into a [[SampleOperator]]. If this operation is 
inside of a loop, the sampling size
@@ -234,8 +209,8 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
   def sampleDynamic(sampleSizeFunction: Int => Int,
                     datasetSize: Long = SampleOperator.UNKNOWN_DATASET_SIZE,
                     seed: Option[Long] = None,
-                    sampleMethod: SampleOperator.Methods = 
SampleOperator.Methods.ANY): DataQuanta[Out] =
-    this.sampleDynamicJava(
+                    sampleMethod: SampleOperator.Methods = 
SampleOperator.Methods.ANY): DataQuanta[Out] = {
+    sampleDynamicJava(
       new IntUnaryOperator {
         override def applyAsInt(operand: Int): Int = 
sampleSizeFunction(operand)
       },
@@ -243,6 +218,7 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
       seed,
       sampleMethod
     )
+  }
 
 
   /**
@@ -256,29 +232,7 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
   def sampleDynamicJava(sampleSizeFunction: IntUnaryOperator,
                         datasetSize: Long = 
SampleOperator.UNKNOWN_DATASET_SIZE,
                         seed: Option[Long] = None,
-                        sampleMethod: SampleOperator.Methods = 
SampleOperator.Methods.ANY): DataQuanta[Out] = {
-    if (seed.isEmpty) {
-      val sampleOperator = new SampleOperator(
-        sampleSizeFunction,
-        dataSetType[Out],
-        sampleMethod
-      )
-      sampleOperator.setDatasetSize(datasetSize)
-      this.connectTo(sampleOperator, 0)
-      sampleOperator
-    }
-    else {
-      val sampleOperator = new SampleOperator(
-        sampleSizeFunction,
-        dataSetType[Out],
-        sampleMethod,
-        seed.get
-      )
-      sampleOperator.setDatasetSize(datasetSize)
-      this.connectTo(sampleOperator, 0)
-      sampleOperator
-    }
-  }
+                        sampleMethod: SampleOperator.Methods = 
SampleOperator.Methods.ANY): DataQuanta[Out]
 
   /**
     * Assigns this instance a key extractor, which enables some key-based 
operations.
@@ -287,7 +241,10 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
     * @param keyExtractor extracts the key from the [[DataQuanta]]
     * @return the [[KeyedDataQuanta]]
     */
-  def keyBy[Key: ClassTag](keyExtractor: Out => Key) = new 
KeyedDataQuanta[Out, Key](this, keyExtractor)
+  //TODO: may need to be build by the extenders
+  def keyBy[Key: ClassTag](keyExtractor: Out => Key) : KeyedDataQuanta[Out, 
Key] = {
+    keyByJava(keyExtractor)
+  }
 
   /**
     * Assigns this instance a key extractor, which enables some key-based 
operations.
@@ -296,7 +253,8 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
     * @param keyExtractor extracts the key from the [[DataQuanta]]
     * @return the [[KeyedDataQuanta]]
     */
-  def keyByJava[Key: ClassTag](keyExtractor: SerializableFunction[Out, Key]) = 
new KeyedDataQuanta[Out, Key](this, keyExtractor)
+  //TODO: may need to be build by the extenders
+  def keyByJava[Key: ClassTag](keyExtractor: SerializableFunction[Out, Key]) : 
KeyedDataQuanta[Out, Key]
 
   /**
     * Feed this instance into a [[ReduceByOperator]].
@@ -308,8 +266,9 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
     */
   def reduceByKey[Key: ClassTag](keyUdf: Out => Key,
                                  udf: (Out, Out) => Out,
-                                 udfLoad: LoadProfileEstimator = null): 
DataQuanta[Out] =
+                                 udfLoad: LoadProfileEstimator = null): 
DataQuanta[Out] = {
     reduceByKeyJava(toSerializableFunction(keyUdf), 
toSerializableBinaryOperator(udf), udfLoad)
+  }
 
   /**
     * Feed this instance into a [[ReduceByOperator]].
@@ -321,15 +280,7 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
     */
   def reduceByKeyJava[Key: ClassTag](keyUdf: SerializableFunction[Out, Key],
                                      udf: SerializableBinaryOperator[Out],
-                                     udfLoad: LoadProfileEstimator = null)
-  : DataQuanta[Out] = {
-    val reduceByOperator = new ReduceByOperator(
-      new TransformationDescriptor(keyUdf, basicDataUnitType[Out], 
basicDataUnitType[Key]),
-      new ReduceDescriptor(udf, groupedDataUnitType[Out], 
basicDataUnitType[Out], udfLoad)
-    )
-    this.connectTo(reduceByOperator, 0)
-    reduceByOperator
-  }
+                                     udfLoad: LoadProfileEstimator = null) : 
DataQuanta[Out]
 
   /**
     * Feed this instance into a [[MaterializedGroupByOperator]].
@@ -350,15 +301,7 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
     * @return a new instance representing the 
[[MaterializedGroupByOperator]]'s output
     */
   def groupByKeyJava[Key: ClassTag](keyUdf: SerializableFunction[Out, Key],
-                                    keyUdfLoad: LoadProfileEstimator = null): 
DataQuanta[java.lang.Iterable[Out]] = {
-    val groupByOperator = new MaterializedGroupByOperator(
-      new TransformationDescriptor(keyUdf, basicDataUnitType[Out], 
basicDataUnitType[Key], keyUdfLoad),
-      dataSetType[Out],
-      groupedDataSetType[Out]
-    )
-    this.connectTo(groupByOperator, 0)
-    groupByOperator
-  }
+                                    keyUdfLoad: LoadProfileEstimator = null): 
DataQuanta[java.lang.Iterable[Out]]
 
   /**
     * Feed this instance into a [[GlobalReduceOperator]].
@@ -379,24 +322,14 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
     * @return a new instance representing the [[GlobalReduceOperator]]'s output
     */
   def reduceJava(udf: SerializableBinaryOperator[Out],
-                 udfLoad: LoadProfileEstimator = null): DataQuanta[Out] = {
-    val globalReduceOperator = new GlobalReduceOperator(
-      new ReduceDescriptor(udf, groupedDataUnitType[Out], 
basicDataUnitType[Out], udfLoad)
-    )
-    this.connectTo(globalReduceOperator, 0)
-    globalReduceOperator
-  }
+                 udfLoad: LoadProfileEstimator = null): DataQuanta[Out]
 
   /**
     * Feed this instance into a [[GlobalMaterializedGroupOperator]].
     *
     * @return a new instance representing the 
[[GlobalMaterializedGroupOperator]]'s output
     */
-  def group(): DataQuanta[java.lang.Iterable[Out]] = {
-    val groupOperator = new GlobalMaterializedGroupOperator(dataSetType[Out], 
groupedDataSetType[Out])
-    this.connectTo(groupOperator, 0)
-    groupOperator
-  }
+  def group(): DataQuanta[java.lang.Iterable[Out]]
 
   /**
     * Feed this instance and a further instance into a [[UnionAllOperator]].
@@ -404,13 +337,7 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
     * @param that the other instance to union with
     * @return a new instance representing the [[UnionAllOperator]]'s output
     */
-  def union(that: DataQuanta[Out]): DataQuanta[Out] = {
-    require(this.planBuilder eq that.planBuilder, s"$this and $that must use 
the same plan builders.")
-    val unionAllOperator = new UnionAllOperator(dataSetType[Out])
-    this.connectTo(unionAllOperator, 0)
-    that.connectTo(unionAllOperator, 1)
-    unionAllOperator
-  }
+  def union(that: DataQuanta[Out]): DataQuanta[Out]
 
   /**
     * Feed this instance and a further instance into a [[IntersectOperator]].
@@ -418,13 +345,7 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
     * @param that the other instance to intersect with
     * @return a new instance representing the [[IntersectOperator]]'s output
     */
-  def intersect(that: DataQuanta[Out]): DataQuanta[Out] = {
-    require(this.planBuilder eq that.planBuilder, s"$this and $that must use 
the same plan builders.")
-    val intersectOperator = new IntersectOperator(dataSetType[Out])
-    this.connectTo(intersectOperator, 0)
-    that.connectTo(intersectOperator, 1)
-    intersectOperator
-  }
+  def intersect(that: DataQuanta[Out]): DataQuanta[Out]
 
   /**
     * Feeds this and a further instance into a [[JoinOperator]].
@@ -434,12 +355,11 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
     * @param thatKeyUdf UDF to extract keys from data quanta from `that` 
instance
     * @return a new instance representing the [[JoinOperator]]'s output
     */
-  def join[ThatOut: ClassTag, Key: ClassTag]
-  (thisKeyUdf: Out => Key,
-   that: DataQuanta[ThatOut],
-   thatKeyUdf: ThatOut => Key)
-  : DataQuanta[WayangTuple2[Out, ThatOut]] =
+  def join[ThatOut: ClassTag, Key: ClassTag] (thisKeyUdf: Out => Key,
+                                              that: DataQuanta[ThatOut],
+                                              thatKeyUdf: ThatOut => Key): 
DataQuanta[WayangTuple2[Out, ThatOut]] = {
     joinJava(toSerializableFunction(thisKeyUdf), that, 
toSerializableFunction(thatKeyUdf))
+  }
 
   /**
     * Feeds this and a further instance into a [[JoinOperator]].
@@ -449,20 +369,9 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
     * @param thatKeyUdf UDF to extract keys from data quanta from `that` 
instance
     * @return a new instance representing the [[JoinOperator]]'s output
     */
-  def joinJava[ThatOut: ClassTag, Key: ClassTag]
-  (thisKeyUdf: SerializableFunction[Out, Key],
-   that: DataQuanta[ThatOut],
-   thatKeyUdf: SerializableFunction[ThatOut, Key])
-  : DataQuanta[WayangTuple2[Out, ThatOut]] = {
-    require(this.planBuilder eq that.planBuilder, s"$this and $that must use 
the same plan builders.")
-    val joinOperator = new JoinOperator(
-      new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], 
basicDataUnitType[Key]),
-      new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], 
basicDataUnitType[Key])
-    )
-    this.connectTo(joinOperator, 0)
-    that.connectTo(joinOperator, 1)
-    joinOperator
-  }
+  def joinJava[ThatOut: ClassTag, Key: ClassTag] (thisKeyUdf: 
SerializableFunction[Out, Key],
+                                                  that: DataQuanta[ThatOut],
+                                                  thatKeyUdf: 
SerializableFunction[ThatOut, Key]) : DataQuanta[WayangTuple2[Out, ThatOut]]
 
   /**
     * Feeds this and a further instance into a [[CoGroupOperator]].
@@ -472,12 +381,11 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
     * @param thatKeyUdf UDF to extract keys from data quanta from `that` 
instance
     * @return a new instance representing the [[CoGroupOperator]]'s output
     */
-  def coGroup[ThatOut: ClassTag, Key: ClassTag]
-  (thisKeyUdf: Out => Key,
-   that: DataQuanta[ThatOut],
-   thatKeyUdf: ThatOut => Key)
-  : DataQuanta[WayangTuple2[java.lang.Iterable[Out], 
java.lang.Iterable[ThatOut]]] =
+  def coGroup[ThatOut: ClassTag, Key: ClassTag] (thisKeyUdf: Out => Key,
+                                                 that: DataQuanta[ThatOut],
+                                                  thatKeyUdf: ThatOut => Key): 
DataQuanta[WayangTuple2[java.lang.Iterable[Out], java.lang.Iterable[ThatOut]]] 
= {
     coGroupJava(toSerializableFunction(thisKeyUdf), that, 
toSerializableFunction(thatKeyUdf))
+  }
 
   /**
     * Feeds this and a further instance into a [[CoGroupOperator]].
@@ -487,20 +395,9 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
     * @param thatKeyUdf UDF to extract keys from data quanta from `that` 
instance
     * @return a new instance representing the [[CoGroupOperator]]'s output
     */
-  def coGroupJava[ThatOut: ClassTag, Key: ClassTag]
-  (thisKeyUdf: SerializableFunction[Out, Key],
-   that: DataQuanta[ThatOut],
-   thatKeyUdf: SerializableFunction[ThatOut, Key])
-  : DataQuanta[WayangTuple2[java.lang.Iterable[Out], 
java.lang.Iterable[ThatOut]]] = {
-    require(this.planBuilder eq that.planBuilder, s"$this and $that must use 
the same plan builders.")
-    val coGroupOperator = new CoGroupOperator(
-      new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], 
basicDataUnitType[Key]),
-      new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], 
basicDataUnitType[Key])
-    )
-    this.connectTo(coGroupOperator, 0)
-    that.connectTo(coGroupOperator, 1)
-    coGroupOperator
-  }
+  def coGroupJava[ThatOut: ClassTag, Key: ClassTag] (thisKeyUdf: 
SerializableFunction[Out, Key],
+                                                     that: DataQuanta[ThatOut],
+                                                     thatKeyUdf: 
SerializableFunction[ThatOut, Key]): 
DataQuanta[WayangTuple2[java.lang.Iterable[Out], java.lang.Iterable[ThatOut]]]
 
   /**
     * Feeds this and a further instance into a [[SortOperator]].
@@ -508,10 +405,9 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
     * @param keyUdf UDF to extract key from data quanta in this instance
     * @return a new instance representing the [[SortOperator]]'s output
     */
-  def sort[Key: ClassTag]
-  (keyUdf: Out => Key)
-  : DataQuanta[Out] =
+  def sort[Key: ClassTag] (keyUdf: Out => Key): DataQuanta[Out] = {
     sortJava(toSerializableFunction(keyUdf))
+  }
 
   /**
     * Feeds this and a further instance into a [[SortOperator]].
@@ -519,16 +415,7 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
     * @param keyUdf UDF to extract key from data quanta in this instance
     * @return a new instance representing the [[SortOperator]]'s output
     */
-  def sortJava[Key: ClassTag]
-  (keyUdf: SerializableFunction[Out, Key])
-  : DataQuanta[Out] = {
-    val sortOperator = new SortOperator(new TransformationDescriptor(
-      keyUdf, basicDataUnitType[Out], basicDataUnitType[Key]))
-    this.connectTo(sortOperator, 0)
-    sortOperator
-  }
-
-
+  def sortJava[Key: ClassTag] (keyUdf: SerializableFunction[Out, Key]) : 
DataQuanta[Out]
 
   /**
     * Feeds this and a further instance into a [[CartesianOperator]].
@@ -536,48 +423,28 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
     * @param that the other instance
     * @return a new instance representing the [[CartesianOperator]]'s output
     */
-  def cartesian[ThatOut: ClassTag](that: DataQuanta[ThatOut])
-  : DataQuanta[WayangTuple2[Out, ThatOut]] = {
-    require(this.planBuilder eq that.planBuilder, s"$this and $that must use 
the same plan builders.")
-    val cartesianOperator = new CartesianOperator(dataSetType[Out], 
dataSetType[ThatOut])
-    this.connectTo(cartesianOperator, 0)
-    that.connectTo(cartesianOperator, 1)
-    cartesianOperator
-  }
+  def cartesian[ThatOut: ClassTag](that: DataQuanta[ThatOut]) : 
DataQuanta[WayangTuple2[Out, ThatOut]]
 
   /**
     * Feeds this instance into a [[ZipWithIdOperator]].
     *
     * @return a new instance representing the [[ZipWithIdOperator]]'s output
     */
-  def zipWithId: DataQuanta[WayangTuple2[java.lang.Long, Out]] = {
-    val zipWithIdOperator = new ZipWithIdOperator(dataSetType[Out])
-    this.connectTo(zipWithIdOperator, 0)
-    zipWithIdOperator
-  }
+  def zipWithId: DataQuanta[WayangTuple2[java.lang.Long, Out]]
 
   /**
     * Feeds this instance into a [[DistinctOperator]].
     *
     * @return a new instance representing the [[DistinctOperator]]'s output
     */
-  def distinct: DataQuanta[Out] = {
-    val distinctOperator = new DistinctOperator(dataSetType[Out])
-    this.connectTo(distinctOperator, 0)
-    distinctOperator
-  }
+  def distinct: DataQuanta[Out]
 
   /**
     * Feeds this instance into a [[CountOperator]].
     *
     * @return a new instance representing the [[CountOperator]]'s output
     */
-  def count: DataQuanta[java.lang.Long] = {
-    val countOperator = new CountOperator(dataSetType[Out])
-    this.connectTo(countOperator, 0)
-    countOperator
-  }
-
+  def count: DataQuanta[java.lang.Long]
 
   /**
     * Feeds this instance into a do-while loop (guarded by a 
[[DoWhileOperator]].
@@ -625,13 +492,13 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
     this.connectTo(doWhileOperator, DoWhileOperator.INITIAL_INPUT_INDEX)
 
     // Create and wire the loop body.
-    val loopDataQuanta = new DataQuanta[Out](doWhileOperator, 
DoWhileOperator.ITERATION_OUTPUT_INDEX)
+    val loopDataQuanta = DataQuantaFactory.build[Out](doWhileOperator, 
DoWhileOperator.ITERATION_OUTPUT_INDEX)
     val iterationResults = bodyBuilder.apply(loopDataQuanta)
     iterationResults.getField0.connectTo(doWhileOperator, 
DoWhileOperator.ITERATION_INPUT_INDEX)
     iterationResults.getField1.connectTo(doWhileOperator, 
DoWhileOperator.CONVERGENCE_INPUT_INDEX)
 
     // Return the iteration result.
-    new DataQuanta[Out](doWhileOperator, DoWhileOperator.FINAL_OUTPUT_INDEX)
+    DataQuantaFactory.build[Out](doWhileOperator, 
DoWhileOperator.FINAL_OUTPUT_INDEX)
   }
 
   /**
@@ -661,12 +528,12 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
     this.connectTo(repeatOperator, RepeatOperator.INITIAL_INPUT_INDEX)
 
     // Create and wire the loop body.
-    val loopDataQuanta = new DataQuanta[Out](repeatOperator, 
RepeatOperator.ITERATION_OUTPUT_INDEX)
+    val loopDataQuanta = DataQuantaFactory.build[Out](repeatOperator, 
RepeatOperator.ITERATION_OUTPUT_INDEX)
     val iterationResult = bodyBuilder.apply(loopDataQuanta)
     iterationResult.connectTo(repeatOperator, 
RepeatOperator.ITERATION_INPUT_INDEX)
 
     // Return the iteration result.
-    new DataQuanta[Out](repeatOperator, RepeatOperator.FINAL_OUTPUT_INDEX)
+    DataQuantaFactory.build[Out](repeatOperator, 
RepeatOperator.FINAL_OUTPUT_INDEX)
   }
 
   /**
@@ -730,6 +597,7 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
     *
     * @param f the action to perform as Java 8 lambda expression
     */
+  //TODO validate if it is the correct way
   def foreachJava(f: Consumer[Out]): Unit = {
     val sink = new LocalCallbackSink(f, dataSetType[Out])
     sink.setName("foreach()")
@@ -744,6 +612,7 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
     *
     * @return the data quanta
     */
+  //TODO validate if it is the correct way
   def collect(): Iterable[Out] = {
     // Set up the sink.
     val collector = new java.util.LinkedList[Out]()
@@ -780,6 +649,7 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
     * @param formatterUdf UDF to format data quanta to [[String]]s
     * @param udfLoad      optional [[LoadProfileEstimator]] for the `udf`
     */
+  //TODO validate if it is the correct way
   def writeTextFileJava(url: String,
                         formatterUdf: SerializableFunction[Out, String],
                         udfLoad: LoadProfileEstimator = null): Unit = {
@@ -869,7 +739,8 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
 
 object DataQuanta {
 
-  def create[T](output: OutputSlot[T])(implicit planBuilder: PlanBuilder): 
DataQuanta[_] =
-    new DataQuanta(output.getOwner.asInstanceOf[ElementaryOperator], 
output.getIndex)(ClassTag(output.getType.getDataUnitType.getTypeClass), 
planBuilder)
+  def create[T](output: OutputSlot[T])(implicit planBuilder: PlanBuilder): 
DataQuanta[_] = {
+    
DataQuantaFactory.build[T](output.getOwner.asInstanceOf[ElementaryOperator], 
output.getIndex)(ClassTag(output.getType.getDataUnitType.getTypeClass), 
planBuilder)
+  }
 
 }
diff --git 
a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaDefault.scala
 
b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaDefault.scala
new file mode 100644
index 0000000..1f79ae7
--- /dev/null
+++ 
b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaDefault.scala
@@ -0,0 +1,409 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquanta
+
+import org.apache.wayang.api.{PlanBuilder, basicDataUnitType, dataSetType, 
groupedDataSetType, groupedDataUnitType, toConsumer, 
toSerializableBinaryOperator, toSerializableFlatteningFunction, 
toSerializableFunction, toSerializablePartitionFunction, 
toSerializablePredicate}
+import org.apache.wayang.basic.data.{Tuple2 => WayangTuple2}
+import org.apache.wayang.basic.function.ProjectionDescriptor
+import org.apache.wayang.basic.operators._
+import 
org.apache.wayang.core.function.FunctionDescriptor.{SerializableBinaryOperator, 
SerializableFunction, SerializablePredicate}
+import org.apache.wayang.core.function._
+import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
+import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator
+import org.apache.wayang.core.plan.wayangplan._
+
+import java.lang
+import java.lang.{Iterable => JavaIterable}
+import java.util.function.{Consumer, IntUnaryOperator, Function => 
JavaFunction}
+import scala.reflect._
+
+/**
+ * Represents an intermediate result/data flow edge in a [[WayangPlan]].
+ *
+ * @param operator    a unary [[Operator]] that produces this instance
+ * @param ev$1        the data type of the elements in this instance
+ * @param planBuilder keeps track of the [[WayangPlan]] being build
+ */
+class DataQuantaDefault[Out: ClassTag]
+          (override val operator: ElementaryOperator, outputIndex: Int = 0)
+          (implicit override val  planBuilder: PlanBuilder)
+    extends DataQuanta[Out](operator, outputIndex) {
+
+  /**
+   * Feed this instance into a [[MapOperator]].
+   *
+   * @param udf     a Java 8 lambda expression as UDF for the [[MapOperator]]
+   * @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
+   * @return a new instance representing the [[MapOperator]]'s output
+   */
+  override def mapJava[NewOut: ClassTag](udf: SerializableFunction[Out, 
NewOut],
+                                         udfLoad: LoadProfileEstimator = 
null): DataQuantaDefault[NewOut] = {
+    val mapOperator = new MapOperator(new TransformationDescriptor(
+      udf, basicDataUnitType[Out], basicDataUnitType[NewOut], udfLoad
+    ))
+    this.connectTo(mapOperator, 0)
+    DataQuantaDefault.wrap[NewOut](mapOperator)
+  }
+
+  /**
+   * Feed this instance into a [[MapPartitionsOperator]].
+   *
+   * @param udf         a Java 8 lambda expression as UDF for the 
[[MapPartitionsOperator]]
+   * @param selectivity selectivity of the UDF
+   * @param udfLoad     optional [[LoadProfileEstimator]] for the `udf`
+   * @return a new instance representing the [[MapOperator]]'s output
+   */
+  override def mapPartitionsJava[NewOut: ClassTag](udf: 
SerializableFunction[JavaIterable[Out], JavaIterable[NewOut]],
+                                                   selectivity: 
ProbabilisticDoubleInterval = null,
+                                                   udfLoad: 
LoadProfileEstimator = null): DataQuantaDefault[NewOut] = {
+    val mapOperator = new MapPartitionsOperator(
+      new MapPartitionsDescriptor(udf, basicDataUnitType[Out], 
basicDataUnitType[NewOut], selectivity, udfLoad)
+    )
+    this.connectTo(mapOperator, 0)
+    DataQuantaDefault.wrap[NewOut](mapOperator)
+  }
+
+  /**
+   * Feed this instance into a [[MapOperator]] with a [[ProjectionDescriptor]].
+   *
+   * @param fieldNames names of the fields to be projected
+   * @return a new instance representing the [[MapOperator]]'s output
+   */
+  override def project[NewOut: ClassTag](fieldNames: Seq[String]): 
DataQuantaDefault[NewOut] = {
+    val projectionOperator = new MapOperator(
+      new ProjectionDescriptor(basicDataUnitType[Out], 
basicDataUnitType[NewOut], fieldNames: _*)
+    )
+    this.connectTo(projectionOperator, 0)
+    DataQuantaDefault.wrap[NewOut](projectionOperator)
+  }
+
+  /**
+   * Feed this instance into a [[FilterOperator]].
+   *
+   * @param udf         UDF for the [[FilterOperator]]
+   * @param sqlUdf      UDF as SQL `WHERE` clause
+   * @param selectivity selectivity of the UDF
+   * @param udfLoad     optional [[LoadProfileEstimator]] for the `udf`
+   * @return a new instance representing the [[FilterOperator]]'s output
+   */
+  override def filterJava(udf: SerializablePredicate[Out],
+                          sqlUdf: String = null,
+                          selectivity: ProbabilisticDoubleInterval = null,
+                          udfLoad: LoadProfileEstimator = null): 
DataQuantaDefault[Out] = {
+    val filterOperator = new FilterOperator(new PredicateDescriptor(
+      udf, this.output.getType.getDataUnitType.toBasicDataUnitType, 
selectivity, udfLoad
+    ).withSqlImplementation(sqlUdf))
+    this.connectTo(filterOperator, 0)
+    DataQuantaDefault.wrap[Out](filterOperator)
+  }
+
+  /**
+   * Feed this instance into a [[FlatMapOperator]].
+   *
+   * @param udf         a Java 8 lambda expression as UDF for the 
[[FlatMapOperator]]
+   * @param selectivity selectivity of the UDF
+   * @param udfLoad     optional [[LoadProfileEstimator]] for the `udf`
+   * @return a new instance representing the [[FlatMapOperator]]'s output
+   */
+  override def flatMapJava[NewOut: ClassTag](udf: SerializableFunction[Out, 
JavaIterable[NewOut]],
+                                             selectivity: 
ProbabilisticDoubleInterval = null,
+                                             udfLoad: LoadProfileEstimator = 
null): DataQuantaDefault[NewOut] = {
+    val flatMapOperator = new FlatMapOperator(new FlatMapDescriptor(
+      udf, basicDataUnitType[Out], basicDataUnitType[NewOut], selectivity, 
udfLoad
+    ))
+    this.connectTo(flatMapOperator, 0)
+    DataQuantaDefault.wrap[NewOut](flatMapOperator)
+  }
+
+
+  /**
+   * Feed this instance into a [[SampleOperator]].
+   *
+   * @param sampleSizeFunction absolute size of the sample as a function of 
the current iteration number
+   * @param datasetSize        optional size of the dataset to be sampled
+   * @param sampleMethod       the [[SampleOperator.Methods]] to use for 
sampling
+   * @return a new instance representing the [[FlatMapOperator]]'s output
+   */
+  override def sampleDynamicJava(sampleSizeFunction: IntUnaryOperator,
+                                 datasetSize: Long = 
SampleOperator.UNKNOWN_DATASET_SIZE,
+                                 seed: Option[Long] = None,
+                                 sampleMethod: SampleOperator.Methods = 
SampleOperator.Methods.ANY): DataQuantaDefault[Out] = {
+    if (seed.isEmpty) {
+      val sampleOperator = new SampleOperator(
+        sampleSizeFunction,
+        dataSetType[Out],
+        sampleMethod
+      )
+      sampleOperator.setDatasetSize(datasetSize)
+      this.connectTo(sampleOperator, 0)
+      DataQuantaDefault.wrap[Out](sampleOperator)
+    }
+    else {
+      val sampleOperator = new SampleOperator(
+        sampleSizeFunction,
+        dataSetType[Out],
+        sampleMethod,
+        seed.get
+      )
+      sampleOperator.setDatasetSize(datasetSize)
+      this.connectTo(sampleOperator, 0)
+      DataQuantaDefault.wrap[Out](sampleOperator)
+    }
+  }
+
+  /**
+   * Assigns this instance a key extractor, which enables some key-based 
operations.
+   *
+   * @see KeyedDataQuanta
+   * @param keyExtractor extracts the key from the [[DataQuantaDefault]]
+   * @return the [[KeyedDataQuanta]]
+   */
+  //TODO validate this implementation
+  override def keyByJava[Key: ClassTag](keyExtractor: 
SerializableFunction[Out, Key]) : KeyedDataQuanta[Out, Key] = {
+    new KeyedDataQuanta[Out, Key](this, keyExtractor)
+  }
+
+  /**
+   * Feed this instance into a [[ReduceByOperator]].
+   *
+   * @param keyUdf  UDF to extract the grouping key from the data quanta
+   * @param udf     aggregation UDF for the [[ReduceByOperator]]
+   * @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
+   * @return a new instance representing the [[ReduceByOperator]]'s output
+   */
+  override def reduceByKeyJava[Key: ClassTag](keyUdf: 
SerializableFunction[Out, Key],
+                                              udf: 
SerializableBinaryOperator[Out],
+                                              udfLoad: LoadProfileEstimator = 
null)
+  : DataQuantaDefault[Out] = {
+    val reduceByOperator = new ReduceByOperator(
+      new TransformationDescriptor(keyUdf, basicDataUnitType[Out], 
basicDataUnitType[Key]),
+      new ReduceDescriptor(udf, groupedDataUnitType[Out], 
basicDataUnitType[Out], udfLoad)
+    )
+    this.connectTo(reduceByOperator, 0)
+    DataQuantaDefault.wrap[Out](reduceByOperator)
+  }
+
+  /**
+   * Feed this instance into a [[MaterializedGroupByOperator]].
+   *
+   * @param keyUdf     UDF to extract the grouping key from the data quanta
+   * @param keyUdfLoad optional [[LoadProfileEstimator]] for the `keyUdf`
+   * @return a new instance representing the [[MaterializedGroupByOperator]]'s 
output
+   */
+  override def groupByKeyJava[Key: ClassTag](keyUdf: SerializableFunction[Out, 
Key],
+                                             keyUdfLoad: LoadProfileEstimator 
= null): DataQuantaDefault[java.lang.Iterable[Out]] = {
+    val groupByOperator = new MaterializedGroupByOperator(
+      new TransformationDescriptor(keyUdf, basicDataUnitType[Out], 
basicDataUnitType[Key], keyUdfLoad),
+      dataSetType[Out],
+      groupedDataSetType[Out]
+    )
+    this.connectTo(groupByOperator, 0)
+    DataQuantaDefault.wrap[java.lang.Iterable[Out]](groupByOperator)
+  }
+
+  /**
+   * Feed this instance into a [[GlobalReduceOperator]].
+   *
+   * @param udf     aggregation UDF for the [[GlobalReduceOperator]]
+   * @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
+   * @return a new instance representing the [[GlobalReduceOperator]]'s output
+   */
+  override def reduceJava(udf: SerializableBinaryOperator[Out],
+                          udfLoad: LoadProfileEstimator = null): 
DataQuantaDefault[Out] = {
+    val globalReduceOperator = new GlobalReduceOperator(
+      new ReduceDescriptor(udf, groupedDataUnitType[Out], 
basicDataUnitType[Out], udfLoad)
+    )
+    this.connectTo(globalReduceOperator, 0)
+    DataQuantaDefault.wrap[Out](globalReduceOperator)
+  }
+
+  /**
+   * Feeds this and a further instance into a [[JoinOperator]].
+   *
+   * @param thisKeyUdf UDF to extract keys from data quanta in this instance
+   * @param that       the other instance
+   * @param thatKeyUdf UDF to extract keys from data quanta from `that` 
instance
+   * @return a new instance representing the [[JoinOperator]]'s output
+   */
+  override def joinJava[ThatOut: ClassTag, Key: ClassTag](thisKeyUdf: 
SerializableFunction[Out, Key], that: DataQuanta[ThatOut], thatKeyUdf: 
SerializableFunction[ThatOut, Key]): DataQuanta[WayangTuple2[Out, ThatOut]] = {
+    require(this.planBuilder eq that.planBuilder, s"$this and $that must use 
the same plan builders.")
+    val joinOperator = new JoinOperator(
+      new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], 
basicDataUnitType[Key]),
+      new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], 
basicDataUnitType[Key])
+    )
+    this.connectTo(joinOperator, 0)
+    that.connectTo(joinOperator, 1)
+    DataQuantaDefault.wrap[WayangTuple2[Out, ThatOut]](joinOperator)
+  }
+
+  /**
+   * Feeds this and a further instance into a [[CoGroupOperator]].
+   *
+   * @param thisKeyUdf UDF to extract keys from data quanta in this instance
+   * @param that       the other instance
+   * @param thatKeyUdf UDF to extract keys from data quanta from `that` 
instance
+   * @return a new instance representing the [[CoGroupOperator]]'s output
+   */
+  override def coGroupJava[ThatOut: ClassTag, Key: ClassTag](thisKeyUdf: 
SerializableFunction[Out, Key], that: DataQuanta[ThatOut], thatKeyUdf: 
SerializableFunction[ThatOut, Key]): DataQuanta[WayangTuple2[JavaIterable[Out], 
JavaIterable[ThatOut]]] = {
+    require(this.planBuilder eq that.planBuilder, s"$this and $that must use 
the same plan builders.")
+    val coGroupOperator = new CoGroupOperator(
+      new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], 
basicDataUnitType[Key]),
+      new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], 
basicDataUnitType[Key])
+    )
+    this.connectTo(coGroupOperator, 0)
+    that.connectTo(coGroupOperator, 1)
+    DataQuantaDefault.wrap[WayangTuple2[java.lang.Iterable[Out], 
java.lang.Iterable[ThatOut]]](coGroupOperator)
+  }
+
+
+  /**
+   * Feeds this and a further instance into a [[SortOperator]].
+   *
+   * @param keyUdf UDF to extract key from data quanta in this instance
+   * @return a new instance representing the [[SortOperator]]'s output
+   */
+  override def sortJava[Key: ClassTag]
+  (keyUdf: SerializableFunction[Out, Key])
+  : DataQuantaDefault[Out] = {
+    val sortOperator = new SortOperator(new TransformationDescriptor(
+      keyUdf, basicDataUnitType[Out], basicDataUnitType[Key]))
+    this.connectTo(sortOperator, 0)
+    DataQuantaDefault.wrap[Out](sortOperator)
+  }
+
+
+
+  /**
+   * Broadcasts the data quanta in this instance to a further instance.
+   *
+   * @param receiver      the instance that receives the broadcast
+   * @param broadcastName the name with that the broadcast will be registered
+   */
+  private def broadcast(receiver: DataQuantaDefault[_], broadcastName: String) 
=
+    receiver.registerBroadcast(this.operator, this.outputIndex, broadcastName)
+
+  /**
+   * Register a further instance as broadcast.
+   *
+   * @param sender        provides the broadcast data quanta
+   * @param outputIndex   identifies the output index of the sender
+   * @param broadcastName the name with that the broadcast will be registered
+   */
+  private def registerBroadcast(sender: Operator, outputIndex: Int, 
broadcastName: String) =
+    sender.broadcastTo(outputIndex, this.operator, broadcastName)
+
+  /**
+   * Feed this instance into a [[GlobalMaterializedGroupOperator]].
+   *
+   * @return a new instance representing the 
[[GlobalMaterializedGroupOperator]]'s output
+   */
+  override def group(): DataQuanta[JavaIterable[Out]] = {
+    val groupOperator = new GlobalMaterializedGroupOperator(dataSetType[Out], 
groupedDataSetType[Out])
+    this.connectTo(groupOperator, 0)
+    DataQuantaDefault.wrap[JavaIterable[Out]](groupOperator)
+  }
+
+  /**
+   * Feed this instance and a further instance into a [[UnionAllOperator]].
+   *
+   * @param that the other instance to union with
+   * @return a new instance representing the [[UnionAllOperator]]'s output
+   */
+  override def union(that: DataQuanta[Out]): DataQuanta[Out] = {
+    require(this.planBuilder eq that.planBuilder, s"$this and $that must use 
the same plan builders.")
+    val unionAllOperator = new UnionAllOperator(dataSetType[Out])
+    this.connectTo(unionAllOperator, 0)
+    that.connectTo(unionAllOperator, 1)
+    DataQuantaDefault.wrap[Out](unionAllOperator)
+  }
+
+  /**
+   * Feed this instance and a further instance into a [[IntersectOperator]].
+   *
+   * @param that the other instance to intersect with
+   * @return a new instance representing the [[IntersectOperator]]'s output
+   */
+  override def intersect(that: DataQuanta[Out]): DataQuanta[Out] =  {
+    require(this.planBuilder eq that.planBuilder, s"$this and $that must use 
the same plan builders.")
+    val intersectOperator = new IntersectOperator(dataSetType[Out])
+    this.connectTo(intersectOperator, 0)
+    that.connectTo(intersectOperator, 1)
+    DataQuantaDefault.wrap[Out](intersectOperator)
+  }
+
+  /**
+   * Feeds this and a further instance into a [[CartesianOperator]].
+   *
+   * @param that the other instance
+   * @return a new instance representing the [[CartesianOperator]]'s output
+   */
+  override def cartesian[ThatOut: ClassTag](that: DataQuanta[ThatOut]): 
DataQuanta[WayangTuple2[Out, ThatOut]] = {
+    require(this.planBuilder eq that.planBuilder, s"$this and $that must use 
the same plan builders.")
+    val cartesianOperator = new CartesianOperator(dataSetType[Out], 
dataSetType[ThatOut])
+    this.connectTo(cartesianOperator, 0)
+    that.connectTo(cartesianOperator, 1)
+    DataQuantaDefault.wrap[WayangTuple2[Out, ThatOut]](cartesianOperator)
+  }
+
+  /**
+   * Feeds this instance into a [[ZipWithIdOperator]].
+   *
+   * @return a new instance representing the [[ZipWithIdOperator]]'s output
+   */
+  override def zipWithId: DataQuanta[WayangTuple2[lang.Long, Out]] = {
+    val zipWithIdOperator = new ZipWithIdOperator(dataSetType[Out])
+    this.connectTo(zipWithIdOperator, 0)
+    DataQuantaDefault.wrap[WayangTuple2[lang.Long, Out]](zipWithIdOperator)
+  }
+
+  /**
+   * Feeds this instance into a [[DistinctOperator]].
+   *
+   * @return a new instance representing the [[DistinctOperator]]'s output
+   */
+  override def distinct: DataQuanta[Out] = {
+    val distinctOperator = new DistinctOperator(dataSetType[Out])
+    this.connectTo(distinctOperator, 0)
+    DataQuantaDefault.wrap[Out](distinctOperator)
+  }
+
+  /**
+   * Feeds this instance into a [[CountOperator]].
+   *
+   * @return a new instance representing the [[CountOperator]]'s output
+   */
+  override def count: DataQuanta[lang.Long] = {
+    val countOperator = new CountOperator(dataSetType[Out])
+    this.connectTo(countOperator, 0)
+    DataQuantaDefault.wrap[lang.Long](countOperator)
+  }
+}
+
+object DataQuantaDefault {
+
+  def wrap[T:ClassTag](operator: ElementaryOperator, outputIndex: Int = 
0)(implicit planBuilder: PlanBuilder): DataQuantaDefault[T] = {
+    new DataQuantaDefault[T](operator, outputIndex)
+  }
+
+  def create[T](output: OutputSlot[T])(implicit planBuilder: PlanBuilder): 
DataQuantaDefault[_] =
+    new DataQuantaDefault(output.getOwner.asInstanceOf[ElementaryOperator], 
output.getIndex)(ClassTag(output.getType.getDataUnitType.getTypeClass), 
planBuilder)
+
+}
+
diff --git 
a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaFactory.scala
 
b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaFactory.scala
new file mode 100644
index 0000000..aad0e42
--- /dev/null
+++ 
b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaFactory.scala
@@ -0,0 +1,49 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquanta
+
+import org.apache.wayang.api.PlanBuilder
+import org.apache.wayang.core.plan.wayangplan.ElementaryOperator
+
+import scala.reflect.ClassTag
+
+/**
+ * Because the [[DataQuanta]] could be implemented by several instances, then 
is need to have a factory
+ * that know what is the instance that need to build. To know which is the 
kind that need to be produced
+ * the [[DataQuantaFactory]] read a configuration files and create the right 
instance
+ */
+object DataQuantaFactory {
+
+  /**
+   * Given the configuration loaded the [[DataQuantaFactory.build()]] the 
right extender, if not configuration is
+   * provided the [[DataQuantaFactory]] will create a [[DataQuantaDefault]] 
instance
+   *
+   * @param operator is the operator that will be wrapped
+   * @param outputIndex index of the operator that will be used
+   * @param planBuilder implicit [[PlanBuilder]]
+   * @tparam T type that is process by the operator
+   * @return Instance of [[DataQuanta]] depending on the configurations 
provided
+   */
+  def build[T:ClassTag](operator: ElementaryOperator, outputIndex: Int = 
0)(implicit planBuilder: PlanBuilder): DataQuanta[T] = {
+    //TODO validate if the correct way
+    DataQuantaDefault.wrap[T](operator, outputIndex)
+  }
+
+}
diff --git 
a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/JoinedDataQuanta.scala
 
b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/JoinedDataQuanta.scala
index 99768cc..19410cf 100644
--- 
a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/JoinedDataQuanta.scala
+++ 
b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/JoinedDataQuanta.scala
@@ -21,7 +21,7 @@ package org.apache.wayang.api.dataquanta
 
 import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator
 import org.apache.wayang.basic.data.{Tuple2 => WayangTuple2}
-
+import java.util.function.{ BiFunction => JavaBiFunction }
 import scala.reflect.ClassTag
 
 /**
diff --git 
a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/KeyedDataQuanta.scala
 
b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/KeyedDataQuanta.scala
index fb6e3f1..71d909b 100644
--- 
a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/KeyedDataQuanta.scala
+++ 
b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/KeyedDataQuanta.scala
@@ -36,9 +36,9 @@ class KeyedDataQuanta[Out: ClassTag, Key: ClassTag](val 
dataQuanta: DataQuanta[O
    * @param that the other [[KeyedDataQuanta]] to join with
    * @return the join product [[DataQuanta]]
    */
-  def join[ThatOut: ClassTag](that: KeyedDataQuanta[ThatOut, Key]):
-  DataQuanta[WayangTuple2[Out, ThatOut]] =
+  def join[ThatOut: ClassTag](that: KeyedDataQuanta[ThatOut, Key]): 
DataQuanta[WayangTuple2[Out, ThatOut]] = {
     dataQuanta.joinJava[ThatOut, Key](this.keyExtractor, that.dataQuanta, 
that.keyExtractor)
+  }
 
   /**
    * Performs a co-group. The grouping fields are governed by the 
[[KeyedDataQuanta]]'s keys.
@@ -47,7 +47,8 @@ class KeyedDataQuanta[Out: ClassTag, Key: ClassTag](val 
dataQuanta: DataQuanta[O
    * @return the co-grouped [[DataQuanta]]
    */
   def coGroup[ThatOut: ClassTag](that: KeyedDataQuanta[ThatOut, Key]):
-  DataQuanta[WayangTuple2[java.lang.Iterable[Out], 
java.lang.Iterable[ThatOut]]] =
+  DataQuanta[WayangTuple2[java.lang.Iterable[Out], 
java.lang.Iterable[ThatOut]]] = {
     dataQuanta.coGroupJava[ThatOut, Key](this.keyExtractor, that.dataQuanta, 
that.keyExtractor)
+  }
 
 }
diff --git 
a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala
 
b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala
index 7dfefd0..0de9a4a 100644
--- 
a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala
+++ 
b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala
@@ -18,7 +18,7 @@
 
 package org.apache.wayang
 
-import org.apache.wayang.api.dataquanta.{DataQuanta, JoinedDataQuanta}
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaFactory, 
JoinedDataQuanta}
 
 import _root_.java.lang.{Class => JavaClass, Iterable => JavaIterable}
 import _root_.java.util.function.{Consumer, ToLongBiFunction, ToLongFunction}
@@ -142,10 +142,12 @@ package object api {
 
   implicit def createPlanBuilder(wayangContext: WayangContext): PlanBuilder = 
new PlanBuilder(wayangContext)
 
-  implicit private[api] def wrap[Out: ClassTag](op: 
ElementaryOperator)(implicit planBuilder: PlanBuilder): DataQuanta[Out] =
-    new DataQuanta[Out](op)
+  implicit private[api] def wrap[Out: ClassTag](op: 
ElementaryOperator)(implicit planBuilder: PlanBuilder): DataQuanta[Out] = {
+    DataQuantaFactory.build[Out](op)
+  }
 
-  implicit def elevateRecordDataQuanta(dataQuanta: DataQuanta[Record]): 
RecordDataQuanta =
+  implicit def elevateRecordDataQuanta(dataQuanta: DataQuanta[Record]): 
RecordDataQuanta = {
     new RecordDataQuanta(dataQuanta)
+  }
 
 }

Reply via email to