http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java index 9722959,0000000..95d0407 mode 100644,000000..100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java @@@ -1,223 -1,0 +1,223 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import com.google.protobuf.Message; +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; +import org.apache.commons.codec.binary.Base64; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.*; +import org.apache.tajo.datum.protobuf.ProtobufJsonFormat; +import org.apache.tajo.storage.FieldSerializerDeserializer; +import org.apache.tajo.util.NumberUtil; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.CharsetDecoder; + +//Compatibility with Apache Hive +public class TextFieldSerializerDeserializer implements FieldSerializerDeserializer { + public static final byte[] trueBytes = "true".getBytes(); + public static final byte[] falseBytes = "false".getBytes(); - private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); ++ private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); + private final CharsetDecoder decoder = CharsetUtil.getDecoder(CharsetUtil.UTF_8); + + private static boolean isNull(ByteBuf val, ByteBuf nullBytes) { + return !val.isReadable() || nullBytes.equals(val); + } + + private static boolean isNullText(ByteBuf val, ByteBuf nullBytes) { + return val.readableBytes() > 0 && nullBytes.equals(val); + } + + @Override + public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars) throws IOException { + byte[] bytes; + int length = 0; + TajoDataTypes.DataType dataType = col.getDataType(); + + if (datum == null || datum instanceof NullDatum) { + switch (dataType.getType()) { + case CHAR: + case TEXT: + length = nullChars.length; + out.write(nullChars); + break; + default: + break; + } + return length; + } + + switch (dataType.getType()) { + case BOOLEAN: + out.write(datum.asBool() ? trueBytes : falseBytes); + length = trueBytes.length; + break; + case CHAR: + byte[] pad = new byte[dataType.getLength() - datum.size()]; + bytes = datum.asTextBytes(); + out.write(bytes); + out.write(pad); + length = bytes.length + pad.length; + break; + case TEXT: + case BIT: + case INT2: + case INT4: + case INT8: + case FLOAT4: + case FLOAT8: + case INET4: + case DATE: + case INTERVAL: + bytes = datum.asTextBytes(); + length = bytes.length; + out.write(bytes); + break; + case TIME: + bytes = ((TimeDatum) datum).asChars(TajoConf.getCurrentTimeZone(), true).getBytes(); + length = bytes.length; + out.write(bytes); + break; + case TIMESTAMP: + bytes = ((TimestampDatum) datum).asChars(TajoConf.getCurrentTimeZone(), true).getBytes(); + length = bytes.length; + out.write(bytes); + break; + case INET6: + case BLOB: + bytes = Base64.encodeBase64(datum.asByteArray(), false); + length = bytes.length; + out.write(bytes, 0, length); + break; + case PROTOBUF: + ProtobufDatum protobuf = (ProtobufDatum) datum; + byte[] protoBytes = protobufJsonFormat.printToString(protobuf.get()).getBytes(); + length = protoBytes.length; + out.write(protoBytes, 0, protoBytes.length); + break; + case NULL_TYPE: + default: + break; + } + return length; + } + + @Override + public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) throws IOException { + Datum datum; + TajoDataTypes.Type type = col.getDataType().getType(); + boolean nullField; + if (type == TajoDataTypes.Type.TEXT || type == TajoDataTypes.Type.CHAR) { + nullField = isNullText(buf, nullChars); + } else { + nullField = isNull(buf, nullChars); + } + + if (nullField) { + datum = NullDatum.get(); + } else { + switch (type) { + case BOOLEAN: + byte bool = buf.readByte(); + datum = DatumFactory.createBool(bool == 't' || bool == 'T'); + break; + case BIT: + datum = DatumFactory.createBit(Byte.parseByte( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString())); + break; + case CHAR: + datum = DatumFactory.createChar( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString().trim()); + break; + case INT1: + case INT2: + datum = DatumFactory.createInt2((short) NumberUtil.parseInt(buf)); + break; + case INT4: + datum = DatumFactory.createInt4(NumberUtil.parseInt(buf)); + break; + case INT8: + datum = DatumFactory.createInt8(NumberUtil.parseLong(buf)); + break; + case FLOAT4: + datum = DatumFactory.createFloat4( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); + break; + case FLOAT8: + datum = DatumFactory.createFloat8(NumberUtil.parseDouble(buf)); + break; + case TEXT: { + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + datum = DatumFactory.createText(bytes); + break; + } + case DATE: + datum = DatumFactory.createDate( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); + break; + case TIME: + datum = DatumFactory.createTime( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); + break; + case TIMESTAMP: + datum = DatumFactory.createTimestamp( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); + break; + case INTERVAL: + datum = DatumFactory.createInterval( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); + break; + case PROTOBUF: { + ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType()); + Message.Builder builder = factory.newBuilder(); + try { + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + protobufJsonFormat.merge(bytes, builder); + datum = factory.createDatum(builder.build()); + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + break; + } + case INET4: + datum = DatumFactory.createInet4( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); + break; + case BLOB: { + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + datum = DatumFactory.createBlob(Base64.decodeBase64(bytes)); + break; + } + default: + datum = NullDatum.get(); + break; + } + } + return datum; + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java index 0000000,0000000..7ebfa79 new file mode 100644 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java @@@ -1,0 -1,0 +1,60 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.tajo.storage.text; ++ ++import io.netty.buffer.ByteBuf; ++import org.apache.tajo.catalog.Schema; ++import org.apache.tajo.catalog.TableMeta; ++import org.apache.tajo.storage.Tuple; ++ ++import java.io.IOException; ++ ++/** ++ * Reads a text line and fills a Tuple with values ++ */ ++public abstract class TextLineDeserializer { ++ protected Schema schema; ++ protected TableMeta meta; ++ protected int [] targetColumnIndexes; ++ ++ public TextLineDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes) { ++ this.schema = schema; ++ this.meta = meta; ++ this.targetColumnIndexes = targetColumnIndexes; ++ } ++ ++ /** ++ * Initialize SerDe ++ */ ++ public abstract void init(); ++ ++ /** ++ * It fills a tuple with a read fields in a given line. ++ * ++ * @param buf Read line ++ * @param output Tuple to be filled with read fields ++ * @throws java.io.IOException ++ */ ++ public abstract void deserialize(final ByteBuf buf, Tuple output) throws IOException, TextLineParsingError; ++ ++ /** ++ * Release external resources ++ */ ++ public abstract void release(); ++} http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java index 0000000,0000000..f0bae5e new file mode 100644 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java @@@ -1,0 -1,0 +1,31 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.tajo.storage.text; ++ ++public class TextLineParsingError extends Exception { ++ ++ public TextLineParsingError(Throwable t) { ++ super(t); ++ } ++ ++ public TextLineParsingError(String message, Throwable t) { ++ super(t.getMessage() + ", Error line: " + message); ++ } ++ ++} http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java index 0000000,0000000..e81e289 new file mode 100644 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java @@@ -1,0 -1,0 +1,65 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.tajo.storage.text; ++ ++import io.netty.buffer.ByteBuf; ++import org.apache.commons.lang.StringEscapeUtils; ++import org.apache.commons.lang.StringUtils; ++import org.apache.tajo.catalog.Schema; ++import org.apache.tajo.catalog.TableMeta; ++import org.apache.tajo.datum.NullDatum; ++import org.apache.tajo.storage.BufferPool; ++import org.apache.tajo.storage.StorageConstants; ++ ++/** ++ * Pluggable Text Line SerDe class ++ */ ++public abstract class TextLineSerDe { ++ ++ public TextLineSerDe() { ++ } ++ ++ public abstract TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes); ++ ++ public abstract TextLineSerializer createSerializer(Schema schema, TableMeta meta); ++ ++ public static ByteBuf getNullChars(TableMeta meta) { ++ byte[] nullCharByteArray = getNullCharsAsBytes(meta); ++ ++ ByteBuf nullChars = BufferPool.directBuffer(nullCharByteArray.length, nullCharByteArray.length); ++ nullChars.writeBytes(nullCharByteArray); ++ ++ return nullChars; ++ } ++ ++ public static byte [] getNullCharsAsBytes(TableMeta meta) { ++ byte [] nullChars; ++ ++ String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_NULL, ++ NullDatum.DEFAULT_TEXT)); ++ if (StringUtils.isEmpty(nullCharacters)) { ++ nullChars = NullDatum.get().asTextBytes(); ++ } else { ++ nullChars = nullCharacters.getBytes(); ++ } ++ ++ return nullChars; ++ } ++ ++} http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java index 0000000,0000000..0c2761f new file mode 100644 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java @@@ -1,0 -1,0 +1,45 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.tajo.storage.text; ++ ++import org.apache.tajo.catalog.Schema; ++import org.apache.tajo.catalog.TableMeta; ++import org.apache.tajo.storage.Tuple; ++ ++import java.io.IOException; ++import java.io.OutputStream; ++ ++/** ++ * Write a Tuple into single text formatted line ++ */ ++public abstract class TextLineSerializer { ++ protected Schema schema; ++ protected TableMeta meta; ++ ++ public TextLineSerializer(Schema schema, TableMeta meta) { ++ this.schema = schema; ++ this.meta = meta; ++ } ++ ++ public abstract void init(); ++ ++ public abstract int serialize(OutputStream out, Tuple input) throws IOException; ++ ++ public abstract void release(); ++} http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java index 088fda9,0000000..ff7fe13 mode 100644,000000..100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java @@@ -1,129 -1,0 +1,137 @@@ - /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - package org.apache.tajo.storage; - - import org.apache.hadoop.fs.FileStatus; - import org.apache.hadoop.fs.FileSystem; - import org.apache.hadoop.fs.Path; - import org.apache.hadoop.fs.s3.S3FileSystem; - import org.apache.hadoop.hdfs.DFSConfigKeys; - import org.apache.tajo.catalog.CatalogUtil; - import org.apache.tajo.catalog.Schema; - import org.apache.tajo.catalog.TableMeta; - import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; - import org.apache.tajo.common.TajoDataTypes.Type; - import org.apache.tajo.conf.TajoConf; - import org.apache.tajo.datum.Datum; - import org.apache.tajo.datum.DatumFactory; - import org.apache.tajo.storage.fragment.Fragment; - import org.apache.tajo.storage.s3.InMemoryFileSystemStore; - import org.apache.tajo.storage.s3.SmallBlockS3FileSystem; - import org.junit.Test; - import org.junit.runner.RunWith; - import org.junit.runners.Parameterized; - - import java.io.IOException; - import java.net.URI; - import java.util.Arrays; - import java.util.Collection; - import java.util.List; - - import static org.junit.Assert.assertEquals; - import static org.junit.Assert.assertTrue; - - @RunWith(Parameterized.class) - public class TestFileSystems { - - protected byte[] data = null; - - private static String TEST_PATH = "target/test-data/TestFileSystem"; - private TajoConf conf = null; - private FileStorageManager sm = null; - private FileSystem fs = null; - Path testDir; - - public TestFileSystems(FileSystem fs) throws IOException { - conf = new TajoConf(); - - if(fs instanceof S3FileSystem){ - conf.set(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, "10"); - fs.initialize(URI.create(fs.getScheme() + ":///"), conf); - } - this.fs = fs; - sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); - testDir = getTestDir(this.fs, TEST_PATH); - } - - public Path getTestDir(FileSystem fs, String dir) throws IOException { - Path path = new Path(dir); - if(fs.exists(path)) - fs.delete(path, true); - - fs.mkdirs(path); - - return fs.makeQualified(path); - } - - @Parameterized.Parameters - public static Collection<Object[]> generateParameters() { - return Arrays.asList(new Object[][] { - {new SmallBlockS3FileSystem(new InMemoryFileSystemStore())}, - }); - } - - @Test - public void testBlockSplit() throws IOException { - - Schema schema = new Schema(); - schema.addColumn("id", Type.INT4); - schema.addColumn("age", Type.INT4); - schema.addColumn("name", Type.TEXT); - - TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV); - - Tuple[] tuples = new Tuple[4]; - for (int i = 0; i < tuples.length; i++) { - tuples[i] = new VTuple(3); - tuples[i] - .put(new Datum[] { DatumFactory.createInt4(i), - DatumFactory.createInt4(i + 32), - DatumFactory.createText("name" + i) }); - } - - Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", - "table.csv"); - fs.mkdirs(path.getParent()); - - Appender appender = sm.getAppender(meta, schema, path); - appender.init(); - for (Tuple t : tuples) { - appender.addTuple(t); - } - appender.close(); - FileStatus fileStatus = fs.getFileStatus(path); - - List<Fragment> splits = sm.getSplits("table", meta, schema, path); - int splitSize = (int) Math.ceil(fileStatus.getLen() / (double) fileStatus.getBlockSize()); - assertEquals(splitSize, splits.size()); - - for (Fragment fragment : splits) { - assertTrue(fragment.getLength() <= fileStatus.getBlockSize()); - } - } - } ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.tajo.storage; ++ ++import org.apache.hadoop.fs.FileStatus; ++import org.apache.hadoop.fs.FileSystem; ++import org.apache.hadoop.fs.LocalFileSystem; ++import org.apache.hadoop.fs.Path; ++import org.apache.tajo.catalog.CatalogUtil; ++import org.apache.tajo.catalog.Schema; ++import org.apache.tajo.catalog.TableMeta; ++import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; ++import org.apache.tajo.common.TajoDataTypes.Type; ++import org.apache.tajo.conf.TajoConf; ++import org.apache.tajo.datum.Datum; ++import org.apache.tajo.datum.DatumFactory; ++import org.apache.tajo.storage.fragment.Fragment; ++import org.junit.After; ++import org.junit.Before; ++import org.junit.Test; ++import org.junit.runner.RunWith; ++import org.junit.runners.Parameterized; ++ ++import java.io.IOException; ++import java.net.URI; ++import java.util.Arrays; ++import java.util.Collection; ++import java.util.List; ++ ++import static org.junit.Assert.assertEquals; ++import static org.junit.Assert.assertTrue; ++ ++@RunWith(Parameterized.class) ++public class TestFileSystems { ++ ++ private static String TEST_PATH = "target/test-data/TestFileSystem"; ++ private TajoConf conf; ++ private FileStorageManager sm; ++ private FileSystem fs; ++ private Path testDir; ++ ++ public TestFileSystems(FileSystem fs) throws IOException { ++ this.fs = fs; ++ this.conf = new TajoConf(fs.getConf()); ++ sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); ++ testDir = getTestDir(this.fs, TEST_PATH); ++ } ++ ++ public Path getTestDir(FileSystem fs, String dir) throws IOException { ++ Path path = new Path(dir); ++ if(fs.exists(path)) ++ fs.delete(path, true); ++ ++ fs.mkdirs(path); ++ ++ return fs.makeQualified(path); ++ } ++ ++ @Parameterized.Parameters ++ public static Collection<Object[]> generateParameters() throws IOException { ++ return Arrays.asList(new Object[][]{ ++ {FileSystem.getLocal(new TajoConf())}, ++ }); ++ } ++ ++ @Before ++ public void setup() throws IOException { ++ if (!(fs instanceof LocalFileSystem)) { ++ conf.set("fs.local.block.size", "10"); ++ fs.initialize(URI.create(fs.getScheme() + ":///"), conf); ++ fs.setConf(conf); ++ } ++ } ++ ++ @After ++ public void tearDown() throws IOException { ++ if (!(fs instanceof LocalFileSystem)) { ++ fs.setConf(new TajoConf()); ++ } ++ } ++ ++ @Test ++ public void testBlockSplit() throws IOException { ++ ++ Schema schema = new Schema(); ++ schema.addColumn("id", Type.INT4); ++ schema.addColumn("age", Type.INT4); ++ schema.addColumn("name", Type.TEXT); ++ ++ TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV); ++ ++ Tuple[] tuples = new Tuple[4]; ++ for (int i = 0; i < tuples.length; i++) { ++ tuples[i] = new VTuple(3); ++ tuples[i] ++ .put(new Datum[]{DatumFactory.createInt4(i), ++ DatumFactory.createInt4(i + 32), ++ DatumFactory.createText("name" + i)}); ++ } ++ ++ Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", ++ "table.csv"); ++ fs.mkdirs(path.getParent()); ++ ++ Appender appender = sm.getAppender(meta, schema, path); ++ appender.init(); ++ for (Tuple t : tuples) { ++ appender.addTuple(t); ++ } ++ appender.close(); ++ FileStatus fileStatus = fs.getFileStatus(path); ++ ++ List<Fragment> splits = sm.getSplits("table", meta, schema, path); ++ int splitSize = (int) Math.ceil(fileStatus.getLen() / (double) fileStatus.getBlockSize()); ++ assertEquals(splitSize, splits.size()); ++ ++ for (Fragment fragment : splits) { ++ assertTrue(fragment.getLength() <= fileStatus.getBlockSize()); ++ } ++ } ++} http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java index 4081a80,0000000..15998f2 mode 100644,000000..100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java @@@ -1,867 -1,0 +1,878 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.tajo.QueryId; +import org.apache.tajo.TajoIdProtos; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.datum.ProtobufDatumFactory; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.rcfile.RCFile; +import org.apache.tajo.storage.sequencefile.SequenceFileScanner; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.KeyValueSet; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class TestStorages { + private TajoConf conf; + private static String TEST_PATH = "target/test-data/TestStorages"; + + private static String TEST_PROJECTION_AVRO_SCHEMA = + "{\n" + + " \"type\": \"record\",\n" + + " \"namespace\": \"org.apache.tajo\",\n" + + " \"name\": \"testProjection\",\n" + + " \"fields\": [\n" + + " { \"name\": \"id\", \"type\": \"int\" },\n" + + " { \"name\": \"age\", \"type\": \"long\" },\n" + + " { \"name\": \"score\", \"type\": \"float\" }\n" + + " ]\n" + + "}\n"; + + private static String TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA = + "{\n" + + " \"type\": \"record\",\n" + + " \"namespace\": \"org.apache.tajo\",\n" + + " \"name\": \"testNullHandlingTypes\",\n" + + " \"fields\": [\n" + + " { \"name\": \"col1\", \"type\": [\"null\", \"boolean\"] },\n" + - " { \"name\": \"col2\", \"type\": [\"null\", \"int\"] },\n" + - " { \"name\": \"col3\", \"type\": [\"null\", \"string\"] },\n" + ++ " { \"name\": \"col2\", \"type\": [\"null\", \"string\"] },\n" + ++ " { \"name\": \"col3\", \"type\": [\"null\", \"int\"] },\n" + + " { \"name\": \"col4\", \"type\": [\"null\", \"int\"] },\n" + - " { \"name\": \"col5\", \"type\": [\"null\", \"int\"] },\n" + - " { \"name\": \"col6\", \"type\": [\"null\", \"long\"] },\n" + - " { \"name\": \"col7\", \"type\": [\"null\", \"float\"] },\n" + - " { \"name\": \"col8\", \"type\": [\"null\", \"double\"] },\n" + - " { \"name\": \"col9\", \"type\": [\"null\", \"string\"] },\n" + ++ " { \"name\": \"col5\", \"type\": [\"null\", \"long\"] },\n" + ++ " { \"name\": \"col6\", \"type\": [\"null\", \"float\"] },\n" + ++ " { \"name\": \"col7\", \"type\": [\"null\", \"double\"] },\n" + ++ " { \"name\": \"col8\", \"type\": [\"null\", \"string\"] },\n" + ++ " { \"name\": \"col9\", \"type\": [\"null\", \"bytes\"] },\n" + + " { \"name\": \"col10\", \"type\": [\"null\", \"bytes\"] },\n" + - " { \"name\": \"col11\", \"type\": [\"null\", \"bytes\"] },\n" + - " { \"name\": \"col12\", \"type\": \"null\" },\n" + - " { \"name\": \"col13\", \"type\": [\"null\", \"bytes\"] }\n" + ++ " { \"name\": \"col11\", \"type\": \"null\" },\n" + ++ " { \"name\": \"col12\", \"type\": [\"null\", \"bytes\"] }\n" + + " ]\n" + + "}\n"; + + private StoreType storeType; + private boolean splitable; + private boolean statsable; + private boolean seekable; + private Path testDir; + private FileSystem fs; + + public TestStorages(StoreType type, boolean splitable, boolean statsable, boolean seekable) throws IOException { + this.storeType = type; + this.splitable = splitable; + this.statsable = statsable; + this.seekable = seekable; + + conf = new TajoConf(); + + if (storeType == StoreType.RCFILE) { + conf.setInt(RCFile.RECORD_INTERVAL_CONF_STR, 100); + } + + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + fs = testDir.getFileSystem(conf); + } + + @Parameterized.Parameters + public static Collection<Object[]> generateParameters() { + return Arrays.asList(new Object[][] { + //type, splitable, statsable, seekable + {StoreType.CSV, true, true, true}, + {StoreType.RAW, false, true, true}, + {StoreType.RCFILE, true, true, false}, + {StoreType.PARQUET, false, false, false}, + {StoreType.SEQUENCEFILE, true, true, false}, + {StoreType.AVRO, false, false, false}, + {StoreType.TEXTFILE, true, true, false}, ++ {StoreType.JSON, true, true, false}, + }); + } + + @Test + public void testSplitable() throws IOException { + if (splitable) { + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + + TableMeta meta = CatalogUtil.newTableMeta(storeType); + Path tablePath = new Path(testDir, "Splitable.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + int tupleNum = 10000; + VTuple vTuple; + + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(2); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createInt8(25l)); + appender.addTuple(vTuple); + } + appender.close(); + TableStats stat = appender.getStats(); + assertEquals(tupleNum, stat.getNumRows().longValue()); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + long randomNum = (long) (Math.random() * fileLen) + 1; + + FileFragment[] tablets = new FileFragment[2]; + tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum); + tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum)); + + Scanner scanner = sm.getScanner(meta, schema, tablets[0], schema); + assertTrue(scanner.isSplittable()); + scanner.init(); + int tupleCnt = 0; + while (scanner.next() != null) { + tupleCnt++; + } + scanner.close(); + + scanner = sm.getScanner(meta, schema, tablets[1], schema); + assertTrue(scanner.isSplittable()); + scanner.init(); + while (scanner.next() != null) { + tupleCnt++; + } + scanner.close(); + + assertEquals(tupleNum, tupleCnt); + } + } + + @Test + public void testRCFileSplitable() throws IOException { + if (storeType == StoreType.RCFILE) { + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + + TableMeta meta = CatalogUtil.newTableMeta(storeType); + Path tablePath = new Path(testDir, "Splitable.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + int tupleNum = 10000; + VTuple vTuple; + + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(2); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createInt8(25l)); + appender.addTuple(vTuple); + } + appender.close(); + TableStats stat = appender.getStats(); + assertEquals(tupleNum, stat.getNumRows().longValue()); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + long randomNum = 122; // header size + + FileFragment[] tablets = new FileFragment[2]; + tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum); + tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum)); + + Scanner scanner = sm.getScanner(meta, schema, tablets[0], schema); + assertTrue(scanner.isSplittable()); + scanner.init(); + int tupleCnt = 0; + while (scanner.next() != null) { + tupleCnt++; + } + scanner.close(); + + scanner = sm.getScanner(meta, schema, tablets[1], schema); + assertTrue(scanner.isSplittable()); + scanner.init(); + while (scanner.next() != null) { + tupleCnt++; + } + scanner.close(); + + assertEquals(tupleNum, tupleCnt); + } + } + + @Test + public void testProjection() throws IOException { + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + schema.addColumn("score", Type.FLOAT4); + + TableMeta meta = CatalogUtil.newTableMeta(storeType); + meta.setOptions(CatalogUtil.newPhysicalProperties(storeType)); + if (storeType == StoreType.AVRO) { + meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, + TEST_PROJECTION_AVRO_SCHEMA); + } + + Path tablePath = new Path(testDir, "testProjection.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.init(); + int tupleNum = 10000; + VTuple vTuple; + + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(3); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createInt8(i + 2)); + vTuple.put(2, DatumFactory.createFloat4(i + 3)); + appender.addTuple(vTuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + FileFragment fragment = new FileFragment("testReadAndWrite", tablePath, 0, status.getLen()); + + Schema target = new Schema(); + target.addColumn("age", Type.INT8); + target.addColumn("score", Type.FLOAT4); + Scanner scanner = sm.getScanner(meta, schema, fragment, target); + scanner.init(); + int tupleCnt = 0; + Tuple tuple; + while ((tuple = scanner.next()) != null) { + if (storeType == StoreType.RCFILE + || storeType == StoreType.CSV + || storeType == StoreType.PARQUET + || storeType == StoreType.SEQUENCEFILE + || storeType == StoreType.AVRO) { + assertTrue(tuple.get(0) == null); + } + assertTrue(tupleCnt + 2 == tuple.get(1).asInt8()); + assertTrue(tupleCnt + 3 == tuple.get(2).asFloat4()); + tupleCnt++; + } + scanner.close(); + + assertEquals(tupleNum, tupleCnt); + } + + @Test + public void testVariousTypes() throws IOException { ++ boolean handleProtobuf = storeType != StoreType.JSON; ++ + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); - schema.addColumn("col2", Type.BIT); - schema.addColumn("col3", Type.CHAR, 7); - schema.addColumn("col4", Type.INT2); - schema.addColumn("col5", Type.INT4); - schema.addColumn("col6", Type.INT8); - schema.addColumn("col7", Type.FLOAT4); - schema.addColumn("col8", Type.FLOAT8); - schema.addColumn("col9", Type.TEXT); - schema.addColumn("col10", Type.BLOB); - schema.addColumn("col11", Type.INET4); - schema.addColumn("col12", Type.NULL_TYPE); - schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); ++ schema.addColumn("col2", Type.CHAR, 7); ++ schema.addColumn("col3", Type.INT2); ++ schema.addColumn("col4", Type.INT4); ++ schema.addColumn("col5", Type.INT8); ++ schema.addColumn("col6", Type.FLOAT4); ++ schema.addColumn("col7", Type.FLOAT8); ++ schema.addColumn("col8", Type.TEXT); ++ schema.addColumn("col9", Type.BLOB); ++ schema.addColumn("col10", Type.INET4); ++ schema.addColumn("col11", Type.NULL_TYPE); ++ if (handleProtobuf) { ++ schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); ++ } + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.setOptions(CatalogUtil.newPhysicalProperties(storeType)); + if (storeType == StoreType.AVRO) { - String path = FileUtil.getResourcePath("testVariousTypes.avsc").toString(); ++ String path = FileUtil.getResourcePath("dataset/testVariousTypes.avsc").toString(); + meta.putOption(StorageConstants.AVRO_SCHEMA_URL, path); + } + + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Path tablePath = new Path(testDir, "testVariousTypes.data"); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + - Tuple tuple = new VTuple(13); ++ Tuple tuple = new VTuple(11 + (handleProtobuf ? 1 : 0)); + tuple.put(new Datum[] { + DatumFactory.createBool(true), - DatumFactory.createBit((byte) 0x99), + DatumFactory.createChar("hyunsik"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("hyunsik"), + DatumFactory.createBlob("hyunsik".getBytes()), + DatumFactory.createInet4("192.168.0.1"), - NullDatum.get(), - factory.createDatum(queryid.getProto()) ++ NullDatum.get() + }); ++ if (handleProtobuf) { ++ tuple.put(11, factory.createDatum(queryid.getProto())); ++ } ++ + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = sm.getScanner(meta, schema, fragment); + scanner.init(); + + Tuple retrieved; + while ((retrieved = scanner.next()) != null) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + scanner.close(); + } + + @Test + public void testNullHandlingTypes() throws IOException { ++ boolean handleProtobuf = storeType != StoreType.JSON; ++ + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); - schema.addColumn("col2", Type.BIT); - schema.addColumn("col3", Type.CHAR, 7); - schema.addColumn("col4", Type.INT2); - schema.addColumn("col5", Type.INT4); - schema.addColumn("col6", Type.INT8); - schema.addColumn("col7", Type.FLOAT4); - schema.addColumn("col8", Type.FLOAT8); - schema.addColumn("col9", Type.TEXT); - schema.addColumn("col10", Type.BLOB); - schema.addColumn("col11", Type.INET4); - schema.addColumn("col12", Type.NULL_TYPE); - schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); ++ schema.addColumn("col2", Type.CHAR, 7); ++ schema.addColumn("col3", Type.INT2); ++ schema.addColumn("col4", Type.INT4); ++ schema.addColumn("col5", Type.INT8); ++ schema.addColumn("col6", Type.FLOAT4); ++ schema.addColumn("col7", Type.FLOAT8); ++ schema.addColumn("col8", Type.TEXT); ++ schema.addColumn("col9", Type.BLOB); ++ schema.addColumn("col10", Type.INET4); ++ schema.addColumn("col11", Type.NULL_TYPE); ++ ++ if (handleProtobuf) { ++ schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); ++ } + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.setOptions(CatalogUtil.newPhysicalProperties(storeType)); + meta.putOption(StorageConstants.TEXT_NULL, "\\\\N"); + meta.putOption(StorageConstants.RCFILE_NULL, "\\\\N"); + meta.putOption(StorageConstants.RCFILE_SERDE, TextSerializerDeserializer.class.getName()); + meta.putOption(StorageConstants.SEQUENCEFILE_NULL, "\\"); + if (storeType == StoreType.AVRO) { + meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, + TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA); + } + + Path tablePath = new Path(testDir, "testVariousTypes.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); - - Tuple seedTuple = new VTuple(13); ++ int columnNum = 11 + (handleProtobuf ? 1 : 0); ++ Tuple seedTuple = new VTuple(columnNum); + seedTuple.put(new Datum[]{ + DatumFactory.createBool(true), // 0 - DatumFactory.createBit((byte) 0x99), // 1 + DatumFactory.createChar("hyunsik"), // 2 + DatumFactory.createInt2((short) 17), // 3 + DatumFactory.createInt4(59), // 4 + DatumFactory.createInt8(23l), // 5 + DatumFactory.createFloat4(77.9f), // 6 + DatumFactory.createFloat8(271.9f), // 7 + DatumFactory.createText("hyunsik"), // 8 + DatumFactory.createBlob("hyunsik".getBytes()),// 9 + DatumFactory.createInet4("192.168.0.1"), // 10 + NullDatum.get(), // 11 - factory.createDatum(queryid.getProto()) // 12 + }); + ++ if (handleProtobuf) { ++ seedTuple.put(11, factory.createDatum(queryid.getProto())); // 12 ++ } ++ + // Making tuples with different null column positions + Tuple tuple; - for (int i = 0; i < 13; i++) { - tuple = new VTuple(13); - for (int j = 0; j < 13; j++) { ++ for (int i = 0; i < columnNum; i++) { ++ tuple = new VTuple(columnNum); ++ for (int j = 0; j < columnNum; j++) { + if (i == j) { // i'th column will have NULL value + tuple.put(j, NullDatum.get()); + } else { + tuple.put(j, seedTuple.get(j)); + } + } + appender.addTuple(tuple); + } + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + Tuple retrieved; + int i = 0; + while ((retrieved = scanner.next()) != null) { - assertEquals(13, retrieved.size()); - for (int j = 0; j < 13; j++) { ++ assertEquals(columnNum, retrieved.size()); ++ for (int j = 0; j < columnNum; j++) { + if (i == j) { + assertEquals(NullDatum.get(), retrieved.get(j)); + } else { + assertEquals(seedTuple.get(j), retrieved.get(j)); + } + } + + i++; + } + scanner.close(); + } + + @Test + public void testRCFileTextSerializeDeserialize() throws IOException { + if(storeType != StoreType.RCFILE) return; + + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.BIT); + schema.addColumn("col3", Type.CHAR, 7); + schema.addColumn("col4", Type.INT2); + schema.addColumn("col5", Type.INT4); + schema.addColumn("col6", Type.INT8); + schema.addColumn("col7", Type.FLOAT4); + schema.addColumn("col8", Type.FLOAT8); + schema.addColumn("col9", Type.TEXT); + schema.addColumn("col10", Type.BLOB); + schema.addColumn("col11", Type.INET4); + schema.addColumn("col12", Type.NULL_TYPE); + schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.putOption(StorageConstants.CSVFILE_SERDE, TextSerializerDeserializer.class.getName()); + + Path tablePath = new Path(testDir, "testVariousTypes.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + + Tuple tuple = new VTuple(13); + tuple.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createBit((byte) 0x99), + DatumFactory.createChar("jinho"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("jinho"), + DatumFactory.createBlob("hyunsik babo".getBytes()), + DatumFactory.createInet4("192.168.0.1"), + NullDatum.get(), + factory.createDatum(queryid.getProto()) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); + + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + Tuple retrieved; + while ((retrieved=scanner.next()) != null) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + scanner.close(); + assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); + assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); + } + + @Test + public void testRCFileBinarySerializeDeserialize() throws IOException { + if(storeType != StoreType.RCFILE) return; + + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.BIT); + schema.addColumn("col3", Type.CHAR, 7); + schema.addColumn("col4", Type.INT2); + schema.addColumn("col5", Type.INT4); + schema.addColumn("col6", Type.INT8); + schema.addColumn("col7", Type.FLOAT4); + schema.addColumn("col8", Type.FLOAT8); + schema.addColumn("col9", Type.TEXT); + schema.addColumn("col10", Type.BLOB); + schema.addColumn("col11", Type.INET4); + schema.addColumn("col12", Type.NULL_TYPE); + schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.putOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName()); + + Path tablePath = new Path(testDir, "testVariousTypes.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + + Tuple tuple = new VTuple(13); + tuple.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createBit((byte) 0x99), + DatumFactory.createChar("jinho"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("jinho"), + DatumFactory.createBlob("hyunsik babo".getBytes()), + DatumFactory.createInet4("192.168.0.1"), + NullDatum.get(), + factory.createDatum(queryid.getProto()) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); + + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + Tuple retrieved; + while ((retrieved=scanner.next()) != null) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + scanner.close(); + assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); + assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); + } + + @Test + public void testSequenceFileTextSerializeDeserialize() throws IOException { + if(storeType != StoreType.SEQUENCEFILE) return; + + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.BIT); + schema.addColumn("col3", Type.CHAR, 7); + schema.addColumn("col4", Type.INT2); + schema.addColumn("col5", Type.INT4); + schema.addColumn("col6", Type.INT8); + schema.addColumn("col7", Type.FLOAT4); + schema.addColumn("col8", Type.FLOAT8); + schema.addColumn("col9", Type.TEXT); + schema.addColumn("col10", Type.BLOB); + schema.addColumn("col11", Type.INET4); + schema.addColumn("col12", Type.NULL_TYPE); + schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName()); + + Path tablePath = new Path(testDir, "testVariousTypes.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + + Tuple tuple = new VTuple(13); + tuple.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createBit((byte) 0x99), + DatumFactory.createChar("jinho"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("jinho"), + DatumFactory.createBlob("hyunsik babo".getBytes()), + DatumFactory.createInet4("192.168.0.1"), + NullDatum.get(), + factory.createDatum(queryid.getProto()) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); + + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + assertTrue(scanner instanceof SequenceFileScanner); + Writable key = ((SequenceFileScanner) scanner).getKey(); + assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName()); + + Tuple retrieved; + while ((retrieved=scanner.next()) != null) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + scanner.close(); + assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); + assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); + } + + @Test + public void testSequenceFileBinarySerializeDeserialize() throws IOException { + if(storeType != StoreType.SEQUENCEFILE) return; + + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.BIT); + schema.addColumn("col3", Type.CHAR, 7); + schema.addColumn("col4", Type.INT2); + schema.addColumn("col5", Type.INT4); + schema.addColumn("col6", Type.INT8); + schema.addColumn("col7", Type.FLOAT4); + schema.addColumn("col8", Type.FLOAT8); + schema.addColumn("col9", Type.TEXT); + schema.addColumn("col10", Type.BLOB); + schema.addColumn("col11", Type.INET4); + schema.addColumn("col12", Type.NULL_TYPE); + schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, BinarySerializerDeserializer.class.getName()); + + Path tablePath = new Path(testDir, "testVariousTypes.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + + Tuple tuple = new VTuple(13); + tuple.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createBit((byte) 0x99), + DatumFactory.createChar("jinho"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("jinho"), + DatumFactory.createBlob("hyunsik babo".getBytes()), + DatumFactory.createInet4("192.168.0.1"), + NullDatum.get(), + factory.createDatum(queryid.getProto()) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); + + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + assertTrue(scanner instanceof SequenceFileScanner); + Writable key = ((SequenceFileScanner) scanner).getKey(); + assertEquals(key.getClass().getCanonicalName(), BytesWritable.class.getCanonicalName()); + + Tuple retrieved; + while ((retrieved=scanner.next()) != null) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + scanner.close(); + assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); + assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); + } + + @Test + public void testTime() throws IOException { + if (storeType == StoreType.CSV || storeType == StoreType.RAW) { + Schema schema = new Schema(); + schema.addColumn("col1", Type.DATE); + schema.addColumn("col2", Type.TIME); + schema.addColumn("col3", Type.TIMESTAMP); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + + Path tablePath = new Path(testDir, "testTime.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.init(); + + Tuple tuple = new VTuple(3); + tuple.put(new Datum[]{ + DatumFactory.createDate("1980-04-01"), + DatumFactory.createTime("12:34:56"), + DatumFactory.createTimestmpDatumWithUnixTime((int)(System.currentTimeMillis() / 1000)) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + Tuple retrieved; + while ((retrieved = scanner.next()) != null) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.get(i)); + } + } + scanner.close(); + } + } + + @Test + public void testSeekableScanner() throws IOException { + if (!seekable) { + return; + } + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + schema.addColumn("comment", Type.TEXT); + + TableMeta meta = CatalogUtil.newTableMeta(storeType); + Path tablePath = new Path(testDir, "Seekable.data"); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + FileAppender appender = (FileAppender) sm.getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + int tupleNum = 100000; + VTuple vTuple; + + List<Long> offsets = Lists.newArrayList(); + offsets.add(0L); + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(3); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createInt8(25l)); + vTuple.put(2, DatumFactory.createText("test" + i)); + appender.addTuple(vTuple); + + // find a seek position + if (i % (tupleNum / 3) == 0) { + offsets.add(appender.getOffset()); + } + } + + // end of file + if (!offsets.contains(appender.getOffset())) { + offsets.add(appender.getOffset()); + } + + appender.close(); + if (statsable) { + TableStats stat = appender.getStats(); + assertEquals(tupleNum, stat.getNumRows().longValue()); + } + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(status.getLen(), appender.getOffset()); + + Scanner scanner; + int tupleCnt = 0; + long prevOffset = 0; + long readBytes = 0; + long readRows = 0; + for (long offset : offsets) { + scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, + new FileFragment("table", tablePath, prevOffset, offset - prevOffset), schema); + scanner.init(); + + while (scanner.next() != null) { + tupleCnt++; + } + + scanner.close(); + if (statsable) { + readBytes += scanner.getInputStats().getNumBytes(); + readRows += scanner.getInputStats().getNumRows(); + } + prevOffset = offset; + } + + assertEquals(tupleNum, tupleCnt); + if (statsable) { + assertEquals(appender.getStats().getNumBytes().longValue(), readBytes); + assertEquals(appender.getStats().getNumRows().longValue(), readRows); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java index 4f7ea1c,0000000..7b83894 mode 100644,000000..100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java @@@ -1,106 -1,0 +1,106 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.avro; + +import org.apache.avro.Schema; +import org.apache.tajo.HttpFileServer; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.NetUtils; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URISyntaxException; +import java.net.URL; + +import static org.junit.Assert.*; + +/** + * Tests for {@link org.apache.tajo.storage.avro.AvroUtil}. + */ +public class TestAvroUtil { + private Schema expected; + private URL schemaUrl; + + @Before + public void setUp() throws Exception { - schemaUrl = FileUtil.getResourcePath("testVariousTypes.avsc"); ++ schemaUrl = FileUtil.getResourcePath("dataset/testVariousTypes.avsc"); + assertNotNull(schemaUrl); + + File file = new File(schemaUrl.getPath()); + assertTrue(file.exists()); + + expected = new Schema.Parser().parse(file); + } + + @Test + public void testGetSchema() throws IOException, URISyntaxException { + TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO); + meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, FileUtil.readTextFile(new File(schemaUrl.getPath()))); + Schema schema = AvroUtil.getAvroSchema(meta, new TajoConf()); + assertEquals(expected, schema); + + meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO); + meta.putOption(StorageConstants.AVRO_SCHEMA_URL, schemaUrl.getPath()); + schema = AvroUtil.getAvroSchema(meta, new TajoConf()); + assertEquals(expected, schema); + + HttpFileServer server = new HttpFileServer(NetUtils.createSocketAddr("127.0.0.1:0")); + try { + server.start(); + InetSocketAddress addr = server.getBindAddress(); + + String url = "http://127.0.0.1:" + addr.getPort() + schemaUrl.getPath(); + meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO); + meta.putOption(StorageConstants.AVRO_SCHEMA_URL, url); + schema = AvroUtil.getAvroSchema(meta, new TajoConf()); + } finally { + server.stop(); + } + assertEquals(expected, schema); + } + + @Test + public void testGetSchemaFromHttp() throws IOException, URISyntaxException { + HttpFileServer server = new HttpFileServer(NetUtils.createSocketAddr("127.0.0.1:0")); + try { + server.start(); + InetSocketAddress addr = server.getBindAddress(); + + Schema schema = AvroUtil.getAvroSchemaFromHttp("http://127.0.0.1:" + addr.getPort() + schemaUrl.getPath()); + assertEquals(expected, schema); + } finally { + server.stop(); + } + } + + @Test + public void testGetSchemaFromFileSystem() throws IOException, URISyntaxException { + Schema schema = AvroUtil.getAvroSchemaFromFileSystem(schemaUrl.toString(), new TajoConf()); + + assertEquals(expected, schema); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestLineReader.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestLineReader.java index 0000000,0000000..bf7516f new file mode 100644 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestLineReader.java @@@ -1,0 -1,0 +1,197 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.tajo.storage.json; ++ ++import io.netty.buffer.ByteBuf; ++import org.apache.hadoop.fs.FileStatus; ++import org.apache.hadoop.fs.FileSystem; ++import org.apache.hadoop.fs.Path; ++import org.apache.hadoop.io.IOUtils; ++import org.apache.hadoop.io.compress.DeflateCodec; ++import org.apache.tajo.catalog.CatalogUtil; ++import org.apache.tajo.catalog.Schema; ++import org.apache.tajo.catalog.TableMeta; ++import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; ++import org.apache.tajo.common.TajoDataTypes.Type; ++import org.apache.tajo.conf.TajoConf; ++import org.apache.tajo.datum.DatumFactory; ++import org.apache.tajo.datum.NullDatum; ++import org.apache.tajo.storage.ByteBufInputChannel; ++import org.apache.tajo.storage.FileAppender; ++import org.apache.tajo.storage.StorageManager; ++import org.apache.tajo.storage.VTuple; ++import org.apache.tajo.storage.fragment.FileFragment; ++import org.apache.tajo.storage.text.ByteBufLineReader; ++import org.apache.tajo.storage.text.DelimitedLineReader; ++import org.apache.tajo.storage.text.DelimitedTextFile; ++import org.apache.tajo.util.CommonTestingUtil; ++import org.apache.tajo.util.FileUtil; ++import org.junit.Test; ++ ++import java.io.File; ++import java.io.FileInputStream; ++import java.io.IOException; ++import java.util.concurrent.atomic.AtomicInteger; ++ ++import static org.junit.Assert.*; ++ ++public class TestLineReader { ++ private static String TEST_PATH = "target/test-data/TestLineReader"; ++ ++ @Test ++ public void testByteBufLineReader() throws IOException { ++ TajoConf conf = new TajoConf(); ++ Path testDir = CommonTestingUtil.getTestDir(TEST_PATH); ++ FileSystem fs = testDir.getFileSystem(conf); ++ ++ Schema schema = new Schema(); ++ schema.addColumn("id", Type.INT4); ++ schema.addColumn("age", Type.INT8); ++ schema.addColumn("comment", Type.TEXT); ++ schema.addColumn("comment2", Type.TEXT); ++ ++ TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE); ++ Path tablePath = new Path(testDir, "line.data"); ++ FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender( ++ null, null, meta, schema, tablePath); ++ appender.enableStats(); ++ appender.init(); ++ int tupleNum = 10000; ++ VTuple vTuple; ++ ++ for (int i = 0; i < tupleNum; i++) { ++ vTuple = new VTuple(4); ++ vTuple.put(0, DatumFactory.createInt4(i + 1)); ++ vTuple.put(1, DatumFactory.createInt8(25l)); ++ vTuple.put(2, DatumFactory.createText("emiya muljomdao")); ++ vTuple.put(3, NullDatum.get()); ++ appender.addTuple(vTuple); ++ } ++ appender.close(); ++ ++ FileStatus status = fs.getFileStatus(tablePath); ++ ++ ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(tablePath)); ++ assertEquals(status.getLen(), channel.available()); ++ ByteBufLineReader reader = new ByteBufLineReader(channel); ++ assertEquals(status.getLen(), reader.available()); ++ ++ long totalRead = 0; ++ int i = 0; ++ AtomicInteger bytes = new AtomicInteger(); ++ for(;;){ ++ ByteBuf buf = reader.readLineBuf(bytes); ++ if(buf == null) break; ++ ++ totalRead += bytes.get(); ++ i++; ++ } ++ IOUtils.cleanup(null, reader, channel, fs); ++ assertEquals(tupleNum, i); ++ assertEquals(status.getLen(), totalRead); ++ assertEquals(status.getLen(), reader.readBytes()); ++ } ++ ++ @Test ++ public void testLineDelimitedReader() throws IOException { ++ TajoConf conf = new TajoConf(); ++ Path testDir = CommonTestingUtil.getTestDir(TEST_PATH); ++ FileSystem fs = testDir.getFileSystem(conf); ++ ++ Schema schema = new Schema(); ++ schema.addColumn("id", Type.INT4); ++ schema.addColumn("age", Type.INT8); ++ schema.addColumn("comment", Type.TEXT); ++ schema.addColumn("comment2", Type.TEXT); ++ ++ TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE); ++ meta.putOption("compression.codec", DeflateCodec.class.getCanonicalName()); ++ ++ Path tablePath = new Path(testDir, "line1." + DeflateCodec.class.getSimpleName()); ++ FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender( ++ null, null, meta, schema, tablePath); ++ appender.enableStats(); ++ appender.init(); ++ int tupleNum = 10000; ++ VTuple vTuple; ++ ++ long splitOffset = 0; ++ for (int i = 0; i < tupleNum; i++) { ++ vTuple = new VTuple(4); ++ vTuple.put(0, DatumFactory.createInt4(i + 1)); ++ vTuple.put(1, DatumFactory.createInt8(25l)); ++ vTuple.put(2, DatumFactory.createText("emiya muljomdao")); ++ vTuple.put(3, NullDatum.get()); ++ appender.addTuple(vTuple); ++ ++ if(i == (tupleNum / 2)){ ++ splitOffset = appender.getOffset(); ++ } ++ } ++ String extension = ((DelimitedTextFile.DelimitedTextFileAppender) appender).getExtension(); ++ appender.close(); ++ ++ tablePath = tablePath.suffix(extension); ++ FileFragment fragment = new FileFragment("table", tablePath, 0, splitOffset); ++ DelimitedLineReader reader = new DelimitedLineReader(conf, fragment); // if file is compressed, will read to EOF ++ assertTrue(reader.isCompressed()); ++ assertFalse(reader.isReadable()); ++ reader.init(); ++ assertTrue(reader.isReadable()); ++ ++ ++ int i = 0; ++ while(reader.isReadable()){ ++ ByteBuf buf = reader.readLine(); ++ if(buf == null) break; ++ i++; ++ } ++ ++ IOUtils.cleanup(null, reader, fs); ++ assertEquals(tupleNum, i); ++ ++ } ++ ++ @Test ++ public void testByteBufLineReaderWithoutTerminating() throws IOException { ++ String path = FileUtil.getResourcePath("dataset/testLineText.txt").getFile(); ++ File file = new File(path); ++ String data = FileUtil.readTextFile(file); ++ ++ ByteBufInputChannel channel = new ByteBufInputChannel(new FileInputStream(file)); ++ ++ assertEquals(file.length(), channel.available()); ++ ByteBufLineReader reader = new ByteBufLineReader(channel); ++ assertEquals(file.length(), reader.available()); ++ ++ long totalRead = 0; ++ int i = 0; ++ AtomicInteger bytes = new AtomicInteger(); ++ for(;;){ ++ ByteBuf buf = reader.readLineBuf(bytes); ++ if(buf == null) break; ++ totalRead += bytes.get(); ++ i++; ++ } ++ IOUtils.cleanup(null, reader); ++ assertEquals(file.length(), totalRead); ++ assertEquals(file.length(), reader.readBytes()); ++ assertEquals(data.split("\n").length, i); ++ } ++} http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestJsonSerDe/testVariousType.json ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestJsonSerDe/testVariousType.json index 0000000,0000000..8ee3408 new file mode 100644 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestJsonSerDe/testVariousType.json @@@ -1,0 -1,0 +1,1 @@@ ++{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"} http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testLineText.txt ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testLineText.txt index 0000000,0000000..7403c26 new file mode 100644 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testLineText.txt @@@ -1,0 -1,0 +1,2 @@@ ++1|25|emiya muljomdao ++2|25|emiya muljomdao http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testVariousTypes.avsc ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testVariousTypes.avsc index 0000000,0000000..d4250a9 new file mode 100644 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testVariousTypes.avsc @@@ -1,0 -1,0 +1,20 @@@ ++{ ++ "type": "record", ++ "namespace": "org.apache.tajo", ++ "name": "testVariousTypes", ++ "fields": [ ++ { "name": "col1", "type": "boolean" }, ++ { "name": "col2", "type": "string" }, ++ { "name": "col3", "type": "int" }, ++ { "name": "col4", "type": "int" }, ++ { "name": "col5", "type": "long" }, ++ { "name": "col6", "type": "float" }, ++ { "name": "col7", "type": "double" }, ++ { "name": "col8", "type": "string" }, ++ { "name": "col9", "type": "bytes" }, ++ { "name": "col10", "type": "bytes" }, ++ { "name": "col11", "type": "null" }, ++ { "name": "col12", "type": "bytes" } ++ ] ++} ++ http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml index 6190d1a,0000000..737284b mode 100644,000000..100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml @@@ -1,164 -1,0 +1,178 @@@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<configuration> + <property> + <name>fs.s3.impl</name> + <value>org.apache.tajo.storage.s3.SmallBlockS3FileSystem</value> + </property> + + <!-- Storage Manager Configuration --> + <property> + <name>tajo.storage.manager.hdfs.class</name> + <value>org.apache.tajo.storage.FileStorageManager</value> + </property> + <property> + <name>tajo.storage.manager.hbase.class</name> + <value>org.apache.tajo.storage.hbase.HBaseStorageManager</value> + </property> + + <!--- Registered Scanner Handler --> + <property> + <name>tajo.storage.scanner-handler</name> - <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro</value> ++ <value>textfile,csv,json,raw,rcfile,row,parquet,sequencefile,avro</value> + </property> + + <!--- Fragment Class Configurations --> + <property> + <name>tajo.storage.fragment.textfile.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.csv.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> ++ <name>tajo.storage.fragment.json.class</name> ++ <value>org.apache.tajo.storage.fragment.FileFragment</value> ++ </property> ++ <property> + <name>tajo.storage.fragment.raw.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.rcfile.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.row.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.parquet.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.sequencefile.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.avro.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + + <!--- Scanner Handler --> + <property> + <name>tajo.storage.scanner-handler.textfile.class</name> + <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.csv.class</name> + <value>org.apache.tajo.storage.CSVFile$CSVScanner</value> + </property> + + <property> ++ <name>tajo.storage.scanner-handler.json.class</name> ++ <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value> ++ </property> ++ ++ <property> + <name>tajo.storage.scanner-handler.raw.class</name> + <value>org.apache.tajo.storage.RawFile$RawFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.rcfile.class</name> + <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.rowfile.class</name> + <value>org.apache.tajo.storage.RowFile$RowFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.parquet.class</name> + <value>org.apache.tajo.storage.parquet.ParquetScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.sequencefile.class</name> + <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.avro.class</name> + <value>org.apache.tajo.storage.avro.AvroScanner</value> + </property> + + <!--- Appender Handler --> + <property> + <name>tajo.storage.appender-handler</name> + <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro</value> + </property> + + <property> + <name>tajo.storage.appender-handler.textfile.class</name> + <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.csv.class</name> + <value>org.apache.tajo.storage.CSVFile$CSVAppender</value> + </property> + + <property> ++ <name>tajo.storage.appender-handler.json.class</name> ++ <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value> ++ </property> ++ ++ <property> + <name>tajo.storage.appender-handler.raw.class</name> + <value>org.apache.tajo.storage.RawFile$RawFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.rcfile.class</name> + <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.rowfile.class</name> + <value>org.apache.tajo.storage.RowFile$RowFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.parquet.class</name> + <value>org.apache.tajo.storage.parquet.ParquetAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.sequencefile.class</name> + <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.avro.class</name> + <value>org.apache.tajo.storage.avro.AvroAppender</value> + </property> +</configuration>
