[SPARK-19534][TESTS] Convert Java tests to use lambdas, Java 8 features ## What changes were proposed in this pull request?
Convert tests to use Java 8 lambdas, and modest related fixes to surrounding code. ## How was this patch tested? Jenkins tests Author: Sean Owen <[email protected]> Closes #16964 from srowen/SPARK-19534. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1487c9af Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1487c9af Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1487c9af Branch: refs/heads/master Commit: 1487c9af20a333ead55955acf4c0aa323bea0d07 Parents: de14d35 Author: Sean Owen <[email protected]> Authored: Sun Feb 19 09:42:50 2017 -0800 Committer: Sean Owen <[email protected]> Committed: Sun Feb 19 09:42:50 2017 -0800 ---------------------------------------------------------------------- .../apache/spark/network/TransportContext.java | 6 +- .../spark/network/util/MapConfigProvider.java | 8 +- .../network/ChunkFetchIntegrationSuite.java | 37 +- .../network/RequestTimeoutIntegrationSuite.java | 3 +- .../network/TransportClientFactorySuite.java | 51 +- .../network/TransportResponseHandlerSuite.java | 14 +- .../network/crypto/AuthIntegrationSuite.java | 19 +- .../spark/network/sasl/SparkSaslSuite.java | 65 +-- .../util/TransportFrameDecoderSuite.java | 44 +- .../network/sasl/SaslIntegrationSuite.java | 34 +- .../ExternalShuffleBlockHandlerSuite.java | 2 +- .../shuffle/ExternalShuffleCleanupSuite.java | 6 +- .../ExternalShuffleIntegrationSuite.java | 13 +- .../shuffle/OneForOneBlockFetcherSuite.java | 78 ++- .../shuffle/RetryingBlockFetcherSuite.java | 64 ++- .../unsafe/sort/UnsafeExternalSorter.java | 1 - .../java/org/apache/spark/JavaJdbcRDDSuite.java | 26 +- .../shuffle/sort/UnsafeShuffleWriterSuite.java | 65 +-- .../map/AbstractBytesToBytesMapSuite.java | 25 +- .../unsafe/sort/UnsafeExternalSorterSuite.java | 25 +- .../test/org/apache/spark/Java8RDDAPISuite.java | 7 +- .../test/org/apache/spark/JavaAPISuite.java | 492 ++++------------- .../kafka010/JavaConsumerStrategySuite.java | 24 +- .../SparkSubmitCommandBuilderSuite.java | 2 +- .../launcher/SparkSubmitOptionParserSuite.java | 8 +- .../apache/spark/ml/feature/JavaPCASuite.java | 35 +- .../classification/JavaNaiveBayesSuite.java | 10 +- .../clustering/JavaBisectingKMeansSuite.java | 4 +- .../spark/mllib/clustering/JavaLDASuite.java | 40 +- .../mllib/fpm/JavaAssociationRulesSuite.java | 6 +- .../regression/JavaLinearRegressionSuite.java | 11 +- .../spark/mllib/tree/JavaDecisionTreeSuite.java | 15 +- .../SpecificParquetRecordReaderBase.java | 2 +- .../spark/sql/Java8DatasetAggregatorSuite.java | 16 +- .../apache/spark/sql/JavaApplySchemaSuite.java | 22 +- .../apache/spark/sql/JavaDataFrameSuite.java | 47 +- .../spark/sql/JavaDatasetAggregatorSuite.java | 49 +- .../sql/JavaDatasetAggregatorSuiteBase.java | 14 +- .../org/apache/spark/sql/JavaDatasetSuite.java | 147 ++---- .../test/org/apache/spark/sql/JavaUDFSuite.java | 37 +- .../spark/streaming/JavaMapWithStateSuite.java | 81 +-- .../spark/streaming/JavaReceiverAPISuite.java | 24 +- .../spark/streaming/JavaWriteAheadLogSuite.java | 10 +- .../apache/spark/streaming/Java8APISuite.java | 21 +- .../apache/spark/streaming/JavaAPISuite.java | 526 +++++-------------- 45 files changed, 662 insertions(+), 1574 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index 37ba543..965c4ae 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -17,9 +17,9 @@ package org.apache.spark.network; +import java.util.ArrayList; import java.util.List; -import com.google.common.collect.Lists; import io.netty.channel.Channel; import io.netty.channel.socket.SocketChannel; import io.netty.handler.timeout.IdleStateHandler; @@ -100,7 +100,7 @@ public class TransportContext { } public TransportClientFactory createClientFactory() { - return createClientFactory(Lists.<TransportClientBootstrap>newArrayList()); + return createClientFactory(new ArrayList<>()); } /** Create a server which will attempt to bind to a specific port. */ @@ -120,7 +120,7 @@ public class TransportContext { } public TransportServer createServer() { - return createServer(0, Lists.<TransportServerBootstrap>newArrayList()); + return createServer(0, new ArrayList<>()); } public TransportChannelHandler initializePipeline(SocketChannel channel) { http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-common/src/main/java/org/apache/spark/network/util/MapConfigProvider.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/MapConfigProvider.java b/common/network-common/src/main/java/org/apache/spark/network/util/MapConfigProvider.java index b666799..9cfee7f 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/MapConfigProvider.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/MapConfigProvider.java @@ -17,22 +17,20 @@ package org.apache.spark.network.util; -import com.google.common.collect.Maps; - import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.NoSuchElementException; /** ConfigProvider based on a Map (copied in the constructor). */ public class MapConfigProvider extends ConfigProvider { - public static final MapConfigProvider EMPTY = new MapConfigProvider( - Collections.<String, String>emptyMap()); + public static final MapConfigProvider EMPTY = new MapConfigProvider(Collections.emptyMap()); private final Map<String, String> config; public MapConfigProvider(Map<String, String> config) { - this.config = Maps.newHashMap(config); + this.config = new HashMap<>(config); } @Override http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java b/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java index 5bb8819..824482a 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java @@ -20,6 +20,7 @@ package org.apache.spark.network; import java.io.File; import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.LinkedList; @@ -29,7 +30,6 @@ import java.util.Set; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.io.Closeables; import org.junit.AfterClass; @@ -179,49 +179,49 @@ public class ChunkFetchIntegrationSuite { @Test public void fetchBufferChunk() throws Exception { - FetchResult res = fetchChunks(Lists.newArrayList(BUFFER_CHUNK_INDEX)); - assertEquals(res.successChunks, Sets.newHashSet(BUFFER_CHUNK_INDEX)); + FetchResult res = fetchChunks(Arrays.asList(BUFFER_CHUNK_INDEX)); + assertEquals(Sets.newHashSet(BUFFER_CHUNK_INDEX), res.successChunks); assertTrue(res.failedChunks.isEmpty()); - assertBufferListsEqual(res.buffers, Lists.newArrayList(bufferChunk)); + assertBufferListsEqual(Arrays.asList(bufferChunk), res.buffers); res.releaseBuffers(); } @Test public void fetchFileChunk() throws Exception { - FetchResult res = fetchChunks(Lists.newArrayList(FILE_CHUNK_INDEX)); - assertEquals(res.successChunks, Sets.newHashSet(FILE_CHUNK_INDEX)); + FetchResult res = fetchChunks(Arrays.asList(FILE_CHUNK_INDEX)); + assertEquals(Sets.newHashSet(FILE_CHUNK_INDEX), res.successChunks); assertTrue(res.failedChunks.isEmpty()); - assertBufferListsEqual(res.buffers, Lists.newArrayList(fileChunk)); + assertBufferListsEqual(Arrays.asList(fileChunk), res.buffers); res.releaseBuffers(); } @Test public void fetchNonExistentChunk() throws Exception { - FetchResult res = fetchChunks(Lists.newArrayList(12345)); + FetchResult res = fetchChunks(Arrays.asList(12345)); assertTrue(res.successChunks.isEmpty()); - assertEquals(res.failedChunks, Sets.newHashSet(12345)); + assertEquals(Sets.newHashSet(12345), res.failedChunks); assertTrue(res.buffers.isEmpty()); } @Test public void fetchBothChunks() throws Exception { - FetchResult res = fetchChunks(Lists.newArrayList(BUFFER_CHUNK_INDEX, FILE_CHUNK_INDEX)); - assertEquals(res.successChunks, Sets.newHashSet(BUFFER_CHUNK_INDEX, FILE_CHUNK_INDEX)); + FetchResult res = fetchChunks(Arrays.asList(BUFFER_CHUNK_INDEX, FILE_CHUNK_INDEX)); + assertEquals(Sets.newHashSet(BUFFER_CHUNK_INDEX, FILE_CHUNK_INDEX), res.successChunks); assertTrue(res.failedChunks.isEmpty()); - assertBufferListsEqual(res.buffers, Lists.newArrayList(bufferChunk, fileChunk)); + assertBufferListsEqual(Arrays.asList(bufferChunk, fileChunk), res.buffers); res.releaseBuffers(); } @Test public void fetchChunkAndNonExistent() throws Exception { - FetchResult res = fetchChunks(Lists.newArrayList(BUFFER_CHUNK_INDEX, 12345)); - assertEquals(res.successChunks, Sets.newHashSet(BUFFER_CHUNK_INDEX)); - assertEquals(res.failedChunks, Sets.newHashSet(12345)); - assertBufferListsEqual(res.buffers, Lists.newArrayList(bufferChunk)); + FetchResult res = fetchChunks(Arrays.asList(BUFFER_CHUNK_INDEX, 12345)); + assertEquals(Sets.newHashSet(BUFFER_CHUNK_INDEX), res.successChunks); + assertEquals(Sets.newHashSet(12345), res.failedChunks); + assertBufferListsEqual(Arrays.asList(bufferChunk), res.buffers); res.releaseBuffers(); } - private void assertBufferListsEqual(List<ManagedBuffer> list0, List<ManagedBuffer> list1) + private static void assertBufferListsEqual(List<ManagedBuffer> list0, List<ManagedBuffer> list1) throws Exception { assertEquals(list0.size(), list1.size()); for (int i = 0; i < list0.size(); i ++) { @@ -229,7 +229,8 @@ public class ChunkFetchIntegrationSuite { } } - private void assertBuffersEqual(ManagedBuffer buffer0, ManagedBuffer buffer1) throws Exception { + private static void assertBuffersEqual(ManagedBuffer buffer0, ManagedBuffer buffer1) + throws Exception { ByteBuffer nio0 = buffer0.nioByteBuffer(); ByteBuffer nio1 = buffer1.nioByteBuffer(); http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java b/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java index 959396b..9aa17e2 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java @@ -17,7 +17,6 @@ package org.apache.spark.network; -import com.google.common.collect.Maps; import com.google.common.util.concurrent.Uninterruptibles; import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.buffer.NioManagedBuffer; @@ -60,7 +59,7 @@ public class RequestTimeoutIntegrationSuite { @Before public void setUp() throws Exception { - Map<String, String> configMap = Maps.newHashMap(); + Map<String, String> configMap = new HashMap<>(); configMap.put("spark.shuffle.io.connectionTimeout", "10s"); conf = new TransportConf("shuffle", new MapConfigProvider(configMap)); http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java b/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java index 205ab88..e95d25f 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java @@ -19,19 +19,20 @@ package org.apache.spark.network; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import com.google.common.collect.Maps; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertTrue; import org.apache.spark.network.client.TransportClient; @@ -71,39 +72,36 @@ public class TransportClientFactorySuite { * * If concurrent is true, create multiple threads to create clients in parallel. */ - private void testClientReuse(final int maxConnections, boolean concurrent) + private void testClientReuse(int maxConnections, boolean concurrent) throws IOException, InterruptedException { - Map<String, String> configMap = Maps.newHashMap(); + Map<String, String> configMap = new HashMap<>(); configMap.put("spark.shuffle.io.numConnectionsPerPeer", Integer.toString(maxConnections)); TransportConf conf = new TransportConf("shuffle", new MapConfigProvider(configMap)); RpcHandler rpcHandler = new NoOpRpcHandler(); TransportContext context = new TransportContext(conf, rpcHandler); - final TransportClientFactory factory = context.createClientFactory(); - final Set<TransportClient> clients = Collections.synchronizedSet( + TransportClientFactory factory = context.createClientFactory(); + Set<TransportClient> clients = Collections.synchronizedSet( new HashSet<TransportClient>()); - final AtomicInteger failed = new AtomicInteger(); + AtomicInteger failed = new AtomicInteger(); Thread[] attempts = new Thread[maxConnections * 10]; // Launch a bunch of threads to create new clients. for (int i = 0; i < attempts.length; i++) { - attempts[i] = new Thread() { - @Override - public void run() { - try { - TransportClient client = - factory.createClient(TestUtils.getLocalHost(), server1.getPort()); - assertTrue(client.isActive()); - clients.add(client); - } catch (IOException e) { - failed.incrementAndGet(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + attempts[i] = new Thread(() -> { + try { + TransportClient client = + factory.createClient(TestUtils.getLocalHost(), server1.getPort()); + assertTrue(client.isActive()); + clients.add(client); + } catch (IOException e) { + failed.incrementAndGet(); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - }; + }); if (concurrent) { attempts[i].start(); @@ -113,8 +111,8 @@ public class TransportClientFactorySuite { } // Wait until all the threads complete. - for (int i = 0; i < attempts.length; i++) { - attempts[i].join(); + for (Thread attempt : attempts) { + attempt.join(); } Assert.assertEquals(0, failed.get()); @@ -150,7 +148,7 @@ public class TransportClientFactorySuite { TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), server2.getPort()); assertTrue(c1.isActive()); assertTrue(c2.isActive()); - assertTrue(c1 != c2); + assertNotSame(c1, c2); factory.close(); } @@ -167,7 +165,7 @@ public class TransportClientFactorySuite { assertFalse(c1.isActive()); TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), server1.getPort()); - assertFalse(c1 == c2); + assertNotSame(c1, c2); assertTrue(c2.isActive()); factory.close(); } @@ -207,8 +205,7 @@ public class TransportClientFactorySuite { } }); TransportContext context = new TransportContext(conf, new NoOpRpcHandler(), true); - TransportClientFactory factory = context.createClientFactory(); - try { + try (TransportClientFactory factory = context.createClientFactory()) { TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort()); assertTrue(c1.isActive()); long expiredTime = System.currentTimeMillis() + 10000; // 10 seconds @@ -216,8 +213,6 @@ public class TransportClientFactorySuite { Thread.sleep(10); } assertFalse(c1.isActive()); - } finally { - factory.close(); } } } http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java b/common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java index 128f7cb..4477c9a 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java @@ -24,8 +24,6 @@ import io.netty.channel.local.LocalChannel; import org.junit.Test; import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.*; import org.apache.spark.network.buffer.ManagedBuffer; @@ -54,7 +52,7 @@ public class TransportResponseHandlerSuite { assertEquals(1, handler.numOutstandingRequests()); handler.handle(new ChunkFetchSuccess(streamChunkId, new TestManagedBuffer(123))); - verify(callback, times(1)).onSuccess(eq(0), (ManagedBuffer) any()); + verify(callback, times(1)).onSuccess(eq(0), any()); assertEquals(0, handler.numOutstandingRequests()); } @@ -67,7 +65,7 @@ public class TransportResponseHandlerSuite { assertEquals(1, handler.numOutstandingRequests()); handler.handle(new ChunkFetchFailure(streamChunkId, "some error msg")); - verify(callback, times(1)).onFailure(eq(0), (Throwable) any()); + verify(callback, times(1)).onFailure(eq(0), any()); assertEquals(0, handler.numOutstandingRequests()); } @@ -84,9 +82,9 @@ public class TransportResponseHandlerSuite { handler.exceptionCaught(new Exception("duh duh duhhhh")); // should fail both b2 and b3 - verify(callback, times(1)).onSuccess(eq(0), (ManagedBuffer) any()); - verify(callback, times(1)).onFailure(eq(1), (Throwable) any()); - verify(callback, times(1)).onFailure(eq(2), (Throwable) any()); + verify(callback, times(1)).onSuccess(eq(0), any()); + verify(callback, times(1)).onFailure(eq(1), any()); + verify(callback, times(1)).onFailure(eq(2), any()); assertEquals(0, handler.numOutstandingRequests()); } @@ -118,7 +116,7 @@ public class TransportResponseHandlerSuite { assertEquals(1, handler.numOutstandingRequests()); handler.handle(new RpcFailure(12345, "oh no")); - verify(callback, times(1)).onFailure((Throwable) any()); + verify(callback, times(1)).onFailure(any()); assertEquals(0, handler.numOutstandingRequests()); } http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthIntegrationSuite.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthIntegrationSuite.java b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthIntegrationSuite.java index 21609d5..8751944 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthIntegrationSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthIntegrationSuite.java @@ -18,11 +18,11 @@ package org.apache.spark.network.crypto; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.List; import java.util.Map; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import io.netty.channel.Channel; import org.junit.After; import org.junit.Test; @@ -163,20 +163,17 @@ public class AuthIntegrationSuite { } void createServer(String secret, boolean enableAes) throws Exception { - TransportServerBootstrap introspector = new TransportServerBootstrap() { - @Override - public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler) { - AuthTestCtx.this.serverChannel = channel; - if (rpcHandler instanceof AuthRpcHandler) { - AuthTestCtx.this.authRpcHandler = (AuthRpcHandler) rpcHandler; - } - return rpcHandler; + TransportServerBootstrap introspector = (channel, rpcHandler) -> { + this.serverChannel = channel; + if (rpcHandler instanceof AuthRpcHandler) { + this.authRpcHandler = (AuthRpcHandler) rpcHandler; } + return rpcHandler; }; SecretKeyHolder keyHolder = createKeyHolder(secret); TransportServerBootstrap auth = enableAes ? new AuthServerBootstrap(conf, keyHolder) : new SaslServerBootstrap(conf, keyHolder); - this.server = ctx.createServer(Lists.newArrayList(auth, introspector)); + this.server = ctx.createServer(Arrays.asList(auth, introspector)); } void createClient(String secret) throws Exception { @@ -186,7 +183,7 @@ public class AuthIntegrationSuite { void createClient(String secret, boolean enableAes) throws Exception { TransportConf clientConf = enableAes ? conf : new TransportConf("rpc", MapConfigProvider.EMPTY); - List<TransportClientBootstrap> bootstraps = Lists.<TransportClientBootstrap>newArrayList( + List<TransportClientBootstrap> bootstraps = Arrays.asList( new AuthClientBootstrap(clientConf, appId, createKeyHolder(secret))); this.client = ctx.createClientFactory(bootstraps) .createClient(TestUtils.getLocalHost(), server.getPort()); http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java b/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java index 87129b9..6f15718 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.*; import java.io.File; import java.lang.reflect.Method; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -35,7 +36,6 @@ import java.util.concurrent.atomic.AtomicReference; import javax.security.sasl.SaslException; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import com.google.common.io.ByteStreams; import com.google.common.io.Files; import io.netty.buffer.ByteBuf; @@ -45,8 +45,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.apache.spark.network.TestUtils; import org.apache.spark.network.TransportContext; @@ -137,18 +135,15 @@ public class SparkSaslSuite { testBasicSasl(true); } - private void testBasicSasl(boolean encrypt) throws Throwable { + private static void testBasicSasl(boolean encrypt) throws Throwable { RpcHandler rpcHandler = mock(RpcHandler.class); - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) { - ByteBuffer message = (ByteBuffer) invocation.getArguments()[1]; - RpcResponseCallback cb = (RpcResponseCallback) invocation.getArguments()[2]; - assertEquals("Ping", JavaUtils.bytesToString(message)); - cb.onSuccess(JavaUtils.stringToBytes("Pong")); - return null; - } - }) + doAnswer(invocation -> { + ByteBuffer message = (ByteBuffer) invocation.getArguments()[1]; + RpcResponseCallback cb = (RpcResponseCallback) invocation.getArguments()[2]; + assertEquals("Ping", JavaUtils.bytesToString(message)); + cb.onSuccess(JavaUtils.stringToBytes("Pong")); + return null; + }) .when(rpcHandler) .receive(any(TransportClient.class), any(ByteBuffer.class), any(RpcResponseCallback.class)); @@ -255,21 +250,17 @@ public class SparkSaslSuite { @Test public void testFileRegionEncryption() throws Exception { - final Map<String, String> testConf = ImmutableMap.of( + Map<String, String> testConf = ImmutableMap.of( "spark.network.sasl.maxEncryptedBlockSize", "1k"); - final AtomicReference<ManagedBuffer> response = new AtomicReference<>(); - final File file = File.createTempFile("sasltest", ".txt"); + AtomicReference<ManagedBuffer> response = new AtomicReference<>(); + File file = File.createTempFile("sasltest", ".txt"); SaslTestCtx ctx = null; try { - final TransportConf conf = new TransportConf("shuffle", new MapConfigProvider(testConf)); + TransportConf conf = new TransportConf("shuffle", new MapConfigProvider(testConf)); StreamManager sm = mock(StreamManager.class); - when(sm.getChunk(anyLong(), anyInt())).thenAnswer(new Answer<ManagedBuffer>() { - @Override - public ManagedBuffer answer(InvocationOnMock invocation) { - return new FileSegmentManagedBuffer(conf, file, 0, file.length()); - } - }); + when(sm.getChunk(anyLong(), anyInt())).thenAnswer(invocation -> + new FileSegmentManagedBuffer(conf, file, 0, file.length())); RpcHandler rpcHandler = mock(RpcHandler.class); when(rpcHandler.getStreamManager()).thenReturn(sm); @@ -280,18 +271,15 @@ public class SparkSaslSuite { ctx = new SaslTestCtx(rpcHandler, true, false, testConf); - final CountDownLatch lock = new CountDownLatch(1); + CountDownLatch lock = new CountDownLatch(1); ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class); - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) { - response.set((ManagedBuffer) invocation.getArguments()[1]); - response.get().retain(); - lock.countDown(); - return null; - } - }).when(callback).onSuccess(anyInt(), any(ManagedBuffer.class)); + doAnswer(invocation -> { + response.set((ManagedBuffer) invocation.getArguments()[1]); + response.get().retain(); + lock.countDown(); + return null; + }).when(callback).onSuccess(anyInt(), any(ManagedBuffer.class)); ctx.client.fetchChunk(0, 0, callback); lock.await(10, TimeUnit.SECONDS); @@ -388,7 +376,7 @@ public class SparkSaslSuite { boolean disableClientEncryption) throws Exception { - this(rpcHandler, encrypt, disableClientEncryption, Collections.<String, String>emptyMap()); + this(rpcHandler, encrypt, disableClientEncryption, Collections.emptyMap()); } SaslTestCtx( @@ -416,7 +404,7 @@ public class SparkSaslSuite { checker)); try { - List<TransportClientBootstrap> clientBootstraps = Lists.newArrayList(); + List<TransportClientBootstrap> clientBootstraps = new ArrayList<>(); clientBootstraps.add(new SaslClientBootstrap(conf, "user", keyHolder)); if (disableClientEncryption) { clientBootstraps.add(new EncryptionDisablerBootstrap()); @@ -468,11 +456,6 @@ public class SparkSaslSuite { } @Override - public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - super.handlerRemoved(ctx); - } - - @Override public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler) { channel.pipeline().addFirst("encryptionChecker", this); return rpcHandler; http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java b/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java index d4de4a9..b53e413 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java @@ -28,8 +28,6 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import org.junit.AfterClass; import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import static org.junit.Assert.*; import static org.mockito.Mockito.*; @@ -52,7 +50,7 @@ public class TransportFrameDecoderSuite { @Test public void testInterception() throws Exception { - final int interceptedReads = 3; + int interceptedReads = 3; TransportFrameDecoder decoder = new TransportFrameDecoder(); TransportFrameDecoder.Interceptor interceptor = spy(new MockInterceptor(interceptedReads)); ChannelHandlerContext ctx = mockChannelHandlerContext(); @@ -84,22 +82,19 @@ public class TransportFrameDecoderSuite { public void testRetainedFrames() throws Exception { TransportFrameDecoder decoder = new TransportFrameDecoder(); - final AtomicInteger count = new AtomicInteger(); - final List<ByteBuf> retained = new ArrayList<>(); + AtomicInteger count = new AtomicInteger(); + List<ByteBuf> retained = new ArrayList<>(); ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); - when(ctx.fireChannelRead(any())).thenAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock in) { - // Retain a few frames but not others. - ByteBuf buf = (ByteBuf) in.getArguments()[0]; - if (count.incrementAndGet() % 2 == 0) { - retained.add(buf); - } else { - buf.release(); - } - return null; + when(ctx.fireChannelRead(any())).thenAnswer(in -> { + // Retain a few frames but not others. + ByteBuf buf = (ByteBuf) in.getArguments()[0]; + if (count.incrementAndGet() % 2 == 0) { + retained.add(buf); + } else { + buf.release(); } + return null; }); ByteBuf data = createAndFeedFrames(100, decoder, ctx); @@ -150,12 +145,6 @@ public class TransportFrameDecoderSuite { testInvalidFrame(8); } - @Test(expected = IllegalArgumentException.class) - public void testLargeFrame() throws Exception { - // Frame length includes the frame size field, so need to add a few more bytes. - testInvalidFrame(Integer.MAX_VALUE + 9); - } - /** * Creates a number of randomly sized frames and feed them to the given decoder, verifying * that the frames were read. @@ -210,13 +199,10 @@ public class TransportFrameDecoderSuite { private ChannelHandlerContext mockChannelHandlerContext() { ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); - when(ctx.fireChannelRead(any())).thenAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock in) { - ByteBuf buf = (ByteBuf) in.getArguments()[0]; - buf.release(); - return null; - } + when(ctx.fireChannelRead(any())).thenAnswer(in -> { + ByteBuf buf = (ByteBuf) in.getArguments()[0]; + buf.release(); + return null; }); return ctx; } http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java index 52f50a3..c0e170e 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java @@ -19,11 +19,11 @@ package org.apache.spark.network.sasl; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; -import com.google.common.collect.Lists; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -38,7 +38,6 @@ import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.client.ChunkReceivedCallback; import org.apache.spark.network.client.RpcResponseCallback; import org.apache.spark.network.client.TransportClient; -import org.apache.spark.network.client.TransportClientBootstrap; import org.apache.spark.network.client.TransportClientFactory; import org.apache.spark.network.server.OneForOneStreamManager; import org.apache.spark.network.server.RpcHandler; @@ -105,8 +104,7 @@ public class SaslIntegrationSuite { @Test public void testGoodClient() throws IOException, InterruptedException { clientFactory = context.createClientFactory( - Lists.<TransportClientBootstrap>newArrayList( - new SaslClientBootstrap(conf, "app-1", secretKeyHolder))); + Arrays.asList(new SaslClientBootstrap(conf, "app-1", secretKeyHolder))); TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort()); String msg = "Hello, World!"; @@ -120,8 +118,7 @@ public class SaslIntegrationSuite { when(badKeyHolder.getSaslUser(anyString())).thenReturn("other-app"); when(badKeyHolder.getSecretKey(anyString())).thenReturn("wrong-password"); clientFactory = context.createClientFactory( - Lists.<TransportClientBootstrap>newArrayList( - new SaslClientBootstrap(conf, "unknown-app", badKeyHolder))); + Arrays.asList(new SaslClientBootstrap(conf, "unknown-app", badKeyHolder))); try { // Bootstrap should fail on startup. @@ -134,8 +131,7 @@ public class SaslIntegrationSuite { @Test public void testNoSaslClient() throws IOException, InterruptedException { - clientFactory = context.createClientFactory( - Lists.<TransportClientBootstrap>newArrayList()); + clientFactory = context.createClientFactory(new ArrayList<>()); TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort()); try { @@ -159,15 +155,11 @@ public class SaslIntegrationSuite { RpcHandler handler = new TestRpcHandler(); TransportContext context = new TransportContext(conf, handler); clientFactory = context.createClientFactory( - Lists.<TransportClientBootstrap>newArrayList( - new SaslClientBootstrap(conf, "app-1", secretKeyHolder))); - TransportServer server = context.createServer(); - try { + Arrays.asList(new SaslClientBootstrap(conf, "app-1", secretKeyHolder))); + try (TransportServer server = context.createServer()) { clientFactory.createClient(TestUtils.getLocalHost(), server.getPort()); } catch (Exception e) { assertTrue(e.getMessage(), e.getMessage().contains("Digest-challenge format violation")); - } finally { - server.close(); } } @@ -191,14 +183,13 @@ public class SaslIntegrationSuite { try { // Create a client, and make a request to fetch blocks from a different app. clientFactory = blockServerContext.createClientFactory( - Lists.<TransportClientBootstrap>newArrayList( - new SaslClientBootstrap(conf, "app-1", secretKeyHolder))); + Arrays.asList(new SaslClientBootstrap(conf, "app-1", secretKeyHolder))); client1 = clientFactory.createClient(TestUtils.getLocalHost(), blockServer.getPort()); - final AtomicReference<Throwable> exception = new AtomicReference<>(); + AtomicReference<Throwable> exception = new AtomicReference<>(); - final CountDownLatch blockFetchLatch = new CountDownLatch(1); + CountDownLatch blockFetchLatch = new CountDownLatch(1); BlockFetchingListener listener = new BlockFetchingListener() { @Override public void onBlockFetchSuccess(String blockId, ManagedBuffer data) { @@ -235,12 +226,11 @@ public class SaslIntegrationSuite { // Create a second client, authenticated with a different app ID, and try to read from // the stream created for the previous app. clientFactory2 = blockServerContext.createClientFactory( - Lists.<TransportClientBootstrap>newArrayList( - new SaslClientBootstrap(conf, "app-2", secretKeyHolder))); + Arrays.asList(new SaslClientBootstrap(conf, "app-2", secretKeyHolder))); client2 = clientFactory2.createClient(TestUtils.getLocalHost(), blockServer.getPort()); - final CountDownLatch chunkReceivedLatch = new CountDownLatch(1); + CountDownLatch chunkReceivedLatch = new CountDownLatch(1); ChunkReceivedCallback callback = new ChunkReceivedCallback() { @Override public void onSuccess(int chunkIndex, ManagedBuffer buffer) { @@ -284,7 +274,7 @@ public class SaslIntegrationSuite { } } - private void checkSecurityException(Throwable t) { + private static void checkSecurityException(Throwable t) { assertNotNull("No exception was caught.", t); assertTrue("Expected SecurityException.", t.getMessage().contains(SecurityException.class.getName())); http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java index c036bc2..e47a72c 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java @@ -93,7 +93,7 @@ public class ExternalShuffleBlockHandlerSuite { ArgumentCaptor<ByteBuffer> response = ArgumentCaptor.forClass(ByteBuffer.class); verify(callback, times(1)).onSuccess(response.capture()); - verify(callback, never()).onFailure((Throwable) any()); + verify(callback, never()).onFailure(any()); StreamHandle handle = (StreamHandle) BlockTransferMessage.Decoder.fromByteBuffer(response.getValue()); http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java index 7757500..47c0870 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java @@ -60,12 +60,10 @@ public class ExternalShuffleCleanupSuite { public void cleanupUsesExecutor() throws IOException { TestShuffleDataContext dataContext = createSomeData(); - final AtomicBoolean cleanupCalled = new AtomicBoolean(false); + AtomicBoolean cleanupCalled = new AtomicBoolean(false); // Executor which does nothing to ensure we're actually using it. - Executor noThreadExecutor = new Executor() { - @Override public void execute(Runnable runnable) { cleanupCalled.set(true); } - }; + Executor noThreadExecutor = runnable -> cleanupCalled.set(true); ExternalShuffleBlockResolver manager = new ExternalShuffleBlockResolver(conf, null, noThreadExecutor); http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java index 88de6fb..b8ae04e 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java @@ -19,6 +19,7 @@ package org.apache.spark.network.shuffle; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.LinkedList; @@ -29,7 +30,6 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.junit.After; import org.junit.AfterClass; @@ -173,7 +173,7 @@ public class ExternalShuffleIntegrationSuite { FetchResult exec0Fetch = fetchBlocks("exec-0", new String[] { "shuffle_0_0_0" }); assertEquals(Sets.newHashSet("shuffle_0_0_0"), exec0Fetch.successBlocks); assertTrue(exec0Fetch.failedBlocks.isEmpty()); - assertBufferListsEqual(exec0Fetch.buffers, Lists.newArrayList(exec0Blocks[0])); + assertBufferListsEqual(exec0Fetch.buffers, Arrays.asList(exec0Blocks[0])); exec0Fetch.releaseBuffers(); } @@ -185,7 +185,7 @@ public class ExternalShuffleIntegrationSuite { assertEquals(Sets.newHashSet("shuffle_0_0_0", "shuffle_0_0_1", "shuffle_0_0_2"), exec0Fetch.successBlocks); assertTrue(exec0Fetch.failedBlocks.isEmpty()); - assertBufferListsEqual(exec0Fetch.buffers, Lists.newArrayList(exec0Blocks)); + assertBufferListsEqual(exec0Fetch.buffers, Arrays.asList(exec0Blocks)); exec0Fetch.releaseBuffers(); } @@ -241,7 +241,7 @@ public class ExternalShuffleIntegrationSuite { assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.failedBlocks); } - private void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo) + private static void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo) throws IOException, InterruptedException { ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false); client.init(APP_ID); @@ -249,7 +249,7 @@ public class ExternalShuffleIntegrationSuite { executorId, executorInfo); } - private void assertBufferListsEqual(List<ManagedBuffer> list0, List<byte[]> list1) + private static void assertBufferListsEqual(List<ManagedBuffer> list0, List<byte[]> list1) throws Exception { assertEquals(list0.size(), list1.size()); for (int i = 0; i < list0.size(); i ++) { @@ -257,7 +257,8 @@ public class ExternalShuffleIntegrationSuite { } } - private void assertBuffersEqual(ManagedBuffer buffer0, ManagedBuffer buffer1) throws Exception { + private static void assertBuffersEqual(ManagedBuffer buffer0, ManagedBuffer buffer1) + throws Exception { ByteBuffer nio0 = buffer0.nioByteBuffer(); ByteBuffer nio1 = buffer1.nioByteBuffer(); http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java index 2590b9c..3e51fea 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java @@ -25,8 +25,6 @@ import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.Maps; import io.netty.buffer.Unpooled; import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -85,8 +83,8 @@ public class OneForOneBlockFetcherSuite { // Each failure will cause a failure to be invoked in all remaining block fetches. verify(listener, times(1)).onBlockFetchSuccess("b0", blocks.get("b0")); - verify(listener, times(1)).onBlockFetchFailure(eq("b1"), (Throwable) any()); - verify(listener, times(2)).onBlockFetchFailure(eq("b2"), (Throwable) any()); + verify(listener, times(1)).onBlockFetchFailure(eq("b1"), any()); + verify(listener, times(2)).onBlockFetchFailure(eq("b2"), any()); } @Test @@ -100,15 +98,15 @@ public class OneForOneBlockFetcherSuite { // We may call both success and failure for the same block. verify(listener, times(1)).onBlockFetchSuccess("b0", blocks.get("b0")); - verify(listener, times(1)).onBlockFetchFailure(eq("b1"), (Throwable) any()); + verify(listener, times(1)).onBlockFetchFailure(eq("b1"), any()); verify(listener, times(1)).onBlockFetchSuccess("b2", blocks.get("b2")); - verify(listener, times(1)).onBlockFetchFailure(eq("b2"), (Throwable) any()); + verify(listener, times(1)).onBlockFetchFailure(eq("b2"), any()); } @Test public void testEmptyBlockFetch() { try { - fetchBlocks(Maps.<String, ManagedBuffer>newLinkedHashMap()); + fetchBlocks(Maps.newLinkedHashMap()); fail(); } catch (IllegalArgumentException e) { assertEquals("Zero-sized blockIds array", e.getMessage()); @@ -123,52 +121,46 @@ public class OneForOneBlockFetcherSuite { * * If a block's buffer is "null", an exception will be thrown instead. */ - private BlockFetchingListener fetchBlocks(final LinkedHashMap<String, ManagedBuffer> blocks) { + private static BlockFetchingListener fetchBlocks(LinkedHashMap<String, ManagedBuffer> blocks) { TransportClient client = mock(TransportClient.class); BlockFetchingListener listener = mock(BlockFetchingListener.class); - final String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]); + String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]); OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client, "app-id", "exec-id", blockIds, listener); - // Respond to the "OpenBlocks" message with an appropirate ShuffleStreamHandle with streamId 123 - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - BlockTransferMessage message = BlockTransferMessage.Decoder.fromByteBuffer( - (ByteBuffer) invocationOnMock.getArguments()[0]); - RpcResponseCallback callback = (RpcResponseCallback) invocationOnMock.getArguments()[1]; - callback.onSuccess(new StreamHandle(123, blocks.size()).toByteBuffer()); - assertEquals(new OpenBlocks("app-id", "exec-id", blockIds), message); - return null; - } + // Respond to the "OpenBlocks" message with an appropriate ShuffleStreamHandle with streamId 123 + doAnswer(invocationOnMock -> { + BlockTransferMessage message = BlockTransferMessage.Decoder.fromByteBuffer( + (ByteBuffer) invocationOnMock.getArguments()[0]); + RpcResponseCallback callback = (RpcResponseCallback) invocationOnMock.getArguments()[1]; + callback.onSuccess(new StreamHandle(123, blocks.size()).toByteBuffer()); + assertEquals(new OpenBlocks("app-id", "exec-id", blockIds), message); + return null; }).when(client).sendRpc(any(ByteBuffer.class), any(RpcResponseCallback.class)); // Respond to each chunk request with a single buffer from our blocks array. - final AtomicInteger expectedChunkIndex = new AtomicInteger(0); - final Iterator<ManagedBuffer> blockIterator = blocks.values().iterator(); - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - try { - long streamId = (Long) invocation.getArguments()[0]; - int myChunkIndex = (Integer) invocation.getArguments()[1]; - assertEquals(123, streamId); - assertEquals(expectedChunkIndex.getAndIncrement(), myChunkIndex); - - ChunkReceivedCallback callback = (ChunkReceivedCallback) invocation.getArguments()[2]; - ManagedBuffer result = blockIterator.next(); - if (result != null) { - callback.onSuccess(myChunkIndex, result); - } else { - callback.onFailure(myChunkIndex, new RuntimeException("Failed " + myChunkIndex)); - } - } catch (Exception e) { - e.printStackTrace(); - fail("Unexpected failure"); + AtomicInteger expectedChunkIndex = new AtomicInteger(0); + Iterator<ManagedBuffer> blockIterator = blocks.values().iterator(); + doAnswer(invocation -> { + try { + long streamId = (Long) invocation.getArguments()[0]; + int myChunkIndex = (Integer) invocation.getArguments()[1]; + assertEquals(123, streamId); + assertEquals(expectedChunkIndex.getAndIncrement(), myChunkIndex); + + ChunkReceivedCallback callback = (ChunkReceivedCallback) invocation.getArguments()[2]; + ManagedBuffer result = blockIterator.next(); + if (result != null) { + callback.onSuccess(myChunkIndex, result); + } else { + callback.onFailure(myChunkIndex, new RuntimeException("Failed " + myChunkIndex)); } - return null; + } catch (Exception e) { + e.printStackTrace(); + fail("Unexpected failure"); } - }).when(client).fetchChunk(anyLong(), anyInt(), (ChunkReceivedCallback) any()); + return null; + }).when(client).fetchChunk(anyLong(), anyInt(), any()); fetcher.start(); return listener; http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java index 6db71ee..a530e16 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java @@ -28,7 +28,6 @@ import java.util.Map; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.mockito.stubbing.Stubber; @@ -84,7 +83,7 @@ public class RetryingBlockFetcherSuite { performInteractions(interactions, listener); - verify(listener).onBlockFetchFailure(eq("b0"), (Throwable) any()); + verify(listener).onBlockFetchFailure(eq("b0"), any()); verify(listener).onBlockFetchSuccess("b1", block1); verifyNoMoreInteractions(listener); } @@ -190,7 +189,7 @@ public class RetryingBlockFetcherSuite { performInteractions(interactions, listener); verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0); - verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), (Throwable) any()); + verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), any()); verifyNoMoreInteractions(listener); } @@ -220,7 +219,7 @@ public class RetryingBlockFetcherSuite { performInteractions(interactions, listener); verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0); - verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), (Throwable) any()); + verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), any()); verify(listener, timeout(5000)).onBlockFetchSuccess("b2", block2); verifyNoMoreInteractions(listener); } @@ -249,40 +248,37 @@ public class RetryingBlockFetcherSuite { Stubber stub = null; // Contains all blockIds that are referenced across all interactions. - final LinkedHashSet<String> blockIds = Sets.newLinkedHashSet(); + LinkedHashSet<String> blockIds = Sets.newLinkedHashSet(); - for (final Map<String, Object> interaction : interactions) { + for (Map<String, Object> interaction : interactions) { blockIds.addAll(interaction.keySet()); - Answer<Void> answer = new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - try { - // Verify that the RetryingBlockFetcher requested the expected blocks. - String[] requestedBlockIds = (String[]) invocationOnMock.getArguments()[0]; - String[] desiredBlockIds = interaction.keySet().toArray(new String[interaction.size()]); - assertArrayEquals(desiredBlockIds, requestedBlockIds); - - // Now actually invoke the success/failure callbacks on each block. - BlockFetchingListener retryListener = - (BlockFetchingListener) invocationOnMock.getArguments()[1]; - for (Map.Entry<String, Object> block : interaction.entrySet()) { - String blockId = block.getKey(); - Object blockValue = block.getValue(); - - if (blockValue instanceof ManagedBuffer) { - retryListener.onBlockFetchSuccess(blockId, (ManagedBuffer) blockValue); - } else if (blockValue instanceof Exception) { - retryListener.onBlockFetchFailure(blockId, (Exception) blockValue); - } else { - fail("Can only handle ManagedBuffers and Exceptions, got " + blockValue); - } + Answer<Void> answer = invocationOnMock -> { + try { + // Verify that the RetryingBlockFetcher requested the expected blocks. + String[] requestedBlockIds = (String[]) invocationOnMock.getArguments()[0]; + String[] desiredBlockIds = interaction.keySet().toArray(new String[interaction.size()]); + assertArrayEquals(desiredBlockIds, requestedBlockIds); + + // Now actually invoke the success/failure callbacks on each block. + BlockFetchingListener retryListener = + (BlockFetchingListener) invocationOnMock.getArguments()[1]; + for (Map.Entry<String, Object> block : interaction.entrySet()) { + String blockId = block.getKey(); + Object blockValue = block.getValue(); + + if (blockValue instanceof ManagedBuffer) { + retryListener.onBlockFetchSuccess(blockId, (ManagedBuffer) blockValue); + } else if (blockValue instanceof Exception) { + retryListener.onBlockFetchFailure(blockId, (Exception) blockValue); + } else { + fail("Can only handle ManagedBuffers and Exceptions, got " + blockValue); } - return null; - } catch (Throwable e) { - e.printStackTrace(); - throw e; } + return null; + } catch (Throwable e) { + e.printStackTrace(); + throw e; } }; @@ -295,7 +291,7 @@ public class RetryingBlockFetcherSuite { } assertNotNull(stub); - stub.when(fetchStarter).createAndStart((String[]) any(), (BlockFetchingListener) anyObject()); + stub.when(fetchStarter).createAndStart(any(), anyObject()); String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]); new RetryingBlockFetcher(conf, fetchStarter, blockIdArray, listener).start(); } http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 189d607..29aca04 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -37,7 +37,6 @@ import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.UnsafeAlignedOffset; import org.apache.spark.unsafe.array.LongArray; import org.apache.spark.unsafe.memory.MemoryBlock; -import org.apache.spark.util.TaskCompletionListener; import org.apache.spark.util.Utils; /** http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java b/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java index 7fe452a..a6589d2 100644 --- a/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java +++ b/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java @@ -20,14 +20,11 @@ import java.io.Serializable; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; -import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function2; import org.apache.spark.rdd.JdbcRDD; import org.junit.After; import org.junit.Assert; @@ -89,30 +86,13 @@ public class JavaJdbcRDDSuite implements Serializable { public void testJavaJdbcRDD() throws Exception { JavaRDD<Integer> rdd = JdbcRDD.create( sc, - new JdbcRDD.ConnectionFactory() { - @Override - public Connection getConnection() throws SQLException { - return DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb"); - } - }, + () -> DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb"), "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?", 1, 100, 1, - new Function<ResultSet, Integer>() { - @Override - public Integer call(ResultSet r) throws Exception { - return r.getInt(1); - } - } + r -> r.getInt(1) ).cache(); Assert.assertEquals(100, rdd.count()); - Assert.assertEquals( - Integer.valueOf(10100), - rdd.reduce(new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - })); + Assert.assertEquals(Integer.valueOf(10100), rdd.reduce((i1, i2) -> i1 + i2)); } } http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 088b681..24a55df 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -34,8 +34,6 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.apache.spark.HashPartitioner; import org.apache.spark.ShuffleDependency; @@ -119,9 +117,7 @@ public class UnsafeShuffleWriterSuite { any(File.class), any(SerializerInstance.class), anyInt(), - any(ShuffleWriteMetrics.class))).thenAnswer(new Answer<DiskBlockObjectWriter>() { - @Override - public DiskBlockObjectWriter answer(InvocationOnMock invocationOnMock) throws Throwable { + any(ShuffleWriteMetrics.class))).thenAnswer(invocationOnMock -> { Object[] args = invocationOnMock.getArguments(); return new DiskBlockObjectWriter( (File) args[1], @@ -132,33 +128,24 @@ public class UnsafeShuffleWriterSuite { (ShuffleWriteMetrics) args[4], (BlockId) args[0] ); - } - }); + }); when(shuffleBlockResolver.getDataFile(anyInt(), anyInt())).thenReturn(mergedOutputFile); - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - partitionSizesInMergedFile = (long[]) invocationOnMock.getArguments()[2]; - File tmp = (File) invocationOnMock.getArguments()[3]; - mergedOutputFile.delete(); - tmp.renameTo(mergedOutputFile); - return null; - } + doAnswer(invocationOnMock -> { + partitionSizesInMergedFile = (long[]) invocationOnMock.getArguments()[2]; + File tmp = (File) invocationOnMock.getArguments()[3]; + mergedOutputFile.delete(); + tmp.renameTo(mergedOutputFile); + return null; }).when(shuffleBlockResolver) .writeIndexFileAndCommit(anyInt(), anyInt(), any(long[].class), any(File.class)); - when(diskBlockManager.createTempShuffleBlock()).thenAnswer( - new Answer<Tuple2<TempShuffleBlockId, File>>() { - @Override - public Tuple2<TempShuffleBlockId, File> answer( - InvocationOnMock invocationOnMock) throws Throwable { - TempShuffleBlockId blockId = new TempShuffleBlockId(UUID.randomUUID()); - File file = File.createTempFile("spillFile", ".spill", tempDir); - spillFilesCreated.add(file); - return Tuple2$.MODULE$.apply(blockId, file); - } - }); + when(diskBlockManager.createTempShuffleBlock()).thenAnswer(invocationOnMock -> { + TempShuffleBlockId blockId = new TempShuffleBlockId(UUID.randomUUID()); + File file = File.createTempFile("spillFile", ".spill", tempDir); + spillFilesCreated.add(file); + return Tuple2$.MODULE$.apply(blockId, file); + }); when(taskContext.taskMetrics()).thenReturn(taskMetrics); when(shuffleDep.serializer()).thenReturn(serializer); @@ -243,7 +230,7 @@ public class UnsafeShuffleWriterSuite { @Test public void writeEmptyIterator() throws Exception { final UnsafeShuffleWriter<Object, Object> writer = createWriter(true); - writer.write(Iterators.<Product2<Object, Object>>emptyIterator()); + writer.write(Iterators.emptyIterator()); final Option<MapStatus> mapStatus = writer.stop(true); assertTrue(mapStatus.isDefined()); assertTrue(mergedOutputFile.exists()); @@ -259,7 +246,7 @@ public class UnsafeShuffleWriterSuite { // In this example, each partition should have exactly one record: final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>(); for (int i = 0; i < NUM_PARTITITONS; i++) { - dataToWrite.add(new Tuple2<Object, Object>(i, i)); + dataToWrite.add(new Tuple2<>(i, i)); } final UnsafeShuffleWriter<Object, Object> writer = createWriter(true); writer.write(dataToWrite.iterator()); @@ -315,7 +302,7 @@ public class UnsafeShuffleWriterSuite { final UnsafeShuffleWriter<Object, Object> writer = createWriter(transferToEnabled); final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>(); for (int i : new int[] { 1, 2, 3, 4, 4, 2 }) { - dataToWrite.add(new Tuple2<Object, Object>(i, i)); + dataToWrite.add(new Tuple2<>(i, i)); } writer.insertRecordIntoSorter(dataToWrite.get(0)); writer.insertRecordIntoSorter(dataToWrite.get(1)); @@ -424,7 +411,7 @@ public class UnsafeShuffleWriterSuite { final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>(); final byte[] bigByteArray = new byte[PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES / 10]; for (int i = 0; i < 10 + 1; i++) { - dataToWrite.add(new Tuple2<Object, Object>(i, bigByteArray)); + dataToWrite.add(new Tuple2<>(i, bigByteArray)); } writer.write(dataToWrite.iterator()); assertEquals(2, spillFilesCreated.size()); @@ -458,7 +445,7 @@ public class UnsafeShuffleWriterSuite { final UnsafeShuffleWriter<Object, Object> writer = createWriter(false); final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>(); for (int i = 0; i < UnsafeShuffleWriter.DEFAULT_INITIAL_SORT_BUFFER_SIZE + 1; i++) { - dataToWrite.add(new Tuple2<Object, Object>(i, i)); + dataToWrite.add(new Tuple2<>(i, i)); } writer.write(dataToWrite.iterator()); writer.stop(true); @@ -478,7 +465,7 @@ public class UnsafeShuffleWriterSuite { final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>(); final byte[] bytes = new byte[(int) (ShuffleExternalSorter.DISK_WRITE_BUFFER_SIZE * 2.5)]; new Random(42).nextBytes(bytes); - dataToWrite.add(new Tuple2<Object, Object>(1, ByteBuffer.wrap(bytes))); + dataToWrite.add(new Tuple2<>(1, ByteBuffer.wrap(bytes))); writer.write(dataToWrite.iterator()); writer.stop(true); assertEquals( @@ -491,15 +478,15 @@ public class UnsafeShuffleWriterSuite { public void writeRecordsThatAreBiggerThanMaxRecordSize() throws Exception { final UnsafeShuffleWriter<Object, Object> writer = createWriter(false); final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>(); - dataToWrite.add(new Tuple2<Object, Object>(1, ByteBuffer.wrap(new byte[1]))); + dataToWrite.add(new Tuple2<>(1, ByteBuffer.wrap(new byte[1]))); // We should be able to write a record that's right _at_ the max record size final byte[] atMaxRecordSize = new byte[(int) taskMemoryManager.pageSizeBytes() - 4]; new Random(42).nextBytes(atMaxRecordSize); - dataToWrite.add(new Tuple2<Object, Object>(2, ByteBuffer.wrap(atMaxRecordSize))); + dataToWrite.add(new Tuple2<>(2, ByteBuffer.wrap(atMaxRecordSize))); // Inserting a record that's larger than the max record size final byte[] exceedsMaxRecordSize = new byte[(int) taskMemoryManager.pageSizeBytes()]; new Random(42).nextBytes(exceedsMaxRecordSize); - dataToWrite.add(new Tuple2<Object, Object>(3, ByteBuffer.wrap(exceedsMaxRecordSize))); + dataToWrite.add(new Tuple2<>(3, ByteBuffer.wrap(exceedsMaxRecordSize))); writer.write(dataToWrite.iterator()); writer.stop(true); assertEquals( @@ -511,10 +498,10 @@ public class UnsafeShuffleWriterSuite { @Test public void spillFilesAreDeletedWhenStoppingAfterError() throws IOException { final UnsafeShuffleWriter<Object, Object> writer = createWriter(false); - writer.insertRecordIntoSorter(new Tuple2<Object, Object>(1, 1)); - writer.insertRecordIntoSorter(new Tuple2<Object, Object>(2, 2)); + writer.insertRecordIntoSorter(new Tuple2<>(1, 1)); + writer.insertRecordIntoSorter(new Tuple2<>(2, 2)); writer.forceSorterToSpill(); - writer.insertRecordIntoSorter(new Tuple2<Object, Object>(2, 2)); + writer.insertRecordIntoSorter(new Tuple2<>(2, 2)); writer.stop(false); assertSpillFilesWereCleanedUp(); } http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java index 2656814..03cec8e 100644 --- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java +++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; -import scala.Tuple2; import scala.Tuple2$; import org.junit.After; @@ -31,8 +30,6 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.apache.spark.SparkConf; import org.apache.spark.executor.ShuffleWriteMetrics; @@ -88,25 +85,18 @@ public abstract class AbstractBytesToBytesMapSuite { spillFilesCreated.clear(); MockitoAnnotations.initMocks(this); when(blockManager.diskBlockManager()).thenReturn(diskBlockManager); - when(diskBlockManager.createTempLocalBlock()).thenAnswer( - new Answer<Tuple2<TempLocalBlockId, File>>() { - @Override - public Tuple2<TempLocalBlockId, File> answer(InvocationOnMock invocationOnMock) - throws Throwable { - TempLocalBlockId blockId = new TempLocalBlockId(UUID.randomUUID()); - File file = File.createTempFile("spillFile", ".spill", tempDir); - spillFilesCreated.add(file); - return Tuple2$.MODULE$.apply(blockId, file); - } + when(diskBlockManager.createTempLocalBlock()).thenAnswer(invocationOnMock -> { + TempLocalBlockId blockId = new TempLocalBlockId(UUID.randomUUID()); + File file = File.createTempFile("spillFile", ".spill", tempDir); + spillFilesCreated.add(file); + return Tuple2$.MODULE$.apply(blockId, file); }); when(blockManager.getDiskWriter( any(BlockId.class), any(File.class), any(SerializerInstance.class), anyInt(), - any(ShuffleWriteMetrics.class))).thenAnswer(new Answer<DiskBlockObjectWriter>() { - @Override - public DiskBlockObjectWriter answer(InvocationOnMock invocationOnMock) throws Throwable { + any(ShuffleWriteMetrics.class))).thenAnswer(invocationOnMock -> { Object[] args = invocationOnMock.getArguments(); return new DiskBlockObjectWriter( @@ -118,8 +108,7 @@ public abstract class AbstractBytesToBytesMapSuite { (ShuffleWriteMetrics) args[4], (BlockId) args[0] ); - } - }); + }); } @After http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index fbbe530..771d390 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.UUID; -import scala.Tuple2; import scala.Tuple2$; import org.junit.After; @@ -31,8 +30,6 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.apache.spark.SparkConf; import org.apache.spark.TaskContext; @@ -96,25 +93,18 @@ public class UnsafeExternalSorterSuite { taskContext = mock(TaskContext.class); when(taskContext.taskMetrics()).thenReturn(new TaskMetrics()); when(blockManager.diskBlockManager()).thenReturn(diskBlockManager); - when(diskBlockManager.createTempLocalBlock()).thenAnswer( - new Answer<Tuple2<TempLocalBlockId, File>>() { - @Override - public Tuple2<TempLocalBlockId, File> answer(InvocationOnMock invocationOnMock) - throws Throwable { - TempLocalBlockId blockId = new TempLocalBlockId(UUID.randomUUID()); - File file = File.createTempFile("spillFile", ".spill", tempDir); - spillFilesCreated.add(file); - return Tuple2$.MODULE$.apply(blockId, file); - } + when(diskBlockManager.createTempLocalBlock()).thenAnswer(invocationOnMock -> { + TempLocalBlockId blockId = new TempLocalBlockId(UUID.randomUUID()); + File file = File.createTempFile("spillFile", ".spill", tempDir); + spillFilesCreated.add(file); + return Tuple2$.MODULE$.apply(blockId, file); }); when(blockManager.getDiskWriter( any(BlockId.class), any(File.class), any(SerializerInstance.class), anyInt(), - any(ShuffleWriteMetrics.class))).thenAnswer(new Answer<DiskBlockObjectWriter>() { - @Override - public DiskBlockObjectWriter answer(InvocationOnMock invocationOnMock) throws Throwable { + any(ShuffleWriteMetrics.class))).thenAnswer(invocationOnMock -> { Object[] args = invocationOnMock.getArguments(); return new DiskBlockObjectWriter( @@ -126,8 +116,7 @@ public class UnsafeExternalSorterSuite { (ShuffleWriteMetrics) args[4], (BlockId) args[0] ); - } - }); + }); } @After http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java b/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java index e22ad89..1d2b05e 100644 --- a/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java +++ b/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java @@ -64,12 +64,7 @@ public class Java8RDDAPISuite implements Serializable { public void foreachWithAnonymousClass() { foreachCalls = 0; JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World")); - rdd.foreach(new VoidFunction<String>() { - @Override - public void call(String s) { - foreachCalls++; - } - }); + rdd.foreach(s -> foreachCalls++); Assert.assertEquals(2, foreachCalls); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
