http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java deleted file mode 100644 index b742e6d..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java +++ /dev/null @@ -1,311 +0,0 @@ -/*** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import com.google.common.base.Preconditions; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; - -import org.apache.tajo.datum.*; -import org.apache.tajo.exception.UnsupportedException; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; -import org.apache.tajo.util.SizeOf; -import org.apache.tajo.util.StringUtils; -import org.apache.tajo.util.UnsafeUtil; - -import sun.misc.Unsafe; -import sun.nio.ch.DirectBuffer; - -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.nio.charset.Charset; - -import static org.apache.tajo.common.TajoDataTypes.DataType; - -public abstract class UnSafeTuple implements Tuple { - private static final Unsafe UNSAFE = UnsafeUtil.unsafe; - - private DirectBuffer bb; - private int relativePos; - private int length; - private DataType [] types; - - protected void set(ByteBuffer bb, int relativePos, int length, DataType [] types) { - this.bb = (DirectBuffer) bb; - this.relativePos = relativePos; - this.length = length; - this.types = types; - } - - void set(ByteBuffer bb, DataType [] types) { - set(bb, 0, bb.limit(), types); - } - - @Override - public int size() { - return types.length; - } - - public ByteBuffer nioBuffer() { - return ((ByteBuffer)((ByteBuffer)bb).duplicate().position(relativePos).limit(relativePos + length)).slice(); - } - - public HeapTuple toHeapTuple() { - byte [] bytes = new byte[length]; - UNSAFE.copyMemory(null, bb.address() + relativePos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, length); - return new HeapTuple(bytes, types); - } - - public void copyFrom(UnSafeTuple tuple) { - Preconditions.checkNotNull(tuple); - - ((ByteBuffer) bb).clear(); - if (length < tuple.length) { - UnsafeUtil.free((ByteBuffer) bb); - bb = (DirectBuffer) ByteBuffer.allocateDirect(tuple.length).order(ByteOrder.nativeOrder()); - this.relativePos = 0; - this.length = tuple.length; - } - - ((ByteBuffer) bb).put(tuple.nioBuffer()); - } - - private int getFieldOffset(int fieldId) { - return UNSAFE.getInt(bb.address() + relativePos + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT)); - } - - public long getFieldAddr(int fieldId) { - int fieldOffset = getFieldOffset(fieldId); - if (fieldOffset == -1) { - throw new RuntimeException("Invalid Field Access: " + fieldId); - } - return bb.address() + relativePos + fieldOffset; - } - - @Override - public boolean contains(int fieldid) { - return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET; - } - - @Override - public boolean isNull(int fieldid) { - return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET; - } - - @Override - public boolean isNotNull(int fieldid) { - return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET; - } - - @Override - public void clear() { - // nothing to do - } - - @Override - public void put(int fieldId, Datum value) { - throw new UnsupportedException("UnSafeTuple does not support put(int, Datum)."); - } - - @Override - public void put(int fieldId, Datum[] values) { - throw new UnsupportedException("UnSafeTuple does not support put(int, Datum [])."); - } - - @Override - public void put(int fieldId, Tuple tuple) { - throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple)."); - } - - @Override - public void put(Datum[] values) { - throw new UnsupportedException("UnSafeTuple does not support put(Datum [])."); - } - - @Override - public Datum get(int fieldId) { - if (isNull(fieldId)) { - return NullDatum.get(); - } - - switch (types[fieldId].getType()) { - case BOOLEAN: - return DatumFactory.createBool(getBool(fieldId)); - case INT1: - case INT2: - return DatumFactory.createInt2(getInt2(fieldId)); - case INT4: - return DatumFactory.createInt4(getInt4(fieldId)); - case INT8: - return DatumFactory.createInt8(getInt4(fieldId)); - case FLOAT4: - return DatumFactory.createFloat4(getFloat4(fieldId)); - case FLOAT8: - return DatumFactory.createFloat8(getFloat8(fieldId)); - case TEXT: - return DatumFactory.createText(getText(fieldId)); - case TIMESTAMP: - return DatumFactory.createTimestamp(getInt8(fieldId)); - case DATE: - return DatumFactory.createDate(getInt4(fieldId)); - case TIME: - return DatumFactory.createTime(getInt8(fieldId)); - case INTERVAL: - return getInterval(fieldId); - case INET4: - return DatumFactory.createInet4(getInt4(fieldId)); - case PROTOBUF: - return getProtobufDatum(fieldId); - default: - throw new UnsupportedException("Unknown type: " + types[fieldId]); - } - } - - @Override - public void setOffset(long offset) { - } - - @Override - public long getOffset() { - return 0; - } - - @Override - public boolean getBool(int fieldId) { - return UNSAFE.getByte(getFieldAddr(fieldId)) == 0x01; - } - - @Override - public byte getByte(int fieldId) { - return UNSAFE.getByte(getFieldAddr(fieldId)); - } - - @Override - public char getChar(int fieldId) { - return UNSAFE.getChar(getFieldAddr(fieldId)); - } - - @Override - public byte[] getBytes(int fieldId) { - long pos = getFieldAddr(fieldId); - int len = UNSAFE.getInt(pos); - pos += SizeOf.SIZE_OF_INT; - - byte [] bytes = new byte[len]; - UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); - return bytes; - } - - @Override - public short getInt2(int fieldId) { - long addr = getFieldAddr(fieldId); - return UNSAFE.getShort(addr); - } - - @Override - public int getInt4(int fieldId) { - return UNSAFE.getInt(getFieldAddr(fieldId)); - } - - @Override - public long getInt8(int fieldId) { - return UNSAFE.getLong(getFieldAddr(fieldId)); - } - - @Override - public float getFloat4(int fieldId) { - return UNSAFE.getFloat(getFieldAddr(fieldId)); - } - - @Override - public double getFloat8(int fieldId) { - return UNSAFE.getDouble(getFieldAddr(fieldId)); - } - - @Override - public String getText(int fieldId) { - long pos = getFieldAddr(fieldId); - int len = UNSAFE.getInt(pos); - pos += SizeOf.SIZE_OF_INT; - - byte [] bytes = new byte[len]; - UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); - return new String(bytes); - } - - public IntervalDatum getInterval(int fieldId) { - long pos = getFieldAddr(fieldId); - int months = UNSAFE.getInt(pos); - pos += SizeOf.SIZE_OF_INT; - long millisecs = UNSAFE.getLong(pos); - return new IntervalDatum(months, millisecs); - } - - @Override - public Datum getProtobufDatum(int fieldId) { - byte [] bytes = getBytes(fieldId); - - ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode()); - Message.Builder builder = factory.newBuilder(); - try { - builder.mergeFrom(bytes); - } catch (InvalidProtocolBufferException e) { - return NullDatum.get(); - } - - return new ProtobufDatum(builder.build()); - } - - @Override - public char[] getUnicodeChars(int fieldId) { - long pos = getFieldAddr(fieldId); - int len = UNSAFE.getInt(pos); - pos += SizeOf.SIZE_OF_INT; - - byte [] bytes = new byte[len]; - UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); - return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8")); - } - - @Override - public Tuple clone() throws CloneNotSupportedException { - return toHeapTuple(); - } - - @Override - public Datum[] getValues() { - Datum [] datums = new Datum[size()]; - for (int i = 0; i < size(); i++) { - if (contains(i)) { - datums[i] = get(i); - } else { - datums[i] = NullDatum.get(); - } - } - return datums; - } - - @Override - public String toString() { - return VTuple.toDisplayString(getValues()); - } - - public abstract void release(); -}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java deleted file mode 100644 index 73e1e2f..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java +++ /dev/null @@ -1,99 +0,0 @@ -/*** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import com.google.common.primitives.Longs; -import com.google.common.primitives.UnsignedLongs; -import org.apache.tajo.util.SizeOf; -import org.apache.tajo.util.UnsafeUtil; -import sun.misc.Unsafe; - -import java.nio.ByteOrder; - -/** - * It directly access UTF bytes in UnSafeTuple without any copy. It is used by compiled TupleComparator. - */ -public class UnSafeTupleBytesComparator { - private static final Unsafe UNSAFE = UnsafeUtil.unsafe; - - static final boolean littleEndian = - ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN); - - public static int compare(long ptr1, long ptr2) { - int lstrLen = UNSAFE.getInt(ptr1); - int rstrLen = UNSAFE.getInt(ptr2); - - ptr1 += SizeOf.SIZE_OF_INT; - ptr2 += SizeOf.SIZE_OF_INT; - - int minLength = Math.min(lstrLen, rstrLen); - int minWords = minLength / Longs.BYTES; - - /* - * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a - * time is no slower than comparing 4 bytes at a time even on 32-bit. - * On the other hand, it is substantially faster on 64-bit. - */ - for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) { - long lw = UNSAFE.getLong(ptr1); - long rw = UNSAFE.getLong(ptr2); - long diff = lw ^ rw; - - if (diff != 0) { - if (!littleEndian) { - return UnsignedLongs.compare(lw, rw); - } - - // Use binary search - int n = 0; - int y; - int x = (int) diff; - if (x == 0) { - x = (int) (diff >>> 32); - n = 32; - } - - y = x << 16; - if (y == 0) { - n += 16; - } else { - x = y; - } - - y = x << 8; - if (y == 0) { - n += 8; - } - return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL)); - } - - ptr1 += SizeOf.SIZE_OF_LONG; - ptr2 += SizeOf.SIZE_OF_LONG; - } - - // The epilogue to cover the last (minLength % 8) elements. - for (int i = minWords * Longs.BYTES; i < minLength; i++) { - int result = UNSAFE.getByte(ptr1++) - UNSAFE.getByte(ptr2++); - if (result != 0) { - return result; - } - } - return lstrLen - rstrLen; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java deleted file mode 100644 index 51dbb29..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import java.nio.ByteBuffer; - -import static org.apache.tajo.common.TajoDataTypes.DataType; - -public class ZeroCopyTuple extends UnSafeTuple { - - public void set(ByteBuffer bb, int relativePos, int length, DataType [] types) { - super.set(bb, relativePos, length, types); - } - - @Override - public void release() { - // nothing to do - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/proto/IndexProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/proto/IndexProtos.proto b/tajo-storage/src/main/proto/IndexProtos.proto deleted file mode 100644 index f5c8a08..0000000 --- a/tajo-storage/src/main/proto/IndexProtos.proto +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -option java_package = "org.apache.tajo.index"; -option java_outer_classname = "IndexProtos"; -option optimize_for = SPEED; -option java_generic_services = false; -option java_generate_equals_and_hash = true; - -import "CatalogProtos.proto"; - -message TupleComparatorProto { - required SchemaProto schema = 1; - repeated SortSpecProto sortSpecs = 2; - repeated TupleComparatorSpecProto compSpecs = 3; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/resources/storage-default.xml b/tajo-storage/src/main/resources/storage-default.xml deleted file mode 100644 index e861b7d..0000000 --- a/tajo-storage/src/main/resources/storage-default.xml +++ /dev/null @@ -1,175 +0,0 @@ -<?xml version="1.0"?> -<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> - -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> - -<configuration> - <property> - <name>tajo.storage.manager.maxReadBytes</name> - <value>8388608</value> - <description></description> - </property> - - <property> - <name>tajo.storage.manager.concurrency.perDisk</name> - <value>1</value> - <description></description> - </property> - - <!--- Registered Scanner Handler --> - <property> - <name>tajo.storage.scanner-handler</name> - <value>textfile,csv,json,raw,rcfile,row,parquet,sequencefile,avro</value> - </property> - - <!--- Fragment Class Configurations --> - <property> - <name>tajo.storage.fragment.textfile.class</name> - <value>org.apache.tajo.storage.fragment.FileFragment</value> - </property> - <property> - <name>tajo.storage.fragment.csv.class</name> - <value>org.apache.tajo.storage.fragment.FileFragment</value> - </property> - <property> - <name>tajo.storage.fragment.json.class</name> - <value>org.apache.tajo.storage.fragment.FileFragment</value> - </property> - <property> - <name>tajo.storage.fragment.raw.class</name> - <value>org.apache.tajo.storage.fragment.FileFragment</value> - </property> - <property> - <name>tajo.storage.fragment.rcfile.class</name> - <value>org.apache.tajo.storage.fragment.FileFragment</value> - </property> - <property> - <name>tajo.storage.fragment.row.class</name> - <value>org.apache.tajo.storage.fragment.FileFragment</value> - </property> - <property> - <name>tajo.storage.fragment.parquet.class</name> - <value>org.apache.tajo.storage.fragment.FileFragment</value> - </property> - <property> - <name>tajo.storage.fragment.sequencefile.class</name> - <value>org.apache.tajo.storage.fragment.FileFragment</value> - </property> - <property> - <name>tajo.storage.fragment.avro.class</name> - <value>org.apache.tajo.storage.fragment.FileFragment</value> - </property> - - <!--- Scanner Handler --> - <property> - <name>tajo.storage.scanner-handler.textfile.class</name> - <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value> - </property> - - <property> - <name>tajo.storage.scanner-handler.csv.class</name> - <value>org.apache.tajo.storage.CSVFile$CSVScanner</value> - </property> - - <property> - <name>tajo.storage.scanner-handler.json.class</name> - <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value> - </property> - - <property> - <name>tajo.storage.scanner-handler.raw.class</name> - <value>org.apache.tajo.storage.RawFile$RawFileScanner</value> - </property> - - <property> - <name>tajo.storage.scanner-handler.rcfile.class</name> - <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value> - </property> - - <property> - <name>tajo.storage.scanner-handler.rowfile.class</name> - <value>org.apache.tajo.storage.RowFile$RowFileScanner</value> - </property> - - <property> - <name>tajo.storage.scanner-handler.parquet.class</name> - <value>org.apache.tajo.storage.parquet.ParquetScanner</value> - </property> - - <property> - <name>tajo.storage.scanner-handler.sequencefile.class</name> - <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value> - </property> - - <property> - <name>tajo.storage.scanner-handler.avro.class</name> - <value>org.apache.tajo.storage.avro.AvroScanner</value> - </property> - - <!--- Appender Handler --> - <property> - <name>tajo.storage.appender-handler</name> - <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro</value> - </property> - - <property> - <name>tajo.storage.appender-handler.textfile.class</name> - <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value> - </property> - - <property> - <name>tajo.storage.appender-handler.csv.class</name> - <value>org.apache.tajo.storage.CSVFile$CSVAppender</value> - </property> - - <property> - <name>tajo.storage.appender-handler.json.class</name> - <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value> - </property> - - <property> - <name>tajo.storage.appender-handler.raw.class</name> - <value>org.apache.tajo.storage.RawFile$RawFileAppender</value> - </property> - - <property> - <name>tajo.storage.appender-handler.rcfile.class</name> - <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value> - </property> - - <property> - <name>tajo.storage.appender-handler.rowfile.class</name> - <value>org.apache.tajo.storage.RowFile$RowFileAppender</value> - </property> - - <property> - <name>tajo.storage.appender-handler.parquet.class</name> - <value>org.apache.tajo.storage.parquet.ParquetAppender</value> - </property> - - <property> - <name>tajo.storage.appender-handler.sequencefile.class</name> - <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value> - </property> - - <property> - <name>tajo.storage.appender-handler.avro.class</name> - <value>org.apache.tajo.storage.avro.AvroAppender</value> - </property> -</configuration> http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java deleted file mode 100644 index cf8a54e..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.net.NetUtils; -import org.jboss.netty.bootstrap.ServerBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.group.ChannelGroup; -import org.jboss.netty.channel.group.ChannelGroupFuture; -import org.jboss.netty.channel.group.DefaultChannelGroup; -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; - -import java.net.InetSocketAddress; -import java.util.concurrent.Executors; - -public class HttpFileServer { - private final static Log LOG = LogFactory.getLog(HttpFileServer.class); - - private final InetSocketAddress addr; - private InetSocketAddress bindAddr; - private ServerBootstrap bootstrap = null; - private ChannelFactory factory = null; - private ChannelGroup channelGroup = null; - - public HttpFileServer(final InetSocketAddress addr) { - this.addr = addr; - this.factory = new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), - 2); - - // Configure the server. - this.bootstrap = new ServerBootstrap(factory); - // Set up the event pipeline factory. - this.bootstrap.setPipelineFactory(new HttpFileServerPipelineFactory()); - this.channelGroup = new DefaultChannelGroup(); - } - - public HttpFileServer(String bindaddr) { - this(NetUtils.createSocketAddr(bindaddr)); - } - - public void start() { - // Bind and start to accept incoming connections. - Channel channel = bootstrap.bind(addr); - channelGroup.add(channel); - this.bindAddr = (InetSocketAddress) channel.getLocalAddress(); - LOG.info("HttpFileServer starts up (" - + this.bindAddr.getAddress().getHostAddress() + ":" + this.bindAddr.getPort() - + ")"); - } - - public InetSocketAddress getBindAddress() { - return this.bindAddr; - } - - public void stop() { - ChannelGroupFuture future = channelGroup.close(); - future.awaitUninterruptibly(); - factory.releaseExternalResources(); - - LOG.info("HttpFileServer shutdown (" - + this.bindAddr.getAddress().getHostAddress() + ":" - + this.bindAddr.getPort() + ")"); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java deleted file mode 100644 index 6c77317..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java +++ /dev/null @@ -1,184 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo; - -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.*; -import org.jboss.netty.handler.codec.frame.TooLongFrameException; -import org.jboss.netty.handler.codec.http.DefaultHttpResponse; -import org.jboss.netty.handler.codec.http.HttpRequest; -import org.jboss.netty.handler.codec.http.HttpResponse; -import org.jboss.netty.handler.codec.http.HttpResponseStatus; -import org.jboss.netty.handler.ssl.SslHandler; -import org.jboss.netty.handler.stream.ChunkedFile; -import org.jboss.netty.util.CharsetUtil; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.RandomAccessFile; -import java.io.UnsupportedEncodingException; -import java.net.URLDecoder; - -import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; -import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive; -import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength; -import static org.jboss.netty.handler.codec.http.HttpMethod.GET; -import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*; -import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; - -/** - * this is an implementation copied from HttpStaticFileServerHandler.java of netty 3.6 - */ -public class HttpFileServerHandler extends SimpleChannelUpstreamHandler { - - @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - HttpRequest request = (HttpRequest) e.getMessage(); - if (request.getMethod() != GET) { - sendError(ctx, METHOD_NOT_ALLOWED); - return; - } - - final String path = sanitizeUri(request.getUri()); - if (path == null) { - sendError(ctx, FORBIDDEN); - return; - } - - File file = new File(path); - if (file.isHidden() || !file.exists()) { - sendError(ctx, NOT_FOUND); - return; - } - if (!file.isFile()) { - sendError(ctx, FORBIDDEN); - return; - } - - RandomAccessFile raf; - try { - raf = new RandomAccessFile(file, "r"); - } catch (FileNotFoundException fnfe) { - sendError(ctx, NOT_FOUND); - return; - } - long fileLength = raf.length(); - - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); - setContentLength(response, fileLength); - setContentTypeHeader(response); - - Channel ch = e.getChannel(); - - // Write the initial line and the header. - ch.write(response); - - // Write the content. - ChannelFuture writeFuture; - if (ch.getPipeline().get(SslHandler.class) != null) { - // Cannot use zero-copy with HTTPS. - writeFuture = ch.write(new ChunkedFile(raf, 0, fileLength, 8192)); - } else { - // No encryption - use zero-copy. - final FileRegion region = - new DefaultFileRegion(raf.getChannel(), 0, fileLength); - writeFuture = ch.write(region); - writeFuture.addListener(new ChannelFutureProgressListener() { - public void operationComplete(ChannelFuture future) { - region.releaseExternalResources(); - } - - public void operationProgressed( - ChannelFuture future, long amount, long current, long total) { - System.out.printf("%s: %d / %d (+%d)%n", path, current, total, amount); - } - }); - } - - // Decide whether to close the connection or not. - if (!isKeepAlive(request)) { - // Close the connection when the whole content is written out. - writeFuture.addListener(ChannelFutureListener.CLOSE); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) - throws Exception { - Channel ch = e.getChannel(); - Throwable cause = e.getCause(); - if (cause instanceof TooLongFrameException) { - sendError(ctx, BAD_REQUEST); - return; - } - - cause.printStackTrace(); - if (ch.isConnected()) { - sendError(ctx, INTERNAL_SERVER_ERROR); - } - } - - private static String sanitizeUri(String uri) { - // Decode the path. - try { - uri = URLDecoder.decode(uri, "UTF-8"); - } catch (UnsupportedEncodingException e) { - try { - uri = URLDecoder.decode(uri, "ISO-8859-1"); - } catch (UnsupportedEncodingException e1) { - throw new Error(); - } - } - - // Convert file separators. - uri = uri.replace('/', File.separatorChar); - - // Simplistic dumb security check. - // You will have to do something serious in the production environment. - if (uri.contains(File.separator + '.') || - uri.contains('.' + File.separator) || - uri.startsWith(".") || uri.endsWith(".")) { - return null; - } - - return uri; - } - - private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); - response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); - response.setContent(ChannelBuffers.copiedBuffer( - "Failure: " + status.toString() + "\r\n", - CharsetUtil.UTF_8)); - - // Close the connection as soon as the error message is sent. - ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE); - } - - /** - * Sets the content type header for the HTTP Response - * - * @param response - * HTTP response - */ - private static void setContentTypeHeader(HttpResponse response) { - response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); - } - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java deleted file mode 100644 index cecf93b..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo; - -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.handler.codec.http.HttpChunkAggregator; -import org.jboss.netty.handler.codec.http.HttpRequestDecoder; -import org.jboss.netty.handler.codec.http.HttpResponseEncoder; -import org.jboss.netty.handler.stream.ChunkedWriteHandler; - -import static org.jboss.netty.channel.Channels.pipeline; - -// Uncomment the following lines if you want HTTPS -//import javax.net.ssl.SSLEngine; -//import org.jboss.netty.example.securechat.SecureChatSslContextFactory; -//import org.jboss.netty.handler.ssl.SslHandler; - -//this class is copied from HttpStaticFileServerPipelineFactory.java of netty 3.6 -public class HttpFileServerPipelineFactory implements ChannelPipelineFactory { - public ChannelPipeline getPipeline() throws Exception { - // Create a default pipeline implementation. - ChannelPipeline pipeline = pipeline(); - - // Uncomment the following lines if you want HTTPS - //SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine(); - //engine.setUseClientMode(false); - //pipeline.addLast("ssl", new SslHandler(engine)); - - pipeline.addLast("decoder", new HttpRequestDecoder()); - pipeline.addLast("aggregator", new HttpChunkAggregator(65536)); - pipeline.addLast("encoder", new HttpResponseEncoder()); - pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); - - pipeline.addLast("handler", new HttpFileServerHandler()); - return pipeline; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java deleted file mode 100644 index fd5a63e..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java +++ /dev/null @@ -1,185 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.compress.*; -import org.apache.hadoop.io.compress.zlib.ZlibFactory; -import org.apache.hadoop.util.NativeCodeLoader; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.sequencefile.SequenceFileScanner; -import org.apache.tajo.storage.text.DelimitedTextFile; -import org.apache.tajo.util.CommonTestingUtil; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; - -import static org.junit.Assert.*; - -@RunWith(Parameterized.class) -public class TestCompressionStorages { - private TajoConf conf; - private static String TEST_PATH = "target/test-data/TestCompressionStorages"; - - private StoreType storeType; - private Path testDir; - private FileSystem fs; - - public TestCompressionStorages(StoreType type) throws IOException { - this.storeType = type; - conf = new TajoConf(); - - testDir = CommonTestingUtil.getTestDir(TEST_PATH); - fs = testDir.getFileSystem(conf); - } - - @Parameterized.Parameters - public static Collection<Object[]> generateParameters() { - return Arrays.asList(new Object[][]{ - {StoreType.CSV}, - {StoreType.RCFILE}, - {StoreType.SEQUENCEFILE}, - {StoreType.TEXTFILE} - }); - } - - @Test - public void testDeflateCodecCompressionData() throws IOException { - storageCompressionTest(storeType, DeflateCodec.class); - } - - @Test - public void testGzipCodecCompressionData() throws IOException { - if (storeType == StoreType.RCFILE) { - if( ZlibFactory.isNativeZlibLoaded(conf)) { - storageCompressionTest(storeType, GzipCodec.class); - } - } else if (storeType == StoreType.SEQUENCEFILE) { - if( ZlibFactory.isNativeZlibLoaded(conf)) { - storageCompressionTest(storeType, GzipCodec.class); - } - } else { - storageCompressionTest(storeType, GzipCodec.class); - } - } - - @Test - public void testSnappyCodecCompressionData() throws IOException { - if (SnappyCodec.isNativeCodeLoaded()) { - storageCompressionTest(storeType, SnappyCodec.class); - } - } - - @Test - public void testLz4CodecCompressionData() throws IOException { - if(NativeCodeLoader.isNativeCodeLoaded() && Lz4Codec.isNativeCodeLoaded()) - storageCompressionTest(storeType, Lz4Codec.class); - } - - private void storageCompressionTest(StoreType storeType, Class<? extends CompressionCodec> codec) throws IOException { - Schema schema = new Schema(); - schema.addColumn("id", Type.INT4); - schema.addColumn("age", Type.FLOAT4); - schema.addColumn("name", Type.TEXT); - - TableMeta meta = CatalogUtil.newTableMeta(storeType); - meta.putOption("compression.codec", codec.getCanonicalName()); - meta.putOption("compression.type", SequenceFile.CompressionType.BLOCK.name()); - meta.putOption("rcfile.serde", TextSerializerDeserializer.class.getName()); - meta.putOption("sequencefile.serde", TextSerializerDeserializer.class.getName()); - - String fileName = "Compression_" + codec.getSimpleName(); - Path tablePath = new Path(testDir, fileName); - Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath); - appender.enableStats(); - - appender.init(); - - String extension = ""; - if (appender instanceof CSVFile.CSVAppender) { - extension = ((CSVFile.CSVAppender) appender).getExtension(); - } else if (appender instanceof DelimitedTextFile.DelimitedTextFileAppender) { - extension = ((DelimitedTextFile.DelimitedTextFileAppender) appender).getExtension(); - } - - int tupleNum = 100000; - VTuple vTuple; - - for (int i = 0; i < tupleNum; i++) { - vTuple = new VTuple(3); - vTuple.put(0, DatumFactory.createInt4(i + 1)); - vTuple.put(1, DatumFactory.createFloat4((float) i)); - vTuple.put(2, DatumFactory.createText(String.valueOf(i))); - appender.addTuple(vTuple); - } - appender.close(); - - TableStats stat = appender.getStats(); - assertEquals(tupleNum, stat.getNumRows().longValue()); - tablePath = tablePath.suffix(extension); - FileStatus status = fs.getFileStatus(tablePath); - long fileLen = status.getLen(); - FileFragment[] tablets = new FileFragment[1]; - tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen); - - Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema); - - if (StoreType.CSV == storeType) { - if (SplittableCompressionCodec.class.isAssignableFrom(codec)) { - assertTrue(scanner.isSplittable()); - } else { - assertFalse(scanner.isSplittable()); - } - } - scanner.init(); - - if (storeType == StoreType.SEQUENCEFILE) { - assertTrue(scanner instanceof SequenceFileScanner); - Writable key = ((SequenceFileScanner) scanner).getKey(); - assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName()); - } - - int tupleCnt = 0; - Tuple tuple; - while ((tuple = scanner.next()) != null) { - tupleCnt++; - } - scanner.close(); - assertEquals(tupleNum, tupleCnt); - assertNotSame(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); - assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java deleted file mode 100644 index 93fb12b..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.util.FileUtil; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.net.URL; - -import static org.junit.Assert.*; - -public class TestDelimitedTextFile { - - private static Schema schema = new Schema(); - - private static Tuple baseTuple = new VTuple(10); - - static { - schema.addColumn("col1", Type.BOOLEAN); - schema.addColumn("col2", Type.CHAR, 7); - schema.addColumn("col3", Type.INT2); - schema.addColumn("col4", Type.INT4); - schema.addColumn("col5", Type.INT8); - schema.addColumn("col6", Type.FLOAT4); - schema.addColumn("col7", Type.FLOAT8); - schema.addColumn("col8", Type.TEXT); - schema.addColumn("col9", Type.BLOB); - schema.addColumn("col10", Type.INET4); - - baseTuple.put(new Datum[] { - DatumFactory.createBool(true), // 0 - DatumFactory.createChar("hyunsik"), // 1 - DatumFactory.createInt2((short) 17), // 2 - DatumFactory.createInt4(59), // 3 - DatumFactory.createInt8(23l), // 4 - DatumFactory.createFloat4(77.9f), // 5 - DatumFactory.createFloat8(271.9d), // 6 - DatumFactory.createText("hyunsik"), // 7 - DatumFactory.createBlob("hyunsik".getBytes()),// 8 - DatumFactory.createInet4("192.168.0.1"), // 9 - }); - } - - public static Path getResourcePath(String path, String suffix) { - URL resultBaseURL = ClassLoader.getSystemResource(path); - return new Path(resultBaseURL.toString(), suffix); - } - - public static Path getResultPath(Class clazz, String fileName) { - return new Path (getResourcePath("results", clazz.getSimpleName()), fileName); - } - - public static String getResultText(Class clazz, String fileName) throws IOException { - FileSystem localFS = FileSystem.getLocal(new Configuration()); - Path path = getResultPath(clazz, fileName); - Preconditions.checkState(localFS.exists(path) && localFS.isFile(path)); - return FileUtil.readTextFile(new File(path.toUri())); - } - - private static final FileFragment getFileFragment(String fileName) throws IOException { - TajoConf conf = new TajoConf(); - Path tablePath = new Path(getResourcePath("dataset", "TestDelimitedTextFile"), fileName); - FileSystem fs = FileSystem.getLocal(conf); - FileStatus status = fs.getFileStatus(tablePath); - return new FileFragment("table", tablePath, 0, status.getLen()); - } - - @Test - public void testIgnoreAllErrors() throws IOException { - TajoConf conf = new TajoConf(); - - TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON); - meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "-1"); - FileFragment fragment = getFileFragment("testErrorTolerance1.json"); - Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment); - scanner.init(); - - Tuple tuple; - int i = 0; - while ((tuple = scanner.next()) != null) { - assertEquals(baseTuple, tuple); - i++; - } - assertEquals(3, i); - scanner.close(); - } - - @Test - public void testIgnoreOneErrorTolerance() throws IOException { - - - TajoConf conf = new TajoConf(); - - TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON); - meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1"); - FileFragment fragment = getFileFragment("testErrorTolerance1.json"); - Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment); - scanner.init(); - - assertNotNull(scanner.next()); - assertNotNull(scanner.next()); - try { - scanner.next(); - } catch (IOException ioe) { - System.out.println(ioe); - return; - } finally { - scanner.close(); - } - fail(); - } - - @Test - public void testNoErrorTolerance() throws IOException { - TajoConf conf = new TajoConf(); - TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON); - meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "0"); - FileFragment fragment = getFileFragment("testErrorTolerance2.json"); - Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment); - scanner.init(); - - try { - scanner.next(); - } catch (IOException ioe) { - return; - } finally { - scanner.close(); - } - fail(); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java deleted file mode 100644 index bec0daf..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java +++ /dev/null @@ -1,138 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements.??See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership.??The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License.??You may obtain a copy of the License at - * - *?????http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.storage.fragment.FileFragment; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.IOException; -import java.net.URI; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -@RunWith(Parameterized.class) -public class TestFileSystems { - - private static String TEST_PATH = "target/test-data/TestFileSystem"; - private Configuration conf; - private StorageManager sm; - private FileSystem fs; - private Path testDir; - - public TestFileSystems(FileSystem fs) throws IOException { - this.fs = fs; - this.conf = fs.getConf(); - this.testDir = getTestDir(this.fs, TEST_PATH); - this.sm = StorageManager.getStorageManager(new TajoConf(this.conf)); - } - - public Path getTestDir(FileSystem fs, String dir) throws IOException { - Path path = new Path(dir); - if (fs.exists(path)) - fs.delete(path, true); - - fs.mkdirs(path); - - return fs.makeQualified(path); - } - - @Parameterized.Parameters - public static Collection<Object[]> generateParameters() throws IOException { - return Arrays.asList(new Object[][]{ - {FileSystem.getLocal(new TajoConf())}, - }); - } - - @Before - public void setup() throws IOException { - if (!(fs instanceof LocalFileSystem)) { - conf.set("fs.local.block.size", "10"); - fs.initialize(URI.create(fs.getScheme() + ":///"), conf); - fs.setConf(conf); - } - } - - @After - public void tearDown() throws IOException { - if (!(fs instanceof LocalFileSystem)) { - fs.setConf(new TajoConf()); - } - } - - @Test - public void testBlockSplit() throws IOException { - - Schema schema = new Schema(); - schema.addColumn("id", Type.INT4); - schema.addColumn("age", Type.INT4); - schema.addColumn("name", Type.TEXT); - - TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV); - - Tuple[] tuples = new Tuple[4]; - for (int i = 0; i < tuples.length; i++) { - tuples[i] = new VTuple(3); - tuples[i] - .put(new Datum[]{DatumFactory.createInt4(i), - DatumFactory.createInt4(i + 32), - DatumFactory.createText("name" + i)}); - } - - Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", - "table.csv"); - fs.mkdirs(path.getParent()); - - Appender appender = sm.getAppender(meta, schema, path); - appender.init(); - for (Tuple t : tuples) { - appender.addTuple(t); - } - appender.close(); - FileStatus fileStatus = fs.getFileStatus(path); - - List<FileFragment> splits = sm.getSplits("table", meta, schema, path); - int splitSize = (int) Math.ceil(fileStatus.getLen() / (double) fileStatus.getBlockSize()); - assertEquals(splitSize, splits.size()); - - for (FileFragment fragment : splits) { - assertTrue(fragment.getEndKey() <= fileStatus.getBlockSize()); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java deleted file mode 100644 index 387fed5..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.DatumFactory; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class TestFrameTuple { - private Tuple tuple1; - private Tuple tuple2; - - @Before - public void setUp() throws Exception { - tuple1 = new VTuple(11); - tuple1.put(new Datum[] { - DatumFactory.createBool(true), - DatumFactory.createBit((byte) 0x99), - DatumFactory.createChar('9'), - DatumFactory.createInt2((short) 17), - DatumFactory.createInt4(59), - DatumFactory.createInt8(23l), - DatumFactory.createFloat4(77.9f), - DatumFactory.createFloat8(271.9f), - DatumFactory.createText("hyunsik"), - DatumFactory.createBlob("hyunsik".getBytes()), - DatumFactory.createInet4("192.168.0.1") - }); - - tuple2 = new VTuple(11); - tuple2.put(new Datum[] { - DatumFactory.createBool(true), - DatumFactory.createBit((byte) 0x99), - DatumFactory.createChar('9'), - DatumFactory.createInt2((short) 17), - DatumFactory.createInt4(59), - DatumFactory.createInt8(23l), - DatumFactory.createFloat4(77.9f), - DatumFactory.createFloat8(271.9f), - DatumFactory.createText("hyunsik"), - DatumFactory.createBlob("hyunsik".getBytes()), - DatumFactory.createInet4("192.168.0.1") - }); - } - - @After - public void tearDown() throws Exception { - } - - @Test - public final void testFrameTuple() { - Tuple frame = new FrameTuple(tuple1, tuple2); - assertEquals(22, frame.size()); - for (int i = 0; i < 22; i++) { - assertTrue(frame.contains(i)); - } - - assertEquals(DatumFactory.createInt8(23l), frame.get(5)); - assertEquals(DatumFactory.createInt8(23l), frame.get(16)); - assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(10)); - assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(21)); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java deleted file mode 100644 index c6149f7..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java +++ /dev/null @@ -1,258 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - - -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.util.BytesUtils; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.*; - -public class TestLazyTuple { - - Schema schema; - byte[][] textRow; - byte[] nullbytes; - SerializerDeserializer serde; - - @Before - public void setUp() { - nullbytes = "\\N".getBytes(); - - schema = new Schema(); - schema.addColumn("col1", TajoDataTypes.Type.BOOLEAN); - schema.addColumn("col2", TajoDataTypes.Type.BIT); - schema.addColumn("col3", TajoDataTypes.Type.CHAR, 7); - schema.addColumn("col4", TajoDataTypes.Type.INT2); - schema.addColumn("col5", TajoDataTypes.Type.INT4); - schema.addColumn("col6", TajoDataTypes.Type.INT8); - schema.addColumn("col7", TajoDataTypes.Type.FLOAT4); - schema.addColumn("col8", TajoDataTypes.Type.FLOAT8); - schema.addColumn("col9", TajoDataTypes.Type.TEXT); - schema.addColumn("col10", TajoDataTypes.Type.BLOB); - schema.addColumn("col11", TajoDataTypes.Type.INET4); - schema.addColumn("col12", TajoDataTypes.Type.INT4); - schema.addColumn("col13", TajoDataTypes.Type.NULL_TYPE); - - StringBuilder sb = new StringBuilder(); - sb.append(DatumFactory.createBool(true)).append('|'); - sb.append(new String(DatumFactory.createBit((byte) 0x99).asTextBytes())).append('|'); - sb.append(DatumFactory.createChar("str")).append('|'); - sb.append(DatumFactory.createInt2((short) 17)).append('|'); - sb.append(DatumFactory.createInt4(59)).append('|'); - sb.append(DatumFactory.createInt8(23l)).append('|'); - sb.append(DatumFactory.createFloat4(77.9f)).append('|'); - sb.append(DatumFactory.createFloat8(271.9f)).append('|'); - sb.append(DatumFactory.createText("str2")).append('|'); - sb.append(DatumFactory.createBlob("jinho".getBytes())).append('|'); - sb.append(DatumFactory.createInet4("192.168.0.1")).append('|'); - sb.append(new String(nullbytes)).append('|'); - sb.append(NullDatum.get()); - textRow = BytesUtils.splitPreserveAllTokens(sb.toString().getBytes(), '|'); - serde = new TextSerializerDeserializer(); - } - - @Test - public void testGetDatum() { - - LazyTuple t1 = new LazyTuple(schema, textRow, -1, nullbytes, serde); - assertEquals(DatumFactory.createBool(true), t1.get(0)); - assertEquals(DatumFactory.createBit((byte) 0x99), t1.get(1)); - assertEquals(DatumFactory.createChar("str"), t1.get(2)); - assertEquals(DatumFactory.createInt2((short) 17), t1.get(3)); - assertEquals(DatumFactory.createInt4(59), t1.get(4)); - assertEquals(DatumFactory.createInt8(23l), t1.get(5)); - assertEquals(DatumFactory.createFloat4(77.9f), t1.get(6)); - assertEquals(DatumFactory.createFloat8(271.9f), t1.get(7)); - assertEquals(DatumFactory.createText("str2"), t1.get(8)); - assertEquals(DatumFactory.createBlob("jinho".getBytes()), t1.get(9)); - assertEquals(DatumFactory.createInet4("192.168.0.1"), t1.get(10)); - assertEquals(NullDatum.get(), t1.get(11)); - assertEquals(NullDatum.get(), t1.get(12)); - } - - @Test - public void testContain() { - int colNum = schema.size(); - - LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1); - t1.put(0, DatumFactory.createInt4(1)); - t1.put(3, DatumFactory.createInt4(1)); - t1.put(7, DatumFactory.createInt4(1)); - - assertTrue(t1.contains(0)); - assertFalse(t1.contains(1)); - assertFalse(t1.contains(2)); - assertTrue(t1.contains(3)); - assertFalse(t1.contains(4)); - assertFalse(t1.contains(5)); - assertFalse(t1.contains(6)); - assertTrue(t1.contains(7)); - assertFalse(t1.contains(8)); - assertFalse(t1.contains(9)); - assertFalse(t1.contains(10)); - assertFalse(t1.contains(11)); - assertFalse(t1.contains(12)); - } - - @Test - public void testPut() { - int colNum = schema.size(); - LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1); - t1.put(0, DatumFactory.createText("str")); - t1.put(1, DatumFactory.createInt4(2)); - t1.put(11, DatumFactory.createFloat4(0.76f)); - - assertTrue(t1.contains(0)); - assertTrue(t1.contains(1)); - - assertEquals(t1.getText(0), "str"); - assertEquals(t1.get(1).asInt4(), 2); - assertTrue(t1.get(11).asFloat4() == 0.76f); - } - - @Test - public void testEquals() { - int colNum = schema.size(); - LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1); - LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1); - - t1.put(0, DatumFactory.createInt4(1)); - t1.put(1, DatumFactory.createInt4(2)); - t1.put(3, DatumFactory.createInt4(2)); - - t2.put(0, DatumFactory.createInt4(1)); - t2.put(1, DatumFactory.createInt4(2)); - t2.put(3, DatumFactory.createInt4(2)); - - assertEquals(t1, t2); - - Tuple t3 = new VTuple(colNum); - t3.put(0, DatumFactory.createInt4(1)); - t3.put(1, DatumFactory.createInt4(2)); - t3.put(3, DatumFactory.createInt4(2)); - assertEquals(t1, t3); - assertEquals(t2, t3); - - LazyTuple t4 = new LazyTuple(schema, new byte[colNum][], -1); - assertNotSame(t1, t4); - } - - @Test - public void testHashCode() { - int colNum = schema.size(); - LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1); - LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1); - - t1.put(0, DatumFactory.createInt4(1)); - t1.put(1, DatumFactory.createInt4(2)); - t1.put(3, DatumFactory.createInt4(2)); - t1.put(4, DatumFactory.createText("str")); - - t2.put(0, DatumFactory.createInt4(1)); - t2.put(1, DatumFactory.createInt4(2)); - t2.put(3, DatumFactory.createInt4(2)); - t2.put(4, DatumFactory.createText("str")); - - assertEquals(t1.hashCode(), t2.hashCode()); - - Tuple t3 = new VTuple(colNum); - t3.put(0, DatumFactory.createInt4(1)); - t3.put(1, DatumFactory.createInt4(2)); - t3.put(3, DatumFactory.createInt4(2)); - t3.put(4, DatumFactory.createText("str")); - assertEquals(t1.hashCode(), t3.hashCode()); - assertEquals(t2.hashCode(), t3.hashCode()); - - Tuple t4 = new VTuple(5); - t4.put(0, DatumFactory.createInt4(1)); - t4.put(1, DatumFactory.createInt4(2)); - t4.put(4, DatumFactory.createInt4(2)); - - assertNotSame(t1.hashCode(), t4.hashCode()); - } - - @Test - public void testPutTuple() { - int colNum = schema.size(); - LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1); - - t1.put(0, DatumFactory.createInt4(1)); - t1.put(1, DatumFactory.createInt4(2)); - t1.put(2, DatumFactory.createInt4(3)); - - - Schema schema2 = new Schema(); - schema2.addColumn("col1", TajoDataTypes.Type.INT8); - schema2.addColumn("col2", TajoDataTypes.Type.INT8); - - LazyTuple t2 = new LazyTuple(schema2, new byte[schema2.size()][], -1); - t2.put(0, DatumFactory.createInt4(4)); - t2.put(1, DatumFactory.createInt4(5)); - - t1.put(3, t2); - - for (int i = 0; i < 5; i++) { - assertEquals(i + 1, t1.get(i).asInt4()); - } - } - - @Test - public void testInvalidNumber() { - byte[][] bytes = BytesUtils.splitPreserveAllTokens(" 1| |2 ||".getBytes(), '|'); - Schema schema = new Schema(); - schema.addColumn("col1", TajoDataTypes.Type.INT2); - schema.addColumn("col2", TajoDataTypes.Type.INT4); - schema.addColumn("col3", TajoDataTypes.Type.INT8); - schema.addColumn("col4", TajoDataTypes.Type.FLOAT4); - schema.addColumn("col5", TajoDataTypes.Type.FLOAT8); - - LazyTuple tuple = new LazyTuple(schema, bytes, 0); - assertEquals(bytes.length, tuple.size()); - - for (int i = 0; i < tuple.size(); i++){ - assertEquals(NullDatum.get(), tuple.get(i)); - } - } - - @Test - public void testClone() throws CloneNotSupportedException { - int colNum = schema.size(); - LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1); - - t1.put(0, DatumFactory.createInt4(1)); - t1.put(1, DatumFactory.createInt4(2)); - t1.put(3, DatumFactory.createInt4(2)); - t1.put(4, DatumFactory.createText("str")); - - LazyTuple t2 = (LazyTuple) t1.clone(); - assertNotSame(t1, t2); - assertEquals(t1, t2); - - assertSame(t1.get(4), t2.get(4)); - - t1.clear(); - assertFalse(t1.equals(t2)); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java deleted file mode 100644 index bfaba04..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java +++ /dev/null @@ -1,220 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import io.netty.buffer.ByteBuf; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.compress.DeflateCodec; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.text.ByteBufLineReader; -import org.apache.tajo.storage.text.DelimitedLineReader; -import org.apache.tajo.storage.text.DelimitedTextFile; -import org.apache.tajo.util.CommonTestingUtil; -import org.apache.tajo.util.FileUtil; -import org.junit.Test; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.*; - -public class TestLineReader { - private static String TEST_PATH = "target/test-data/TestLineReader"; - - @Test - public void testByteBufLineReader() throws IOException { - TajoConf conf = new TajoConf(); - Path testDir = CommonTestingUtil.getTestDir(TEST_PATH); - FileSystem fs = testDir.getFileSystem(conf); - - Schema schema = new Schema(); - schema.addColumn("id", Type.INT4); - schema.addColumn("age", Type.INT8); - schema.addColumn("comment", Type.TEXT); - schema.addColumn("comment2", Type.TEXT); - - TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE); - Path tablePath = new Path(testDir, "line.data"); - FileAppender appender = (FileAppender) StorageManager.getStorageManager(conf).getAppender(meta, schema, - tablePath); - appender.enableStats(); - appender.init(); - int tupleNum = 10000; - VTuple vTuple; - - for (int i = 0; i < tupleNum; i++) { - vTuple = new VTuple(4); - vTuple.put(0, DatumFactory.createInt4(i + 1)); - vTuple.put(1, DatumFactory.createInt8(25l)); - vTuple.put(2, DatumFactory.createText("emiya muljomdao")); - vTuple.put(3, NullDatum.get()); - appender.addTuple(vTuple); - } - appender.close(); - - FileStatus status = fs.getFileStatus(tablePath); - - ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(tablePath)); - ByteBufLineReader reader = new ByteBufLineReader(channel); - - long totalRead = 0; - int i = 0; - AtomicInteger bytes = new AtomicInteger(); - for(;;){ - ByteBuf buf = reader.readLineBuf(bytes); - totalRead += bytes.get(); - if(buf == null) break; - i++; - } - IOUtils.cleanup(null, reader, channel, fs); - assertEquals(tupleNum, i); - assertEquals(status.getLen(), totalRead); - assertEquals(status.getLen(), reader.readBytes()); - } - - @Test - public void testLineDelimitedReader() throws IOException { - TajoConf conf = new TajoConf(); - Path testDir = CommonTestingUtil.getTestDir(TEST_PATH); - FileSystem fs = testDir.getFileSystem(conf); - - Schema schema = new Schema(); - schema.addColumn("id", Type.INT4); - schema.addColumn("age", Type.INT8); - schema.addColumn("comment", Type.TEXT); - schema.addColumn("comment2", Type.TEXT); - - TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE); - meta.putOption("compression.codec", DeflateCodec.class.getCanonicalName()); - - Path tablePath = new Path(testDir, "line1." + DeflateCodec.class.getSimpleName()); - FileAppender appender = (FileAppender) StorageManager.getStorageManager(conf).getAppender(meta, schema, - tablePath); - appender.enableStats(); - appender.init(); - int tupleNum = 10000; - VTuple vTuple; - - long splitOffset = 0; - for (int i = 0; i < tupleNum; i++) { - vTuple = new VTuple(4); - vTuple.put(0, DatumFactory.createInt4(i + 1)); - vTuple.put(1, DatumFactory.createInt8(25l)); - vTuple.put(2, DatumFactory.createText("emiya muljomdao")); - vTuple.put(3, NullDatum.get()); - appender.addTuple(vTuple); - - if(i == (tupleNum / 2)){ - splitOffset = appender.getOffset(); - } - } - String extension = ((DelimitedTextFile.DelimitedTextFileAppender) appender).getExtension(); - appender.close(); - - tablePath = tablePath.suffix(extension); - FileFragment fragment = new FileFragment("table", tablePath, 0, splitOffset); - DelimitedLineReader reader = new DelimitedLineReader(conf, fragment); // if file is compressed, will read to EOF - assertTrue(reader.isCompressed()); - assertFalse(reader.isReadable()); - reader.init(); - assertTrue(reader.isReadable()); - - - int i = 0; - while(reader.isReadable()){ - ByteBuf buf = reader.readLine(); - if(buf == null) break; - i++; - } - - IOUtils.cleanup(null, reader, fs); - assertEquals(tupleNum, i); - - } - - @Test - public void testByteBufLineReaderWithoutTerminating() throws IOException { - String path = FileUtil.getResourcePath("dataset/testLineText.txt").getFile(); - File file = new File(path); - String data = FileUtil.readTextFile(file); - - ByteBufInputChannel channel = new ByteBufInputChannel(new FileInputStream(file)); - ByteBufLineReader reader = new ByteBufLineReader(channel); - - long totalRead = 0; - int i = 0; - AtomicInteger bytes = new AtomicInteger(); - for(;;){ - ByteBuf buf = reader.readLineBuf(bytes); - totalRead += bytes.get(); - if(buf == null) break; - i++; - } - IOUtils.cleanup(null, reader); - assertEquals(file.length(), totalRead); - assertEquals(file.length(), reader.readBytes()); - assertEquals(data.split("\n").length, i); - } - - @Test - public void testCRLFLine() throws IOException { - TajoConf conf = new TajoConf(); - Path testFile = new Path(CommonTestingUtil.getTestDir(TEST_PATH), "testCRLFLineText.txt"); - - FileSystem fs = testFile.getFileSystem(conf); - FSDataOutputStream outputStream = fs.create(testFile, true); - outputStream.write("0\r\n1\r\n".getBytes()); - outputStream.flush(); - IOUtils.closeStream(outputStream); - - ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(testFile)); - ByteBufLineReader reader = new ByteBufLineReader(channel, BufferPool.directBuffer(2)); - FileStatus status = fs.getFileStatus(testFile); - - long totalRead = 0; - int i = 0; - AtomicInteger bytes = new AtomicInteger(); - for(;;){ - ByteBuf buf = reader.readLineBuf(bytes); - totalRead += bytes.get(); - if(buf == null) break; - String row = buf.toString(Charset.defaultCharset()); - assertEquals(i, Integer.parseInt(row)); - i++; - } - IOUtils.cleanup(null, reader); - assertEquals(status.getLen(), totalRead); - assertEquals(status.getLen(), reader.readBytes()); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java deleted file mode 100644 index e6714b5..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java +++ /dev/null @@ -1,201 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.util.KeyValueSet; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.util.CommonTestingUtil; -import org.apache.tajo.util.TUtil; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; - -import static org.junit.Assert.*; - -@RunWith(Parameterized.class) -public class TestMergeScanner { - private TajoConf conf; - StorageManager sm; - private static String TEST_PATH = "target/test-data/TestMergeScanner"; - - private static String TEST_MULTIPLE_FILES_AVRO_SCHEMA = - "{\n" + - " \"type\": \"record\",\n" + - " \"namespace\": \"org.apache.tajo\",\n" + - " \"name\": \"testMultipleFiles\",\n" + - " \"fields\": [\n" + - " { \"name\": \"id\", \"type\": \"int\" },\n" + - " { \"name\": \"file\", \"type\": \"string\" },\n" + - " { \"name\": \"name\", \"type\": \"string\" },\n" + - " { \"name\": \"age\", \"type\": \"long\" }\n" + - " ]\n" + - "}\n"; - - private Path testDir; - private StoreType storeType; - private FileSystem fs; - - public TestMergeScanner(StoreType storeType) { - this.storeType = storeType; - } - - @Parameters - public static Collection<Object[]> generateParameters() { - return Arrays.asList(new Object[][] { - {StoreType.CSV}, - {StoreType.RAW}, - {StoreType.RCFILE}, - {StoreType.PARQUET}, - {StoreType.SEQUENCEFILE}, - {StoreType.AVRO}, - // RowFile requires Byte-buffer read support, so we omitted RowFile. - //{StoreType.ROWFILE}, - }); - } - - @Before - public void setup() throws Exception { - conf = new TajoConf(); - conf.setVar(ConfVars.ROOT_DIR, TEST_PATH); - conf.setStrings("tajo.storage.projectable-scanner", "rcfile", "parquet", "avro"); - testDir = CommonTestingUtil.getTestDir(TEST_PATH); - fs = testDir.getFileSystem(conf); - sm = StorageManager.getStorageManager(conf, testDir); - } - - @Test - public void testMultipleFiles() throws IOException { - Schema schema = new Schema(); - schema.addColumn("id", Type.INT4); - schema.addColumn("file", Type.TEXT); - schema.addColumn("name", Type.TEXT); - schema.addColumn("age", Type.INT8); - - KeyValueSet options = new KeyValueSet(); - TableMeta meta = CatalogUtil.newTableMeta(storeType, options); - meta.setOptions(CatalogUtil.newPhysicalProperties(storeType)); - if (storeType == StoreType.AVRO) { - meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, - TEST_MULTIPLE_FILES_AVRO_SCHEMA); - } - - Path table1Path = new Path(testDir, storeType + "_1.data"); - Appender appender1 = StorageManager.getStorageManager(conf).getAppender(meta, schema, table1Path); - appender1.enableStats(); - appender1.init(); - int tupleNum = 10000; - VTuple vTuple; - - for(int i = 0; i < tupleNum; i++) { - vTuple = new VTuple(4); - vTuple.put(0, DatumFactory.createInt4(i + 1)); - vTuple.put(1, DatumFactory.createText("hyunsik")); - vTuple.put(2, DatumFactory.createText("jihoon")); - vTuple.put(3, DatumFactory.createInt8(25l)); - appender1.addTuple(vTuple); - } - appender1.close(); - - TableStats stat1 = appender1.getStats(); - if (stat1 != null) { - assertEquals(tupleNum, stat1.getNumRows().longValue()); - } - - Path table2Path = new Path(testDir, storeType + "_2.data"); - Appender appender2 = StorageManager.getStorageManager(conf).getAppender(meta, schema, table2Path); - appender2.enableStats(); - appender2.init(); - - for(int i = 0; i < tupleNum; i++) { - vTuple = new VTuple(4); - vTuple.put(0, DatumFactory.createInt4(i + 1)); - vTuple.put(1, DatumFactory.createText("hyunsik")); - vTuple.put(2, DatumFactory.createText("jihoon")); - vTuple.put(3, DatumFactory.createInt8(25l)); - appender2.addTuple(vTuple); - } - appender2.close(); - - TableStats stat2 = appender2.getStats(); - if (stat2 != null) { - assertEquals(tupleNum, stat2.getNumRows().longValue()); - } - - - FileStatus status1 = fs.getFileStatus(table1Path); - FileStatus status2 = fs.getFileStatus(table2Path); - FileFragment[] fragment = new FileFragment[2]; - fragment[0] = new FileFragment("tablet1", table1Path, 0, status1.getLen()); - fragment[1] = new FileFragment("tablet1", table2Path, 0, status2.getLen()); - - Schema targetSchema = new Schema(); - targetSchema.addColumn(schema.getColumn(0)); - targetSchema.addColumn(schema.getColumn(2)); - - Scanner scanner = new MergeScanner(conf, schema, meta, TUtil.<FileFragment>newList(fragment), targetSchema); - assertEquals(isProjectableStorage(meta.getStoreType()), scanner.isProjectable()); - - scanner.init(); - int totalCounts = 0; - Tuple tuple; - while ((tuple = scanner.next()) != null) { - totalCounts++; - if (isProjectableStorage(meta.getStoreType())) { - assertNotNull(tuple.get(0)); - assertNull(tuple.get(1)); - assertNotNull(tuple.get(2)); - assertNull(tuple.get(3)); - } - } - scanner.close(); - - assertEquals(tupleNum * 2, totalCounts); - } - - private static boolean isProjectableStorage(StoreType type) { - switch (type) { - case RCFILE: - case PARQUET: - case SEQUENCEFILE: - case CSV: - case AVRO: - return true; - default: - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java deleted file mode 100644 index 12ea551..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.util.CharsetUtil; -import org.apache.tajo.storage.text.FieldSplitProcessor; -import org.apache.tajo.storage.text.LineSplitProcessor; -import org.junit.Test; - -import java.io.IOException; - -import static io.netty.util.ReferenceCountUtil.releaseLater; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class TestSplitProcessor { - - @Test - public void testFieldSplitProcessor() throws IOException { - String data = "abc||de"; - final ByteBuf buf = releaseLater( - Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1)); - - final int len = buf.readableBytes(); - FieldSplitProcessor processor = new FieldSplitProcessor('|'); - - assertEquals(3, buf.forEachByte(0, len, processor)); - assertEquals(4, buf.forEachByte(4, len - 4, processor)); - assertEquals(-1, buf.forEachByte(5, len - 5, processor)); - - } - - @Test - public void testLineSplitProcessor() throws IOException { - String data = "abc\r\n\n"; - final ByteBuf buf = releaseLater( - Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1)); - - final int len = buf.readableBytes(); - LineSplitProcessor processor = new LineSplitProcessor(); - - //find CR - assertEquals(3, buf.forEachByte(0, len, processor)); - - // find CRLF - assertEquals(4, buf.forEachByte(4, len - 4, processor)); - assertEquals(buf.getByte(4), '\n'); - // need to skip LF - assertTrue(processor.isPrevCharCR()); - - // find LF - assertEquals(5, buf.forEachByte(5, len - 5, processor)); //line length is zero - } -}
