This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3df86a2 [SPARK-27147][CORE][TEST] Add SortShuffleWriterSuite
3df86a2 is described below
commit 3df86a2581288ec5211533752f3c5d79dc9ba439
Author: 10087686 <[email protected]>
AuthorDate: Sat May 25 21:03:10 2019 -0700
[SPARK-27147][CORE][TEST] Add SortShuffleWriterSuite
## What changes were proposed in this pull request?
There are no unit test cases for this SortShuffleWriter,so add new test
cases;
## How was this patch tested?
new test cases
Closes #24080 from wangjiaochun/UtestForSortShuffleWriter.
Authored-by: 10087686 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../shuffle/sort/SortShuffleWriterSuite.scala | 96 ++++++++++++++++++++++
1 file changed, 96 insertions(+)
diff --git
a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala
b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala
new file mode 100644
index 0000000..690bcd9
--- /dev/null
+++
b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import org.mockito.Mockito._
+import org.mockito.MockitoAnnotations
+import org.scalatest.Matchers
+
+import org.apache.spark.{Partitioner, SharedSparkContext, ShuffleDependency,
SparkFunSuite}
+import org.apache.spark.memory.MemoryTestingUtils
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.shuffle.{BaseShuffleHandle, IndexShuffleBlockResolver}
+import org.apache.spark.util.Utils
+
+
+class SortShuffleWriterSuite extends SparkFunSuite with SharedSparkContext
with Matchers {
+
+ private val shuffleId = 0
+ private val numMaps = 5
+ private var shuffleHandle: BaseShuffleHandle[Int, Int, Int] = _
+ private val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)
+ private val serializer = new JavaSerializer(conf)
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ MockitoAnnotations.initMocks(this)
+ val partitioner = new Partitioner() {
+ def numPartitions = numMaps
+ def getPartition(key: Any) = Utils.nonNegativeMod(key.hashCode,
numPartitions)
+ }
+ shuffleHandle = {
+ val dependency = mock(classOf[ShuffleDependency[Int, Int, Int]])
+ when(dependency.partitioner).thenReturn(partitioner)
+ when(dependency.serializer).thenReturn(serializer)
+ when(dependency.aggregator).thenReturn(None)
+ when(dependency.keyOrdering).thenReturn(None)
+ new BaseShuffleHandle(shuffleId, numMaps = numMaps, dependency)
+ }
+ }
+
+ override def afterAll(): Unit = {
+ try {
+ shuffleBlockResolver.stop()
+ } finally {
+ super.afterAll()
+ }
+ }
+
+ test("write empty iterator") {
+ val context = MemoryTestingUtils.fakeTaskContext(sc.env)
+ val writer = new SortShuffleWriter[Int, Int, Int](
+ shuffleBlockResolver,
+ shuffleHandle,
+ mapId = 1,
+ context)
+ writer.write(Iterator.empty)
+ writer.stop(success = true)
+ val dataFile = shuffleBlockResolver.getDataFile(shuffleId, 1)
+ val writeMetrics = context.taskMetrics().shuffleWriteMetrics
+ assert(!dataFile.exists())
+ assert(writeMetrics.bytesWritten === 0)
+ assert(writeMetrics.recordsWritten === 0)
+ }
+
+ test("write with some records") {
+ val context = MemoryTestingUtils.fakeTaskContext(sc.env)
+ val records = List[(Int, Int)]((1, 2), (2, 3), (4, 4), (6, 5))
+ val writer = new SortShuffleWriter[Int, Int, Int](
+ shuffleBlockResolver,
+ shuffleHandle,
+ mapId = 2,
+ context)
+ writer.write(records.toIterator)
+ writer.stop(success = true)
+ val dataFile = shuffleBlockResolver.getDataFile(shuffleId, 2)
+ val writeMetrics = context.taskMetrics().shuffleWriteMetrics
+ assert(dataFile.exists())
+ assert(dataFile.length() === writeMetrics.bytesWritten)
+ assert(records.size === writeMetrics.recordsWritten)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]