http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/main/java/org/apache/spark/network/server/DefaultStreamManager.java ---------------------------------------------------------------------- diff --git a/network/common/src/main/java/org/apache/spark/network/server/DefaultStreamManager.java b/network/common/src/main/java/org/apache/spark/network/server/DefaultStreamManager.java deleted file mode 100644 index 9688705..0000000 --- a/network/common/src/main/java/org/apache/spark/network/server/DefaultStreamManager.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.network.server; - -import java.util.Iterator; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.spark.network.buffer.ManagedBuffer; - -/** - * StreamManager which allows registration of an Iterator<ManagedBuffer>, which are individually - * fetched as chunks by the client. - */ -public class DefaultStreamManager extends StreamManager { - private final Logger logger = LoggerFactory.getLogger(DefaultStreamManager.class); - - private final AtomicLong nextStreamId; - private final Map<Long, StreamState> streams; - - /** State of a single stream. */ - private static class StreamState { - final Iterator<ManagedBuffer> buffers; - - // Used to keep track of the index of the buffer that the user has retrieved, just to ensure - // that the caller only requests each chunk one at a time, in order. - int curChunk = 0; - - StreamState(Iterator<ManagedBuffer> buffers) { - this.buffers = buffers; - } - } - - public DefaultStreamManager() { - // For debugging purposes, start with a random stream id to help identifying different streams. - // This does not need to be globally unique, only unique to this class. - nextStreamId = new AtomicLong((long) new Random().nextInt(Integer.MAX_VALUE) * 1000); - streams = new ConcurrentHashMap<Long, StreamState>(); - } - - @Override - public ManagedBuffer getChunk(long streamId, int chunkIndex) { - StreamState state = streams.get(streamId); - if (chunkIndex != state.curChunk) { - throw new IllegalStateException(String.format( - "Received out-of-order chunk index %s (expected %s)", chunkIndex, state.curChunk)); - } else if (!state.buffers.hasNext()) { - throw new IllegalStateException(String.format( - "Requested chunk index beyond end %s", chunkIndex)); - } - state.curChunk += 1; - ManagedBuffer nextChunk = state.buffers.next(); - - if (!state.buffers.hasNext()) { - logger.trace("Removing stream id {}", streamId); - streams.remove(streamId); - } - - return nextChunk; - } - - @Override - public void connectionTerminated(long streamId) { - // Release all remaining buffers. - StreamState state = streams.remove(streamId); - if (state != null && state.buffers != null) { - while (state.buffers.hasNext()) { - state.buffers.next().release(); - } - } - } - - /** - * Registers a stream of ManagedBuffers which are served as individual chunks one at a time to - * callers. Each ManagedBuffer will be release()'d after it is transferred on the wire. If a - * client connection is closed before the iterator is fully drained, then the remaining buffers - * will all be release()'d. - */ - public long registerStream(Iterator<ManagedBuffer> buffers) { - long myStreamId = nextStreamId.getAndIncrement(); - streams.put(myStreamId, new StreamState(buffers)); - return myStreamId; - } -}
http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/main/java/org/apache/spark/network/server/NoOpRpcHandler.java ---------------------------------------------------------------------- diff --git a/network/common/src/main/java/org/apache/spark/network/server/NoOpRpcHandler.java b/network/common/src/main/java/org/apache/spark/network/server/NoOpRpcHandler.java new file mode 100644 index 0000000..5a3f003 --- /dev/null +++ b/network/common/src/main/java/org/apache/spark/network/server/NoOpRpcHandler.java @@ -0,0 +1,38 @@ +package org.apache.spark.network.server; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.spark.network.client.RpcResponseCallback; +import org.apache.spark.network.client.TransportClient; + +/** An RpcHandler suitable for a client-only TransportContext, which cannot receive RPCs. */ +public class NoOpRpcHandler implements RpcHandler { + private final StreamManager streamManager; + + public NoOpRpcHandler() { + streamManager = new OneForOneStreamManager(); + } + + @Override + public void receive(TransportClient client, byte[] message, RpcResponseCallback callback) { + throw new UnsupportedOperationException("Cannot handle messages"); + } + + @Override + public StreamManager getStreamManager() { return streamManager; } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java ---------------------------------------------------------------------- diff --git a/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java new file mode 100644 index 0000000..731d48d --- /dev/null +++ b/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.server; + +import java.util.Iterator; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.buffer.ManagedBuffer; + +/** + * StreamManager which allows registration of an Iterator<ManagedBuffer>, which are individually + * fetched as chunks by the client. Each registered buffer is one chunk. + */ +public class OneForOneStreamManager extends StreamManager { + private final Logger logger = LoggerFactory.getLogger(OneForOneStreamManager.class); + + private final AtomicLong nextStreamId; + private final Map<Long, StreamState> streams; + + /** State of a single stream. */ + private static class StreamState { + final Iterator<ManagedBuffer> buffers; + + // Used to keep track of the index of the buffer that the user has retrieved, just to ensure + // that the caller only requests each chunk one at a time, in order. + int curChunk = 0; + + StreamState(Iterator<ManagedBuffer> buffers) { + this.buffers = buffers; + } + } + + public OneForOneStreamManager() { + // For debugging purposes, start with a random stream id to help identifying different streams. + // This does not need to be globally unique, only unique to this class. + nextStreamId = new AtomicLong((long) new Random().nextInt(Integer.MAX_VALUE) * 1000); + streams = new ConcurrentHashMap<Long, StreamState>(); + } + + @Override + public ManagedBuffer getChunk(long streamId, int chunkIndex) { + StreamState state = streams.get(streamId); + if (chunkIndex != state.curChunk) { + throw new IllegalStateException(String.format( + "Received out-of-order chunk index %s (expected %s)", chunkIndex, state.curChunk)); + } else if (!state.buffers.hasNext()) { + throw new IllegalStateException(String.format( + "Requested chunk index beyond end %s", chunkIndex)); + } + state.curChunk += 1; + ManagedBuffer nextChunk = state.buffers.next(); + + if (!state.buffers.hasNext()) { + logger.trace("Removing stream id {}", streamId); + streams.remove(streamId); + } + + return nextChunk; + } + + @Override + public void connectionTerminated(long streamId) { + // Release all remaining buffers. + StreamState state = streams.remove(streamId); + if (state != null && state.buffers != null) { + while (state.buffers.hasNext()) { + state.buffers.next().release(); + } + } + } + + /** + * Registers a stream of ManagedBuffers which are served as individual chunks one at a time to + * callers. Each ManagedBuffer will be release()'d after it is transferred on the wire. If a + * client connection is closed before the iterator is fully drained, then the remaining buffers + * will all be release()'d. + */ + public long registerStream(Iterator<ManagedBuffer> buffers) { + long myStreamId = nextStreamId.getAndIncrement(); + streams.put(myStreamId, new StreamState(buffers)); + return myStreamId; + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java ---------------------------------------------------------------------- diff --git a/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java b/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java index f54a696..2369dc6 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java +++ b/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java @@ -35,4 +35,10 @@ public interface RpcHandler { * RPC. */ void receive(TransportClient client, byte[] message, RpcResponseCallback callback); + + /** + * Returns the StreamManager which contains the state about which streams are currently being + * fetched by a TransportClient. + */ + StreamManager getStreamManager(); } http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java ---------------------------------------------------------------------- diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index 352f865..17fe900 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -56,24 +56,23 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> { /** Client on the same channel allowing us to talk back to the requester. */ private final TransportClient reverseClient; - /** Returns each chunk part of a stream. */ - private final StreamManager streamManager; - /** Handles all RPC messages. */ private final RpcHandler rpcHandler; + /** Returns each chunk part of a stream. */ + private final StreamManager streamManager; + /** List of all stream ids that have been read on this handler, used for cleanup. */ private final Set<Long> streamIds; public TransportRequestHandler( Channel channel, TransportClient reverseClient, - StreamManager streamManager, RpcHandler rpcHandler) { this.channel = channel; this.reverseClient = reverseClient; - this.streamManager = streamManager; this.rpcHandler = rpcHandler; + this.streamManager = rpcHandler.getStreamManager(); this.streamIds = Sets.newHashSet(); } http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java ---------------------------------------------------------------------- diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java index 2430707..d1a1877 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java +++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java @@ -49,11 +49,11 @@ public class TransportServer implements Closeable { private ChannelFuture channelFuture; private int port = -1; - public TransportServer(TransportContext context) { + public TransportServer(TransportContext context, int portToBind) { this.context = context; this.conf = context.getConf(); - init(); + init(portToBind); } public int getPort() { @@ -63,7 +63,7 @@ public class TransportServer implements Closeable { return port; } - private void init() { + private void init(int portToBind) { IOMode ioMode = IOMode.valueOf(conf.ioMode()); EventLoopGroup bossGroup = @@ -95,7 +95,7 @@ public class TransportServer implements Closeable { } }); - channelFuture = bootstrap.bind(new InetSocketAddress(conf.serverPort())); + channelFuture = bootstrap.bind(new InetSocketAddress(portToBind)); channelFuture.syncUninterruptibly(); port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort(); http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java ---------------------------------------------------------------------- diff --git a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java index 32ba3f5..40b71b0 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java +++ b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java @@ -17,8 +17,12 @@ package org.apache.spark.network.util; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import com.google.common.io.Closeables; import org.slf4j.Logger; @@ -35,4 +39,38 @@ public class JavaUtils { logger.error("IOException should not have been thrown.", e); } } + + // TODO: Make this configurable, do not use Java serialization! + public static <T> T deserialize(byte[] bytes) { + try { + ObjectInputStream is = new ObjectInputStream(new ByteArrayInputStream(bytes)); + Object out = is.readObject(); + is.close(); + return (T) out; + } catch (ClassNotFoundException e) { + throw new RuntimeException("Could not deserialize object", e); + } catch (IOException e) { + throw new RuntimeException("Could not deserialize object", e); + } + } + + // TODO: Make this configurable, do not use Java serialization! + public static byte[] serialize(Object object) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream os = new ObjectOutputStream(baos); + os.writeObject(object); + os.close(); + return baos.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Could not serialize object", e); + } + } + + /** Returns a hash consistent with Spark's Utils.nonNegativeHash(). */ + public static int nonNegativeHash(Object obj) { + if (obj == null) { return 0; } + int hash = obj.hashCode(); + return hash != Integer.MIN_VALUE ? Math.abs(hash) : 0; + } } http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/main/java/org/apache/spark/network/util/SystemPropertyConfigProvider.java ---------------------------------------------------------------------- diff --git a/network/common/src/main/java/org/apache/spark/network/util/SystemPropertyConfigProvider.java b/network/common/src/main/java/org/apache/spark/network/util/SystemPropertyConfigProvider.java new file mode 100644 index 0000000..5f20b70 --- /dev/null +++ b/network/common/src/main/java/org/apache/spark/network/util/SystemPropertyConfigProvider.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.util; + +import java.util.NoSuchElementException; + +import org.apache.spark.network.util.ConfigProvider; + +/** Uses System properties to obtain config values. */ +public class SystemPropertyConfigProvider extends ConfigProvider { + @Override + public String get(String name) { + String value = System.getProperty(name); + if (value == null) { + throw new NoSuchElementException(name); + } + return value; + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java ---------------------------------------------------------------------- diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java index 80f65d9..a68f38e 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -27,9 +27,6 @@ public class TransportConf { this.conf = conf; } - /** Port the server listens on. Default to a random port. */ - public int serverPort() { return conf.getInt("spark.shuffle.io.port", 0); } - /** IO mode: nio or epoll */ public String ioMode() { return conf.get("spark.shuffle.io.mode", "NIO").toUpperCase(); } http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java ---------------------------------------------------------------------- diff --git a/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java b/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java index 738dca9..c415883 100644 --- a/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java +++ b/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java @@ -41,10 +41,13 @@ import org.apache.spark.network.buffer.FileSegmentManagedBuffer; import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.buffer.NioManagedBuffer; import org.apache.spark.network.client.ChunkReceivedCallback; +import org.apache.spark.network.client.RpcResponseCallback; import org.apache.spark.network.client.TransportClient; import org.apache.spark.network.client.TransportClientFactory; +import org.apache.spark.network.server.RpcHandler; import org.apache.spark.network.server.TransportServer; import org.apache.spark.network.server.StreamManager; +import org.apache.spark.network.util.SystemPropertyConfigProvider; import org.apache.spark.network.util.TransportConf; public class ChunkFetchIntegrationSuite { @@ -93,7 +96,18 @@ public class ChunkFetchIntegrationSuite { } } }; - TransportContext context = new TransportContext(conf, streamManager, new NoOpRpcHandler()); + RpcHandler handler = new RpcHandler() { + @Override + public void receive(TransportClient client, byte[] message, RpcResponseCallback callback) { + throw new UnsupportedOperationException(); + } + + @Override + public StreamManager getStreamManager() { + return streamManager; + } + }; + TransportContext context = new TransportContext(conf, handler); server = context.createServer(); clientFactory = context.createClientFactory(); } http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/test/java/org/apache/spark/network/NoOpRpcHandler.java ---------------------------------------------------------------------- diff --git a/network/common/src/test/java/org/apache/spark/network/NoOpRpcHandler.java b/network/common/src/test/java/org/apache/spark/network/NoOpRpcHandler.java deleted file mode 100644 index 7aa37ef..0000000 --- a/network/common/src/test/java/org/apache/spark/network/NoOpRpcHandler.java +++ /dev/null @@ -1,28 +0,0 @@ -package org.apache.spark.network;/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import org.apache.spark.network.client.RpcResponseCallback; -import org.apache.spark.network.client.TransportClient; -import org.apache.spark.network.server.RpcHandler; - -/** Test RpcHandler which always returns a zero-sized success. */ -public class NoOpRpcHandler implements RpcHandler { - @Override - public void receive(TransportClient client, byte[] message, RpcResponseCallback callback) { - callback.onSuccess(new byte[0]); - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java ---------------------------------------------------------------------- diff --git a/network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java b/network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java index 9f216dd..64b457b 100644 --- a/network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java +++ b/network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java @@ -35,9 +35,11 @@ import static org.junit.Assert.*; import org.apache.spark.network.client.RpcResponseCallback; import org.apache.spark.network.client.TransportClient; import org.apache.spark.network.client.TransportClientFactory; -import org.apache.spark.network.server.DefaultStreamManager; +import org.apache.spark.network.server.OneForOneStreamManager; import org.apache.spark.network.server.RpcHandler; +import org.apache.spark.network.server.StreamManager; import org.apache.spark.network.server.TransportServer; +import org.apache.spark.network.util.SystemPropertyConfigProvider; import org.apache.spark.network.util.TransportConf; public class RpcIntegrationSuite { @@ -61,8 +63,11 @@ public class RpcIntegrationSuite { throw new RuntimeException("Thrown: " + parts[1]); } } + + @Override + public StreamManager getStreamManager() { return new OneForOneStreamManager(); } }; - TransportContext context = new TransportContext(conf, new DefaultStreamManager(), rpcHandler); + TransportContext context = new TransportContext(conf, rpcHandler); server = context.createServer(); clientFactory = context.createClientFactory(); } http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/test/java/org/apache/spark/network/SystemPropertyConfigProvider.java ---------------------------------------------------------------------- diff --git a/network/common/src/test/java/org/apache/spark/network/SystemPropertyConfigProvider.java b/network/common/src/test/java/org/apache/spark/network/SystemPropertyConfigProvider.java deleted file mode 100644 index f4e0a24..0000000 --- a/network/common/src/test/java/org/apache/spark/network/SystemPropertyConfigProvider.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.network; - -import java.util.NoSuchElementException; - -import org.apache.spark.network.util.ConfigProvider; - -/** Uses System properties to obtain config values. */ -public class SystemPropertyConfigProvider extends ConfigProvider { - @Override - public String get(String name) { - String value = System.getProperty(name); - if (value == null) { - throw new NoSuchElementException(name); - } - return value; - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java ---------------------------------------------------------------------- diff --git a/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java b/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java index 3ef9646..5a10fdb 100644 --- a/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java +++ b/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java @@ -28,11 +28,11 @@ import static org.junit.Assert.assertTrue; import org.apache.spark.network.client.TransportClient; import org.apache.spark.network.client.TransportClientFactory; -import org.apache.spark.network.server.DefaultStreamManager; +import org.apache.spark.network.server.NoOpRpcHandler; import org.apache.spark.network.server.RpcHandler; import org.apache.spark.network.server.TransportServer; -import org.apache.spark.network.server.StreamManager; import org.apache.spark.network.util.JavaUtils; +import org.apache.spark.network.util.SystemPropertyConfigProvider; import org.apache.spark.network.util.TransportConf; public class TransportClientFactorySuite { @@ -44,9 +44,8 @@ public class TransportClientFactorySuite { @Before public void setUp() { conf = new TransportConf(new SystemPropertyConfigProvider()); - StreamManager streamManager = new DefaultStreamManager(); RpcHandler rpcHandler = new NoOpRpcHandler(); - context = new TransportContext(conf, streamManager, rpcHandler); + context = new TransportContext(conf, rpcHandler); server1 = context.createServer(); server2 = context.createServer(); } http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/shuffle/pom.xml ---------------------------------------------------------------------- diff --git a/network/shuffle/pom.xml b/network/shuffle/pom.xml new file mode 100644 index 0000000..d271704 --- /dev/null +++ b/network/shuffle/pom.xml @@ -0,0 +1,96 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.spark</groupId> + <artifactId>spark-parent</artifactId> + <version>1.2.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <groupId>org.apache.spark</groupId> + <artifactId>spark-network-shuffle_2.10</artifactId> + <packaging>jar</packaging> + <name>Spark Project Shuffle Streaming Service Code</name> + <url>http://spark.apache.org/</url> + <properties> + <sbt.project.name>network-shuffle</sbt.project.name> + </properties> + + <dependencies> + <!-- Core dependencies --> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-network-common_2.10</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <!-- Provided dependencies --> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <scope>provided</scope> + </dependency> + + <!-- Test dependencies --> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-network-common_2.10</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.novocode</groupId> + <artifactId>junit-interface</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> + </build> +</project> http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/shuffle/src/main/java/org/apache/spark/network/shuffle/BlockFetchingListener.java ---------------------------------------------------------------------- diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/BlockFetchingListener.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/BlockFetchingListener.java new file mode 100644 index 0000000..138fd53 --- /dev/null +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/BlockFetchingListener.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import java.util.EventListener; + +import org.apache.spark.network.buffer.ManagedBuffer; + +public interface BlockFetchingListener extends EventListener { + /** + * Called once per successfully fetched block. After this call returns, data will be released + * automatically. If the data will be passed to another thread, the receiver should retain() + * and release() the buffer on their own, or copy the data to a new buffer. + */ + void onBlockFetchSuccess(String blockId, ManagedBuffer data); + + /** + * Called at least once per block upon failures. + */ + void onBlockFetchFailure(String blockId, Throwable exception); +} http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorShuffleInfo.java ---------------------------------------------------------------------- diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorShuffleInfo.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorShuffleInfo.java new file mode 100644 index 0000000..d45e646 --- /dev/null +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorShuffleInfo.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import java.io.Serializable; +import java.util.Arrays; + +import com.google.common.base.Objects; + +/** Contains all configuration necessary for locating the shuffle files of an executor. */ +public class ExecutorShuffleInfo implements Serializable { + /** The base set of local directories that the executor stores its shuffle files in. */ + final String[] localDirs; + /** Number of subdirectories created within each localDir. */ + final int subDirsPerLocalDir; + /** Shuffle manager (SortShuffleManager or HashShuffleManager) that the executor is using. */ + final String shuffleManager; + + public ExecutorShuffleInfo(String[] localDirs, int subDirsPerLocalDir, String shuffleManager) { + this.localDirs = localDirs; + this.subDirsPerLocalDir = subDirsPerLocalDir; + this.shuffleManager = shuffleManager; + } + + @Override + public int hashCode() { + return Objects.hashCode(subDirsPerLocalDir, shuffleManager) * 41 + Arrays.hashCode(localDirs); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("localDirs", Arrays.toString(localDirs)) + .add("subDirsPerLocalDir", subDirsPerLocalDir) + .add("shuffleManager", shuffleManager) + .toString(); + } + + @Override + public boolean equals(Object other) { + if (other != null && other instanceof ExecutorShuffleInfo) { + ExecutorShuffleInfo o = (ExecutorShuffleInfo) other; + return Arrays.equals(localDirs, o.localDirs) + && Objects.equal(subDirsPerLocalDir, o.subDirsPerLocalDir) + && Objects.equal(shuffleManager, o.shuffleManager); + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java ---------------------------------------------------------------------- diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java new file mode 100644 index 0000000..a9dff31 --- /dev/null +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import java.util.List; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.spark.network.shuffle.ExternalShuffleMessages.*; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.client.RpcResponseCallback; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.server.OneForOneStreamManager; +import org.apache.spark.network.server.RpcHandler; +import org.apache.spark.network.server.StreamManager; +import org.apache.spark.network.util.JavaUtils; + +/** + * RPC Handler for a server which can serve shuffle blocks from outside of an Executor process. + * + * Handles registering executors and opening shuffle blocks from them. Shuffle blocks are registered + * with the "one-for-one" strategy, meaning each Transport-layer Chunk is equivalent to one Spark- + * level shuffle block. + */ +public class ExternalShuffleBlockHandler implements RpcHandler { + private final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockHandler.class); + + private final ExternalShuffleBlockManager blockManager; + private final OneForOneStreamManager streamManager; + + public ExternalShuffleBlockHandler() { + this(new OneForOneStreamManager(), new ExternalShuffleBlockManager()); + } + + /** Enables mocking out the StreamManager and BlockManager. */ + @VisibleForTesting + ExternalShuffleBlockHandler( + OneForOneStreamManager streamManager, + ExternalShuffleBlockManager blockManager) { + this.streamManager = streamManager; + this.blockManager = blockManager; + } + + @Override + public void receive(TransportClient client, byte[] message, RpcResponseCallback callback) { + Object msgObj = JavaUtils.deserialize(message); + + logger.trace("Received message: " + msgObj); + + if (msgObj instanceof OpenShuffleBlocks) { + OpenShuffleBlocks msg = (OpenShuffleBlocks) msgObj; + List<ManagedBuffer> blocks = Lists.newArrayList(); + + for (String blockId : msg.blockIds) { + blocks.add(blockManager.getBlockData(msg.appId, msg.execId, blockId)); + } + long streamId = streamManager.registerStream(blocks.iterator()); + logger.trace("Registered streamId {} with {} buffers", streamId, msg.blockIds.length); + callback.onSuccess(JavaUtils.serialize( + new ShuffleStreamHandle(streamId, msg.blockIds.length))); + + } else if (msgObj instanceof RegisterExecutor) { + RegisterExecutor msg = (RegisterExecutor) msgObj; + blockManager.registerExecutor(msg.appId, msg.execId, msg.executorInfo); + callback.onSuccess(new byte[0]); + + } else { + throw new UnsupportedOperationException(String.format( + "Unexpected message: %s (class = %s)", msgObj, msgObj.getClass())); + } + } + + @Override + public StreamManager getStreamManager() { + return streamManager; + } + + /** For testing, clears all executors registered with "RegisterExecutor". */ + @VisibleForTesting + public void clearRegisteredExecutors() { + blockManager.clearRegisteredExecutors(); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java ---------------------------------------------------------------------- diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java new file mode 100644 index 0000000..6589889 --- /dev/null +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.buffer.FileSegmentManagedBuffer; +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.util.JavaUtils; + +/** + * Manages converting shuffle BlockIds into physical segments of local files, from a process outside + * of Executors. Each Executor must register its own configuration about where it stores its files + * (local dirs) and how (shuffle manager). The logic for retrieval of individual files is replicated + * from Spark's FileShuffleBlockManager and IndexShuffleBlockManager. + * + * Executors with shuffle file consolidation are not currently supported, as the index is stored in + * the Executor's memory, unlike the IndexShuffleBlockManager. + */ +public class ExternalShuffleBlockManager { + private final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockManager.class); + + // Map from "appId-execId" to the executor's configuration. + private final ConcurrentHashMap<String, ExecutorShuffleInfo> executors = + new ConcurrentHashMap<String, ExecutorShuffleInfo>(); + + // Returns an id suitable for a single executor within a single application. + private String getAppExecId(String appId, String execId) { + return appId + "-" + execId; + } + + /** Registers a new Executor with all the configuration we need to find its shuffle files. */ + public void registerExecutor( + String appId, + String execId, + ExecutorShuffleInfo executorInfo) { + String fullId = getAppExecId(appId, execId); + logger.info("Registered executor {} with {}", fullId, executorInfo); + executors.put(fullId, executorInfo); + } + + /** + * Obtains a FileSegmentManagedBuffer from a shuffle block id. We expect the blockId has the + * format "shuffle_ShuffleId_MapId_ReduceId" (from ShuffleBlockId), and additionally make + * assumptions about how the hash and sort based shuffles store their data. + */ + public ManagedBuffer getBlockData(String appId, String execId, String blockId) { + String[] blockIdParts = blockId.split("_"); + if (blockIdParts.length < 4) { + throw new IllegalArgumentException("Unexpected block id format: " + blockId); + } else if (!blockIdParts[0].equals("shuffle")) { + throw new IllegalArgumentException("Expected shuffle block id, got: " + blockId); + } + int shuffleId = Integer.parseInt(blockIdParts[1]); + int mapId = Integer.parseInt(blockIdParts[2]); + int reduceId = Integer.parseInt(blockIdParts[3]); + + ExecutorShuffleInfo executor = executors.get(getAppExecId(appId, execId)); + if (executor == null) { + throw new RuntimeException( + String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId)); + } + + if ("org.apache.spark.shuffle.hash.HashShuffleManager".equals(executor.shuffleManager)) { + return getHashBasedShuffleBlockData(executor, blockId); + } else if ("org.apache.spark.shuffle.sort.SortShuffleManager".equals(executor.shuffleManager)) { + return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId); + } else { + throw new UnsupportedOperationException( + "Unsupported shuffle manager: " + executor.shuffleManager); + } + } + + /** + * Hash-based shuffle data is simply stored as one file per block. + * This logic is from FileShuffleBlockManager. + */ + // TODO: Support consolidated hash shuffle files + private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) { + File shuffleFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId); + return new FileSegmentManagedBuffer(shuffleFile, 0, shuffleFile.length()); + } + + /** + * Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file + * called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockManager, + * and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId. + */ + private ManagedBuffer getSortBasedShuffleBlockData( + ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) { + File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, + "shuffle_" + shuffleId + "_" + mapId + "_0.index"); + + DataInputStream in = null; + try { + in = new DataInputStream(new FileInputStream(indexFile)); + in.skipBytes(reduceId * 8); + long offset = in.readLong(); + long nextOffset = in.readLong(); + return new FileSegmentManagedBuffer( + getFile(executor.localDirs, executor.subDirsPerLocalDir, + "shuffle_" + shuffleId + "_" + mapId + "_0.data"), + offset, + nextOffset - offset); + } catch (IOException e) { + throw new RuntimeException("Failed to open file: " + indexFile, e); + } finally { + if (in != null) { + JavaUtils.closeQuietly(in); + } + } + } + + /** + * Hashes a filename into the corresponding local directory, in a manner consistent with + * Spark's DiskBlockManager.getFile(). + */ + @VisibleForTesting + static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename) { + int hash = JavaUtils.nonNegativeHash(filename); + String localDir = localDirs[hash % localDirs.length]; + int subDirId = (hash / localDirs.length) % subDirsPerLocalDir; + return new File(new File(localDir, String.format("%02x", subDirId)), filename); + } + + /** For testing, clears all registered executors. */ + @VisibleForTesting + void clearRegisteredExecutors() { + executors.clear(); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java ---------------------------------------------------------------------- diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java new file mode 100644 index 0000000..cc2f626 --- /dev/null +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.TransportContext; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.client.TransportClientFactory; +import org.apache.spark.network.server.NoOpRpcHandler; +import org.apache.spark.network.shuffle.ExternalShuffleMessages.RegisterExecutor; +import org.apache.spark.network.util.JavaUtils; +import org.apache.spark.network.util.TransportConf; + +/** + * Client for reading shuffle blocks which points to an external (outside of executor) server. + * This is instead of reading shuffle blocks directly from other executors (via + * BlockTransferService), which has the downside of losing the shuffle data if we lose the + * executors. + */ +public class ExternalShuffleClient implements ShuffleClient { + private final Logger logger = LoggerFactory.getLogger(ExternalShuffleClient.class); + + private final TransportClientFactory clientFactory; + private final String appId; + + public ExternalShuffleClient(TransportConf conf, String appId) { + TransportContext context = new TransportContext(conf, new NoOpRpcHandler()); + this.clientFactory = context.createClientFactory(); + this.appId = appId; + } + + @Override + public void fetchBlocks( + String host, + int port, + String execId, + String[] blockIds, + BlockFetchingListener listener) { + logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId); + try { + TransportClient client = clientFactory.createClient(host, port); + new OneForOneBlockFetcher(client, blockIds, listener) + .start(new ExternalShuffleMessages.OpenShuffleBlocks(appId, execId, blockIds)); + } catch (Exception e) { + logger.error("Exception while beginning fetchBlocks", e); + for (String blockId : blockIds) { + listener.onBlockFetchFailure(blockId, e); + } + } + } + + /** + * Registers this executor with an external shuffle server. This registration is required to + * inform the shuffle server about where and how we store our shuffle files. + * + * @param host Host of shuffle server. + * @param port Port of shuffle server. + * @param execId This Executor's id. + * @param executorInfo Contains all info necessary for the service to find our shuffle files. + */ + public void registerWithShuffleServer( + String host, + int port, + String execId, + ExecutorShuffleInfo executorInfo) { + TransportClient client = clientFactory.createClient(host, port); + byte[] registerExecutorMessage = + JavaUtils.serialize(new RegisterExecutor(appId, execId, executorInfo)); + client.sendRpcSync(registerExecutorMessage, 5000 /* timeoutMs */); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleMessages.java ---------------------------------------------------------------------- diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleMessages.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleMessages.java new file mode 100644 index 0000000..e79420e --- /dev/null +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleMessages.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import java.io.Serializable; +import java.util.Arrays; + +import com.google.common.base.Objects; + +/** Messages handled by the {@link ExternalShuffleBlockHandler}. */ +public class ExternalShuffleMessages { + + /** Request to read a set of shuffle blocks. Returns [[ShuffleStreamHandle]]. */ + public static class OpenShuffleBlocks implements Serializable { + public final String appId; + public final String execId; + public final String[] blockIds; + + public OpenShuffleBlocks(String appId, String execId, String[] blockIds) { + this.appId = appId; + this.execId = execId; + this.blockIds = blockIds; + } + + @Override + public int hashCode() { + return Objects.hashCode(appId, execId) * 41 + Arrays.hashCode(blockIds); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("appId", appId) + .add("execId", execId) + .add("blockIds", Arrays.toString(blockIds)) + .toString(); + } + + @Override + public boolean equals(Object other) { + if (other != null && other instanceof OpenShuffleBlocks) { + OpenShuffleBlocks o = (OpenShuffleBlocks) other; + return Objects.equal(appId, o.appId) + && Objects.equal(execId, o.execId) + && Arrays.equals(blockIds, o.blockIds); + } + return false; + } + } + + /** Initial registration message between an executor and its local shuffle server. */ + public static class RegisterExecutor implements Serializable { + public final String appId; + public final String execId; + public final ExecutorShuffleInfo executorInfo; + + public RegisterExecutor( + String appId, + String execId, + ExecutorShuffleInfo executorInfo) { + this.appId = appId; + this.execId = execId; + this.executorInfo = executorInfo; + } + + @Override + public int hashCode() { + return Objects.hashCode(appId, execId, executorInfo); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("appId", appId) + .add("execId", execId) + .add("executorInfo", executorInfo) + .toString(); + } + + @Override + public boolean equals(Object other) { + if (other != null && other instanceof RegisterExecutor) { + RegisterExecutor o = (RegisterExecutor) other; + return Objects.equal(appId, o.appId) + && Objects.equal(execId, o.execId) + && Objects.equal(executorInfo, o.executorInfo); + } + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java ---------------------------------------------------------------------- diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java new file mode 100644 index 0000000..39b6f30 --- /dev/null +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import java.util.Arrays; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.client.ChunkReceivedCallback; +import org.apache.spark.network.client.RpcResponseCallback; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.util.JavaUtils; + +/** + * Simple wrapper on top of a TransportClient which interprets each chunk as a whole block, and + * invokes the BlockFetchingListener appropriately. This class is agnostic to the actual RPC + * handler, as long as there is a single "open blocks" message which returns a ShuffleStreamHandle, + * and Java serialization is used. + * + * Note that this typically corresponds to a + * {@link org.apache.spark.network.server.OneForOneStreamManager} on the server side. + */ +public class OneForOneBlockFetcher { + private final Logger logger = LoggerFactory.getLogger(OneForOneBlockFetcher.class); + + private final TransportClient client; + private final String[] blockIds; + private final BlockFetchingListener listener; + private final ChunkReceivedCallback chunkCallback; + + private ShuffleStreamHandle streamHandle = null; + + public OneForOneBlockFetcher( + TransportClient client, + String[] blockIds, + BlockFetchingListener listener) { + if (blockIds.length == 0) { + throw new IllegalArgumentException("Zero-sized blockIds array"); + } + this.client = client; + this.blockIds = blockIds; + this.listener = listener; + this.chunkCallback = new ChunkCallback(); + } + + /** Callback invoked on receipt of each chunk. We equate a single chunk to a single block. */ + private class ChunkCallback implements ChunkReceivedCallback { + @Override + public void onSuccess(int chunkIndex, ManagedBuffer buffer) { + // On receipt of a chunk, pass it upwards as a block. + listener.onBlockFetchSuccess(blockIds[chunkIndex], buffer); + } + + @Override + public void onFailure(int chunkIndex, Throwable e) { + // On receipt of a failure, fail every block from chunkIndex onwards. + String[] remainingBlockIds = Arrays.copyOfRange(blockIds, chunkIndex, blockIds.length); + failRemainingBlocks(remainingBlockIds, e); + } + } + + /** + * Begins the fetching process, calling the listener with every block fetched. + * The given message will be serialized with the Java serializer, and the RPC must return a + * {@link ShuffleStreamHandle}. We will send all fetch requests immediately, without throttling. + */ + public void start(Object openBlocksMessage) { + client.sendRpc(JavaUtils.serialize(openBlocksMessage), new RpcResponseCallback() { + @Override + public void onSuccess(byte[] response) { + try { + streamHandle = JavaUtils.deserialize(response); + logger.trace("Successfully opened blocks {}, preparing to fetch chunks.", streamHandle); + + // Immediately request all chunks -- we expect that the total size of the request is + // reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]]. + for (int i = 0; i < streamHandle.numChunks; i++) { + client.fetchChunk(streamHandle.streamId, i, chunkCallback); + } + } catch (Exception e) { + logger.error("Failed while starting block fetches", e); + failRemainingBlocks(blockIds, e); + } + } + + @Override + public void onFailure(Throwable e) { + logger.error("Failed while starting block fetches", e); + failRemainingBlocks(blockIds, e); + } + }); + } + + /** Invokes the "onBlockFetchFailure" callback for every listed block id. */ + private void failRemainingBlocks(String[] failedBlockIds, Throwable e) { + for (String blockId : failedBlockIds) { + try { + listener.onBlockFetchFailure(blockId, e); + } catch (Exception e2) { + logger.error("Error in block fetch failure callback", e2); + } + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java ---------------------------------------------------------------------- diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java new file mode 100644 index 0000000..9fa87c2 --- /dev/null +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +/** Provides an interface for reading shuffle files, either from an Executor or external service. */ +public interface ShuffleClient { + /** + * Fetch a sequence of blocks from a remote node asynchronously, + * + * Note that this API takes a sequence so the implementation can batch requests, and does not + * return a future so the underlying implementation can invoke onBlockFetchSuccess as soon as + * the data of a block is fetched, rather than waiting for all blocks to be fetched. + */ + public void fetchBlocks( + String host, + int port, + String execId, + String[] blockIds, + BlockFetchingListener listener); +} http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleStreamHandle.java ---------------------------------------------------------------------- diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleStreamHandle.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleStreamHandle.java new file mode 100644 index 0000000..9c94691 --- /dev/null +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleStreamHandle.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import java.io.Serializable; +import java.util.Arrays; + +import com.google.common.base.Objects; + +/** + * Identifier for a fixed number of chunks to read from a stream created by an "open blocks" + * message. This is used by {@link OneForOneBlockFetcher}. + */ +public class ShuffleStreamHandle implements Serializable { + public final long streamId; + public final int numChunks; + + public ShuffleStreamHandle(long streamId, int numChunks) { + this.streamId = streamId; + this.numChunks = numChunks; + } + + @Override + public int hashCode() { + return Objects.hashCode(streamId, numChunks); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("streamId", streamId) + .add("numChunks", numChunks) + .toString(); + } + + @Override + public boolean equals(Object other) { + if (other != null && other instanceof ShuffleStreamHandle) { + ShuffleStreamHandle o = (ShuffleStreamHandle) other; + return Objects.equal(streamId, o.streamId) + && Objects.equal(numChunks, o.numChunks); + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java ---------------------------------------------------------------------- diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java new file mode 100644 index 0000000..7939cb4 --- /dev/null +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import java.nio.ByteBuffer; +import java.util.Iterator; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import static org.apache.spark.network.shuffle.ExternalShuffleMessages.OpenShuffleBlocks; +import static org.apache.spark.network.shuffle.ExternalShuffleMessages.RegisterExecutor; +import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.buffer.NioManagedBuffer; +import org.apache.spark.network.client.RpcResponseCallback; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.server.OneForOneStreamManager; +import org.apache.spark.network.server.RpcHandler; +import org.apache.spark.network.util.JavaUtils; + +public class ExternalShuffleBlockHandlerSuite { + TransportClient client = mock(TransportClient.class); + + OneForOneStreamManager streamManager; + ExternalShuffleBlockManager blockManager; + RpcHandler handler; + + @Before + public void beforeEach() { + streamManager = mock(OneForOneStreamManager.class); + blockManager = mock(ExternalShuffleBlockManager.class); + handler = new ExternalShuffleBlockHandler(streamManager, blockManager); + } + + @Test + public void testRegisterExecutor() { + RpcResponseCallback callback = mock(RpcResponseCallback.class); + + ExecutorShuffleInfo config = new ExecutorShuffleInfo(new String[] {"/a", "/b"}, 16, "sort"); + byte[] registerMessage = JavaUtils.serialize( + new RegisterExecutor("app0", "exec1", config)); + handler.receive(client, registerMessage, callback); + verify(blockManager, times(1)).registerExecutor("app0", "exec1", config); + + verify(callback, times(1)).onSuccess((byte[]) any()); + verify(callback, never()).onFailure((Throwable) any()); + } + + @SuppressWarnings("unchecked") + @Test + public void testOpenShuffleBlocks() { + RpcResponseCallback callback = mock(RpcResponseCallback.class); + + ManagedBuffer block0Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[3])); + ManagedBuffer block1Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[7])); + when(blockManager.getBlockData("app0", "exec1", "b0")).thenReturn(block0Marker); + when(blockManager.getBlockData("app0", "exec1", "b1")).thenReturn(block1Marker); + byte[] openBlocksMessage = JavaUtils.serialize( + new OpenShuffleBlocks("app0", "exec1", new String[] { "b0", "b1" })); + handler.receive(client, openBlocksMessage, callback); + verify(blockManager, times(1)).getBlockData("app0", "exec1", "b0"); + verify(blockManager, times(1)).getBlockData("app0", "exec1", "b1"); + + ArgumentCaptor<byte[]> response = ArgumentCaptor.forClass(byte[].class); + verify(callback, times(1)).onSuccess(response.capture()); + verify(callback, never()).onFailure((Throwable) any()); + + ShuffleStreamHandle handle = JavaUtils.deserialize(response.getValue()); + assertEquals(2, handle.numChunks); + + ArgumentCaptor<Iterator> stream = ArgumentCaptor.forClass(Iterator.class); + verify(streamManager, times(1)).registerStream(stream.capture()); + Iterator<ManagedBuffer> buffers = (Iterator<ManagedBuffer>) stream.getValue(); + assertEquals(block0Marker, buffers.next()); + assertEquals(block1Marker, buffers.next()); + assertFalse(buffers.hasNext()); + } + + @Test + public void testBadMessages() { + RpcResponseCallback callback = mock(RpcResponseCallback.class); + + byte[] unserializableMessage = new byte[] { 0x12, 0x34, 0x56 }; + try { + handler.receive(client, unserializableMessage, callback); + fail("Should have thrown"); + } catch (Exception e) { + // pass + } + + byte[] unexpectedMessage = JavaUtils.serialize( + new ExecutorShuffleInfo(new String[] {"/a", "/b"}, 16, "sort")); + try { + handler.receive(client, unexpectedMessage, callback); + fail("Should have thrown"); + } catch (UnsupportedOperationException e) { + // pass + } + + verify(callback, never()).onSuccess((byte[]) any()); + verify(callback, never()).onFailure((Throwable) any()); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java ---------------------------------------------------------------------- diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java new file mode 100644 index 0000000..da54797 --- /dev/null +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; + +import com.google.common.io.CharStreams; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class ExternalShuffleBlockManagerSuite { + static String sortBlock0 = "Hello!"; + static String sortBlock1 = "World!"; + + static String hashBlock0 = "Elementary"; + static String hashBlock1 = "Tabular"; + + static TestShuffleDataContext dataContext; + + @BeforeClass + public static void beforeAll() throws IOException { + dataContext = new TestShuffleDataContext(2, 5); + + dataContext.create(); + // Write some sort and hash data. + dataContext.insertSortShuffleData(0, 0, + new byte[][] { sortBlock0.getBytes(), sortBlock1.getBytes() } ); + dataContext.insertHashShuffleData(1, 0, + new byte[][] { hashBlock0.getBytes(), hashBlock1.getBytes() } ); + } + + @AfterClass + public static void afterAll() { + dataContext.cleanup(); + } + + @Test + public void testBadRequests() { + ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(); + // Unregistered executor + try { + manager.getBlockData("app0", "exec1", "shuffle_1_1_0"); + fail("Should have failed"); + } catch (RuntimeException e) { + assertTrue("Bad error message: " + e, e.getMessage().contains("not registered")); + } + + // Invalid shuffle manager + manager.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar")); + try { + manager.getBlockData("app0", "exec2", "shuffle_1_1_0"); + fail("Should have failed"); + } catch (UnsupportedOperationException e) { + // pass + } + + // Nonexistent shuffle block + manager.registerExecutor("app0", "exec3", + dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager")); + try { + manager.getBlockData("app0", "exec3", "shuffle_1_1_0"); + fail("Should have failed"); + } catch (Exception e) { + // pass + } + } + + @Test + public void testSortShuffleBlocks() throws IOException { + ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(); + manager.registerExecutor("app0", "exec0", + dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager")); + + InputStream block0Stream = + manager.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream(); + String block0 = CharStreams.toString(new InputStreamReader(block0Stream)); + block0Stream.close(); + assertEquals(sortBlock0, block0); + + InputStream block1Stream = + manager.getBlockData("app0", "exec0", "shuffle_0_0_1").createInputStream(); + String block1 = CharStreams.toString(new InputStreamReader(block1Stream)); + block1Stream.close(); + assertEquals(sortBlock1, block1); + } + + @Test + public void testHashShuffleBlocks() throws IOException { + ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(); + manager.registerExecutor("app0", "exec0", + dataContext.createExecutorInfo("org.apache.spark.shuffle.hash.HashShuffleManager")); + + InputStream block0Stream = + manager.getBlockData("app0", "exec0", "shuffle_1_0_0").createInputStream(); + String block0 = CharStreams.toString(new InputStreamReader(block0Stream)); + block0Stream.close(); + assertEquals(hashBlock0, block0); + + InputStream block1Stream = + manager.getBlockData("app0", "exec0", "shuffle_1_0_1").createInputStream(); + String block1 = CharStreams.toString(new InputStreamReader(block1Stream)); + block1Stream.close(); + assertEquals(hashBlock1, block1); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
