Repository: spark
Updated Branches:
  refs/heads/master 74fb2ecf7 -> 8ca4ecb6a


[SPARK-546] Add full outer join to RDD and DStream.

leftOuterJoin and rightOuterJoin are already implemented.  This patch adds 
fullOuterJoin.

Author: Aaron Staple <[email protected]>

Closes #1395 from staple/SPARK-546 and squashes the following commits:

1f5595c [Aaron Staple] Fix python style
7ac0aa9 [Aaron Staple] [SPARK-546] Add full outer join to RDD and DStream.
3b5d137 [Aaron Staple] In JavaPairDStream, make class tag specification in 
rightOuterJoin consistent with other functions.
31f2956 [Aaron Staple] Fix left outer join documentation comments.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8ca4ecb6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8ca4ecb6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8ca4ecb6

Branch: refs/heads/master
Commit: 8ca4ecb6a56b96bae21b33e27f6abdb53676683a
Parents: 74fb2ec
Author: Aaron Staple <[email protected]>
Authored: Wed Sep 24 20:39:09 2014 -0700
Committer: Patrick Wendell <[email protected]>
Committed: Wed Sep 24 20:39:09 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/api/java/JavaPairRDD.scala | 48 +++++++++++++++++
 .../org/apache/spark/rdd/PairRDDFunctions.scala | 42 +++++++++++++++
 .../org/apache/spark/PartitioningSuite.scala    |  3 ++
 .../spark/rdd/PairRDDFunctionsSuite.scala       | 15 ++++++
 .../scala/org/apache/spark/rdd/RDDSuite.scala   |  1 +
 docs/programming-guide.md                       |  2 +-
 python/pyspark/join.py                          | 16 ++++++
 python/pyspark/rdd.py                           | 25 ++++++++-
 .../streaming/api/java/JavaPairDStream.scala    | 54 ++++++++++++++++++--
 .../dstream/PairDStreamFunctions.scala          | 36 +++++++++++++
 .../spark/streaming/BasicOperationsSuite.scala  | 15 ++++++
 11 files changed, 250 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8ca4ecb6/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 880f61c..0846225 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -470,6 +470,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
   }
 
   /**
+   * Perform a full outer join of `this` and `other`. For each element (k, v) 
in `this`, the
+   * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w 
in `other`, or
+   * the pair (k, (Some(v), None)) if no elements in `other` have key k. 
Similarly, for each
+   * element (k, w) in `other`, the resulting RDD will either contain all pairs
+   * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) 
if no elements
+   * in `this` have key k. Uses the given Partitioner to partition the output 
RDD.
+   */
+  def fullOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
+  : JavaPairRDD[K, (Optional[V], Optional[W])] = {
+    val joinResult = rdd.fullOuterJoin(other, partitioner)
+    fromRDD(joinResult.mapValues{ case (v, w) =>
+      (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
+    })
+  }
+
+  /**
    * Simplified version of combineByKey that hash-partitions the resulting RDD 
using the existing
    * partitioner/parallelism level.
    */
@@ -564,6 +580,38 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
   }
 
   /**
+   * Perform a full outer join of `this` and `other`. For each element (k, v) 
in `this`, the
+   * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w 
in `other`, or
+   * the pair (k, (Some(v), None)) if no elements in `other` have key k. 
Similarly, for each
+   * element (k, w) in `other`, the resulting RDD will either contain all pairs
+   * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) 
if no elements
+   * in `this` have key k. Hash-partitions the resulting RDD using the 
existing partitioner/
+   * parallelism level.
+   */
+  def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], 
Optional[W])] = {
+    val joinResult = rdd.fullOuterJoin(other)
+    fromRDD(joinResult.mapValues{ case (v, w) =>
+      (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
+    })
+  }
+
+  /**
+   * Perform a full outer join of `this` and `other`. For each element (k, v) 
in `this`, the
+   * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w 
in `other`, or
+   * the pair (k, (Some(v), None)) if no elements in `other` have key k. 
Similarly, for each
+   * element (k, w) in `other`, the resulting RDD will either contain all pairs
+   * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) 
if no elements
+   * in `this` have key k. Hash-partitions the resulting RDD into the given 
number of partitions.
+   */
+  def fullOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int)
+  : JavaPairRDD[K, (Optional[V], Optional[W])] = {
+    val joinResult = rdd.fullOuterJoin(other, numPartitions)
+    fromRDD(joinResult.mapValues{ case (v, w) =>
+      (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
+    })
+  }
+
+  /**
    * Return the key-value pairs in this RDD to the master as a Map.
    */
   def collectAsMap(): java.util.Map[K, V] = mapAsJavaMap(rdd.collectAsMap())

http://git-wip-us.apache.org/repos/asf/spark/blob/8ca4ecb6/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 51ba8c2..7f578bc 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -507,6 +507,23 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
   }
 
   /**
+   * Perform a full outer join of `this` and `other`. For each element (k, v) 
in `this`, the
+   * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w 
in `other`, or
+   * the pair (k, (Some(v), None)) if no elements in `other` have key k. 
Similarly, for each
+   * element (k, w) in `other`, the resulting RDD will either contain all pairs
+   * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) 
if no elements
+   * in `this` have key k. Uses the given Partitioner to partition the output 
RDD.
+   */
+  def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
+      : RDD[(K, (Option[V], Option[W]))] = {
+    this.cogroup(other, partitioner).flatMapValues {
+      case (vs, Seq()) => vs.map(v => (Some(v), None))
+      case (Seq(), ws) => ws.map(w => (None, Some(w)))
+      case (vs, ws) => for (v <- vs; w <- ws) yield (Some(v), Some(w))
+    }
+  }
+
+  /**
    * Simplified version of combineByKey that hash-partitions the resulting RDD 
using the
    * existing partitioner/parallelism level.
    */
@@ -586,6 +603,31 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
   }
 
   /**
+   * Perform a full outer join of `this` and `other`. For each element (k, v) 
in `this`, the
+   * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w 
in `other`, or
+   * the pair (k, (Some(v), None)) if no elements in `other` have key k. 
Similarly, for each
+   * element (k, w) in `other`, the resulting RDD will either contain all pairs
+   * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) 
if no elements
+   * in `this` have key k. Hash-partitions the resulting RDD using the 
existing partitioner/
+   * parallelism level.
+   */
+  def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = 
{
+    fullOuterJoin(other, defaultPartitioner(self, other))
+  }
+
+  /**
+   * Perform a full outer join of `this` and `other`. For each element (k, v) 
in `this`, the
+   * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w 
in `other`, or
+   * the pair (k, (Some(v), None)) if no elements in `other` have key k. 
Similarly, for each
+   * element (k, w) in `other`, the resulting RDD will either contain all pairs
+   * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) 
if no elements
+   * in `this` have key k. Hash-partitions the resulting RDD into the given 
number of partitions.
+   */
+  def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, 
(Option[V], Option[W]))] = {
+    fullOuterJoin(other, new HashPartitioner(numPartitions))
+  }
+
+  /**
    * Return the key-value pairs in this RDD to the master as a Map.
    *
    * Warning: this doesn't return a multimap (so if you have multiple values 
to the same key, only

http://git-wip-us.apache.org/repos/asf/spark/blob/8ca4ecb6/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala 
b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index fc0cee3..646ede3 100644
--- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -193,11 +193,13 @@ class PartitioningSuite extends FunSuite with 
SharedSparkContext with PrivateMet
     assert(grouped2.join(grouped4).partitioner === grouped4.partitioner)
     assert(grouped2.leftOuterJoin(grouped4).partitioner === 
grouped4.partitioner)
     assert(grouped2.rightOuterJoin(grouped4).partitioner === 
grouped4.partitioner)
+    assert(grouped2.fullOuterJoin(grouped4).partitioner === 
grouped4.partitioner)
     assert(grouped2.cogroup(grouped4).partitioner === grouped4.partitioner)
 
     assert(grouped2.join(reduced2).partitioner === grouped2.partitioner)
     assert(grouped2.leftOuterJoin(reduced2).partitioner === 
grouped2.partitioner)
     assert(grouped2.rightOuterJoin(reduced2).partitioner === 
grouped2.partitioner)
+    assert(grouped2.fullOuterJoin(reduced2).partitioner === 
grouped2.partitioner)
     assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner)
 
     assert(grouped2.map(_ => 1).partitioner === None)
@@ -218,6 +220,7 @@ class PartitioningSuite extends FunSuite with 
SharedSparkContext with PrivateMet
     assert(intercept[SparkException]{ arrPairs.join(arrPairs) 
}.getMessage.contains("array"))
     assert(intercept[SparkException]{ arrPairs.leftOuterJoin(arrPairs) 
}.getMessage.contains("array"))
     assert(intercept[SparkException]{ arrPairs.rightOuterJoin(arrPairs) 
}.getMessage.contains("array"))
+    assert(intercept[SparkException]{ arrPairs.fullOuterJoin(arrPairs) 
}.getMessage.contains("array"))
     assert(intercept[SparkException]{ arrPairs.groupByKey() 
}.getMessage.contains("array"))
     assert(intercept[SparkException]{ arrPairs.countByKey() 
}.getMessage.contains("array"))
     assert(intercept[SparkException]{ arrPairs.countByKeyApprox(1) 
}.getMessage.contains("array"))

http://git-wip-us.apache.org/repos/asf/spark/blob/8ca4ecb6/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index e84cc69..75b0119 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -298,6 +298,21 @@ class PairRDDFunctionsSuite extends FunSuite with 
SharedSparkContext {
     ))
   }
 
+  test("fullOuterJoin") {
+    val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
+    val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
+    val joined = rdd1.fullOuterJoin(rdd2).collect()
+    assert(joined.size === 6)
+    assert(joined.toSet === Set(
+      (1, (Some(1), Some('x'))),
+      (1, (Some(2), Some('x'))),
+      (2, (Some(1), Some('y'))),
+      (2, (Some(1), Some('z'))),
+      (3, (Some(1), None)),
+      (4, (None, Some('w')))
+    ))
+  }
+
   test("join with no matches") {
     val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
     val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w')))

http://git-wip-us.apache.org/repos/asf/spark/blob/8ca4ecb6/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index c1b501a..465c1a8 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -193,6 +193,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     assert(rdd.join(emptyKv).collect().size === 0)
     assert(rdd.rightOuterJoin(emptyKv).collect().size === 0)
     assert(rdd.leftOuterJoin(emptyKv).collect().size === 2)
+    assert(rdd.fullOuterJoin(emptyKv).collect().size === 2)
     assert(rdd.cogroup(emptyKv).collect().size === 2)
     assert(rdd.union(emptyKv).collect().size === 2)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8ca4ecb6/docs/programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index 01d378a..510b47a 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -906,7 +906,7 @@ for details.
 <tr>
   <td> <b>join</b>(<i>otherDataset</i>, [<i>numTasks</i>]) </td>
   <td> When called on datasets of type (K, V) and (K, W), returns a dataset of 
(K, (V, W)) pairs with all pairs of elements for each key.
-    Outer joins are also supported through <code>leftOuterJoin</code> and 
<code>rightOuterJoin</code>.
+    Outer joins are supported through <code>leftOuterJoin</code>, 
<code>rightOuterJoin</code>, and <code>fullOuterJoin</code>.
   </td>
 </tr>
 <tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/8ca4ecb6/python/pyspark/join.py
----------------------------------------------------------------------
diff --git a/python/pyspark/join.py b/python/pyspark/join.py
index b0f1cc1..b4a8447 100644
--- a/python/pyspark/join.py
+++ b/python/pyspark/join.py
@@ -80,6 +80,22 @@ def python_left_outer_join(rdd, other, numPartitions):
     return _do_python_join(rdd, other, numPartitions, dispatch)
 
 
+def python_full_outer_join(rdd, other, numPartitions):
+    def dispatch(seq):
+        vbuf, wbuf = [], []
+        for (n, v) in seq:
+            if n == 1:
+                vbuf.append(v)
+            elif n == 2:
+                wbuf.append(v)
+        if not vbuf:
+            vbuf.append(None)
+        if not wbuf:
+            wbuf.append(None)
+        return [(v, w) for v in vbuf for w in wbuf]
+    return _do_python_join(rdd, other, numPartitions, dispatch)
+
+
 def python_cogroup(rdds, numPartitions):
     def make_mapper(i):
         return lambda (k, v): (k, (i, v))

http://git-wip-us.apache.org/repos/asf/spark/blob/8ca4ecb6/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 8ef233b..680140d 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -36,7 +36,7 @@ from pyspark.serializers import NoOpSerializer, 
CartesianDeserializer, \
     BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
     PickleSerializer, pack_long, AutoBatchedSerializer
 from pyspark.join import python_join, python_left_outer_join, \
-    python_right_outer_join, python_cogroup
+    python_right_outer_join, python_full_outer_join, python_cogroup
 from pyspark.statcounter import StatCounter
 from pyspark.rddsampler import RDDSampler, RDDStratifiedSampler
 from pyspark.storagelevel import StorageLevel
@@ -1375,7 +1375,7 @@ class RDD(object):
 
         For each element (k, v) in C{self}, the resulting RDD will either
         contain all pairs (k, (v, w)) for w in C{other}, or the pair
-        (k, (v, None)) if no elements in other have key k.
+        (k, (v, None)) if no elements in C{other} have key k.
 
         Hash-partitions the resulting RDD into the given number of partitions.
 
@@ -1403,6 +1403,27 @@ class RDD(object):
         """
         return python_right_outer_join(self, other, numPartitions)
 
+    def fullOuterJoin(self, other, numPartitions=None):
+        """
+        Perform a right outer join of C{self} and C{other}.
+
+        For each element (k, v) in C{self}, the resulting RDD will either
+        contain all pairs (k, (v, w)) for w in C{other}, or the pair
+        (k, (v, None)) if no elements in C{other} have key k.
+
+        Similarly, for each element (k, w) in C{other}, the resulting RDD will
+        either contain all pairs (k, (v, w)) for v in C{self}, or the pair
+        (k, (None, w)) if no elements in C{self} have key k.
+
+        Hash-partitions the resulting RDD into the given number of partitions.
+
+        >>> x = sc.parallelize([("a", 1), ("b", 4)])
+        >>> y = sc.parallelize([("a", 2), ("c", 8)])
+        >>> sorted(x.fullOuterJoin(y).collect())
+        [('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]
+        """
+        return python_full_outer_join(self, other, numPartitions)
+
     # TODO: add option to control map-side combining
     # portable_hash is used as default, because builtin hash of None is 
different
     # cross machines.

http://git-wip-us.apache.org/repos/asf/spark/blob/8ca4ecb6/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index c00e11d..59d4423 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -606,8 +606,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
   }
 
   /**
-   * Return a new DStream by applying 'join' between RDDs of `this` DStream 
and `other` DStream.
-   * The supplied org.apache.spark.Partitioner is used to control the 
partitioning of each RDD.
+   * Return a new DStream by applying 'left outer join' between RDDs of `this` 
DStream and
+   * `other` DStream. The supplied org.apache.spark.Partitioner is used to 
control
+   * the partitioning of each RDD.
    */
   def leftOuterJoin[W](
       other: JavaPairDStream[K, W],
@@ -624,8 +625,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * number of partitions.
    */
   def rightOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, 
(Optional[V], W)] = {
-    implicit val cm: ClassTag[W] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val cm: ClassTag[W] = fakeClassTag
     val joinResult = dstream.rightOuterJoin(other.dstream)
     joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
   }
@@ -659,6 +659,52 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
   }
 
   /**
+   * Return a new DStream by applying 'full outer join' between RDDs of `this` 
DStream and
+   * `other` DStream. Hash partitioning is used to generate the RDDs with 
Spark's default
+   * number of partitions.
+   */
+  def fullOuterJoin[W](other: JavaPairDStream[K, W])
+      : JavaPairDStream[K, (Optional[V], Optional[W])] = {
+    implicit val cm: ClassTag[W] = fakeClassTag
+    val joinResult = dstream.fullOuterJoin(other.dstream)
+    joinResult.mapValues{ case (v, w) =>
+      (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
+    }
+  }
+
+  /**
+   * Return a new DStream by applying 'full outer join' between RDDs of `this` 
DStream and
+   * `other` DStream. Hash partitioning is used to generate the RDDs with 
`numPartitions`
+   * partitions.
+   */
+  def fullOuterJoin[W](
+      other: JavaPairDStream[K, W],
+      numPartitions: Int
+    ): JavaPairDStream[K, (Optional[V], Optional[W])] = {
+    implicit val cm: ClassTag[W] = fakeClassTag
+    val joinResult = dstream.fullOuterJoin(other.dstream, numPartitions)
+    joinResult.mapValues{ case (v, w) =>
+      (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
+    }
+  }
+
+  /**
+   * Return a new DStream by applying 'full outer join' between RDDs of `this` 
DStream and
+   * `other` DStream. The supplied org.apache.spark.Partitioner is used to 
control
+   * the partitioning of each RDD.
+   */
+  def fullOuterJoin[W](
+      other: JavaPairDStream[K, W],
+      partitioner: Partitioner
+    ): JavaPairDStream[K, (Optional[V], Optional[W])] = {
+    implicit val cm: ClassTag[W] = fakeClassTag
+    val joinResult = dstream.fullOuterJoin(other.dstream, partitioner)
+    joinResult.mapValues{ case (v, w) =>
+      (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
+    }
+  }
+
+  /**
    * Save each RDD in `this` DStream as a Hadoop file. The file name at each 
batch interval is
    * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/8ca4ecb6/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 826bf39..9467595 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -569,6 +569,42 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
   }
 
   /**
+   * Return a new DStream by applying 'full outer join' between RDDs of `this` 
DStream and
+   * `other` DStream. Hash partitioning is used to generate the RDDs with 
Spark's default
+   * number of partitions.
+   */
+  def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, 
(Option[V], Option[W]))] = {
+    fullOuterJoin[W](other, defaultPartitioner())
+  }
+
+  /**
+   * Return a new DStream by applying 'full outer join' between RDDs of `this` 
DStream and
+   * `other` DStream. Hash partitioning is used to generate the RDDs with 
`numPartitions`
+   * partitions.
+   */
+  def fullOuterJoin[W: ClassTag](
+      other: DStream[(K, W)],
+      numPartitions: Int
+    ): DStream[(K, (Option[V], Option[W]))] = {
+    fullOuterJoin[W](other, defaultPartitioner(numPartitions))
+  }
+
+  /**
+   * Return a new DStream by applying 'full outer join' between RDDs of `this` 
DStream and
+   * `other` DStream. The supplied org.apache.spark.Partitioner is used to 
control
+   * the partitioning of each RDD.
+   */
+  def fullOuterJoin[W: ClassTag](
+      other: DStream[(K, W)],
+      partitioner: Partitioner
+    ): DStream[(K, (Option[V], Option[W]))] = {
+    self.transformWith(
+      other,
+      (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.fullOuterJoin(rdd2, 
partitioner)
+    )
+  }
+
+  /**
    * Save each RDD in `this` DStream as a Hadoop file. The file name at each 
batch interval
    * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/8ca4ecb6/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 059ac6c..6c8bb50 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -303,6 +303,21 @@ class BasicOperationsSuite extends TestSuiteBase {
     testOperation(inputData1, inputData2, operation, outputData, true)
   }
 
+  test("fullOuterJoin") {
+    val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() )
+    val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("")   )
+    val outputData = Seq(
+      Seq( ("a", (Some(1), Some("x"))), ("b", (Some(1), Some("x"))) ),
+      Seq( ("", (Some(1), Some("x"))), ("a", (Some(1), None)), ("b", (None, 
Some("x"))) ),
+      Seq( ("", (Some(1), None)) ),
+      Seq( ("", (None, Some("x"))) )
+    )
+    val operation = (s1: DStream[String], s2: DStream[String]) => {
+      s1.map(x => (x, 1)).fullOuterJoin(s2.map(x => (x, "x")))
+    }
+    testOperation(inputData1, inputData2, operation, outputData, true)
+  }
+
   test("updateStateByKey") {
     val inputData =
       Seq(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to