This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 0c0fd94da83e [SPARK-53247][CORE][SQL][MLLIB][TESTS] Use `createArray` for large test array creations 0c0fd94da83e is described below commit 0c0fd94da83ecd98ab5eb0ca6b42b21388e988c8 Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Mon Aug 11 14:07:25 2025 -0700 [SPARK-53247][CORE][SQL][MLLIB][TESTS] Use `createArray` for large test array creations ### What changes were proposed in this pull request? This PR aims to use `createArray` for simple large test array creations like the following. ```scala - Array.fill[Long](4000000)(0) + createArray(4000000, 0L) ``` ```scala scala> spark.time(Array.fill[Long](4000000)(0).size) Time taken: 19 ms val res0: Int = 4000000 scala> spark.time(org.apache.spark.util.collection.Utils.createArray(4000000, 0L).size) Time taken: 6 ms val res1: Int = 4000000 ``` For example, the following test is repeated three times by `OrcV1QuerySuite`, `OrcV2QuerySuite`, and `HiveOrcQuerySuite`. In addition, the test case itself repeats `Array.fill[Byte](5 * 1024 * 1024)('X')` 2048 (=2 x 1024) times. So, in total, this PR improves the array creation **6144 times** just via this single instance change. https://github.com/apache/spark/blob/8d5e60279b10afcaad96abafb11c1c9950029b3d/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala#L866-L877 ### Why are the changes needed? To reduce the test time for [SPARK-48094 Reduce GitHub Action usage according to ASF project allowance](https://issues.apache.org/jira/browse/SPARK-48094). > - The average number of minutes a project uses per calendar week MUST NOT exceed the equivalent of 25 full-time runners (250,000 minutes, or 4,200 hours). - https://infra-reports.apache.org/#ghactions&project=spark&hours=168 <img width="1137" height="496" alt="Screenshot 2025-08-11 at 12 07 29" src="https://github.com/user-attachments/assets/fbb8b500-e1b2-4fa6-9e31-4d5decd51ebb" /> ### Does this PR introduce _any_ user-facing change? No, this is test only change. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51976 from dongjoon-hyun/SPARK-53247. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../test/scala/org/apache/spark/FileSuite.scala | 11 ++++++---- .../org/apache/spark/MapOutputTrackerSuite.scala | 9 ++++---- .../apache/spark/io/ChunkedByteBufferSuite.scala | 3 ++- .../test/scala/org/apache/spark/rdd/RDDSuite.scala | 10 +++++---- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 24 ++++++++++++--------- .../apache/spark/scheduler/MapStatusSuite.scala | 7 +++--- .../spark/shuffle/ShuffleBlockPusherSuite.scala | 25 +++++++++++----------- .../apache/spark/storage/BlockManagerSuite.scala | 5 +++-- .../org/apache/spark/util/SizeEstimatorSuite.scala | 7 +++--- .../scala/org/apache/spark/util/UtilsSuite.scala | 3 ++- .../collection/unsafe/sort/RadixSortSuite.scala | 5 +++-- .../spark/util/random/RandomSamplerSuite.scala | 5 +++-- .../apache/spark/ml/feature/InstanceSuite.scala | 5 +++-- .../spark/ml/feature/RobustScalerSuite.scala | 21 +++++++++--------- .../org/apache/spark/mllib/feature/PCASuite.scala | 5 +++-- .../mllib/linalg/UDTSerializationBenchmark.scala | 3 ++- .../variant/VariantExpressionSuite.scala | 13 +++++------ .../benchmark/PrimitiveArrayBenchmark.scala | 5 +++-- .../execution/datasources/orc/OrcQuerySuite.scala | 9 ++++---- .../parquet/ParquetVectorizedSuite.scala | 3 ++- 20 files changed, 102 insertions(+), 76 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 5be3f7dfdfba..6d9ec966e9fd 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -39,6 +39,7 @@ import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD} import org.apache.spark.serializer.KryoSerializer import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils +import org.apache.spark.util.collection.Utils.createArray class FileSuite extends SparkFunSuite with LocalSparkContext { var tempDir: File = _ @@ -86,11 +87,12 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { val normalFile = new File(normalDir, "part-00000") val normalContent = sc.textFile(normalDir).collect() - assert(normalContent === Array.fill(10000)("a")) + val expected = createArray(10000, "a") + assert(normalContent === expected) val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension) val compressedContent = sc.textFile(compressedOutputDir).collect() - assert(compressedContent === Array.fill(10000)("a")) + assert(compressedContent === expected) assert(compressedFile.length < normalFile.length) } @@ -125,11 +127,12 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { val normalFile = new File(normalDir, "part-00000") val normalContent = sc.sequenceFile[String, String](normalDir).collect() - assert(normalContent === Array.fill(100)(("abc", "abc"))) + val expected = createArray(100, ("abc", "abc")) + assert(normalContent === expected) val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension) val compressedContent = sc.sequenceFile[String, String](compressedOutputDir).collect() - assert(compressedContent === Array.fill(100)(("abc", "abc"))) + assert(compressedContent === expected) assert(compressedFile.length < normalFile.length) } diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 26dc218c30c7..68e366e9ad10 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -38,6 +38,7 @@ import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEndpoint, RpcEndpoin import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus, MapStatus, MergeStatus} import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{BlockManagerId, BlockManagerMasterEndpoint, ShuffleBlockId, ShuffleMergedBlockId} +import org.apache.spark.util.collection.Utils.createArray class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { private val conf = new SparkConf @@ -193,7 +194,7 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { // Message size should be ~123B, and no exception should be thrown masterTracker.registerShuffle(10, 1, MergeStatus.SHUFFLE_PUSH_DUMMY_NUM_REDUCES) masterTracker.registerMapOutput(10, 0, MapStatus( - BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0), 5)) + BlockManagerId("88", "mph", 1000), createArray(10, 0L), 5)) val senderAddress = RpcAddress("localhost", 12345) val rpcCallContext = mock(classOf[RpcCallContext]) when(rpcCallContext.senderAddress).thenReturn(senderAddress) @@ -271,7 +272,7 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { masterTracker.registerShuffle(20, 100, MergeStatus.SHUFFLE_PUSH_DUMMY_NUM_REDUCES) (0 until 100).foreach { i => masterTracker.registerMapOutput(20, i, new CompressedMapStatus( - BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0), 5)) + BlockManagerId("999", "mps", 1000), createArray(4000000, 0L), 5)) } val senderAddress = RpcAddress("localhost", 12345) val rpcCallContext = mock(classOf[RpcCallContext]) @@ -578,7 +579,7 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { masterTracker.registerShuffle(20, 100, MergeStatus.SHUFFLE_PUSH_DUMMY_NUM_REDUCES) (0 until 100).foreach { i => masterTracker.registerMapOutput(20, i, new CompressedMapStatus( - BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0), 5)) + BlockManagerId("999", "mps", 1000), createArray(4000000, 0L), 5)) } val mapWorkerRpcEnv = createRpcEnv("spark-worker", "localhost", 0, new SecurityManager(conf)) @@ -625,7 +626,7 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { masterTracker.registerShuffle(20, 100, MergeStatus.SHUFFLE_PUSH_DUMMY_NUM_REDUCES) (0 until 100).foreach { i => masterTracker.registerMapOutput(20, i, new CompressedMapStatus( - BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0), 5)) + BlockManagerId("999", "mps", 1000), createArray(4000000, 0L), 5)) } masterTracker.registerMergeResult(20, 0, MergeStatus(BlockManagerId("999", "mps", 1000), 0, bitmap1, 1000L)) diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala index 992bb37f44a0..5e036b6d0138 100644 --- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.{SharedSparkContext, SparkFunSuite} import org.apache.spark.internal.config import org.apache.spark.network.util.ByteArrayWritableChannel import org.apache.spark.util.Utils +import org.apache.spark.util.collection.Utils.createArray import org.apache.spark.util.io.ChunkedByteBuffer class ChunkedByteBufferSuite extends SparkFunSuite with SharedSparkContext { @@ -128,7 +129,7 @@ class ChunkedByteBufferSuite extends SparkFunSuite with SharedSparkContext { test("toArray() throws UnsupportedOperationException if size exceeds 2GB") { val fourMegabyteBuffer = ByteBuffer.allocate(1024 * 1024 * 4) fourMegabyteBuffer.limit(fourMegabyteBuffer.capacity()) - val chunkedByteBuffer = new ChunkedByteBuffer(Array.fill(1024)(fourMegabyteBuffer)) + val chunkedByteBuffer = new ChunkedByteBuffer(createArray(1024, fourMegabyteBuffer)) assert(chunkedByteBuffer.size === (1024L * 1024L * 1024L * 4L)) intercept[UnsupportedOperationException] { chunkedByteBuffer.toArray diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index aecb8b99d0e3..b9774482f949 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -38,6 +38,7 @@ import org.apache.spark.rdd.RDDSuiteUtils._ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.util.{ThreadUtils, Utils} import org.apache.spark.util.ArrayImplicits._ +import org.apache.spark.util.collection.Utils.createArray class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { var tempDir: File = _ @@ -365,7 +366,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { test("repartitioned RDDs perform load balancing") { // Coalesce partitions - val input = Array.fill(1000)(1) + val input = createArray(1000, 1) val initialPartitions = 10 val data = sc.parallelize(input.toImmutableArraySeq, initialPartitions) @@ -393,9 +394,10 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { } } - testSplitPartitions(Array.fill(100)(1).toImmutableArraySeq, 10, 20) - testSplitPartitions((Array.fill(10000)(1) ++ Array.fill(10000)(2)).toImmutableArraySeq, 20, 100) - testSplitPartitions(Array.fill(1000)(1).toImmutableArraySeq, 250, 128) + testSplitPartitions(createArray(100, 1).toImmutableArraySeq, 10, 20) + testSplitPartitions( + (createArray(10000, 1) ++ createArray(10000, 2)).toImmutableArraySeq, 20, 100) + testSplitPartitions(createArray(1000, 1).toImmutableArraySeq, 250, 128) } test("coalesced RDDs") { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index d4e90be7c66d..bf38c629f700 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -50,6 +50,7 @@ import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedExcept import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerId, BlockManagerMaster} import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, Clock, LongAccumulator, SystemClock, ThreadUtils, Utils} import org.apache.spark.util.ArrayImplicits._ +import org.apache.spark.util.collection.Utils.createArray class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler) extends DAGSchedulerEventProcessLoop(dagScheduler) { @@ -679,23 +680,24 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) submit(reduceRdd, Array(0)) // map stage1 completes successfully, with one task on each executor + val uncompressedSizes = createArray(1, 2L) complete(taskSets(0), Seq( (Success, MapStatus( - BlockManagerId("hostA-exec1", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 5)), + BlockManagerId("hostA-exec1", "hostA", 12345), uncompressedSizes, mapTaskId = 5)), (Success, MapStatus( - BlockManagerId("hostA-exec2", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 6)), + BlockManagerId("hostA-exec2", "hostA", 12345), uncompressedSizes, mapTaskId = 6)), (Success, makeMapStatus("hostB", 1, mapTaskId = 7)) )) // map stage2 completes successfully, with one task on each executor complete(taskSets(1), Seq( (Success, MapStatus( - BlockManagerId("hostA-exec1", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 8)), + BlockManagerId("hostA-exec1", "hostA", 12345), uncompressedSizes, mapTaskId = 8)), (Success, MapStatus( - BlockManagerId("hostA-exec2", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 9)), + BlockManagerId("hostA-exec2", "hostA", 12345), uncompressedSizes, mapTaskId = 9)), (Success, makeMapStatus("hostB", 1, mapTaskId = 10)) )) // make sure our test setup is correct @@ -4948,6 +4950,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti } // stage2`s task0 Fetch failed + val uncompressedSizes = createArray(2, 2L) runEvent(makeCompletionEvent( taskSets(1).tasks(0), FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0L, 0, 0, @@ -4957,11 +4960,11 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti // long running task complete runEvent(makeCompletionEvent(taskSets(1).tasks(1), Success, result = MapStatus(BlockManagerId("hostC-exec1", "hostC", 44399), - Array.fill[Long](2)(2), mapTaskId = taskIdCount), + uncompressedSizes, mapTaskId = taskIdCount), Seq.empty, Array.empty, createTaskInfo(false))) runEvent(makeCompletionEvent(taskSets(1).tasks(0), Success, result = MapStatus(BlockManagerId("hostD-exec1", "hostD", 44400), - Array.fill[Long](2)(2), mapTaskId = taskIdCount), + uncompressedSizes, mapTaskId = taskIdCount), Seq.empty, Array.empty, createTaskInfo(true))) @@ -4984,7 +4987,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti assert(stage0Retry.size === 1) runEvent(makeCompletionEvent(stage0Retry.head.tasks(0), Success, result = MapStatus(BlockManagerId("hostE-exec1", "hostE", 44401), - Array.fill[Long](2)(2), mapTaskId = taskIdCount))) + uncompressedSizes, mapTaskId = taskIdCount))) // wait stage2 resubmit sc.listenerBus.waitUntilEmpty() @@ -5062,15 +5065,16 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti normalTask)) // Make the speculative task succeed after initial task has failed + val uncompressedSizes = createArray(2, 2L) runEvent(makeCompletionEvent(taskSets(1).tasks(0), Success, result = MapStatus(BlockManagerId("hostD-exec1", "hostD", 34512), - Array.fill[Long](2)(2), mapTaskId = speculativeTask.taskId), + uncompressedSizes, mapTaskId = speculativeTask.taskId), taskInfo = speculativeTask)) // The second task, for partition 1 succeeds as well. runEvent(makeCompletionEvent(taskSets(1).tasks(1), Success, result = MapStatus(BlockManagerId("hostE-exec2", "hostE", 23456), - Array.fill[Long](2)(2), mapTaskId = taskIdCount))) + createArray(2, 2L), mapTaskId = taskIdCount))) taskIdCount += 1 sc.listenerBus.waitUntilEmpty() @@ -5096,7 +5100,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti // make the original task succeed runEvent(makeCompletionEvent(stage0Retry.head.tasks(fetchFailParentPartition), Success, result = MapStatus(BlockManagerId("hostF-exec1", "hostF", 12345), - Array.fill[Long](2)(2), mapTaskId = taskIdCount))) + createArray(2, 2L), mapTaskId = taskIdCount))) Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2) dagEventProcessLoopTester.runEvents() diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala index 13e7ff758eba..951c2c2d6cbe 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala @@ -30,6 +30,7 @@ import org.apache.spark.internal.config import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.Utils +import org.apache.spark.util.collection.Utils.createArray class MapStatusSuite extends SparkFunSuite { private def doReturn(value: Any) = org.mockito.Mockito.doReturn(value, Seq.empty: _*) @@ -75,7 +76,7 @@ class MapStatusSuite extends SparkFunSuite { } test("large tasks should use " + classOf[HighlyCompressedMapStatus].getName) { - val sizes = Array.fill[Long](2001)(150L) + val sizes = createArray(2001, 150L) val status = MapStatus(null, sizes, -1) assert(status.isInstanceOf[HighlyCompressedMapStatus]) assert(status.getSizeForBlock(10) === 150L) @@ -208,7 +209,7 @@ class MapStatusSuite extends SparkFunSuite { doReturn(conf).when(env).conf SparkEnv.set(env) - val emptyBlocks = Array.fill[Long](emptyBlocksLength)(0L) + val emptyBlocks = createArray(emptyBlocksLength, 0L) val smallAndUntrackedBlocks = Array.tabulate[Long](smallAndUntrackedBlocksLength)(i => i) val trackedSkewedBlocks = Array.tabulate[Long](trackedSkewedBlocksLength)(i => i + 350 * 1024) @@ -252,7 +253,7 @@ class MapStatusSuite extends SparkFunSuite { doReturn(conf).when(env).conf SparkEnv.set(env) - val emptyBlocks = Array.fill[Long](emptyBlocksLength)(0L) + val emptyBlocks = createArray(emptyBlocksLength, 0L) val smallBlockSizes = Array.tabulate[Long](smallBlocksLength)(i => i + 1) val untrackedSkewedBlocksSizes = Array.tabulate[Long](untrackedSkewedBlocksLength)(i => i + 3500 * 1024) diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala index f030b64944e5..741e5a82e884 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala @@ -42,6 +42,7 @@ import org.apache.spark.serializer.JavaSerializer import org.apache.spark.shuffle.ShuffleBlockPusher.PushRequest import org.apache.spark.storage._ import org.apache.spark.util.{SslTestUtils, ThreadUtils} +import org.apache.spark.util.collection.Utils.createArray class ShuffleBlockPusherSuite extends SparkFunSuite { @@ -115,7 +116,7 @@ class ShuffleBlockPusherSuite extends SparkFunSuite { val mergerLocs = dependency.getMergerLocs.map(loc => BlockManagerId("", loc.host, loc.port)) val largeBlockSize = 2 * 1024 * 1024 blockPusher.initiateBlockPush(mock(classOf[File]), - Array.fill(dependency.partitioner.numPartitions) { 5 }, dependency, 0) + createArray(dependency.partitioner.numPartitions, 5), dependency, 0) val pushRequests = blockPusher.prepareBlockPushRequests(5, 0, 0, 0, mock(classOf[File]), Array(2, 2, 2, largeBlockSize, largeBlockSize), mergerLocs, mock(classOf[TransportConf])) @@ -131,7 +132,7 @@ class ShuffleBlockPusherSuite extends SparkFunSuite { val blockPusher = new TestShuffleBlockPusher(conf) val mergerLocs = dependency.getMergerLocs.map(loc => BlockManagerId("", loc.host, loc.port)) blockPusher.initiateBlockPush(mock(classOf[File]), - Array.fill(dependency.partitioner.numPartitions) { 5 }, dependency, 0) + createArray(dependency.partitioner.numPartitions, 5), dependency, 0) val pushRequests = blockPusher.prepareBlockPushRequests(5, 0, 0, 0, mock(classOf[File]), Array(2, 2, 2, 1028, 1024), mergerLocs, mock(classOf[TransportConf])) blockPusher.runPendingTasks() @@ -146,7 +147,7 @@ class ShuffleBlockPusherSuite extends SparkFunSuite { val blockPusher = new TestShuffleBlockPusher(conf) val mergerLocs = dependency.getMergerLocs.map(loc => BlockManagerId("", loc.host, loc.port)) blockPusher.initiateBlockPush(mock(classOf[File]), - Array.fill(dependency.partitioner.numPartitions) { 5 }, dependency, 0) + createArray(dependency.partitioner.numPartitions, 5), dependency, 0) val pushRequests = blockPusher.prepareBlockPushRequests(5, 0, 0, 0, mock(classOf[File]), Array(2, 2, 2, 2, 2), mergerLocs, mock(classOf[TransportConf])) blockPusher.runPendingTasks() @@ -188,7 +189,7 @@ class ShuffleBlockPusherSuite extends SparkFunSuite { val blockPusher = new ConcurrentTestBlockPusher(conf, semaphore) val mergerLocs = dependency.getMergerLocs.map(loc => BlockManagerId("", loc.host, loc.port)) blockPusher.initiateBlockPush(mock(classOf[File]), - Array.fill(dependency.partitioner.numPartitions) { 5 }, dependency, 0) + createArray(dependency.partitioner.numPartitions, 5), dependency, 0) val pushRequests = blockPusher.prepareBlockPushRequests(5, 0, 0, 0, mock(classOf[File]), Array(2, 2, 2, 2, 2), mergerLocs, mock(classOf[TransportConf])) latch.countDown() @@ -204,7 +205,7 @@ class ShuffleBlockPusherSuite extends SparkFunSuite { interceptPushedBlocksForSuccess() val blockPusher = new TestShuffleBlockPusher(conf) blockPusher.initiateBlockPush(mock(classOf[File]), - Array.fill(dependency.partitioner.numPartitions) { 2 }, dependency, 0) + createArray(dependency.partitioner.numPartitions, 2), dependency, 0) blockPusher.runPendingTasks() verify(shuffleClient, times(1)) .pushBlocks(any(), any(), any(), any(), any()) @@ -232,7 +233,7 @@ class ShuffleBlockPusherSuite extends SparkFunSuite { interceptPushedBlocksForSuccess() val pusher = new TestShuffleBlockPusher(conf) pusher.initiateBlockPush( - mock(classOf[File]), Array.fill(dependency.partitioner.numPartitions) { 2 }, dependency, 0) + mock(classOf[File]), createArray(dependency.partitioner.numPartitions, 2), dependency, 0) pusher.runPendingTasks() verify(shuffleClient, times(8)) .pushBlocks(any(), any(), any(), any(), any()) @@ -267,7 +268,7 @@ class ShuffleBlockPusherSuite extends SparkFunSuite { }) val pusher = new TestShuffleBlockPusher(conf) pusher.initiateBlockPush( - mock(classOf[File]), Array.fill(dependency.partitioner.numPartitions) { 2 }, dependency, 0) + mock(classOf[File]), createArray(dependency.partitioner.numPartitions, 2), dependency, 0) pusher.runPendingTasks() verify(shuffleClient, times(1)) .pushBlocks(any(), any(), any(), any(), any()) @@ -288,7 +289,7 @@ class ShuffleBlockPusherSuite extends SparkFunSuite { interceptPushedBlocksForSuccess() val pusher = new TestShuffleBlockPusher(conf) pusher.initiateBlockPush(mock(classOf[File]), - Array.fill(dependency.partitioner.numPartitions) { 512 * 1024 }, dependency, 0) + createArray(dependency.partitioner.numPartitions, 512 * 1024), dependency, 0) pusher.runPendingTasks() verify(shuffleClient, times(4)) .pushBlocks(any(), any(), any(), any(), any()) @@ -355,7 +356,7 @@ class ShuffleBlockPusherSuite extends SparkFunSuite { }) }) pusher.initiateBlockPush( - mock(classOf[File]), Array.fill(dependency.partitioner.numPartitions) { 2 }, dependency, 0) + mock(classOf[File]), createArray(dependency.partitioner.numPartitions, 2), dependency, 0) pusher.runPendingTasks() verify(shuffleClient, times(8)) .pushBlocks(any(), any(), any(), any(), any()) @@ -385,7 +386,7 @@ class ShuffleBlockPusherSuite extends SparkFunSuite { }) }) pusher.initiateBlockPush( - mock(classOf[File]), Array.fill(dependency.partitioner.numPartitions) { 2 }, dependency, 0) + mock(classOf[File]), createArray(dependency.partitioner.numPartitions, 2), dependency, 0) pusher.runPendingTasks() verify(shuffleClient, times(1)) .pushBlocks(any(), any(), any(), any(), any()) @@ -408,7 +409,7 @@ class ShuffleBlockPusherSuite extends SparkFunSuite { }) val pusher = new TestShuffleBlockPusher(conf) pusher.initiateBlockPush( - mock(classOf[File]), Array.fill(dependency.partitioner.numPartitions) { 2 }, dependency, 0) + mock(classOf[File]), createArray(dependency.partitioner.numPartitions, 2), dependency, 0) pusher.runPendingTasks() verify(shuffleClient, times(2)) .pushBlocks(any(), any(), any(), any(), any()) @@ -434,7 +435,7 @@ class ShuffleBlockPusherSuite extends SparkFunSuite { }) }) pusher.initiateBlockPush( - mock(classOf[File]), Array.fill(dependency.partitioner.numPartitions) { 2 }, dependency, 0) + mock(classOf[File]), createArray(dependency.partitioner.numPartitions, 2), dependency, 0) pusher.runPendingTasks() verify(shuffleClient, times(1)) .pushBlocks(any(), any(), any(), any(), any()) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 7fd080823c4d..5b86345dd5f9 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -66,6 +66,7 @@ import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util._ import org.apache.spark.util.ArrayImplicits._ +import org.apache.spark.util.collection.Utils.createArray import org.apache.spark.util.io.ChunkedByteBuffer class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTester @@ -937,7 +938,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe // the local disk of sameHostBm where the block is replicated to. // When there is no replication then block must be added via sameHostBm directly. val bmToPutBlock = if (storageLevel.replication > 1) otherHostBm else sameHostBm - val array = Array.fill(16)(Byte.MinValue to Byte.MaxValue).flatten + val array = createArray(16, Byte.MinValue to Byte.MaxValue).flatten val blockId = "list" bmToPutBlock.putIterator(blockId, List(array).iterator, storageLevel, tellMaster = true) @@ -970,7 +971,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe val store2 = makeBlockManager(8000, "executor2", this.master, Some(new MockBlockTransferService(0))) val blockId = "list" - val array = Array.fill(16)(Byte.MinValue to Byte.MaxValue).flatten + val array = createArray(16, Byte.MinValue to Byte.MaxValue).flatten store2.putIterator(blockId, List(array).iterator, level, true) val expectedBlockData = store2.getLocalBytes(blockId) assert(expectedBlockData.isDefined) diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala index fe86e4988d4b..a7908d89288a 100644 --- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala @@ -24,6 +24,7 @@ import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester} import org.apache.spark.SparkFunSuite import org.apache.spark.internal.config.Tests.TEST_USE_COMPRESSED_OOPS_KEY import org.apache.spark.util.Utils +import org.apache.spark.util.collection.Utils.createArray class DummyClass1 {} @@ -184,16 +185,16 @@ class SizeEstimatorSuite // If an array contains the *same* element many times, we should only count it once. val d1 = new DummyClass1 // 10 pointers plus 8-byte object - assertResult(72)(SizeEstimator.estimate(Array.fill(10)(d1))) + assertResult(72)(SizeEstimator.estimate(createArray(10, d1))) // 100 pointers plus 8-byte object - assertResult(432)(SizeEstimator.estimate(Array.fill(100)(d1))) + assertResult(432)(SizeEstimator.estimate(createArray(100, d1))) // Same thing with huge array containing the same element many times. Note that this won't // return exactly 4032 because it can't tell that *all* the elements will equal the first // one it samples, but it should be close to that. // TODO: If we sample 100 elements, this should always be 4176 ? - val estimatedSize = SizeEstimator.estimate(Array.fill(1000)(d1)) + val estimatedSize = SizeEstimator.estimate(createArray(1000, d1)) assert(estimatedSize >= 4000, "Estimated size " + estimatedSize + " should be more than 4000") assert(estimatedSize <= 4200, "Estimated size " + estimatedSize + " should be less than 4200") } diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index c1e4d005bed1..933b6fc39e91 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -44,6 +44,7 @@ import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.launcher.SparkLauncher import org.apache.spark.network.util.ByteUnit import org.apache.spark.scheduler.SparkListener +import org.apache.spark.util.collection.Utils.createArray import org.apache.spark.util.io.ChunkedByteBufferInputStream class UtilsSuite extends SparkFunSuite with ResetSystemProperties { @@ -1115,7 +1116,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties { val chi = new ChiSquareTest() // We expect an even distribution; this array will be rescaled by `chiSquareTest` - val expected = Array.fill(arraySize * arraySize)(1.0) + val expected = createArray(arraySize * arraySize, 1.0) val observed = results.flatten // Performs Pearson's chi-squared test. Using the sum-of-squares as the test statistic, gives diff --git a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala index f5d417fcf19d..73d744b77c28 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.unsafe.array.LongArray import org.apache.spark.unsafe.memory.MemoryBlock import org.apache.spark.util.collection.Sorter +import org.apache.spark.util.collection.Utils.createArray import org.apache.spark.util.random.XORShiftRandom class RadixSortSuite extends SparkFunSuite { @@ -75,13 +76,13 @@ class RadixSortSuite extends SparkFunSuite { private def generateTestData(size: Long, rand: => Long): (Array[JLong], LongArray) = { val ref = Array.tabulate[Long](Ints.checkedCast(size)) { i => rand } - val extended = ref ++ Array.fill[Long](Ints.checkedCast(size))(0) + val extended = ref ++ createArray(Ints.checkedCast(size), 0L) (ref.map(i => JLong.valueOf(i)), new LongArray(MemoryBlock.fromLongArray(extended))) } private def generateKeyPrefixTestData(size: Long, rand: => Long): (LongArray, LongArray) = { val ref = Array.tabulate[Long](Ints.checkedCast(size * 2)) { i => rand } - val extended = ref ++ Array.fill[Long](Ints.checkedCast(size * 2))(0) + val extended = ref ++ createArray(Ints.checkedCast(size * 2), 0L) (new LongArray(MemoryBlock.fromLongArray(ref)), new LongArray(MemoryBlock.fromLongArray(extended))) } diff --git a/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala b/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala index 8fcbfd9ccad8..dfba45b756b7 100644 --- a/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala @@ -26,6 +26,7 @@ import org.scalatest.matchers.must.Matchers import org.scalatest.matchers.should.Matchers._ import org.apache.spark.SparkFunSuite +import org.apache.spark.util.collection.Utils.createArray class RandomSamplerSuite extends SparkFunSuite with Matchers { /** @@ -100,8 +101,8 @@ class RandomSamplerSuite extends SparkFunSuite with Matchers { assert(math.min(d1.length, d2.length) > 0) assert(math.min(d1.min, d2.min) >= 0) val m = 1 + math.max(d1.max, d2.max) - val h1 = Array.fill[Int](m)(0) - val h2 = Array.fill[Int](m)(0) + val h1 = createArray(m, 0) + val h2 = createArray(m, 0) for (v <- d1) { h1(v) += 1 } for (v <- d2) { h2(v) += 1 } assert(h1.sum == h2.sum) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala index 53be2444ecb6..58ddbebbe77d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala @@ -21,6 +21,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.internal.config.Kryo._ import org.apache.spark.ml.linalg.Vectors import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.util.collection.Utils.createArray class InstanceSuite extends SparkFunSuite { test("Kryo class register") { @@ -98,7 +99,7 @@ class InstanceSuite extends SparkFunSuite { } // instances larger than maxMemUsage - val denseInstance = Instance(-1.0, 2.0, Vectors.dense(Array.fill(1000)(1.0))) + val denseInstance = Instance(-1.0, 2.0, Vectors.dense(createArray(1000, 1.0))) InstanceBlock.blokifyWithMaxMemUsage(Iterator.single(denseInstance), 64).size InstanceBlock.blokifyWithMaxMemUsage(Iterator.fill(10)(denseInstance), 64).size @@ -109,7 +110,7 @@ class InstanceSuite extends SparkFunSuite { // nnz = 10 val sparseInstance = Instance(-2.0, 3.0, - Vectors.sparse(1000, Array.range(0, 1000, 100), Array.fill(10)(0.1))) + Vectors.sparse(1000, Array.range(0, 1000, 100), createArray(10, 0.1))) // normally, memory usage of a block does not exceed maxMemUsage too much val maxMemUsage = 1 << 18 diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/RobustScalerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/RobustScalerSuite.scala index 6f24b9db3b3e..b3eb595b971a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/RobustScalerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/RobustScalerSuite.scala @@ -22,6 +22,7 @@ import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.sql.Row +import org.apache.spark.util.collection.Utils.createArray class RobustScalerSuite extends MLTest with DefaultReadWriteTest { @@ -140,19 +141,19 @@ class RobustScalerSuite extends MLTest with DefaultReadWriteTest { // 3st quartile = [3.0, ...] // quantile range = IQR = [2.0, ...] highDimData = Array( - Vectors.dense(Array.fill(2000)(0.0)), - Vectors.dense(Array.fill(2000)(1.0)), - Vectors.dense(Array.fill(2000)(2.0)), - Vectors.dense(Array.fill(2000)(3.0)), - Vectors.dense(Array.fill(2000)(4.0)) + Vectors.dense(createArray(2000, 0.0)), + Vectors.dense(createArray(2000, 1.0)), + Vectors.dense(createArray(2000, 2.0)), + Vectors.dense(createArray(2000, 3.0)), + Vectors.dense(createArray(2000, 4.0)) ) highDimRes = Array( - Vectors.dense(Array.fill(2000)(0.0)), - Vectors.dense(Array.fill(2000)(0.5)), - Vectors.dense(Array.fill(2000)(1.0)), - Vectors.dense(Array.fill(2000)(1.5)), - Vectors.dense(Array.fill(2000)(2.0)) + Vectors.dense(createArray(2000, 0.0)), + Vectors.dense(createArray(2000, 0.5)), + Vectors.dense(createArray(2000, 1.0)), + Vectors.dense(createArray(2000, 1.5)), + Vectors.dense(createArray(2000, 2.0)) ) } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala index ea02f58ca906..cf1f6005b4af 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala @@ -23,6 +23,7 @@ import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.util.ArrayImplicits._ +import org.apache.spark.util.collection.Utils.createArray class PCASuite extends SparkFunSuite with MLlibTestSparkContext { @@ -58,8 +59,8 @@ class PCASuite extends SparkFunSuite with MLlibTestSparkContext { test("number of features more than 65535") { val data1 = sc.parallelize(Seq( - Vectors.dense(Array.fill(100000)(2.0)), - Vectors.dense(Array.fill(100000)(0.0)) + Vectors.dense(createArray(100000, 2.0)), + Vectors.dense(createArray(100000, 0.0)) ), 2) val pca = new PCA(2).fit(data1) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/UDTSerializationBenchmark.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/UDTSerializationBenchmark.scala index 37bbe76b9b53..22c514a0a017 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/UDTSerializationBenchmark.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/UDTSerializationBenchmark.scala @@ -19,6 +19,7 @@ package org.apache.spark.mllib.linalg import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.util.collection.Utils.createArray /** * Serialization benchmark for VectorUDT. @@ -45,7 +46,7 @@ object UDTSerializationBenchmark extends BenchmarkBase { val fromRow = encoder.createDeserializer() val vectors = (1 to numRows).map { i => - Vectors.dense(Array.fill(1e5.toInt)(1.0 * i)) + Vectors.dense(createArray(1e5.toInt, 1.0 * i)) }.toArray val rows = vectors.map(toRow) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionSuite.scala index baf588d8472c..ef2618e8455f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionSuite.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.types._ import org.apache.spark.types.variant.VariantBuilder import org.apache.spark.types.variant.VariantUtil._ import org.apache.spark.unsafe.types.{UTF8String, VariantVal} +import org.apache.spark.util.collection.Utils.createArray class VariantExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { // Zero-extend each byte in the array with the appropriate number of bytes. @@ -61,7 +62,7 @@ class VariantExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { // INT8 only has 7 byte content. check(Array(primitiveHeader(INT8), 0, 0, 0, 0, 0, 0, 0), emptyMetadata) // DECIMAL16 only has 15 byte content. - check(Array(primitiveHeader(DECIMAL16)) ++ Array.fill(16)(0.toByte), emptyMetadata) + check(Array(primitiveHeader(DECIMAL16)) ++ createArray[Byte](16, 0.toByte), emptyMetadata) // 1e38 has a precision of 39. Even if it still fits into 16 bytes, it is not a valid decimal. check(Array[Byte](primitiveHeader(DECIMAL16), 0) ++ BigDecimal(1e38).toBigInt.toByteArray.reverse, emptyMetadata) @@ -95,13 +96,13 @@ class VariantExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { // Construct binary values that are over SIZE_LIMIT bytes, but otherwise valid. val bigVersion = Array[Byte]((VERSION | (3 << 6)).toByte) - val a = Array.fill(SIZE_LIMIT)('a'.toByte) + val a = createArray[Byte](SIZE_LIMIT, 'a'.toByte) val hugeMetadata = bigVersion ++ Array[Byte](2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1) ++ a ++ Array[Byte]('b') check(Array(primitiveHeader(TRUE)), hugeMetadata, "VARIANT_CONSTRUCTOR_SIZE_LIMIT") // The keys are 'aaa....' and 'b'. Values are "yyy..." and 'true'. - val y = Array.fill(SIZE_LIMIT)('y'.toByte) + val y = createArray[Byte](SIZE_LIMIT, 'y'.toByte) val hugeObject = Array[Byte](objectHeader(true, 4, 4)) ++ /* size */ padded(Array(2), 4) ++ /* id list */ padded(Array(0, 1), 4) ++ @@ -198,7 +199,7 @@ class VariantExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { // bytes for size and offsets, plus 1 byte for the final value, so the large value is 1 << 24 - // 14 bytes, or (-14, -1, -1) as a signed little-endian value. val aSize = (1 << 24) - 14 - val a = Array.fill(aSize)('a'.toByte) + val a = createArray[Byte](aSize, 'a'.toByte) val hugeMetadata = bigVersion ++ Array[Byte](2, 0, 0, 0, 0, 0, -14, -1, -1, -13, -1, -1) ++ a ++ Array[Byte]('b') // Validate metadata in isolation. @@ -212,7 +213,7 @@ class VariantExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { // In order to get the full binary to 1 << 24, the large string is (1 << 24) - 26 bytes. As a // signed little-endian value, this is (-26, -1, -1). val ySize = (1 << 24) - 26 - val y = Array.fill(ySize)('y'.toByte) + val y = createArray[Byte](ySize, 'y'.toByte) val hugeObject = Array[Byte](objectHeader(true, 3, 3)) ++ /* size */ padded(Array(2), 4) ++ /* id list */ padded(Array(0, 1), 3) ++ @@ -984,7 +985,7 @@ class VariantExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { val emptyMetadata = Array[Byte](VERSION, 0, 0) // UUID - val uuidVal = Array(primitiveHeader(UUID)) ++ Array.fill(16)(1.toByte) + val uuidVal = Array(primitiveHeader(UUID)) ++ createArray[Byte](16, 1.toByte) val uuid = Literal(new VariantVal(uuidVal, emptyMetadata)) checkEvaluation(SchemaOfVariant(uuid), s"UUID") // Merge with variantNull retains type. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/PrimitiveArrayBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/PrimitiveArrayBenchmark.scala index a09a64d6a8fd..659f3699d5a4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/PrimitiveArrayBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/PrimitiveArrayBenchmark.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.benchmark import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.SparkSession +import org.apache.spark.util.collection.Utils.createArray /** * Benchmark primitive arrays via DataFrame and Dataset program using primitive arrays @@ -53,7 +54,7 @@ object PrimitiveArrayBenchmark extends SqlBasedBenchmark { val count = 1024 * 1024 * 2 val sc = spark.sparkContext - val primitiveIntArray = Array.fill[Int](count)(65535) + val primitiveIntArray = createArray(count, 65535) val dsInt = sc.parallelize(Seq(primitiveIntArray), 1).toDS() dsInt.count() // force to build dataset val intArray = { i: Int => @@ -64,7 +65,7 @@ object PrimitiveArrayBenchmark extends SqlBasedBenchmark { n += 1 } } - val primitiveDoubleArray = Array.fill[Double](count)(65535.0) + val primitiveDoubleArray = createArray(count, 65535.0) val dsDouble = sc.parallelize(Seq(primitiveDoubleArray), 1).toDS() dsDouble.count() // force to build dataset val doubleArray = { i: Int => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index a52336524194..8d90a78c6e62 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -43,6 +43,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +import org.apache.spark.util.collection.Utils.createArray case class AllDataTypesWithNonPrimitiveType( stringField: String, @@ -854,7 +855,7 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { withTempPath { dir => val path = dir.getCanonicalPath val df = spark.range(1, 22, 1, 1).map { _ => - val byteData = Array.fill[Byte](1024 * 1024)('X') + val byteData = createArray[Byte](1024 * 1024, 'X') val mapData = (1 to 100).map(i => (i, byteData)) mapData }.toDF() @@ -868,7 +869,7 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { withTempPath { dir => val path = dir.getCanonicalPath val df = spark.range(1, 1024, 1, 1).map { _ => - val byteData = Array.fill[Byte](5 * 1024 * 1024)('X') + val byteData = createArray[Byte](5 * 1024 * 1024, 'X') byteData }.toDF() df.write.format("orc").save(path) @@ -885,9 +886,9 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { val path = dir.getCanonicalPath val df = spark.range(1, 1 + 512, 1, 1).map { i => if (i == 1) { - (i, Array.fill[Byte](5 * 1024 * 1024)('X')) + (i, createArray[Byte](5 * 1024 * 1024, 'X')) } else { - (i, Array.fill[Byte](1)('X')) + (i, createArray[Byte](1, 'X')) } }.toDF("c1", "c2") df.write.format("orc").save(path) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorizedSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorizedSuite.scala index f2d04a9c28f2..5d68fcac1385 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorizedSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorizedSuite.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.util.ArrayImplicits._ +import org.apache.spark.util.collection.Utils.createArray /** * A test suite on the vectorized Parquet reader. Unlike `ParquetIOSuite`, this focuses on @@ -501,7 +502,7 @@ class ParquetVectorizedSuite extends QueryTest with ParquetTest with SharedSpark val maxDef = if (inputValues.contains(null)) 1 else 0 val ty = parquetSchema.asGroupType().getType("a").asPrimitiveType() val cd = new ColumnDescriptor(Seq("a").toArray, ty, 0, maxDef) - val repetitionLevels = Array.fill[Int](inputValues.length)(0) + val repetitionLevels = createArray(inputValues.length, 0) val definitionLevels = inputValues.map(v => if (v == null) 0 else maxDef) val memPageStore = new MemPageStore(expectedValues.length) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org