Repository: hive Updated Branches: refs/heads/llap d8a9531a7 -> 81b26df9e
HIVE-13202: LLAP: Replace use of ServerSocket with netty in LlapOutputFormatService Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/81b26df9 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/81b26df9 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/81b26df9 Branch: refs/heads/llap Commit: 81b26df9ed00e9db671c57aece8e51bf62365e34 Parents: d8a9531 Author: Jason Dere <[email protected]> Authored: Thu Mar 3 12:57:43 2016 -0800 Committer: Jason Dere <[email protected]> Committed: Thu Mar 3 12:57:43 2016 -0800 ---------------------------------------------------------------------- .../apache/hive/jdbc/TestJdbcWithMiniLlap.java | 74 +++++++-- .../hadoop/hive/llap/ChannelOutputStream.java | 141 +++++++++++++++++ .../hive/llap/LlapOutputFormatService.java | 155 ++++++++++++------- .../hadoop/hive/ql/exec/FileSinkOperator.java | 12 +- 4 files changed, 308 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/81b26df9/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java index 88e2e55..98daab4 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java @@ -37,6 +37,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; @@ -86,6 +88,7 @@ public class TestJdbcWithMiniLlap { private static Path kvDataFilePath; private static final String tmpDir = System.getProperty("test.tmp.dir"); + private static HiveConf conf = null; private Connection hs2Conn = null; @BeforeClass @@ -98,7 +101,7 @@ public class TestJdbcWithMiniLlap { System.out.println("Setting hive-site: "+HiveConf.getHiveSiteLocation()); } - HiveConf conf = new HiveConf(); + conf = new HiveConf(); conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); // Necessary for GetSplits()/LlapInputFormat, // the config generated for the query fragment needs to include the MapWork @@ -109,7 +112,7 @@ public class TestJdbcWithMiniLlap { conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + "/llap-daemon-site.xml")); - miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP, true); + miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP); dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); kvDataFilePath = new Path(dataFileDir, "kv1.txt"); @@ -160,21 +163,54 @@ public class TestJdbcWithMiniLlap { stmt.close(); } - @Test - public void testLlapInputFormatEndToEnd() throws Exception { - createTestTable("testtab1"); + private static boolean timedOut = false; + + private static class TestTimerTask extends TimerTask { + private boolean timedOut = false; + private Thread threadToInterrupt; + + public TestTimerTask(Thread threadToInterrupt) { + this.threadToInterrupt = threadToInterrupt; + } + + @Override + public void run() { + System.out.println("Test timed out!"); + timedOut = true; + threadToInterrupt.interrupt(); + } + + public boolean isTimedOut() { + return timedOut; + } + + public void setTimedOut(boolean timedOut) { + this.timedOut = timedOut; + } + + } + + private int getLlapIFRowCount(String query, int numSplits) throws Exception { + // Add a timer task to stop this test if it has not finished in a reasonable amount of time. + Timer timer = new Timer(); + long delay = 30000; + TestTimerTask timerTask = new TestTimerTask(Thread.currentThread()); + timer.schedule(timerTask, delay); + + // Setup LlapInputFormat String url = miniHS2.getJdbcURL(); String user = System.getProperty("user.name"); String pwd = user; - String query = "select * from testtab1 where under_col = 0"; LlapInputFormat inputFormat = new LlapInputFormat(url, user, pwd, query); - JobConf job = new JobConf(); - int numSplits = 1; + + // Get splits + JobConf job = new JobConf(conf); InputSplit[] splits = inputFormat.getSplits(job, numSplits); - assert(splits.length > 0); + assertTrue(splits.length > 0); + // Fetch rows from splits boolean first = true; int rowCount = 0; for (InputSplit split : splits) { @@ -198,6 +234,26 @@ public class TestJdbcWithMiniLlap { ++rowCount; } } + + timer.cancel(); + assertFalse("Test timed out", timerTask.isTimedOut()); + + return rowCount; + } + + @Test + public void testLlapInputFormatEndToEnd() throws Exception { + createTestTable("testtab1"); + + int rowCount; + + String query = "select * from testtab1 where under_col = 0"; + rowCount = getLlapIFRowCount(query, 1); assertEquals(3, rowCount); + + // Try empty rows query + query = "select * from testtab1 where true = false"; + rowCount = getLlapIFRowCount(query, 1); + assertEquals(0, rowCount); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/81b26df9/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java b/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java new file mode 100644 index 0000000..e861791 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; + +import java.io.IOException; +import java.io.OutputStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * OutputStream to write to the Netty Channel + */ +public class ChannelOutputStream extends OutputStream { + + private static final Logger LOG = LoggerFactory.getLogger(ChannelOutputStream.class); + + private ChannelHandlerContext chc; + private int bufSize; + private String id; + private ByteBuf buf; + private byte[] singleByte = new byte[1]; + private boolean closed = false; + + private ChannelFutureListener listener = new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + if (future.isCancelled()) { + LOG.error(id + " was cancelled"); + } else if (!future.isSuccess()) { + LOG.error("Error on ID " + id, future.cause()); + } + } + }; + + public ChannelOutputStream(ChannelHandlerContext chc, String id, int bufSize) { + this.chc = chc; + this.id = id; + this.bufSize = bufSize; + this.buf = chc.alloc().buffer(bufSize); + } + + @Override + public void write(int b) throws IOException { + singleByte[0] = (byte) b; + write(singleByte, 0, 1); + } + + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + int currentOffset = off; + int bytesRemaining = len; + + while (bytesRemaining + buf.readableBytes() > bufSize) { + int iterationLen = bufSize - buf.readableBytes(); + writeInternal(b, currentOffset, iterationLen); + currentOffset += iterationLen; + bytesRemaining -= iterationLen; + } + + if (bytesRemaining > 0) { + writeInternal(b, currentOffset, bytesRemaining); + } + } + + @Override + public void flush() throws IOException { + if (buf.isReadable()) { + writeToChannel(); + } + chc.flush(); + } + + @Override + public void close() throws IOException { + if (closed) { + throw new IOException("Already closed: " + id); + } + + try { + flush(); + } catch (IOException err) { + LOG.error("Error flushing stream before close", err); + } + + try { + chc.close().addListener(listener).sync(); + } catch (InterruptedException err) { + throw new IOException(err); + } finally { + buf.release(); + buf = null; + chc = null; + closed = true; + } + } + + private void writeToChannel() throws IOException { + if (closed) { + throw new IOException("Already closed: " + id); + } + + chc.write(buf.copy()).addListener(listener); + buf.clear(); + } + + private void writeInternal(byte[] b, int off, int len) throws IOException { + if (closed) { + throw new IOException("Already closed: " + id); + } + + buf.writeBytes(b, off, len); + if (buf.readableBytes() >= bufSize) { + writeToChannel(); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/81b26df9/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java index a197d7b..b39f085 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java @@ -19,9 +19,6 @@ package org.apache.hadoop.hive.llap; import java.util.Map; import java.util.HashMap; import java.io.IOException; -import java.net.ServerSocket; -import java.net.Socket; -import java.io.InputStream; import java.io.OutputStream; import org.slf4j.Logger; @@ -45,8 +42,22 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.DelimiterBasedFrameDecoder; +import io.netty.handler.codec.Delimiters; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; +import io.netty.util.concurrent.Future; + /** * @@ -57,18 +68,17 @@ public class LlapOutputFormatService { private static LlapOutputFormatService service; private final Map<String, RecordWriter> writers; - private final ServerSocket socket; private final HiveConf conf; - private final ExecutorService executor; private static final int WAIT_TIME = 5; + private static final int MAX_QUERY_ID_LENGTH = 256; + + private EventLoopGroup eventLoopGroup; + private ServerBootstrap serverBootstrap; + private ChannelFuture listeningChannelFuture; private LlapOutputFormatService() throws IOException { writers = new HashMap<String, RecordWriter>(); conf = new HiveConf(); - executor = Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LLAP output %d").build()); - socket = new ServerSocket( - conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT)); } public static LlapOutputFormatService get() throws IOException { @@ -80,52 +90,34 @@ public class LlapOutputFormatService { } public void start() throws IOException { - executor.submit(new Runnable() { - byte[] buffer = new byte[4096]; - @Override - public void run() { - while (true) { - Socket s = null; - try { - s = socket.accept(); - String id = readId(s); - LOG.debug("Received: "+id); - registerReader(s, id); - } catch (IOException io) { - if (s != null) { - try{ - s.close(); - } catch (IOException io2) { - // ignore - } - } - } - } - } - - private String readId(Socket s) throws IOException { - InputStream in = s.getInputStream(); - int idx = 0; - while((buffer[idx++] = (byte)in.read()) != '\0') {} - return new String(buffer,0,idx-1); - } - - private void registerReader(Socket s, String id) throws IOException { - synchronized(service) { - LOG.debug("registering socket for: "+id); - LlapRecordWriter writer = new LlapRecordWriter(s.getOutputStream()); - writers.put(id, writer); - service.notifyAll(); - } - } - } - ); + LOG.info("Starting LlapOutputFormatService"); + + int port = conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT); + eventLoopGroup = new NioEventLoopGroup(1); + serverBootstrap = new ServerBootstrap(); + serverBootstrap.group(eventLoopGroup); + serverBootstrap.channel(NioServerSocketChannel.class); + serverBootstrap.childHandler(new LlapOutputFormatServiceChannelHandler()); + try { + LOG.info("LlapOutputFormatService: Binding to port " + port); + listeningChannelFuture = serverBootstrap.bind(port).sync(); + } catch (InterruptedException err) { + throw new IOException("LlapOutputFormatService: Error binding to port " + port, err); + } } public void stop() throws IOException, InterruptedException { - executor.shutdown(); - executor.awaitTermination(WAIT_TIME, TimeUnit.SECONDS); - socket.close(); + LOG.info("Stopping LlapOutputFormatService"); + + if (listeningChannelFuture != null) { + listeningChannelFuture.channel().close().sync(); + listeningChannelFuture = null; + } else { + LOG.warn("LlapOutputFormatService does not appear to have a listening port to close."); + } + + Future terminationFuture = eventLoopGroup.shutdownGracefully(1, WAIT_TIME, TimeUnit.SECONDS); + terminationFuture.sync(); } public <K,V> RecordWriter<K, V> getWriter(String id) throws IOException, InterruptedException { @@ -139,4 +131,59 @@ public class LlapOutputFormatService { LOG.info("Returning writer for: "+id); return writer; } + + protected class LlapOutputFormatServiceHandler extends SimpleChannelInboundHandler<String> { + @Override + public void channelRead0(ChannelHandlerContext ctx, String msg) { + String id = msg; + registerReader(ctx, id); + } + + private void registerReader(ChannelHandlerContext ctx, String id) { + synchronized(service) { + LOG.debug("registering socket for: "+id); + int bufSize = 128 * 1024; // configable? + OutputStream stream = new ChannelOutputStream(ctx, id, bufSize); + LlapRecordWriter writer = new LlapRecordWriter(stream); + writers.put(id, writer); + + // Add listener to handle any cleanup for when the connection is closed + ctx.channel().closeFuture().addListener(new LlapOutputFormatChannelCloseListener(id)); + + service.notifyAll(); + } + } + } + + protected class LlapOutputFormatChannelCloseListener implements ChannelFutureListener { + private String id; + + LlapOutputFormatChannelCloseListener(String id) { + this.id = id; + } + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + RecordWriter writer = null; + + synchronized (service) { + writer = writers.remove(id); + } + + if (writer == null) { + LOG.warn("Did not find a writer for ID " + id); + } + } + } + + protected class LlapOutputFormatServiceChannelHandler extends ChannelInitializer<SocketChannel> { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast( + new DelimiterBasedFrameDecoder(MAX_QUERY_ID_LENGTH, Delimiters.nulDelimiter()), + new StringDecoder(), + new StringEncoder(), + new LlapOutputFormatServiceHandler()); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/81b26df9/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 02439be..17f3895 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -203,17 +203,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements } } } - try { - if ("org.apache.hadoop.hive.llap.LlapStorageHandler".equals(getConf().getTableInfo().getProperties(). - get(hive_metastoreConstants.META_TABLE_STORAGE))) { - (new LlapOutputFormat()) - .getRecordWriter(null, - hconf instanceof JobConf ? (JobConf) hconf : new JobConf(hconf), null, null) - .close(null); - } - } catch (IOException e) { - // ignored - } + try { for (int i = 0; i < updaters.length; i++) { if (updaters[i] != null) {
