TAJO-1181: Avro schema URL should support various protocols. (jinho) Closes #252
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/965cbd90 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/965cbd90 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/965cbd90 Branch: refs/heads/hbase_storage Commit: 965cbd90778e2c6872e04456e94a19dd2e4f27f5 Parents: c544ffc Author: jhkim <[email protected]> Authored: Thu Nov 20 10:29:20 2014 +0900 Committer: jhkim <[email protected]> Committed: Thu Nov 20 10:29:20 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../org/apache/tajo/storage/avro/AvroUtil.java | 35 +++- .../java/org/apache/tajo/HttpFileServer.java | 84 +++++++++ .../org/apache/tajo/HttpFileServerHandler.java | 184 +++++++++++++++++++ .../tajo/HttpFileServerPipelineFactory.java | 54 ++++++ .../apache/tajo/storage/avro/TestAvroUtil.java | 108 +++++++++++ 6 files changed, 463 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/965cbd90/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index b197457..42ab10a 100644 --- a/CHANGES +++ b/CHANGES @@ -63,6 +63,9 @@ Release 0.9.1 - unreleased BUG FIXES + TAJO-1181: Avro schema URL should support various protocols. + (jinho) + TAJO-1200: Invalid shuffle data of multiple worker in same server. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/965cbd90/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java index c15d20b..0d14c3d 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java @@ -18,21 +18,23 @@ package org.apache.tajo.storage.avro; -import java.io.IOException; - import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.tajo.storage.StorageConstants; +import org.apache.hadoop.io.IOUtils; import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.storage.StorageConstants; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; public class AvroUtil { public static Schema getAvroSchema(TableMeta meta, Configuration conf) throws IOException { - boolean isSchemaLiteral = meta.containsOption(StorageConstants.AVRO_SCHEMA_LITERAL); boolean isSchemaUrl = meta.containsOption(StorageConstants.AVRO_SCHEMA_URL); if (!isSchemaLiteral && !isSchemaUrl) { @@ -44,9 +46,32 @@ public class AvroUtil { } String schemaURL = meta.getOption(StorageConstants.AVRO_SCHEMA_URL); + if (schemaURL.toLowerCase().startsWith("http")) { + return getAvroSchemaFromHttp(schemaURL); + } else { + return getAvroSchemaFromFileSystem(schemaURL, conf); + } + } + + public static Schema getAvroSchemaFromHttp(String schemaURL) throws IOException { + InputStream inputStream = new URL(schemaURL).openStream(); + + try { + return new Schema.Parser().parse(inputStream); + } finally { + IOUtils.closeStream(inputStream); + } + } + + public static Schema getAvroSchemaFromFileSystem(String schemaURL, Configuration conf) throws IOException { Path schemaPath = new Path(schemaURL); FileSystem fs = schemaPath.getFileSystem(conf); FSDataInputStream inputStream = fs.open(schemaPath); - return new Schema.Parser().parse(inputStream); + + try { + return new Schema.Parser().parse(inputStream); + } finally { + IOUtils.closeStream(inputStream); + } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/965cbd90/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java new file mode 100644 index 0000000..cf8a54e --- /dev/null +++ b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.net.NetUtils; +import org.jboss.netty.bootstrap.ServerBootstrap; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFactory; +import org.jboss.netty.channel.group.ChannelGroup; +import org.jboss.netty.channel.group.ChannelGroupFuture; +import org.jboss.netty.channel.group.DefaultChannelGroup; +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; + +import java.net.InetSocketAddress; +import java.util.concurrent.Executors; + +public class HttpFileServer { + private final static Log LOG = LogFactory.getLog(HttpFileServer.class); + + private final InetSocketAddress addr; + private InetSocketAddress bindAddr; + private ServerBootstrap bootstrap = null; + private ChannelFactory factory = null; + private ChannelGroup channelGroup = null; + + public HttpFileServer(final InetSocketAddress addr) { + this.addr = addr; + this.factory = new NioServerSocketChannelFactory( + Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), + 2); + + // Configure the server. + this.bootstrap = new ServerBootstrap(factory); + // Set up the event pipeline factory. + this.bootstrap.setPipelineFactory(new HttpFileServerPipelineFactory()); + this.channelGroup = new DefaultChannelGroup(); + } + + public HttpFileServer(String bindaddr) { + this(NetUtils.createSocketAddr(bindaddr)); + } + + public void start() { + // Bind and start to accept incoming connections. + Channel channel = bootstrap.bind(addr); + channelGroup.add(channel); + this.bindAddr = (InetSocketAddress) channel.getLocalAddress(); + LOG.info("HttpFileServer starts up (" + + this.bindAddr.getAddress().getHostAddress() + ":" + this.bindAddr.getPort() + + ")"); + } + + public InetSocketAddress getBindAddress() { + return this.bindAddr; + } + + public void stop() { + ChannelGroupFuture future = channelGroup.close(); + future.awaitUninterruptibly(); + factory.releaseExternalResources(); + + LOG.info("HttpFileServer shutdown (" + + this.bindAddr.getAddress().getHostAddress() + ":" + + this.bindAddr.getPort() + ")"); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/965cbd90/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java new file mode 100644 index 0000000..6c77317 --- /dev/null +++ b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo; + +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.*; +import org.jboss.netty.handler.codec.frame.TooLongFrameException; +import org.jboss.netty.handler.codec.http.DefaultHttpResponse; +import org.jboss.netty.handler.codec.http.HttpRequest; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.jboss.netty.handler.ssl.SslHandler; +import org.jboss.netty.handler.stream.ChunkedFile; +import org.jboss.netty.util.CharsetUtil; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.RandomAccessFile; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; + +import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive; +import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength; +import static org.jboss.netty.handler.codec.http.HttpMethod.GET; +import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*; +import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +/** + * this is an implementation copied from HttpStaticFileServerHandler.java of netty 3.6 + */ +public class HttpFileServerHandler extends SimpleChannelUpstreamHandler { + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { + HttpRequest request = (HttpRequest) e.getMessage(); + if (request.getMethod() != GET) { + sendError(ctx, METHOD_NOT_ALLOWED); + return; + } + + final String path = sanitizeUri(request.getUri()); + if (path == null) { + sendError(ctx, FORBIDDEN); + return; + } + + File file = new File(path); + if (file.isHidden() || !file.exists()) { + sendError(ctx, NOT_FOUND); + return; + } + if (!file.isFile()) { + sendError(ctx, FORBIDDEN); + return; + } + + RandomAccessFile raf; + try { + raf = new RandomAccessFile(file, "r"); + } catch (FileNotFoundException fnfe) { + sendError(ctx, NOT_FOUND); + return; + } + long fileLength = raf.length(); + + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + setContentLength(response, fileLength); + setContentTypeHeader(response); + + Channel ch = e.getChannel(); + + // Write the initial line and the header. + ch.write(response); + + // Write the content. + ChannelFuture writeFuture; + if (ch.getPipeline().get(SslHandler.class) != null) { + // Cannot use zero-copy with HTTPS. + writeFuture = ch.write(new ChunkedFile(raf, 0, fileLength, 8192)); + } else { + // No encryption - use zero-copy. + final FileRegion region = + new DefaultFileRegion(raf.getChannel(), 0, fileLength); + writeFuture = ch.write(region); + writeFuture.addListener(new ChannelFutureProgressListener() { + public void operationComplete(ChannelFuture future) { + region.releaseExternalResources(); + } + + public void operationProgressed( + ChannelFuture future, long amount, long current, long total) { + System.out.printf("%s: %d / %d (+%d)%n", path, current, total, amount); + } + }); + } + + // Decide whether to close the connection or not. + if (!isKeepAlive(request)) { + // Close the connection when the whole content is written out. + writeFuture.addListener(ChannelFutureListener.CLOSE); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) + throws Exception { + Channel ch = e.getChannel(); + Throwable cause = e.getCause(); + if (cause instanceof TooLongFrameException) { + sendError(ctx, BAD_REQUEST); + return; + } + + cause.printStackTrace(); + if (ch.isConnected()) { + sendError(ctx, INTERNAL_SERVER_ERROR); + } + } + + private static String sanitizeUri(String uri) { + // Decode the path. + try { + uri = URLDecoder.decode(uri, "UTF-8"); + } catch (UnsupportedEncodingException e) { + try { + uri = URLDecoder.decode(uri, "ISO-8859-1"); + } catch (UnsupportedEncodingException e1) { + throw new Error(); + } + } + + // Convert file separators. + uri = uri.replace('/', File.separatorChar); + + // Simplistic dumb security check. + // You will have to do something serious in the production environment. + if (uri.contains(File.separator + '.') || + uri.contains('.' + File.separator) || + uri.startsWith(".") || uri.endsWith(".")) { + return null; + } + + return uri; + } + + private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); + response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); + response.setContent(ChannelBuffers.copiedBuffer( + "Failure: " + status.toString() + "\r\n", + CharsetUtil.UTF_8)); + + // Close the connection as soon as the error message is sent. + ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE); + } + + /** + * Sets the content type header for the HTTP Response + * + * @param response + * HTTP response + */ + private static void setContentTypeHeader(HttpResponse response) { + response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/965cbd90/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java new file mode 100644 index 0000000..cecf93b --- /dev/null +++ b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo; + +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.handler.codec.http.HttpChunkAggregator; +import org.jboss.netty.handler.codec.http.HttpRequestDecoder; +import org.jboss.netty.handler.codec.http.HttpResponseEncoder; +import org.jboss.netty.handler.stream.ChunkedWriteHandler; + +import static org.jboss.netty.channel.Channels.pipeline; + +// Uncomment the following lines if you want HTTPS +//import javax.net.ssl.SSLEngine; +//import org.jboss.netty.example.securechat.SecureChatSslContextFactory; +//import org.jboss.netty.handler.ssl.SslHandler; + +//this class is copied from HttpStaticFileServerPipelineFactory.java of netty 3.6 +public class HttpFileServerPipelineFactory implements ChannelPipelineFactory { + public ChannelPipeline getPipeline() throws Exception { + // Create a default pipeline implementation. + ChannelPipeline pipeline = pipeline(); + + // Uncomment the following lines if you want HTTPS + //SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine(); + //engine.setUseClientMode(false); + //pipeline.addLast("ssl", new SslHandler(engine)); + + pipeline.addLast("decoder", new HttpRequestDecoder()); + pipeline.addLast("aggregator", new HttpChunkAggregator(65536)); + pipeline.addLast("encoder", new HttpResponseEncoder()); + pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); + + pipeline.addLast("handler", new HttpFileServerHandler()); + return pipeline; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/965cbd90/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java b/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java new file mode 100644 index 0000000..6186e9e --- /dev/null +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.avro; + +import org.apache.avro.Schema; +import org.apache.tajo.HttpFileServer; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.NetUtils; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URISyntaxException; +import java.net.URL; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link org.apache.tajo.storage.avro.AvroUtil}. + */ +public class TestAvroUtil { + private Schema expected; + private URL schemaUrl; + + @Before + public void setUp() throws Exception { + schemaUrl = FileUtil.getResourcePath("testVariousTypes.avsc"); + assertNotNull(schemaUrl); + + File file = new File(schemaUrl.getPath()); + assertTrue(file.exists()); + + expected = new Schema.Parser().parse(file); + } + + @Test + public void testGetSchema() throws IOException, URISyntaxException { + TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO); + meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, FileUtil.readTextFile(new File(schemaUrl.getPath()))); + Schema schema = AvroUtil.getAvroSchema(meta, new TajoConf()); + assertEquals(expected, schema); + + meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO); + meta.putOption(StorageConstants.AVRO_SCHEMA_URL, schemaUrl.getPath()); + schema = AvroUtil.getAvroSchema(meta, new TajoConf()); + assertEquals(expected, schema); + + HttpFileServer server = new HttpFileServer(NetUtils.createSocketAddr("127.0.0.1:0")); + try { + server.start(); + InetSocketAddress addr = server.getBindAddress(); + + String url = "http://127.0.0.1:" + addr.getPort() + schemaUrl.getPath(); + meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO); + meta.putOption(StorageConstants.AVRO_SCHEMA_URL, url); + schema = AvroUtil.getAvroSchema(meta, new TajoConf()); + } finally { + server.stop(); + } + assertEquals(expected, schema); + } + + @Test + public void testGetSchemaFromHttp() throws IOException, URISyntaxException { + HttpFileServer server = new HttpFileServer(NetUtils.createSocketAddr("127.0.0.1:0")); + try { + server.start(); + InetSocketAddress addr = server.getBindAddress(); + + Schema schema = AvroUtil.getAvroSchemaFromHttp("http://127.0.0.1:" + addr.getPort() + schemaUrl.getPath()); + assertEquals(expected, schema); + } finally { + server.stop(); + } + } + + @Test + public void testGetSchemaFromFileSystem() throws IOException, URISyntaxException { + Schema schema = AvroUtil.getAvroSchemaFromFileSystem(schemaUrl.toString(), new TajoConf()); + + assertEquals(expected, schema); + } +}
