This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3df86a2  [SPARK-27147][CORE][TEST] Add SortShuffleWriterSuite
3df86a2 is described below

commit 3df86a2581288ec5211533752f3c5d79dc9ba439
Author: 10087686 <[email protected]>
AuthorDate: Sat May 25 21:03:10 2019 -0700

    [SPARK-27147][CORE][TEST] Add SortShuffleWriterSuite
    
    ## What changes were proposed in this pull request?
    There are no unit test cases for this SortShuffleWriter,so add new test 
cases;
    
    ## How was this patch tested?
    new test cases
    
    Closes #24080 from wangjiaochun/UtestForSortShuffleWriter.
    
    Authored-by: 10087686 <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../shuffle/sort/SortShuffleWriterSuite.scala      | 96 ++++++++++++++++++++++
 1 file changed, 96 insertions(+)

diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala
new file mode 100644
index 0000000..690bcd9
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import org.mockito.Mockito._
+import org.mockito.MockitoAnnotations
+import org.scalatest.Matchers
+
+import org.apache.spark.{Partitioner, SharedSparkContext, ShuffleDependency, 
SparkFunSuite}
+import org.apache.spark.memory.MemoryTestingUtils
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.shuffle.{BaseShuffleHandle, IndexShuffleBlockResolver}
+import org.apache.spark.util.Utils
+
+
+class SortShuffleWriterSuite extends SparkFunSuite with SharedSparkContext 
with Matchers {
+
+  private val shuffleId = 0
+  private val numMaps = 5
+  private var shuffleHandle: BaseShuffleHandle[Int, Int, Int] = _
+  private val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)
+  private val serializer = new JavaSerializer(conf)
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    MockitoAnnotations.initMocks(this)
+    val partitioner = new Partitioner() {
+      def numPartitions = numMaps
+      def getPartition(key: Any) = Utils.nonNegativeMod(key.hashCode, 
numPartitions)
+    }
+    shuffleHandle = {
+      val dependency = mock(classOf[ShuffleDependency[Int, Int, Int]])
+      when(dependency.partitioner).thenReturn(partitioner)
+      when(dependency.serializer).thenReturn(serializer)
+      when(dependency.aggregator).thenReturn(None)
+      when(dependency.keyOrdering).thenReturn(None)
+      new BaseShuffleHandle(shuffleId, numMaps = numMaps, dependency)
+    }
+  }
+
+  override def afterAll(): Unit = {
+    try {
+      shuffleBlockResolver.stop()
+    } finally {
+      super.afterAll()
+    }
+  }
+
+  test("write empty iterator") {
+    val context = MemoryTestingUtils.fakeTaskContext(sc.env)
+    val writer = new SortShuffleWriter[Int, Int, Int](
+      shuffleBlockResolver,
+      shuffleHandle,
+      mapId = 1,
+      context)
+    writer.write(Iterator.empty)
+    writer.stop(success = true)
+    val dataFile = shuffleBlockResolver.getDataFile(shuffleId, 1)
+    val writeMetrics = context.taskMetrics().shuffleWriteMetrics
+    assert(!dataFile.exists())
+    assert(writeMetrics.bytesWritten === 0)
+    assert(writeMetrics.recordsWritten === 0)
+  }
+
+  test("write with some records") {
+    val context = MemoryTestingUtils.fakeTaskContext(sc.env)
+    val records = List[(Int, Int)]((1, 2), (2, 3), (4, 4), (6, 5))
+    val writer = new SortShuffleWriter[Int, Int, Int](
+      shuffleBlockResolver,
+      shuffleHandle,
+      mapId = 2,
+      context)
+    writer.write(records.toIterator)
+    writer.stop(success = true)
+    val dataFile = shuffleBlockResolver.getDataFile(shuffleId, 2)
+    val writeMetrics = context.taskMetrics().shuffleWriteMetrics
+    assert(dataFile.exists())
+    assert(dataFile.length() === writeMetrics.bytesWritten)
+    assert(records.size === writeMetrics.recordsWritten)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to