http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/resources/storage-default.xml b/tajo-storage/src/main/resources/storage-default.xml deleted file mode 100644 index 85e5f41..0000000 --- a/tajo-storage/src/main/resources/storage-default.xml +++ /dev/null @@ -1,180 +0,0 @@ -<?xml version="1.0"?> -<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> - -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> - -<configuration> - <property> - <name>tajo.storage.manager.maxReadBytes</name> - <value>8388608</value> - <description></description> - </property> - - <property> - <name>tajo.storage.manager.concurrency.perDisk</name> - <value>1</value> - <description></description> - </property> - - <!--- Registered Scanner Handler --> - <property> - <name>tajo.storage.scanner-handler</name> - <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro,hbase</value> - </property> - - <!--- Fragment Class Configurations --> - <property> - <name>tajo.storage.fragment.textfile.class</name> - <value>org.apache.tajo.storage.fragment.FileFragment</value> - </property> - <property> - <name>tajo.storage.fragment.csv.class</name> - <value>org.apache.tajo.storage.fragment.FileFragment</value> - </property> - <property> - <name>tajo.storage.fragment.raw.class</name> - <value>org.apache.tajo.storage.fragment.FileFragment</value> - </property> - <property> - <name>tajo.storage.fragment.rcfile.class</name> - <value>org.apache.tajo.storage.fragment.FileFragment</value> - </property> - <property> - <name>tajo.storage.fragment.row.class</name> - <value>org.apache.tajo.storage.fragment.FileFragment</value> - </property> - <property> - <name>tajo.storage.fragment.parquet.class</name> - <value>org.apache.tajo.storage.fragment.FileFragment</value> - </property> - <property> - <name>tajo.storage.fragment.sequencefile.class</name> - <value>org.apache.tajo.storage.fragment.FileFragment</value> - </property> - <property> - <name>tajo.storage.fragment.avro.class</name> - <value>org.apache.tajo.storage.fragment.FileFragment</value> - </property> - <property> - <name>tajo.storage.fragment.hbase.class</name> - <value>org.apache.tajo.storage.hbase.HBaseFragment</value> - </property> - - <!--- Scanner Handler --> - <property> - <name>tajo.storage.scanner-handler.textfile.class</name> - <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value> - </property> - - <property> - <name>tajo.storage.scanner-handler.csv.class</name> - <value>org.apache.tajo.storage.CSVFile$CSVScanner</value> - </property> - - <property> - <name>tajo.storage.scanner-handler.raw.class</name> - <value>org.apache.tajo.storage.RawFile$RawFileScanner</value> - </property> - - <property> - <name>tajo.storage.scanner-handler.rcfile.class</name> - <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value> - </property> - - <property> - <name>tajo.storage.scanner-handler.rowfile.class</name> - <value>org.apache.tajo.storage.RowFile$RowFileScanner</value> - </property> - - <property> - <name>tajo.storage.scanner-handler.parquet.class</name> - <value>org.apache.tajo.storage.parquet.ParquetScanner</value> - </property> - - <property> - <name>tajo.storage.scanner-handler.sequencefile.class</name> - <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value> - </property> - - <property> - <name>tajo.storage.scanner-handler.avro.class</name> - <value>org.apache.tajo.storage.avro.AvroScanner</value> - </property> - - <property> - <name>tajo.storage.scanner-handler.hbase.class</name> - <value>org.apache.tajo.storage.hbase.HBaseScanner</value> - </property> - - <!--- Appender Handler --> - <property> - <name>tajo.storage.appender-handler</name> - <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro,hbase</value> - </property> - - <property> - <name>tajo.storage.appender-handler.textfile.class</name> - <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value> - </property> - - <property> - <name>tajo.storage.appender-handler.csv.class</name> - <value>org.apache.tajo.storage.CSVFile$CSVAppender</value> - </property> - - <property> - <name>tajo.storage.appender-handler.raw.class</name> - <value>org.apache.tajo.storage.RawFile$RawFileAppender</value> - </property> - - <property> - <name>tajo.storage.appender-handler.rcfile.class</name> - <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value> - </property> - - <property> - <name>tajo.storage.appender-handler.rowfile.class</name> - <value>org.apache.tajo.storage.RowFile$RowFileAppender</value> - </property> - - <property> - <name>tajo.storage.appender-handler.parquet.class</name> - <value>org.apache.tajo.storage.parquet.ParquetAppender</value> - </property> - - <property> - <name>tajo.storage.appender-handler.sequencefile.class</name> - <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value> - </property> - - <property> - <name>tajo.storage.appender-handler.avro.class</name> - <value>org.apache.tajo.storage.avro.AvroAppender</value> - </property> - - <property> - <name>tajo.storage.appender-handler.hbase.class</name> - <value>org.apache.tajo.storage.hbase.HFileAppender</value> - </property> - - <property> - <name>tajo.storage.appender-handler.hfile.class</name> - <value>org.apache.tajo.storage.hbase.HFileAppender</value> - </property> -</configuration>
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java deleted file mode 100644 index cf8a54e..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.net.NetUtils; -import org.jboss.netty.bootstrap.ServerBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.group.ChannelGroup; -import org.jboss.netty.channel.group.ChannelGroupFuture; -import org.jboss.netty.channel.group.DefaultChannelGroup; -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; - -import java.net.InetSocketAddress; -import java.util.concurrent.Executors; - -public class HttpFileServer { - private final static Log LOG = LogFactory.getLog(HttpFileServer.class); - - private final InetSocketAddress addr; - private InetSocketAddress bindAddr; - private ServerBootstrap bootstrap = null; - private ChannelFactory factory = null; - private ChannelGroup channelGroup = null; - - public HttpFileServer(final InetSocketAddress addr) { - this.addr = addr; - this.factory = new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), - 2); - - // Configure the server. - this.bootstrap = new ServerBootstrap(factory); - // Set up the event pipeline factory. - this.bootstrap.setPipelineFactory(new HttpFileServerPipelineFactory()); - this.channelGroup = new DefaultChannelGroup(); - } - - public HttpFileServer(String bindaddr) { - this(NetUtils.createSocketAddr(bindaddr)); - } - - public void start() { - // Bind and start to accept incoming connections. - Channel channel = bootstrap.bind(addr); - channelGroup.add(channel); - this.bindAddr = (InetSocketAddress) channel.getLocalAddress(); - LOG.info("HttpFileServer starts up (" - + this.bindAddr.getAddress().getHostAddress() + ":" + this.bindAddr.getPort() - + ")"); - } - - public InetSocketAddress getBindAddress() { - return this.bindAddr; - } - - public void stop() { - ChannelGroupFuture future = channelGroup.close(); - future.awaitUninterruptibly(); - factory.releaseExternalResources(); - - LOG.info("HttpFileServer shutdown (" - + this.bindAddr.getAddress().getHostAddress() + ":" - + this.bindAddr.getPort() + ")"); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java deleted file mode 100644 index 6c77317..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java +++ /dev/null @@ -1,184 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo; - -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.*; -import org.jboss.netty.handler.codec.frame.TooLongFrameException; -import org.jboss.netty.handler.codec.http.DefaultHttpResponse; -import org.jboss.netty.handler.codec.http.HttpRequest; -import org.jboss.netty.handler.codec.http.HttpResponse; -import org.jboss.netty.handler.codec.http.HttpResponseStatus; -import org.jboss.netty.handler.ssl.SslHandler; -import org.jboss.netty.handler.stream.ChunkedFile; -import org.jboss.netty.util.CharsetUtil; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.RandomAccessFile; -import java.io.UnsupportedEncodingException; -import java.net.URLDecoder; - -import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; -import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive; -import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength; -import static org.jboss.netty.handler.codec.http.HttpMethod.GET; -import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*; -import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; - -/** - * this is an implementation copied from HttpStaticFileServerHandler.java of netty 3.6 - */ -public class HttpFileServerHandler extends SimpleChannelUpstreamHandler { - - @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - HttpRequest request = (HttpRequest) e.getMessage(); - if (request.getMethod() != GET) { - sendError(ctx, METHOD_NOT_ALLOWED); - return; - } - - final String path = sanitizeUri(request.getUri()); - if (path == null) { - sendError(ctx, FORBIDDEN); - return; - } - - File file = new File(path); - if (file.isHidden() || !file.exists()) { - sendError(ctx, NOT_FOUND); - return; - } - if (!file.isFile()) { - sendError(ctx, FORBIDDEN); - return; - } - - RandomAccessFile raf; - try { - raf = new RandomAccessFile(file, "r"); - } catch (FileNotFoundException fnfe) { - sendError(ctx, NOT_FOUND); - return; - } - long fileLength = raf.length(); - - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); - setContentLength(response, fileLength); - setContentTypeHeader(response); - - Channel ch = e.getChannel(); - - // Write the initial line and the header. - ch.write(response); - - // Write the content. - ChannelFuture writeFuture; - if (ch.getPipeline().get(SslHandler.class) != null) { - // Cannot use zero-copy with HTTPS. - writeFuture = ch.write(new ChunkedFile(raf, 0, fileLength, 8192)); - } else { - // No encryption - use zero-copy. - final FileRegion region = - new DefaultFileRegion(raf.getChannel(), 0, fileLength); - writeFuture = ch.write(region); - writeFuture.addListener(new ChannelFutureProgressListener() { - public void operationComplete(ChannelFuture future) { - region.releaseExternalResources(); - } - - public void operationProgressed( - ChannelFuture future, long amount, long current, long total) { - System.out.printf("%s: %d / %d (+%d)%n", path, current, total, amount); - } - }); - } - - // Decide whether to close the connection or not. - if (!isKeepAlive(request)) { - // Close the connection when the whole content is written out. - writeFuture.addListener(ChannelFutureListener.CLOSE); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) - throws Exception { - Channel ch = e.getChannel(); - Throwable cause = e.getCause(); - if (cause instanceof TooLongFrameException) { - sendError(ctx, BAD_REQUEST); - return; - } - - cause.printStackTrace(); - if (ch.isConnected()) { - sendError(ctx, INTERNAL_SERVER_ERROR); - } - } - - private static String sanitizeUri(String uri) { - // Decode the path. - try { - uri = URLDecoder.decode(uri, "UTF-8"); - } catch (UnsupportedEncodingException e) { - try { - uri = URLDecoder.decode(uri, "ISO-8859-1"); - } catch (UnsupportedEncodingException e1) { - throw new Error(); - } - } - - // Convert file separators. - uri = uri.replace('/', File.separatorChar); - - // Simplistic dumb security check. - // You will have to do something serious in the production environment. - if (uri.contains(File.separator + '.') || - uri.contains('.' + File.separator) || - uri.startsWith(".") || uri.endsWith(".")) { - return null; - } - - return uri; - } - - private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); - response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); - response.setContent(ChannelBuffers.copiedBuffer( - "Failure: " + status.toString() + "\r\n", - CharsetUtil.UTF_8)); - - // Close the connection as soon as the error message is sent. - ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE); - } - - /** - * Sets the content type header for the HTTP Response - * - * @param response - * HTTP response - */ - private static void setContentTypeHeader(HttpResponse response) { - response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); - } - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java deleted file mode 100644 index cecf93b..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo; - -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.handler.codec.http.HttpChunkAggregator; -import org.jboss.netty.handler.codec.http.HttpRequestDecoder; -import org.jboss.netty.handler.codec.http.HttpResponseEncoder; -import org.jboss.netty.handler.stream.ChunkedWriteHandler; - -import static org.jboss.netty.channel.Channels.pipeline; - -// Uncomment the following lines if you want HTTPS -//import javax.net.ssl.SSLEngine; -//import org.jboss.netty.example.securechat.SecureChatSslContextFactory; -//import org.jboss.netty.handler.ssl.SslHandler; - -//this class is copied from HttpStaticFileServerPipelineFactory.java of netty 3.6 -public class HttpFileServerPipelineFactory implements ChannelPipelineFactory { - public ChannelPipeline getPipeline() throws Exception { - // Create a default pipeline implementation. - ChannelPipeline pipeline = pipeline(); - - // Uncomment the following lines if you want HTTPS - //SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine(); - //engine.setUseClientMode(false); - //pipeline.addLast("ssl", new SslHandler(engine)); - - pipeline.addLast("decoder", new HttpRequestDecoder()); - pipeline.addLast("aggregator", new HttpChunkAggregator(65536)); - pipeline.addLast("encoder", new HttpResponseEncoder()); - pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); - - pipeline.addLast("handler", new HttpFileServerHandler()); - return pipeline; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java deleted file mode 100644 index ea46fa7..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java +++ /dev/null @@ -1,185 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.compress.*; -import org.apache.hadoop.io.compress.zlib.ZlibFactory; -import org.apache.hadoop.util.NativeCodeLoader; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.sequencefile.SequenceFileScanner; -import org.apache.tajo.storage.text.DelimitedTextFile; -import org.apache.tajo.util.CommonTestingUtil; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; - -import static org.junit.Assert.*; - -@RunWith(Parameterized.class) -public class TestCompressionStorages { - private TajoConf conf; - private static String TEST_PATH = "target/test-data/TestCompressionStorages"; - - private StoreType storeType; - private Path testDir; - private FileSystem fs; - - public TestCompressionStorages(StoreType type) throws IOException { - this.storeType = type; - conf = new TajoConf(); - - testDir = CommonTestingUtil.getTestDir(TEST_PATH); - fs = testDir.getFileSystem(conf); - } - - @Parameterized.Parameters - public static Collection<Object[]> generateParameters() { - return Arrays.asList(new Object[][]{ - {StoreType.CSV}, - {StoreType.RCFILE}, - {StoreType.SEQUENCEFILE}, - {StoreType.TEXTFILE} - }); - } - - @Test - public void testDeflateCodecCompressionData() throws IOException { - storageCompressionTest(storeType, DeflateCodec.class); - } - - @Test - public void testGzipCodecCompressionData() throws IOException { - if (storeType == StoreType.RCFILE) { - if( ZlibFactory.isNativeZlibLoaded(conf)) { - storageCompressionTest(storeType, GzipCodec.class); - } - } else if (storeType == StoreType.SEQUENCEFILE) { - if( ZlibFactory.isNativeZlibLoaded(conf)) { - storageCompressionTest(storeType, GzipCodec.class); - } - } else { - storageCompressionTest(storeType, GzipCodec.class); - } - } - - @Test - public void testSnappyCodecCompressionData() throws IOException { - if (SnappyCodec.isNativeCodeLoaded()) { - storageCompressionTest(storeType, SnappyCodec.class); - } - } - - @Test - public void testLz4CodecCompressionData() throws IOException { - if(NativeCodeLoader.isNativeCodeLoaded() && Lz4Codec.isNativeCodeLoaded()) - storageCompressionTest(storeType, Lz4Codec.class); - } - - private void storageCompressionTest(StoreType storeType, Class<? extends CompressionCodec> codec) throws IOException { - Schema schema = new Schema(); - schema.addColumn("id", Type.INT4); - schema.addColumn("age", Type.FLOAT4); - schema.addColumn("name", Type.TEXT); - - TableMeta meta = CatalogUtil.newTableMeta(storeType); - meta.putOption("compression.codec", codec.getCanonicalName()); - meta.putOption("compression.type", SequenceFile.CompressionType.BLOCK.name()); - meta.putOption("rcfile.serde", TextSerializerDeserializer.class.getName()); - meta.putOption("sequencefile.serde", TextSerializerDeserializer.class.getName()); - - String fileName = "Compression_" + codec.getSimpleName(); - Path tablePath = new Path(testDir, fileName); - Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath); - appender.enableStats(); - - appender.init(); - - String extension = ""; - if (appender instanceof CSVFile.CSVAppender) { - extension = ((CSVFile.CSVAppender) appender).getExtension(); - } else if (appender instanceof DelimitedTextFile.DelimitedTextFileAppender) { - extension = ((DelimitedTextFile.DelimitedTextFileAppender) appender).getExtension(); - } - - int tupleNum = 100000; - VTuple vTuple; - - for (int i = 0; i < tupleNum; i++) { - vTuple = new VTuple(3); - vTuple.put(0, DatumFactory.createInt4(i + 1)); - vTuple.put(1, DatumFactory.createFloat4((float) i)); - vTuple.put(2, DatumFactory.createText(String.valueOf(i))); - appender.addTuple(vTuple); - } - appender.close(); - - TableStats stat = appender.getStats(); - assertEquals(tupleNum, stat.getNumRows().longValue()); - tablePath = tablePath.suffix(extension); - FileStatus status = fs.getFileStatus(tablePath); - long fileLen = status.getLen(); - FileFragment[] tablets = new FileFragment[1]; - tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen); - - Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, tablets[0], schema); - - if (StoreType.CSV == storeType) { - if (SplittableCompressionCodec.class.isAssignableFrom(codec)) { - assertTrue(scanner.isSplittable()); - } else { - assertFalse(scanner.isSplittable()); - } - } - scanner.init(); - - if (storeType == StoreType.SEQUENCEFILE) { - assertTrue(scanner instanceof SequenceFileScanner); - Writable key = ((SequenceFileScanner) scanner).getKey(); - assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName()); - } - - int tupleCnt = 0; - Tuple tuple; - while ((tuple = scanner.next()) != null) { - tupleCnt++; - } - scanner.close(); - assertEquals(tupleNum, tupleCnt); - assertNotSame(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); - assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java deleted file mode 100644 index 17a8da7..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java +++ /dev/null @@ -1,130 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3.S3FileSystem; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.storage.s3.InMemoryFileSystemStore; -import org.apache.tajo.storage.s3.SmallBlockS3FileSystem; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.IOException; -import java.net.URI; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -@RunWith(Parameterized.class) -public class TestFileSystems { - - protected byte[] data = null; - - private static String TEST_PATH = "target/test-data/TestFileSystem"; - private TajoConf conf = null; - private FileStorageManager sm = null; - private FileSystem fs = null; - Path testDir; - - public TestFileSystems(FileSystem fs) throws IOException { - conf = new TajoConf(); - - if(fs instanceof S3FileSystem){ - conf.set(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, "10"); - fs.initialize(URI.create(fs.getScheme() + ":///"), conf); - } - this.fs = fs; - sm = StorageManager.getFileStorageManager(conf); - testDir = getTestDir(this.fs, TEST_PATH); - } - - public Path getTestDir(FileSystem fs, String dir) throws IOException { - Path path = new Path(dir); - if(fs.exists(path)) - fs.delete(path, true); - - fs.mkdirs(path); - - return fs.makeQualified(path); - } - - @Parameterized.Parameters - public static Collection<Object[]> generateParameters() { - return Arrays.asList(new Object[][] { - {new SmallBlockS3FileSystem(new InMemoryFileSystemStore())}, - }); - } - - @Test - public void testBlockSplit() throws IOException { - - Schema schema = new Schema(); - schema.addColumn("id", Type.INT4); - schema.addColumn("age", Type.INT4); - schema.addColumn("name", Type.TEXT); - - TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV); - - Tuple[] tuples = new Tuple[4]; - for (int i = 0; i < tuples.length; i++) { - tuples[i] = new VTuple(3); - tuples[i] - .put(new Datum[] { DatumFactory.createInt4(i), - DatumFactory.createInt4(i + 32), - DatumFactory.createText("name" + i) }); - } - - Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", - "table.csv"); - fs.mkdirs(path.getParent()); - - Appender appender = sm.getAppender(meta, schema, path); - appender.init(); - for (Tuple t : tuples) { - appender.addTuple(t); - } - appender.close(); - FileStatus fileStatus = fs.getFileStatus(path); - - List<Fragment> splits = sm.getSplits("table", meta, schema, path); - int splitSize = (int) Math.ceil(fileStatus.getLen() / (double) fileStatus.getBlockSize()); - assertEquals(splitSize, splits.size()); - - for (Fragment fragment : splits) { - assertTrue(fragment.getLength() <= fileStatus.getBlockSize()); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java deleted file mode 100644 index 387fed5..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.DatumFactory; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class TestFrameTuple { - private Tuple tuple1; - private Tuple tuple2; - - @Before - public void setUp() throws Exception { - tuple1 = new VTuple(11); - tuple1.put(new Datum[] { - DatumFactory.createBool(true), - DatumFactory.createBit((byte) 0x99), - DatumFactory.createChar('9'), - DatumFactory.createInt2((short) 17), - DatumFactory.createInt4(59), - DatumFactory.createInt8(23l), - DatumFactory.createFloat4(77.9f), - DatumFactory.createFloat8(271.9f), - DatumFactory.createText("hyunsik"), - DatumFactory.createBlob("hyunsik".getBytes()), - DatumFactory.createInet4("192.168.0.1") - }); - - tuple2 = new VTuple(11); - tuple2.put(new Datum[] { - DatumFactory.createBool(true), - DatumFactory.createBit((byte) 0x99), - DatumFactory.createChar('9'), - DatumFactory.createInt2((short) 17), - DatumFactory.createInt4(59), - DatumFactory.createInt8(23l), - DatumFactory.createFloat4(77.9f), - DatumFactory.createFloat8(271.9f), - DatumFactory.createText("hyunsik"), - DatumFactory.createBlob("hyunsik".getBytes()), - DatumFactory.createInet4("192.168.0.1") - }); - } - - @After - public void tearDown() throws Exception { - } - - @Test - public final void testFrameTuple() { - Tuple frame = new FrameTuple(tuple1, tuple2); - assertEquals(22, frame.size()); - for (int i = 0; i < 22; i++) { - assertTrue(frame.contains(i)); - } - - assertEquals(DatumFactory.createInt8(23l), frame.get(5)); - assertEquals(DatumFactory.createInt8(23l), frame.get(16)); - assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(10)); - assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(21)); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java deleted file mode 100644 index c6149f7..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java +++ /dev/null @@ -1,258 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - - -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.util.BytesUtils; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.*; - -public class TestLazyTuple { - - Schema schema; - byte[][] textRow; - byte[] nullbytes; - SerializerDeserializer serde; - - @Before - public void setUp() { - nullbytes = "\\N".getBytes(); - - schema = new Schema(); - schema.addColumn("col1", TajoDataTypes.Type.BOOLEAN); - schema.addColumn("col2", TajoDataTypes.Type.BIT); - schema.addColumn("col3", TajoDataTypes.Type.CHAR, 7); - schema.addColumn("col4", TajoDataTypes.Type.INT2); - schema.addColumn("col5", TajoDataTypes.Type.INT4); - schema.addColumn("col6", TajoDataTypes.Type.INT8); - schema.addColumn("col7", TajoDataTypes.Type.FLOAT4); - schema.addColumn("col8", TajoDataTypes.Type.FLOAT8); - schema.addColumn("col9", TajoDataTypes.Type.TEXT); - schema.addColumn("col10", TajoDataTypes.Type.BLOB); - schema.addColumn("col11", TajoDataTypes.Type.INET4); - schema.addColumn("col12", TajoDataTypes.Type.INT4); - schema.addColumn("col13", TajoDataTypes.Type.NULL_TYPE); - - StringBuilder sb = new StringBuilder(); - sb.append(DatumFactory.createBool(true)).append('|'); - sb.append(new String(DatumFactory.createBit((byte) 0x99).asTextBytes())).append('|'); - sb.append(DatumFactory.createChar("str")).append('|'); - sb.append(DatumFactory.createInt2((short) 17)).append('|'); - sb.append(DatumFactory.createInt4(59)).append('|'); - sb.append(DatumFactory.createInt8(23l)).append('|'); - sb.append(DatumFactory.createFloat4(77.9f)).append('|'); - sb.append(DatumFactory.createFloat8(271.9f)).append('|'); - sb.append(DatumFactory.createText("str2")).append('|'); - sb.append(DatumFactory.createBlob("jinho".getBytes())).append('|'); - sb.append(DatumFactory.createInet4("192.168.0.1")).append('|'); - sb.append(new String(nullbytes)).append('|'); - sb.append(NullDatum.get()); - textRow = BytesUtils.splitPreserveAllTokens(sb.toString().getBytes(), '|'); - serde = new TextSerializerDeserializer(); - } - - @Test - public void testGetDatum() { - - LazyTuple t1 = new LazyTuple(schema, textRow, -1, nullbytes, serde); - assertEquals(DatumFactory.createBool(true), t1.get(0)); - assertEquals(DatumFactory.createBit((byte) 0x99), t1.get(1)); - assertEquals(DatumFactory.createChar("str"), t1.get(2)); - assertEquals(DatumFactory.createInt2((short) 17), t1.get(3)); - assertEquals(DatumFactory.createInt4(59), t1.get(4)); - assertEquals(DatumFactory.createInt8(23l), t1.get(5)); - assertEquals(DatumFactory.createFloat4(77.9f), t1.get(6)); - assertEquals(DatumFactory.createFloat8(271.9f), t1.get(7)); - assertEquals(DatumFactory.createText("str2"), t1.get(8)); - assertEquals(DatumFactory.createBlob("jinho".getBytes()), t1.get(9)); - assertEquals(DatumFactory.createInet4("192.168.0.1"), t1.get(10)); - assertEquals(NullDatum.get(), t1.get(11)); - assertEquals(NullDatum.get(), t1.get(12)); - } - - @Test - public void testContain() { - int colNum = schema.size(); - - LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1); - t1.put(0, DatumFactory.createInt4(1)); - t1.put(3, DatumFactory.createInt4(1)); - t1.put(7, DatumFactory.createInt4(1)); - - assertTrue(t1.contains(0)); - assertFalse(t1.contains(1)); - assertFalse(t1.contains(2)); - assertTrue(t1.contains(3)); - assertFalse(t1.contains(4)); - assertFalse(t1.contains(5)); - assertFalse(t1.contains(6)); - assertTrue(t1.contains(7)); - assertFalse(t1.contains(8)); - assertFalse(t1.contains(9)); - assertFalse(t1.contains(10)); - assertFalse(t1.contains(11)); - assertFalse(t1.contains(12)); - } - - @Test - public void testPut() { - int colNum = schema.size(); - LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1); - t1.put(0, DatumFactory.createText("str")); - t1.put(1, DatumFactory.createInt4(2)); - t1.put(11, DatumFactory.createFloat4(0.76f)); - - assertTrue(t1.contains(0)); - assertTrue(t1.contains(1)); - - assertEquals(t1.getText(0), "str"); - assertEquals(t1.get(1).asInt4(), 2); - assertTrue(t1.get(11).asFloat4() == 0.76f); - } - - @Test - public void testEquals() { - int colNum = schema.size(); - LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1); - LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1); - - t1.put(0, DatumFactory.createInt4(1)); - t1.put(1, DatumFactory.createInt4(2)); - t1.put(3, DatumFactory.createInt4(2)); - - t2.put(0, DatumFactory.createInt4(1)); - t2.put(1, DatumFactory.createInt4(2)); - t2.put(3, DatumFactory.createInt4(2)); - - assertEquals(t1, t2); - - Tuple t3 = new VTuple(colNum); - t3.put(0, DatumFactory.createInt4(1)); - t3.put(1, DatumFactory.createInt4(2)); - t3.put(3, DatumFactory.createInt4(2)); - assertEquals(t1, t3); - assertEquals(t2, t3); - - LazyTuple t4 = new LazyTuple(schema, new byte[colNum][], -1); - assertNotSame(t1, t4); - } - - @Test - public void testHashCode() { - int colNum = schema.size(); - LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1); - LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1); - - t1.put(0, DatumFactory.createInt4(1)); - t1.put(1, DatumFactory.createInt4(2)); - t1.put(3, DatumFactory.createInt4(2)); - t1.put(4, DatumFactory.createText("str")); - - t2.put(0, DatumFactory.createInt4(1)); - t2.put(1, DatumFactory.createInt4(2)); - t2.put(3, DatumFactory.createInt4(2)); - t2.put(4, DatumFactory.createText("str")); - - assertEquals(t1.hashCode(), t2.hashCode()); - - Tuple t3 = new VTuple(colNum); - t3.put(0, DatumFactory.createInt4(1)); - t3.put(1, DatumFactory.createInt4(2)); - t3.put(3, DatumFactory.createInt4(2)); - t3.put(4, DatumFactory.createText("str")); - assertEquals(t1.hashCode(), t3.hashCode()); - assertEquals(t2.hashCode(), t3.hashCode()); - - Tuple t4 = new VTuple(5); - t4.put(0, DatumFactory.createInt4(1)); - t4.put(1, DatumFactory.createInt4(2)); - t4.put(4, DatumFactory.createInt4(2)); - - assertNotSame(t1.hashCode(), t4.hashCode()); - } - - @Test - public void testPutTuple() { - int colNum = schema.size(); - LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1); - - t1.put(0, DatumFactory.createInt4(1)); - t1.put(1, DatumFactory.createInt4(2)); - t1.put(2, DatumFactory.createInt4(3)); - - - Schema schema2 = new Schema(); - schema2.addColumn("col1", TajoDataTypes.Type.INT8); - schema2.addColumn("col2", TajoDataTypes.Type.INT8); - - LazyTuple t2 = new LazyTuple(schema2, new byte[schema2.size()][], -1); - t2.put(0, DatumFactory.createInt4(4)); - t2.put(1, DatumFactory.createInt4(5)); - - t1.put(3, t2); - - for (int i = 0; i < 5; i++) { - assertEquals(i + 1, t1.get(i).asInt4()); - } - } - - @Test - public void testInvalidNumber() { - byte[][] bytes = BytesUtils.splitPreserveAllTokens(" 1| |2 ||".getBytes(), '|'); - Schema schema = new Schema(); - schema.addColumn("col1", TajoDataTypes.Type.INT2); - schema.addColumn("col2", TajoDataTypes.Type.INT4); - schema.addColumn("col3", TajoDataTypes.Type.INT8); - schema.addColumn("col4", TajoDataTypes.Type.FLOAT4); - schema.addColumn("col5", TajoDataTypes.Type.FLOAT8); - - LazyTuple tuple = new LazyTuple(schema, bytes, 0); - assertEquals(bytes.length, tuple.size()); - - for (int i = 0; i < tuple.size(); i++){ - assertEquals(NullDatum.get(), tuple.get(i)); - } - } - - @Test - public void testClone() throws CloneNotSupportedException { - int colNum = schema.size(); - LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1); - - t1.put(0, DatumFactory.createInt4(1)); - t1.put(1, DatumFactory.createInt4(2)); - t1.put(3, DatumFactory.createInt4(2)); - t1.put(4, DatumFactory.createText("str")); - - LazyTuple t2 = (LazyTuple) t1.clone(); - assertNotSame(t1, t2); - assertEquals(t1, t2); - - assertSame(t1.get(4), t2.get(4)); - - t1.clear(); - assertFalse(t1.equals(t2)); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java deleted file mode 100644 index 1a4bdba..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import io.netty.buffer.ByteBuf; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.compress.DeflateCodec; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.text.ByteBufLineReader; -import org.apache.tajo.storage.text.DelimitedTextFile; -import org.apache.tajo.storage.text.DelimitedLineReader; -import org.apache.tajo.util.CommonTestingUtil; -import org.junit.Test; - -import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.*; - -public class TestLineReader { - private static String TEST_PATH = "target/test-data/TestLineReader"; - - @Test - public void testByteBufLineReader() throws IOException { - TajoConf conf = new TajoConf(); - Path testDir = CommonTestingUtil.getTestDir(TEST_PATH); - FileSystem fs = testDir.getFileSystem(conf); - - Schema schema = new Schema(); - schema.addColumn("id", Type.INT4); - schema.addColumn("age", Type.INT8); - schema.addColumn("comment", Type.TEXT); - schema.addColumn("comment2", Type.TEXT); - - TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE); - Path tablePath = new Path(testDir, "line.data"); - FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender(meta, schema, - tablePath); - appender.enableStats(); - appender.init(); - int tupleNum = 10000; - VTuple vTuple; - - for (int i = 0; i < tupleNum; i++) { - vTuple = new VTuple(4); - vTuple.put(0, DatumFactory.createInt4(i + 1)); - vTuple.put(1, DatumFactory.createInt8(25l)); - vTuple.put(2, DatumFactory.createText("emiya muljomdao")); - vTuple.put(3, NullDatum.get()); - appender.addTuple(vTuple); - } - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - - ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(tablePath)); - assertEquals(status.getLen(), channel.available()); - ByteBufLineReader reader = new ByteBufLineReader(channel); - assertEquals(status.getLen(), reader.available()); - - long totalRead = 0; - int i = 0; - AtomicInteger bytes = new AtomicInteger(); - for(;;){ - ByteBuf buf = reader.readLineBuf(bytes); - if(buf == null) break; - - totalRead += bytes.get(); - i++; - } - IOUtils.cleanup(null, reader, channel, fs); - assertEquals(tupleNum, i); - assertEquals(status.getLen(), totalRead); - assertEquals(status.getLen(), reader.readBytes()); - } - - @Test - public void testLineDelimitedReader() throws IOException { - TajoConf conf = new TajoConf(); - Path testDir = CommonTestingUtil.getTestDir(TEST_PATH); - FileSystem fs = testDir.getFileSystem(conf); - - Schema schema = new Schema(); - schema.addColumn("id", Type.INT4); - schema.addColumn("age", Type.INT8); - schema.addColumn("comment", Type.TEXT); - schema.addColumn("comment2", Type.TEXT); - - TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE); - meta.putOption("compression.codec", DeflateCodec.class.getCanonicalName()); - - Path tablePath = new Path(testDir, "line1." + DeflateCodec.class.getSimpleName()); - FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender(meta, schema, - tablePath); - appender.enableStats(); - appender.init(); - int tupleNum = 10000; - VTuple vTuple; - - long splitOffset = 0; - for (int i = 0; i < tupleNum; i++) { - vTuple = new VTuple(4); - vTuple.put(0, DatumFactory.createInt4(i + 1)); - vTuple.put(1, DatumFactory.createInt8(25l)); - vTuple.put(2, DatumFactory.createText("emiya muljomdao")); - vTuple.put(3, NullDatum.get()); - appender.addTuple(vTuple); - - if(i == (tupleNum / 2)){ - splitOffset = appender.getOffset(); - } - } - String extension = ((DelimitedTextFile.DelimitedTextFileAppender) appender).getExtension(); - appender.close(); - - tablePath = tablePath.suffix(extension); - FileFragment fragment = new FileFragment("table", tablePath, 0, splitOffset); - DelimitedLineReader reader = new DelimitedLineReader(conf, fragment); // if file is compressed, will read to EOF - assertTrue(reader.isCompressed()); - assertFalse(reader.isReadable()); - reader.init(); - assertTrue(reader.isReadable()); - - - int i = 0; - while(reader.isReadable()){ - ByteBuf buf = reader.readLine(); - if(buf == null) break; - i++; - } - - IOUtils.cleanup(null, reader, fs); - assertEquals(tupleNum, i); - - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java deleted file mode 100644 index cc4aa51..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java +++ /dev/null @@ -1,202 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.util.KeyValueSet; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.util.CommonTestingUtil; -import org.apache.tajo.util.TUtil; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; - -import static org.junit.Assert.*; - -@RunWith(Parameterized.class) -public class TestMergeScanner { - private TajoConf conf; - StorageManager sm; - private static String TEST_PATH = "target/test-data/TestMergeScanner"; - - private static String TEST_MULTIPLE_FILES_AVRO_SCHEMA = - "{\n" + - " \"type\": \"record\",\n" + - " \"namespace\": \"org.apache.tajo\",\n" + - " \"name\": \"testMultipleFiles\",\n" + - " \"fields\": [\n" + - " { \"name\": \"id\", \"type\": \"int\" },\n" + - " { \"name\": \"file\", \"type\": \"string\" },\n" + - " { \"name\": \"name\", \"type\": \"string\" },\n" + - " { \"name\": \"age\", \"type\": \"long\" }\n" + - " ]\n" + - "}\n"; - - private Path testDir; - private StoreType storeType; - private FileSystem fs; - - public TestMergeScanner(StoreType storeType) { - this.storeType = storeType; - } - - @Parameters - public static Collection<Object[]> generateParameters() { - return Arrays.asList(new Object[][] { - {StoreType.CSV}, - {StoreType.RAW}, - {StoreType.RCFILE}, - {StoreType.PARQUET}, - {StoreType.SEQUENCEFILE}, - {StoreType.AVRO}, - // RowFile requires Byte-buffer read support, so we omitted RowFile. - //{StoreType.ROWFILE}, - }); - } - - @Before - public void setup() throws Exception { - conf = new TajoConf(); - conf.setVar(ConfVars.ROOT_DIR, TEST_PATH); - conf.setStrings("tajo.storage.projectable-scanner", "rcfile", "parquet", "avro"); - testDir = CommonTestingUtil.getTestDir(TEST_PATH); - fs = testDir.getFileSystem(conf); - sm = StorageManager.getFileStorageManager(conf, testDir); - } - - @Test - public void testMultipleFiles() throws IOException { - Schema schema = new Schema(); - schema.addColumn("id", Type.INT4); - schema.addColumn("file", Type.TEXT); - schema.addColumn("name", Type.TEXT); - schema.addColumn("age", Type.INT8); - - KeyValueSet options = new KeyValueSet(); - TableMeta meta = CatalogUtil.newTableMeta(storeType, options); - meta.setOptions(CatalogUtil.newPhysicalProperties(storeType)); - if (storeType == StoreType.AVRO) { - meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, - TEST_MULTIPLE_FILES_AVRO_SCHEMA); - } - - Path table1Path = new Path(testDir, storeType + "_1.data"); - Appender appender1 = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, table1Path); - appender1.enableStats(); - appender1.init(); - int tupleNum = 10000; - VTuple vTuple; - - for(int i = 0; i < tupleNum; i++) { - vTuple = new VTuple(4); - vTuple.put(0, DatumFactory.createInt4(i + 1)); - vTuple.put(1, DatumFactory.createText("hyunsik")); - vTuple.put(2, DatumFactory.createText("jihoon")); - vTuple.put(3, DatumFactory.createInt8(25l)); - appender1.addTuple(vTuple); - } - appender1.close(); - - TableStats stat1 = appender1.getStats(); - if (stat1 != null) { - assertEquals(tupleNum, stat1.getNumRows().longValue()); - } - - Path table2Path = new Path(testDir, storeType + "_2.data"); - Appender appender2 = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, table2Path); - appender2.enableStats(); - appender2.init(); - - for(int i = 0; i < tupleNum; i++) { - vTuple = new VTuple(4); - vTuple.put(0, DatumFactory.createInt4(i + 1)); - vTuple.put(1, DatumFactory.createText("hyunsik")); - vTuple.put(2, DatumFactory.createText("jihoon")); - vTuple.put(3, DatumFactory.createInt8(25l)); - appender2.addTuple(vTuple); - } - appender2.close(); - - TableStats stat2 = appender2.getStats(); - if (stat2 != null) { - assertEquals(tupleNum, stat2.getNumRows().longValue()); - } - - - FileStatus status1 = fs.getFileStatus(table1Path); - FileStatus status2 = fs.getFileStatus(table2Path); - Fragment[] fragment = new Fragment[2]; - fragment[0] = new FileFragment("tablet1", table1Path, 0, status1.getLen()); - fragment[1] = new FileFragment("tablet1", table2Path, 0, status2.getLen()); - - Schema targetSchema = new Schema(); - targetSchema.addColumn(schema.getColumn(0)); - targetSchema.addColumn(schema.getColumn(2)); - - Scanner scanner = new MergeScanner(conf, schema, meta, TUtil.newList(fragment), targetSchema); - assertEquals(isProjectableStorage(meta.getStoreType()), scanner.isProjectable()); - - scanner.init(); - int totalCounts = 0; - Tuple tuple; - while ((tuple = scanner.next()) != null) { - totalCounts++; - if (isProjectableStorage(meta.getStoreType())) { - assertNotNull(tuple.get(0)); - assertNull(tuple.get(1)); - assertNotNull(tuple.get(2)); - assertNull(tuple.get(3)); - } - } - scanner.close(); - - assertEquals(tupleNum * 2, totalCounts); - } - - private static boolean isProjectableStorage(StoreType type) { - switch (type) { - case RCFILE: - case PARQUET: - case SEQUENCEFILE: - case CSV: - case AVRO: - return true; - default: - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java deleted file mode 100644 index 12ea551..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.util.CharsetUtil; -import org.apache.tajo.storage.text.FieldSplitProcessor; -import org.apache.tajo.storage.text.LineSplitProcessor; -import org.junit.Test; - -import java.io.IOException; - -import static io.netty.util.ReferenceCountUtil.releaseLater; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class TestSplitProcessor { - - @Test - public void testFieldSplitProcessor() throws IOException { - String data = "abc||de"; - final ByteBuf buf = releaseLater( - Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1)); - - final int len = buf.readableBytes(); - FieldSplitProcessor processor = new FieldSplitProcessor('|'); - - assertEquals(3, buf.forEachByte(0, len, processor)); - assertEquals(4, buf.forEachByte(4, len - 4, processor)); - assertEquals(-1, buf.forEachByte(5, len - 5, processor)); - - } - - @Test - public void testLineSplitProcessor() throws IOException { - String data = "abc\r\n\n"; - final ByteBuf buf = releaseLater( - Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1)); - - final int len = buf.readableBytes(); - LineSplitProcessor processor = new LineSplitProcessor(); - - //find CR - assertEquals(3, buf.forEachByte(0, len, processor)); - - // find CRLF - assertEquals(4, buf.forEachByte(4, len - 4, processor)); - assertEquals(buf.getByte(4), '\n'); - // need to skip LF - assertTrue(processor.isPrevCharCR()); - - // find LF - assertEquals(5, buf.forEachByte(5, len - 5, processor)); //line length is zero - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java deleted file mode 100644 index 13aeef6..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java +++ /dev/null @@ -1,203 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import com.google.common.collect.Lists; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.*; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.util.CommonTestingUtil; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.util.List; -import java.util.UUID; - -import static org.junit.Assert.*; - -public class TestStorageManager { - private TajoConf conf; - private static String TEST_PATH = "target/test-data/TestStorageManager"; - StorageManager sm = null; - private Path testDir; - private FileSystem fs; - - @Before - public void setUp() throws Exception { - conf = new TajoConf(); - testDir = CommonTestingUtil.getTestDir(TEST_PATH); - fs = testDir.getFileSystem(conf); - sm = StorageManager.getFileStorageManager(conf, testDir); - } - - @After - public void tearDown() throws Exception { - } - - @Test - public final void testGetScannerAndAppender() throws IOException { - Schema schema = new Schema(); - schema.addColumn("id", Type.INT4); - schema.addColumn("age",Type.INT4); - schema.addColumn("name",Type.TEXT); - - TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV); - - Tuple[] tuples = new Tuple[4]; - for(int i=0; i < tuples.length; i++) { - tuples[i] = new VTuple(3); - tuples[i].put(new Datum[] { - DatumFactory.createInt4(i), - DatumFactory.createInt4(i + 32), - DatumFactory.createText("name" + i)}); - } - - Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv"); - fs.mkdirs(path.getParent()); - Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, path); - appender.init(); - for(Tuple t : tuples) { - appender.addTuple(t); - } - appender.close(); - - Scanner scanner = StorageManager.getFileStorageManager(conf).getFileScanner(meta, schema, path); - scanner.init(); - int i=0; - while(scanner.next() != null) { - i++; - } - assertEquals(4,i); - } - - @Test - public void testGetSplit() throws Exception { - final Configuration conf = new HdfsConfiguration(); - String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString(); - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath); - conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); - conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, false); - - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(1).build(); - - int testCount = 10; - Path tablePath = new Path("/testGetSplit"); - try { - DistributedFileSystem fs = cluster.getFileSystem(); - - // Create test partitions - List<Path> partitions = Lists.newArrayList(); - for (int i =0; i < testCount; i++){ - Path tmpFile = new Path(tablePath, String.valueOf(i)); - DFSTestUtil.createFile(fs, new Path(tmpFile, "tmpfile.dat"), 10, (short) 2, 0xDEADDEADl); - partitions.add(tmpFile); - } - - assertTrue(fs.exists(tablePath)); - FileStorageManager sm = StorageManager.getFileStorageManager(new TajoConf(conf), tablePath); - - Schema schema = new Schema(); - schema.addColumn("id", Type.INT4); - schema.addColumn("age",Type.INT4); - schema.addColumn("name",Type.TEXT); - TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV); - - List<Fragment> splits = Lists.newArrayList(); - // Get FileFragments in partition batch - splits.addAll(sm.getSplits("data", meta, schema, partitions.toArray(new Path[partitions.size()]))); - assertEquals(testCount, splits.size()); - // -1 is unknown volumeId - assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]); - - splits.clear(); - splits.addAll(sm.getSplits("data", meta, schema, - partitions.subList(0, partitions.size() / 2).toArray(new Path[partitions.size() / 2]))); - assertEquals(testCount / 2, splits.size()); - assertEquals(1, splits.get(0).getHosts().length); - assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]); - fs.close(); - } finally { - cluster.shutdown(); - - File dir = new File(testDataPath); - dir.delete(); - } - } - - @Test - public void testGetSplitWithBlockStorageLocationsBatching() throws Exception { - final Configuration conf = new HdfsConfiguration(); - String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString(); - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath); - conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); - conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true); - - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(2).build(); - - int testCount = 10; - Path tablePath = new Path("/testGetSplitWithBlockStorageLocationsBatching"); - try { - DistributedFileSystem fs = cluster.getFileSystem(); - - // Create test files - for (int i = 0; i < testCount; i++) { - Path tmpFile = new Path(tablePath, "tmpfile" + i + ".dat"); - DFSTestUtil.createFile(fs, tmpFile, 10, (short) 2, 0xDEADDEADl); - } - assertTrue(fs.exists(tablePath)); - FileStorageManager sm = StorageManager.getFileStorageManager(new TajoConf(conf), tablePath); - - Schema schema = new Schema(); - schema.addColumn("id", Type.INT4); - schema.addColumn("age", Type.INT4); - schema.addColumn("name", Type.TEXT); - TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV); - - List<Fragment> splits = Lists.newArrayList(); - splits.addAll(sm.getSplits("data", meta, schema, tablePath)); - - assertEquals(testCount, splits.size()); - assertEquals(2, splits.get(0).getHosts().length); - assertEquals(2, ((FileFragment)splits.get(0)).getDiskIds().length); - assertNotEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]); - fs.close(); - } finally { - cluster.shutdown(); - - File dir = new File(testDataPath); - dir.delete(); - } - } -}
