Repository: spark
Updated Branches:
  refs/heads/master b715aa0c8 -> ba28a8fcb


[SPARK-2936] Migrate Netty network module from Java to Scala

The Netty network module was originally written when Scala 2.9.x had a bug that 
prevents a pure Scala implementation, and a subset of the files were done in 
Java. We have since upgraded to Scala 2.10, and can migrate all Java files now 
to Scala.

https://github.com/netty/netty/issues/781

https://github.com/mesos/spark/pull/522

Author: Reynold Xin <[email protected]>

Closes #1865 from rxin/netty and squashes the following commits:

332422f [Reynold Xin] Code review feedback
ca9eeee [Reynold Xin] Minor update.
7f1434b [Reynold Xin] [SPARK-2936] Migrate Netty network module from Java to 
Scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba28a8fc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba28a8fc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba28a8fc

Branch: refs/heads/master
Commit: ba28a8fcbc3ba432e7ea4d6f0b535450a6ec96c6
Parents: b715aa0
Author: Reynold Xin <[email protected]>
Authored: Sun Aug 10 20:36:54 2014 -0700
Committer: Aaron Davidson <[email protected]>
Committed: Sun Aug 10 20:36:54 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/network/netty/FileClient.java  | 100 -----------------
 .../netty/FileClientChannelInitializer.java     |  39 -------
 .../spark/network/netty/FileClientHandler.java  |  55 ---------
 .../apache/spark/network/netty/FileServer.java  | 111 -------------------
 .../netty/FileServerChannelInitializer.java     |  41 -------
 .../spark/network/netty/FileServerHandler.java  |  83 --------------
 .../spark/network/netty/PathResolver.java       |  26 -----
 .../apache/spark/network/netty/FileClient.scala |  85 ++++++++++++++
 .../netty/FileClientChannelInitializer.scala    |  31 ++++++
 .../spark/network/netty/FileClientHandler.scala |  50 +++++++++
 .../apache/spark/network/netty/FileHeader.scala |   5 +-
 .../apache/spark/network/netty/FileServer.scala |  91 +++++++++++++++
 .../netty/FileServerChannelInitializer.scala    |  34 ++++++
 .../spark/network/netty/FileServerHandler.scala |  68 ++++++++++++
 .../spark/network/netty/PathResolver.scala      |  25 +++++
 .../spark/network/netty/ShuffleSender.scala     |   2 +-
 16 files changed, 387 insertions(+), 459 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ba28a8fc/core/src/main/java/org/apache/spark/network/netty/FileClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClient.java 
b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
deleted file mode 100644
index 0d31894..0000000
--- a/core/src/main/java/org/apache/spark/network/netty/FileClient.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty;
-
-import java.util.concurrent.TimeUnit;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.oio.OioEventLoopGroup;
-import io.netty.channel.socket.oio.OioSocketChannel;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-class FileClient {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(FileClient.class.getName());
-
-  private final FileClientHandler handler;
-  private Channel channel = null;
-  private Bootstrap bootstrap = null;
-  private EventLoopGroup group = null;
-  private final int connectTimeout;
-  private final int sendTimeout = 60; // 1 min
-
-  FileClient(FileClientHandler handler, int connectTimeout) {
-    this.handler = handler;
-    this.connectTimeout = connectTimeout;
-  }
-
-  public void init() {
-    group = new OioEventLoopGroup();
-    bootstrap = new Bootstrap();
-    bootstrap.group(group)
-      .channel(OioSocketChannel.class)
-      .option(ChannelOption.SO_KEEPALIVE, true)
-      .option(ChannelOption.TCP_NODELAY, true)
-      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
-      .handler(new FileClientChannelInitializer(handler));
-  }
-
-  public void connect(String host, int port) {
-    try {
-      // Start the connection attempt.
-      channel = bootstrap.connect(host, port).sync().channel();
-      // ChannelFuture cf = channel.closeFuture();
-      //cf.addListener(new ChannelCloseListener(this));
-    } catch (InterruptedException e) {
-      LOG.warn("FileClient interrupted while trying to connect", e);
-      close();
-    }
-  }
-
-  public void waitForClose() {
-    try {
-      channel.closeFuture().sync();
-    } catch (InterruptedException e) {
-      LOG.warn("FileClient interrupted", e);
-    }
-  }
-
-  public void sendRequest(String file) {
-    //assert(file == null);
-    //assert(channel == null);
-      try {
-          // Should be able to send the message to network link channel.
-          boolean bSent = channel.writeAndFlush(file + 
"\r\n").await(sendTimeout, TimeUnit.SECONDS);
-          if (!bSent) {
-              throw new RuntimeException("Failed to send");
-          }
-      } catch (InterruptedException e) {
-          LOG.error("Error", e);
-      }
-  }
-
-  public void close() {
-    if (group != null) {
-      group.shutdownGracefully();
-      group = null;
-      bootstrap = null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/ba28a8fc/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
 
b/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
deleted file mode 100644
index 264cf97..0000000
--- 
a/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty;
-
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.string.StringEncoder;
-
-class FileClientChannelInitializer extends ChannelInitializer<SocketChannel> {
-
-  private final FileClientHandler fhandler;
-
-  FileClientChannelInitializer(FileClientHandler handler) {
-    fhandler = handler;
-  }
-
-  @Override
-  public void initChannel(SocketChannel channel) {
-    // file no more than 2G
-    channel.pipeline()
-      .addLast("encoder", new StringEncoder())
-      .addLast("handler", fhandler);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/ba28a8fc/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java 
b/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java
deleted file mode 100644
index 63d3d92..0000000
--- a/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-
-import org.apache.spark.storage.BlockId;
-
-abstract class FileClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
-
-  private FileHeader currentHeader = null;
-
-  private volatile boolean handlerCalled = false;
-
-  public boolean isComplete() {
-    return handlerCalled;
-  }
-
-  public abstract void handle(ChannelHandlerContext ctx, ByteBuf in, 
FileHeader header);
-  public abstract void handleError(BlockId blockId);
-
-  @Override
-  public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
-    // get header
-    if (currentHeader == null && in.readableBytes() >= 
FileHeader.HEADER_SIZE()) {
-      currentHeader = 
FileHeader.create(in.readBytes(FileHeader.HEADER_SIZE()));
-    }
-    // get file
-    if(in.readableBytes() >= currentHeader.fileLen()) {
-      handle(ctx, in, currentHeader);
-      handlerCalled = true;
-      currentHeader = null;
-      ctx.close();
-    }
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/spark/blob/ba28a8fc/core/src/main/java/org/apache/spark/network/netty/FileServer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServer.java 
b/core/src/main/java/org/apache/spark/network/netty/FileServer.java
deleted file mode 100644
index c93425e..0000000
--- a/core/src/main/java/org/apache/spark/network/netty/FileServer.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty;
-
-import java.net.InetSocketAddress;
-
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.oio.OioEventLoopGroup;
-import io.netty.channel.socket.oio.OioServerSocketChannel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Server that accept the path of a file an echo back its content.
- */
-class FileServer {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(FileServer.class.getName());
-
-  private EventLoopGroup bossGroup = null;
-  private EventLoopGroup workerGroup = null;
-  private ChannelFuture channelFuture = null;
-  private int port = 0;
-
-  FileServer(PathResolver pResolver, int port) {
-    InetSocketAddress addr = new InetSocketAddress(port);
-
-    // Configure the server.
-    bossGroup = new OioEventLoopGroup();
-    workerGroup = new OioEventLoopGroup();
-
-    ServerBootstrap bootstrap = new ServerBootstrap();
-    bootstrap.group(bossGroup, workerGroup)
-        .channel(OioServerSocketChannel.class)
-        .option(ChannelOption.SO_BACKLOG, 100)
-        .option(ChannelOption.SO_RCVBUF, 1500)
-        .childHandler(new FileServerChannelInitializer(pResolver));
-    // Start the server.
-    channelFuture = bootstrap.bind(addr);
-    try {
-      // Get the address we bound to.
-      InetSocketAddress boundAddress =
-        ((InetSocketAddress) channelFuture.sync().channel().localAddress());
-      this.port = boundAddress.getPort();
-    } catch (InterruptedException ie) {
-      this.port = 0;
-    }
-  }
-
-  /**
-   * Start the file server asynchronously in a new thread.
-   */
-  public void start() {
-    Thread blockingThread = new Thread() {
-      @Override
-      public void run() {
-        try {
-          channelFuture.channel().closeFuture().sync();
-          LOG.info("FileServer exiting");
-        } catch (InterruptedException e) {
-          LOG.error("File server start got interrupted", e);
-        }
-        // NOTE: bootstrap is shutdown in stop()
-      }
-    };
-    blockingThread.setDaemon(true);
-    blockingThread.start();
-  }
-
-  public int getPort() {
-    return port;
-  }
-
-  public void stop() {
-    // Close the bound channel.
-    if (channelFuture != null) {
-      channelFuture.channel().close().awaitUninterruptibly();
-      channelFuture = null;
-    }
-
-    // Shutdown event groups
-    if (bossGroup != null) {
-       bossGroup.shutdownGracefully();
-       bossGroup = null;
-    }
-
-    if (workerGroup != null) {
-       workerGroup.shutdownGracefully();
-       workerGroup = null;
-    }
-    // TODO: Shutdown all accepted channels as well ?
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/ba28a8fc/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
 
b/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
deleted file mode 100644
index 46efec8..0000000
--- 
a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty;
-
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.DelimiterBasedFrameDecoder;
-import io.netty.handler.codec.Delimiters;
-import io.netty.handler.codec.string.StringDecoder;
-
-class FileServerChannelInitializer extends ChannelInitializer<SocketChannel> {
-
-  private final PathResolver pResolver;
-
-  FileServerChannelInitializer(PathResolver pResolver) {
-    this.pResolver = pResolver;
-  }
-
-  @Override
-  public void initChannel(SocketChannel channel) {
-    channel.pipeline()
-      .addLast("framer", new DelimiterBasedFrameDecoder(8192, 
Delimiters.lineDelimiter()))
-      .addLast("stringDecoder", new StringDecoder())
-      .addLast("handler", new FileServerHandler(pResolver));
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/ba28a8fc/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java 
b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
deleted file mode 100644
index c0133e1..0000000
--- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty;
-
-import java.io.File;
-import java.io.FileInputStream;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.DefaultFileRegion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.spark.storage.BlockId;
-import org.apache.spark.storage.FileSegment;
-
-class FileServerHandler extends SimpleChannelInboundHandler<String> {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(FileServerHandler.class.getName());
-
-  private final PathResolver pResolver;
-
-  FileServerHandler(PathResolver pResolver){
-    this.pResolver = pResolver;
-  }
-
-  @Override
-  public void channelRead0(ChannelHandlerContext ctx, String blockIdString) {
-    BlockId blockId = BlockId.apply(blockIdString);
-    FileSegment fileSegment = pResolver.getBlockLocation(blockId);
-    // if getBlockLocation returns null, close the channel
-    if (fileSegment == null) {
-      //ctx.close();
-      return;
-    }
-    File file = fileSegment.file();
-    if (file.exists()) {
-      if (!file.isFile()) {
-        ctx.write(new FileHeader(0, blockId).buffer());
-        ctx.flush();
-        return;
-      }
-      long length = fileSegment.length();
-      if (length > Integer.MAX_VALUE || length <= 0) {
-        ctx.write(new FileHeader(0, blockId).buffer());
-        ctx.flush();
-        return;
-      }
-      int len = (int) length;
-      ctx.write((new FileHeader(len, blockId)).buffer());
-      try {
-        ctx.write(new DefaultFileRegion(new FileInputStream(file)
-          .getChannel(), fileSegment.offset(), fileSegment.length()));
-      } catch (Exception e) {
-          LOG.error("Exception: ", e);
-      }
-    } else {
-      ctx.write(new FileHeader(0, blockId).buffer());
-    }
-    ctx.flush();
-  }
-
-  @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-    LOG.error("Exception: ", cause);
-    ctx.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/ba28a8fc/core/src/main/java/org/apache/spark/network/netty/PathResolver.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java 
b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java
deleted file mode 100755
index 7ad8d03..0000000
--- a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty;
-
-import org.apache.spark.storage.BlockId;
-import org.apache.spark.storage.FileSegment;
-
-public interface PathResolver {
-  /** Get the file segment in which the given block resides. */
-  FileSegment getBlockLocation(BlockId blockId);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/ba28a8fc/core/src/main/scala/org/apache/spark/network/netty/FileClient.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/FileClient.scala 
b/core/src/main/scala/org/apache/spark/network/netty/FileClient.scala
new file mode 100644
index 0000000..c6d35f7
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/netty/FileClient.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import java.util.concurrent.TimeUnit
+
+import io.netty.bootstrap.Bootstrap
+import io.netty.channel.{Channel, ChannelOption, EventLoopGroup}
+import io.netty.channel.oio.OioEventLoopGroup
+import io.netty.channel.socket.oio.OioSocketChannel
+
+import org.apache.spark.Logging
+
+class FileClient(handler: FileClientHandler, connectTimeout: Int) extends 
Logging {
+
+  private var channel: Channel = _
+  private var bootstrap: Bootstrap = _
+  private var group: EventLoopGroup = _
+  private val sendTimeout = 60
+
+  def init(): Unit = {
+    group = new OioEventLoopGroup
+    bootstrap = new Bootstrap
+    bootstrap.group(group)
+      .channel(classOf[OioSocketChannel])
+      .option(ChannelOption.SO_KEEPALIVE, java.lang.Boolean.TRUE)
+      .option(ChannelOption.TCP_NODELAY, java.lang.Boolean.TRUE)
+      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
Integer.valueOf(connectTimeout))
+      .handler(new FileClientChannelInitializer(handler))
+  }
+
+  def connect(host: String, port: Int) {
+    try {
+      channel = bootstrap.connect(host, port).sync().channel()
+    } catch {
+      case e: InterruptedException =>
+        logWarning("FileClient interrupted while trying to connect", e)
+        close()
+    }
+  }
+
+  def waitForClose(): Unit = {
+    try {
+      channel.closeFuture.sync()
+    } catch {
+      case e: InterruptedException =>
+        logWarning("FileClient interrupted", e)
+    }
+  }
+
+  def sendRequest(file: String): Unit = {
+    try {
+      val bSent = channel.writeAndFlush(file + "\r\n").await(sendTimeout, 
TimeUnit.SECONDS)
+      if (!bSent) {
+        throw new RuntimeException("Failed to send")
+      }
+    } catch {
+      case e: InterruptedException =>
+        logError("Error", e)
+    }
+  }
+
+  def close(): Unit = {
+    if (group != null) {
+      group.shutdownGracefully()
+      group = null
+      bootstrap = null
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ba28a8fc/core/src/main/scala/org/apache/spark/network/netty/FileClientChannelInitializer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/FileClientChannelInitializer.scala
 
b/core/src/main/scala/org/apache/spark/network/netty/FileClientChannelInitializer.scala
new file mode 100644
index 0000000..f4261c1
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/FileClientChannelInitializer.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import io.netty.channel.ChannelInitializer
+import io.netty.channel.socket.SocketChannel
+import io.netty.handler.codec.string.StringEncoder
+
+
+class FileClientChannelInitializer(handler: FileClientHandler)
+  extends ChannelInitializer[SocketChannel] {
+
+  def initChannel(channel: SocketChannel) {
+    channel.pipeline.addLast("encoder", new StringEncoder).addLast("handler", 
handler)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ba28a8fc/core/src/main/scala/org/apache/spark/network/netty/FileClientHandler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/FileClientHandler.scala 
b/core/src/main/scala/org/apache/spark/network/netty/FileClientHandler.scala
new file mode 100644
index 0000000..017302e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/netty/FileClientHandler.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import io.netty.buffer.ByteBuf
+import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
+
+import org.apache.spark.storage.BlockId
+
+
+abstract class FileClientHandler extends SimpleChannelInboundHandler[ByteBuf] {
+
+  private var currentHeader: FileHeader = null
+
+  @volatile
+  private var handlerCalled: Boolean = false
+
+  def isComplete: Boolean = handlerCalled
+
+  def handle(ctx: ChannelHandlerContext, in: ByteBuf, header: FileHeader)
+
+  def handleError(blockId: BlockId)
+
+  override def channelRead0(ctx: ChannelHandlerContext, in: ByteBuf) {
+    if (currentHeader == null && in.readableBytes >= FileHeader.HEADER_SIZE) {
+      currentHeader = FileHeader.create(in.readBytes(FileHeader.HEADER_SIZE))
+    }
+    if (in.readableBytes >= currentHeader.fileLen) {
+      handle(ctx, in, currentHeader)
+      handlerCalled = true
+      currentHeader = null
+      ctx.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ba28a8fc/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala 
b/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
index 136c191..607e560 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
@@ -26,7 +26,7 @@ private[spark] class FileHeader (
   val fileLen: Int,
   val blockId: BlockId) extends Logging {
 
-  lazy val buffer = {
+  lazy val buffer: ByteBuf = {
     val buf = Unpooled.buffer()
     buf.capacity(FileHeader.HEADER_SIZE)
     buf.writeInt(fileLen)
@@ -62,11 +62,10 @@ private[spark] object FileHeader {
     new FileHeader(length, blockId)
   }
 
-  def main (args:Array[String]) {
+  def main(args:Array[String]) {
     val header = new FileHeader(25, TestBlockId("my_block"))
     val buf = header.buffer
     val newHeader = FileHeader.create(buf)
     System.out.println("id=" + newHeader.blockId + ",size=" + 
newHeader.fileLen)
   }
 }
-

http://git-wip-us.apache.org/repos/asf/spark/blob/ba28a8fc/core/src/main/scala/org/apache/spark/network/netty/FileServer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/FileServer.scala 
b/core/src/main/scala/org/apache/spark/network/netty/FileServer.scala
new file mode 100644
index 0000000..dff7795
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/netty/FileServer.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import java.net.InetSocketAddress
+
+import io.netty.bootstrap.ServerBootstrap
+import io.netty.channel.{ChannelFuture, ChannelOption, EventLoopGroup}
+import io.netty.channel.oio.OioEventLoopGroup
+import io.netty.channel.socket.oio.OioServerSocketChannel
+
+import org.apache.spark.Logging
+
+/**
+ * Server that accept the path of a file an echo back its content.
+ */
+class FileServer(pResolver: PathResolver, private var port: Int) extends 
Logging {
+
+  private val addr: InetSocketAddress = new InetSocketAddress(port)
+  private var bossGroup: EventLoopGroup = new OioEventLoopGroup
+  private var workerGroup: EventLoopGroup = new OioEventLoopGroup
+
+  private var channelFuture: ChannelFuture = {
+    val bootstrap = new ServerBootstrap
+    bootstrap.group(bossGroup, workerGroup)
+      .channel(classOf[OioServerSocketChannel])
+      .option(ChannelOption.SO_BACKLOG, java.lang.Integer.valueOf(100))
+      .option(ChannelOption.SO_RCVBUF, java.lang.Integer.valueOf(1500))
+      .childHandler(new FileServerChannelInitializer(pResolver))
+    bootstrap.bind(addr)
+  }
+
+  try {
+    val boundAddress = 
channelFuture.sync.channel.localAddress.asInstanceOf[InetSocketAddress]
+    port = boundAddress.getPort
+  } catch {
+    case ie: InterruptedException =>
+      port = 0
+  }
+
+  /** Start the file server asynchronously in a new thread. */
+  def start(): Unit = {
+    val blockingThread: Thread = new Thread {
+      override def run(): Unit = {
+        try {
+          channelFuture.channel.closeFuture.sync
+          logInfo("FileServer exiting")
+        } catch {
+          case e: InterruptedException =>
+            logError("File server start got interrupted", e)
+        }
+        // NOTE: bootstrap is shutdown in stop()
+      }
+    }
+    blockingThread.setDaemon(true)
+    blockingThread.start()
+  }
+
+  def getPort: Int = port
+
+  def stop(): Unit = {
+    if (channelFuture != null) {
+      channelFuture.channel().close().awaitUninterruptibly()
+      channelFuture = null
+    }
+    if (bossGroup != null) {
+      bossGroup.shutdownGracefully()
+      bossGroup = null
+    }
+    if (workerGroup != null) {
+      workerGroup.shutdownGracefully()
+      workerGroup = null
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/ba28a8fc/core/src/main/scala/org/apache/spark/network/netty/FileServerChannelInitializer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/FileServerChannelInitializer.scala
 
b/core/src/main/scala/org/apache/spark/network/netty/FileServerChannelInitializer.scala
new file mode 100644
index 0000000..aaa2f91
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/FileServerChannelInitializer.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import io.netty.channel.ChannelInitializer
+import io.netty.channel.socket.SocketChannel
+import io.netty.handler.codec.{DelimiterBasedFrameDecoder, Delimiters}
+import io.netty.handler.codec.string.StringDecoder
+
+class FileServerChannelInitializer(pResolver: PathResolver)
+  extends ChannelInitializer[SocketChannel] {
+
+  override def initChannel(channel: SocketChannel): Unit = {
+    channel.pipeline
+      .addLast("framer", new DelimiterBasedFrameDecoder(8192, 
Delimiters.lineDelimiter : _*))
+      .addLast("stringDecoder", new StringDecoder)
+      .addLast("handler", new FileServerHandler(pResolver))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ba28a8fc/core/src/main/scala/org/apache/spark/network/netty/FileServerHandler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/FileServerHandler.scala 
b/core/src/main/scala/org/apache/spark/network/netty/FileServerHandler.scala
new file mode 100644
index 0000000..96f60b2
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/netty/FileServerHandler.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import java.io.FileInputStream
+
+import io.netty.channel.{DefaultFileRegion, ChannelHandlerContext, 
SimpleChannelInboundHandler}
+
+import org.apache.spark.Logging
+import org.apache.spark.storage.{BlockId, FileSegment}
+
+
+class FileServerHandler(pResolver: PathResolver)
+  extends SimpleChannelInboundHandler[String] with Logging {
+
+  override def channelRead0(ctx: ChannelHandlerContext, blockIdString: 
String): Unit = {
+    val blockId: BlockId = BlockId(blockIdString)
+    val fileSegment: FileSegment = pResolver.getBlockLocation(blockId)
+    if (fileSegment == null) {
+      return
+    }
+    val file = fileSegment.file
+    if (file.exists) {
+      if (!file.isFile) {
+        ctx.write(new FileHeader(0, blockId).buffer)
+        ctx.flush()
+        return
+      }
+      val length: Long = fileSegment.length
+      if (length > Integer.MAX_VALUE || length <= 0) {
+        ctx.write(new FileHeader(0, blockId).buffer)
+        ctx.flush()
+        return
+      }
+      ctx.write(new FileHeader(length.toInt, blockId).buffer)
+      try {
+        val channel = new FileInputStream(file).getChannel
+        ctx.write(new DefaultFileRegion(channel, fileSegment.offset, 
fileSegment.length))
+      } catch {
+        case e: Exception =>
+          logError("Exception: ", e)
+      }
+    } else {
+      ctx.write(new FileHeader(0, blockId).buffer)
+    }
+    ctx.flush()
+  }
+
+  override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): 
Unit = {
+    logError("Exception: ", cause)
+    ctx.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ba28a8fc/core/src/main/scala/org/apache/spark/network/netty/PathResolver.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/PathResolver.scala 
b/core/src/main/scala/org/apache/spark/network/netty/PathResolver.scala
new file mode 100644
index 0000000..0d76950
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/netty/PathResolver.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import org.apache.spark.storage.{BlockId, FileSegment}
+
+trait PathResolver {
+  /** Get the file segment in which the given block resides. */
+  def getBlockLocation(blockId: BlockId): FileSegment
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ba28a8fc/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala 
b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
index 7ef7aec..95958e3 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
@@ -32,7 +32,7 @@ private[spark] class ShuffleSender(portIn: Int, val 
pResolver: PathResolver) ext
     server.stop()
   }
 
-  def port: Int = server.getPort()
+  def port: Int = server.getPort
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to