Repository: tajo Updated Branches: refs/heads/hbase_storage 87c957e43 -> dfd7f996d
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java new file mode 100644 index 0000000..517e00e --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.parquet; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.junit.Test; +import parquet.schema.MessageType; +import parquet.schema.MessageTypeParser; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link TajoSchemaConverter}. + */ +public class TestSchemaConverter { + private static final String ALL_PARQUET_SCHEMA = + "message table_schema {\n" + + " optional boolean myboolean;\n" + + " optional int32 myint;\n" + + " optional int64 mylong;\n" + + " optional float myfloat;\n" + + " optional double mydouble;\n" + + " optional binary mybytes;\n" + + " optional binary mystring (UTF8);\n" + + " optional fixed_len_byte_array(1) myfixed;\n" + + "}\n"; + + private static final String CONVERTED_ALL_PARQUET_SCHEMA = + "message table_schema {\n" + + " optional boolean myboolean;\n" + + " optional int32 mybit;\n" + + " optional binary mychar (UTF8);\n" + + " optional int32 myint2;\n" + + " optional int32 myint4;\n" + + " optional int64 myint8;\n" + + " optional float myfloat4;\n" + + " optional double myfloat8;\n" + + " optional binary mytext (UTF8);\n" + + " optional binary myblob;\n" + + // NULL_TYPE fields are not encoded. + " optional binary myinet4;\n" + + " optional binary myprotobuf;\n" + + "}\n"; + + private Schema createAllTypesSchema() { + List<Column> columns = new ArrayList<Column>(); + columns.add(new Column("myboolean", Type.BOOLEAN)); + columns.add(new Column("mybit", Type.BIT)); + columns.add(new Column("mychar", Type.CHAR)); + columns.add(new Column("myint2", Type.INT2)); + columns.add(new Column("myint4", Type.INT4)); + columns.add(new Column("myint8", Type.INT8)); + columns.add(new Column("myfloat4", Type.FLOAT4)); + columns.add(new Column("myfloat8", Type.FLOAT8)); + columns.add(new Column("mytext", Type.TEXT)); + columns.add(new Column("myblob", Type.BLOB)); + columns.add(new Column("mynull", Type.NULL_TYPE)); + columns.add(new Column("myinet4", Type.INET4)); + columns.add(new Column("myprotobuf", Type.PROTOBUF)); + Column[] columnsArray = new Column[columns.size()]; + columnsArray = columns.toArray(columnsArray); + return new Schema(columnsArray); + } + + private Schema createAllTypesConvertedSchema() { + List<Column> columns = new ArrayList<Column>(); + columns.add(new Column("myboolean", Type.BOOLEAN)); + columns.add(new Column("myint", Type.INT4)); + columns.add(new Column("mylong", Type.INT8)); + columns.add(new Column("myfloat", Type.FLOAT4)); + columns.add(new Column("mydouble", Type.FLOAT8)); + columns.add(new Column("mybytes", Type.BLOB)); + columns.add(new Column("mystring", Type.TEXT)); + columns.add(new Column("myfixed", Type.BLOB)); + Column[] columnsArray = new Column[columns.size()]; + columnsArray = columns.toArray(columnsArray); + return new Schema(columnsArray); + } + + private void testTajoToParquetConversion( + Schema tajoSchema, String schemaString) throws Exception { + TajoSchemaConverter converter = new TajoSchemaConverter(); + MessageType schema = converter.convert(tajoSchema); + MessageType expected = MessageTypeParser.parseMessageType(schemaString); + assertEquals("converting " + schema + " to " + schemaString, + expected.toString(), schema.toString()); + } + + private void testParquetToTajoConversion( + Schema tajoSchema, String schemaString) throws Exception { + TajoSchemaConverter converter = new TajoSchemaConverter(); + Schema schema = converter.convert( + MessageTypeParser.parseMessageType(schemaString)); + assertEquals("converting " + schemaString + " to " + tajoSchema, + tajoSchema.toString(), schema.toString()); + } + + @Test + public void testAllTypesToParquet() throws Exception { + Schema schema = createAllTypesSchema(); + testTajoToParquetConversion(schema, CONVERTED_ALL_PARQUET_SCHEMA); + } + + @Test + public void testAllTypesToTajo() throws Exception { + Schema schema = createAllTypesConvertedSchema(); + testParquetToTajoConversion(schema, ALL_PARQUET_SCHEMA); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/INode.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/INode.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/INode.java new file mode 100644 index 0000000..7b09937 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/INode.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.s3; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.s3.Block; +import org.apache.hadoop.io.IOUtils; + +import java.io.*; + +/** + * Holds file metadata including type (regular file, or directory), + * and the list of blocks that are pointers to the data. + */ [email protected] [email protected] +public class INode { + + enum FileType { + DIRECTORY, FILE + } + + public static final FileType[] FILE_TYPES = { + FileType.DIRECTORY, + FileType.FILE + }; + + public static final INode DIRECTORY_INODE = new INode(FileType.DIRECTORY, null); + + private FileType fileType; + private Block[] blocks; + + public INode(FileType fileType, Block[] blocks) { + this.fileType = fileType; + if (isDirectory() && blocks != null) { + throw new IllegalArgumentException("A directory cannot contain blocks."); + } + this.blocks = blocks; + } + + public Block[] getBlocks() { + return blocks; + } + + public FileType getFileType() { + return fileType; + } + + public boolean isDirectory() { + return fileType == FileType.DIRECTORY; + } + + public boolean isFile() { + return fileType == FileType.FILE; + } + + public long getSerializedLength() { + return 1L + (blocks == null ? 0 : 4 + blocks.length * 16); + } + + + public InputStream serialize() throws IOException { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(bytes); + try { + out.writeByte(fileType.ordinal()); + if (isFile()) { + out.writeInt(blocks.length); + for (int i = 0; i < blocks.length; i++) { + out.writeLong(blocks[i].getId()); + out.writeLong(blocks[i].getLength()); + } + } + out.close(); + out = null; + } finally { + IOUtils.closeStream(out); + } + return new ByteArrayInputStream(bytes.toByteArray()); + } + + public static INode deserialize(InputStream in) throws IOException { + if (in == null) { + return null; + } + DataInputStream dataIn = new DataInputStream(in); + FileType fileType = INode.FILE_TYPES[dataIn.readByte()]; + switch (fileType) { + case DIRECTORY: + in.close(); + return INode.DIRECTORY_INODE; + case FILE: + int numBlocks = dataIn.readInt(); + Block[] blocks = new Block[numBlocks]; + for (int i = 0; i < numBlocks; i++) { + long id = dataIn.readLong(); + long length = dataIn.readLong(); + blocks[i] = new Block(id, length); + } + in.close(); + return new INode(fileType, blocks); + default: + throw new IllegalArgumentException("Cannot deserialize inode."); + } + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java new file mode 100644 index 0000000..f3ac7b6 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.s3; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3.Block; +import org.apache.hadoop.fs.s3.INode; +import org.apache.hadoop.fs.s3.FileSystemStore; +import org.apache.hadoop.fs.s3.S3FileSystem; +import org.apache.tajo.common.exception.NotImplementedException; + +import java.io.*; +import java.net.URI; +import java.util.*; + +/** + * A stub implementation of {@link FileSystemStore} for testing + * {@link S3FileSystem} without actually connecting to S3. + */ +public class InMemoryFileSystemStore implements FileSystemStore { + + private Configuration conf; + private SortedMap<Path, INode> inodes = new TreeMap<Path, INode>(); + private Map<Long, byte[]> blocks = new HashMap<Long, byte[]>(); + + @Override + public void initialize(URI uri, Configuration conf) { + this.conf = conf; + } + + @Override + public String getVersion() throws IOException { + return "0"; + } + + @Override + public void deleteINode(Path path) throws IOException { + inodes.remove(normalize(path)); + } + + @Override + public void deleteBlock(Block block) throws IOException { + blocks.remove(block.getId()); + } + + @Override + public boolean inodeExists(Path path) throws IOException { + return inodes.containsKey(normalize(path)); + } + + @Override + public boolean blockExists(long blockId) throws IOException { + return blocks.containsKey(blockId); + } + + @Override + public INode retrieveINode(Path path) throws IOException { + return inodes.get(normalize(path)); + } + + @Override + public File retrieveBlock(Block block, long byteRangeStart) throws IOException { + byte[] data = blocks.get(block.getId()); + File file = createTempFile(); + BufferedOutputStream out = null; + try { + out = new BufferedOutputStream(new FileOutputStream(file)); + out.write(data, (int) byteRangeStart, data.length - (int) byteRangeStart); + } finally { + if (out != null) { + out.close(); + } + } + return file; + } + + private File createTempFile() throws IOException { + File dir = new File(conf.get("fs.s3.buffer.dir")); + if (!dir.exists() && !dir.mkdirs()) { + throw new IOException("Cannot create S3 buffer directory: " + dir); + } + File result = File.createTempFile("test-", ".tmp", dir); + result.deleteOnExit(); + return result; + } + + @Override + public Set<Path> listSubPaths(Path path) throws IOException { + Path normalizedPath = normalize(path); + // This is inefficient but more than adequate for testing purposes. + Set<Path> subPaths = new LinkedHashSet<Path>(); + for (Path p : inodes.tailMap(normalizedPath).keySet()) { + if (normalizedPath.equals(p.getParent())) { + subPaths.add(p); + } + } + return subPaths; + } + + @Override + public Set<Path> listDeepSubPaths(Path path) throws IOException { + Path normalizedPath = normalize(path); + String pathString = normalizedPath.toUri().getPath(); + if (!pathString.endsWith("/")) { + pathString += "/"; + } + // This is inefficient but more than adequate for testing purposes. + Set<Path> subPaths = new LinkedHashSet<Path>(); + for (Path p : inodes.tailMap(normalizedPath).keySet()) { + if (p.toUri().getPath().startsWith(pathString)) { + subPaths.add(p); + } + } + return subPaths; + } + + @Override + public void storeINode(Path path, INode inode) throws IOException { + inodes.put(normalize(path), inode); + } + + @Override + public void storeBlock(Block block, File file) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + byte[] buf = new byte[8192]; + int numRead; + BufferedInputStream in = null; + try { + in = new BufferedInputStream(new FileInputStream(file)); + while ((numRead = in.read(buf)) >= 0) { + out.write(buf, 0, numRead); + } + } finally { + if (in != null) { + in.close(); + } + } + blocks.put(block.getId(), out.toByteArray()); + } + + private Path normalize(Path path) { + if (!path.isAbsolute()) { + throw new IllegalArgumentException("Path must be absolute: " + path); + } + return new Path(path.toUri().getPath()); + } + + @Override + public void purge() throws IOException { + inodes.clear(); + blocks.clear(); + } + + @Override + public void dump() throws IOException { + throw new NotImplementedException(); + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java new file mode 100644 index 0000000..d4034b9 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.s3; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3.Block; +import org.apache.hadoop.fs.s3.FileSystemStore; +import org.apache.hadoop.fs.s3.INode; +import org.apache.hadoop.util.Progressable; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + [email protected] [email protected] +class S3OutputStream extends OutputStream { + + private Configuration conf; + + private int bufferSize; + + private FileSystemStore store; + + private Path path; + + private long blockSize; + + private File backupFile; + + private OutputStream backupStream; + + private Random r = new Random(); + + private boolean closed; + + private int pos = 0; + + private long filePos = 0; + + private int bytesWrittenToBlock = 0; + + private byte[] outBuf; + + private List<Block> blocks = new ArrayList<Block>(); + + private Block nextBlock; + + private static final Log LOG = + LogFactory.getLog(S3OutputStream.class.getName()); + + + public S3OutputStream(Configuration conf, FileSystemStore store, + Path path, long blockSize, Progressable progress, + int buffersize) throws IOException { + + this.conf = conf; + this.store = store; + this.path = path; + this.blockSize = blockSize; + this.backupFile = newBackupFile(); + this.backupStream = new FileOutputStream(backupFile); + this.bufferSize = buffersize; + this.outBuf = new byte[bufferSize]; + + } + + private File newBackupFile() throws IOException { + File dir = new File(conf.get("fs.s3.buffer.dir")); + if (!dir.exists() && !dir.mkdirs()) { + throw new IOException("Cannot create S3 buffer directory: " + dir); + } + File result = File.createTempFile("output-", ".tmp", dir); + result.deleteOnExit(); + return result; + } + + public long getPos() throws IOException { + return filePos; + } + + @Override + public synchronized void write(int b) throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + + if ((bytesWrittenToBlock + pos == blockSize) || (pos >= bufferSize)) { + flush(); + } + outBuf[pos++] = (byte) b; + filePos++; + } + + @Override + public synchronized void write(byte b[], int off, int len) throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + while (len > 0) { + int remaining = bufferSize - pos; + int toWrite = Math.min(remaining, len); + System.arraycopy(b, off, outBuf, pos, toWrite); + pos += toWrite; + off += toWrite; + len -= toWrite; + filePos += toWrite; + + if ((bytesWrittenToBlock + pos >= blockSize) || (pos == bufferSize)) { + flush(); + } + } + } + + @Override + public synchronized void flush() throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + + if (bytesWrittenToBlock + pos >= blockSize) { + flushData((int) blockSize - bytesWrittenToBlock); + } + if (bytesWrittenToBlock == blockSize) { + endBlock(); + } + flushData(pos); + } + + private synchronized void flushData(int maxPos) throws IOException { + int workingPos = Math.min(pos, maxPos); + + if (workingPos > 0) { + // + // To the local block backup, write just the bytes + // + backupStream.write(outBuf, 0, workingPos); + + // + // Track position + // + bytesWrittenToBlock += workingPos; + System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos); + pos -= workingPos; + } + } + + private synchronized void endBlock() throws IOException { + // + // Done with local copy + // + backupStream.close(); + + // + // Send it to S3 + // + // TODO: Use passed in Progressable to report progress. + nextBlockOutputStream(); + store.storeBlock(nextBlock, backupFile); + Block[] arr = new Block[blocks.size()]; + arr = blocks.toArray(arr); + store.storeINode(path, new INode(INode.FILE_TYPES[1], arr)); + + // + // Delete local backup, start new one + // + boolean b = backupFile.delete(); + if (!b) { + LOG.warn("Ignoring failed delete"); + } + backupFile = newBackupFile(); + backupStream = new FileOutputStream(backupFile); + bytesWrittenToBlock = 0; + } + + private synchronized void nextBlockOutputStream() throws IOException { + long blockId = r.nextLong(); + while (store.blockExists(blockId)) { + blockId = r.nextLong(); + } + nextBlock = new Block(blockId, bytesWrittenToBlock); + blocks.add(nextBlock); + bytesWrittenToBlock = 0; + } + + + @Override + public synchronized void close() throws IOException { + if (closed) { + return; + } + + flush(); + if (filePos == 0 || bytesWrittenToBlock != 0) { + endBlock(); + } + + backupStream.close(); + boolean b = backupFile.delete(); + if (!b) { + LOG.warn("Ignoring failed delete"); + } + + super.close(); + + closed = true; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java new file mode 100644 index 0000000..cb97a74 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java @@ -0,0 +1,314 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.s3; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.s3.Block; +import org.apache.hadoop.fs.s3.INode; +import org.apache.hadoop.fs.s3.FileSystemStore; +import org.apache.hadoop.fs.s3.S3FileSystem; +import org.apache.hadoop.util.Progressable; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +public class SmallBlockS3FileSystem extends S3FileSystem { + + private URI uri; + + private FileSystemStore store; + + private Path workingDir; + + static class Holder { + private static InMemoryFileSystemStore s; + + public synchronized static FileSystemStore get() { + if(s != null) { + return s; + } + s = new InMemoryFileSystemStore(); + return s; + } + + public synchronized static void set(InMemoryFileSystemStore inMemoryFileSystemStore) { + s = inMemoryFileSystemStore; + } + } + + public SmallBlockS3FileSystem() { + } + + + public SmallBlockS3FileSystem( + InMemoryFileSystemStore inMemoryFileSystemStore) { + Holder.set(inMemoryFileSystemStore); + this.store = inMemoryFileSystemStore; + } + + @Override + public URI getUri() { + return uri; + } + @Override + public long getDefaultBlockSize() { + return 10; + } + + @Override + public void initialize(URI uri, Configuration conf) throws IOException { + if (store == null) { + store = Holder.get(); + } + store.initialize(uri, conf); + setConf(conf); + this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); + this.workingDir = + new Path("/user", System.getProperty("user.name")).makeQualified(this); + } + @Override + public boolean isFile(Path path) throws IOException { + INode inode = store.retrieveINode(makeAbsolute(path)); + if (inode == null) { + return false; + } + return inode.isFile(); + } + + private INode checkFile(Path path) throws IOException { + INode inode = store.retrieveINode(makeAbsolute(path)); + if (inode == null) { + throw new IOException("No such file."); + } + if (inode.isDirectory()) { + throw new IOException("Path " + path + " is a directory."); + } + return inode; + } + + @Override + public FileStatus[] listStatus(Path f) throws IOException { + Path absolutePath = makeAbsolute(f); + INode inode = store.retrieveINode(absolutePath); + if (inode == null) { + throw new FileNotFoundException("File " + f + " does not exist."); + } + if (inode.isFile()) { + return new FileStatus[] { + new S3FileStatus(f.makeQualified(this), inode) + }; + } + ArrayList<FileStatus> ret = new ArrayList<FileStatus>(); + for (Path p : store.listSubPaths(absolutePath)) { + ret.add(getFileStatus(p.makeQualified(this))); + } + return ret.toArray(new FileStatus[0]); + } + @Override + public FSDataOutputStream create(Path file, FsPermission permission, + boolean overwrite, int bufferSize, + short replication, long blockSize, Progressable progress) + throws IOException { + + INode inode = store.retrieveINode(makeAbsolute(file)); + if (inode != null) { + if (overwrite) { + delete(file, true); + } else { + throw new IOException("File already exists: " + file); + } + } else { + Path parent = file.getParent(); + if (parent != null) { + if (!mkdirs(parent)) { + throw new IOException("Mkdirs failed to create " + parent.toString()); + } + } + } + return new FSDataOutputStream + (new S3OutputStream(getConf(), store, makeAbsolute(file), + blockSize, progress, bufferSize), + statistics); + } + @Override + public boolean mkdirs(Path path, FsPermission permission) throws IOException { + Path absolutePath = makeAbsolute(path); + List<Path> paths = new ArrayList<Path>(); + do { + paths.add(0, absolutePath); + absolutePath = absolutePath.getParent(); + } while (absolutePath != null); + + boolean result = true; + for (Path p : paths) { + result &= mkdir(p); + } + return result; + } + + @Override + public Path getWorkingDirectory() { + return workingDir; + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + Path absoluteSrc = makeAbsolute(src); + INode srcINode = store.retrieveINode(absoluteSrc); + if (srcINode == null) { + // src path doesn't exist + return false; + } + Path absoluteDst = makeAbsolute(dst); + INode dstINode = store.retrieveINode(absoluteDst); + if (dstINode != null && dstINode.isDirectory()) { + absoluteDst = new Path(absoluteDst, absoluteSrc.getName()); + dstINode = store.retrieveINode(absoluteDst); + } + if (dstINode != null) { + // dst path already exists - can't overwrite + return false; + } + Path dstParent = absoluteDst.getParent(); + if (dstParent != null) { + INode dstParentINode = store.retrieveINode(dstParent); + if (dstParentINode == null || dstParentINode.isFile()) { + // dst parent doesn't exist or is a file + return false; + } + } + return renameRecursive(absoluteSrc, absoluteDst); + } + + private boolean renameRecursive(Path src, Path dst) throws IOException { + INode srcINode = store.retrieveINode(src); + store.storeINode(dst, srcINode); + store.deleteINode(src); + if (srcINode.isDirectory()) { + for (Path oldSrc : store.listDeepSubPaths(src)) { + INode inode = store.retrieveINode(oldSrc); + if (inode == null) { + return false; + } + String oldSrcPath = oldSrc.toUri().getPath(); + String srcPath = src.toUri().getPath(); + String dstPath = dst.toUri().getPath(); + Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath)); + store.storeINode(newDst, inode); + store.deleteINode(oldSrc); + } + } + return true; + } + + @Override + public boolean delete(Path path, boolean recursive) throws IOException { + Path absolutePath = makeAbsolute(path); + INode inode = store.retrieveINode(absolutePath); + if (inode == null) { + return false; + } + if (inode.isFile()) { + store.deleteINode(absolutePath); + for (Block block: inode.getBlocks()) { + store.deleteBlock(block); + } + } else { + FileStatus[] contents = null; + try { + contents = listStatus(absolutePath); + } catch(FileNotFoundException fnfe) { + return false; + } + + if ((contents.length !=0) && (!recursive)) { + throw new IOException("Directory " + path.toString() + + " is not empty."); + } + for (FileStatus p:contents) { + if (!delete(p.getPath(), recursive)) { + return false; + } + } + store.deleteINode(absolutePath); + } + return true; + } + + /** + * FileStatus for S3 file systems. + */ + @Override + public FileStatus getFileStatus(Path f) throws IOException { + INode inode = store.retrieveINode(makeAbsolute(f)); + if (inode == null) { + throw new FileNotFoundException(f + ": No such file or directory."); + } + return new S3FileStatus(f.makeQualified(this), inode); + } + private boolean mkdir(Path path) throws IOException { + Path absolutePath = makeAbsolute(path); + INode inode = store.retrieveINode(absolutePath); + if (inode == null) { + store.storeINode(absolutePath, INode.DIRECTORY_INODE); + } else if (inode.isFile()) { + throw new IOException(String.format( + "Can't make directory for path %s since it is a file.", + absolutePath)); + } + return true; + } + private Path makeAbsolute(Path path) { + if (path.isAbsolute()) { + return path; + } + return new Path(workingDir, path); + } + + private static class S3FileStatus extends FileStatus { + + S3FileStatus(Path f, INode inode) throws IOException { + super(findLength(inode), inode.isDirectory(), 1, + findBlocksize(inode), 0, f); + } + + private static long findLength(INode inode) { + if (!inode.isDirectory()) { + long length = 0L; + for (Block block : inode.getBlocks()) { + length += block.getLength(); + } + return length; + } + return 0; + } + + private static long findBlocksize(INode inode) { + final Block[] ret = inode.getBlocks(); + return ret == null ? 0L : ret[0].getLength(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml new file mode 100644 index 0000000..6190d1a --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml @@ -0,0 +1,164 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<configuration> + <property> + <name>fs.s3.impl</name> + <value>org.apache.tajo.storage.s3.SmallBlockS3FileSystem</value> + </property> + + <!-- Storage Manager Configuration --> + <property> + <name>tajo.storage.manager.hdfs.class</name> + <value>org.apache.tajo.storage.FileStorageManager</value> + </property> + <property> + <name>tajo.storage.manager.hbase.class</name> + <value>org.apache.tajo.storage.hbase.HBaseStorageManager</value> + </property> + + <!--- Registered Scanner Handler --> + <property> + <name>tajo.storage.scanner-handler</name> + <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro</value> + </property> + + <!--- Fragment Class Configurations --> + <property> + <name>tajo.storage.fragment.textfile.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.csv.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.raw.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.rcfile.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.row.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.parquet.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.sequencefile.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.avro.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + + <!--- Scanner Handler --> + <property> + <name>tajo.storage.scanner-handler.textfile.class</name> + <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.csv.class</name> + <value>org.apache.tajo.storage.CSVFile$CSVScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.raw.class</name> + <value>org.apache.tajo.storage.RawFile$RawFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.rcfile.class</name> + <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.rowfile.class</name> + <value>org.apache.tajo.storage.RowFile$RowFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.parquet.class</name> + <value>org.apache.tajo.storage.parquet.ParquetScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.sequencefile.class</name> + <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.avro.class</name> + <value>org.apache.tajo.storage.avro.AvroScanner</value> + </property> + + <!--- Appender Handler --> + <property> + <name>tajo.storage.appender-handler</name> + <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro</value> + </property> + + <property> + <name>tajo.storage.appender-handler.textfile.class</name> + <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.csv.class</name> + <value>org.apache.tajo.storage.CSVFile$CSVAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.raw.class</name> + <value>org.apache.tajo.storage.RawFile$RawFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.rcfile.class</name> + <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.rowfile.class</name> + <value>org.apache.tajo.storage.RowFile$RowFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.parquet.class</name> + <value>org.apache.tajo.storage.parquet.ParquetAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.sequencefile.class</name> + <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.avro.class</name> + <value>org.apache.tajo.storage.avro.AvroAppender</value> + </property> +</configuration> http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/test/resources/testVariousTypes.avsc ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/testVariousTypes.avsc b/tajo-storage/tajo-storage-hdfs/src/test/resources/testVariousTypes.avsc new file mode 100644 index 0000000..611b97f --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/testVariousTypes.avsc @@ -0,0 +1,21 @@ +{ + "type": "record", + "namespace": "org.apache.tajo", + "name": "testVariousTypes", + "fields": [ + { "name": "col1", "type": "boolean" }, + { "name": "col2", "type": "int" }, + { "name": "col3", "type": "string" }, + { "name": "col4", "type": "int" }, + { "name": "col5", "type": "int" }, + { "name": "col6", "type": "long" }, + { "name": "col7", "type": "float" }, + { "name": "col8", "type": "double" }, + { "name": "col9", "type": "string" }, + { "name": "col10", "type": "bytes" }, + { "name": "col11", "type": "bytes" }, + { "name": "col12", "type": "null" }, + { "name": "col13", "type": "bytes" } + ] +} + http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-yarn-pullserver/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/pom.xml b/tajo-yarn-pullserver/pom.xml index a7644a1..3daec5c 100644 --- a/tajo-yarn-pullserver/pom.xml +++ b/tajo-yarn-pullserver/pom.xml @@ -57,7 +57,12 @@ </dependency> <dependency> <groupId>org.apache.tajo</groupId> - <artifactId>tajo-storage</artifactId> + <artifactId>tajo-storage-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-storage-hdfs</artifactId> <scope>provided</scope> </dependency> <dependency>
