http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java deleted file mode 100644 index 16c4faa..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.SchemaObject; -import org.apache.tajo.catalog.statistics.TableStats; - -import java.io.Closeable; -import java.io.IOException; - -/** - * Scanner Interface - */ -public interface Scanner extends SchemaObject, Closeable { - - void init() throws IOException; - - /** - * It returns one tuple at each call. - * - * @return retrieve null if the scanner has no more tuples. - * Otherwise it returns one tuple. - * - * @throws IOException if internal I/O error occurs during next method - */ - Tuple next() throws IOException; - - /** - * Reset the cursor. After executed, the scanner - * will retrieve the first tuple. - * - * @throws IOException if internal I/O error occurs during reset method - */ - void reset() throws IOException; - - /** - * Close scanner - * - * @throws IOException if internal I/O error occurs during close method - */ - void close() throws IOException; - - - /** - * It returns if the projection is executed in the underlying scanner layer. - * - * @return true if this scanner can project the given columns. - */ - boolean isProjectable(); - - /** - * Set target columns - * @param targets columns to be projected - */ - void setTarget(Column [] targets); - - /** - * It returns if the selection is executed in the underlying scanner layer. - * - * @return true if this scanner can filter tuples against a given condition. - */ - boolean isSelectable(); - - /** - * Set a search condition - * @param expr to be searched - * - * TODO - to be changed Object type - */ - void setSearchCondition(Object expr); - - /** - * It returns if the file is splittable. - * - * @return true if this scanner can split the a file. - */ - boolean isSplittable(); - - /** - * How much of the input has the Scanner consumed - * @return progress from <code>0.0</code> to <code>1.0</code>. - */ - float getProgress(); - - TableStats getInputStats(); -}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/SeekableScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/SeekableScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/SeekableScanner.java deleted file mode 100644 index 894e7ee..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/SeekableScanner.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import java.io.IOException; - -public interface SeekableScanner extends Scanner { - - public abstract long getNextOffset() throws IOException; - - public abstract void seek(long offset) throws IOException; -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java deleted file mode 100644 index 564a9f5..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.tajo.catalog.Column; -import org.apache.tajo.datum.Datum; - -import java.io.IOException; -import java.io.OutputStream; - -@Deprecated -public interface SerializerDeserializer { - - public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException; - - public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException; - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java deleted file mode 100644 index 3579674..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.hadoop.conf.Configuration; - -import java.io.IOException; -import java.io.InputStream; - -public class SplitLineReader extends LineReader { - public SplitLineReader(InputStream in, byte[] recordDelimiterBytes) { - super(in, recordDelimiterBytes); - } - - public SplitLineReader(InputStream in, Configuration conf, - byte[] recordDelimiterBytes) throws IOException { - super(in, conf, recordDelimiterBytes); - } - - public boolean needAdditionalRecordAfterSplit() { - return false; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/Storage.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Storage.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Storage.java deleted file mode 100644 index cc85c1d..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/Storage.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.storage.fragment.FileFragment; - -import java.io.IOException; - -public abstract class Storage { - protected final Configuration conf; - - public Storage(final Configuration conf) { - this.conf = conf; - } - - public Configuration getConf() { - return this.conf; - } - - public abstract Appender getAppender(TableMeta meta, Path path) - throws IOException; - - public abstract Scanner openScanner(Schema schema, FileFragment[] tablets) - throws IOException; -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java deleted file mode 100644 index 220eb6c..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java +++ /dev/null @@ -1,812 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.storage.fragment.FragmentConvertor; -import org.apache.tajo.util.Bytes; -import org.apache.tajo.util.FileUtil; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.net.URI; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; - -/** - * StorageManager - */ -public class StorageManager { - private final Log LOG = LogFactory.getLog(StorageManager.class); - - protected final TajoConf conf; - protected final FileSystem fs; - protected final Path tableBaseDir; - protected final boolean blocksMetadataEnabled; - private static final HdfsVolumeId zeroVolumeId = new HdfsVolumeId(Bytes.toBytes(0)); - - private static final Map<String, StorageManager> storageManagers = Maps.newHashMap(); - - /** - * Cache of scanner handlers for each storage type. - */ - protected static final Map<String, Class<? extends Scanner>> SCANNER_HANDLER_CACHE - = new ConcurrentHashMap<String, Class<? extends Scanner>>(); - - /** - * Cache of appender handlers for each storage type. - */ - protected static final Map<String, Class<? extends FileAppender>> APPENDER_HANDLER_CACHE - = new ConcurrentHashMap<String, Class<? extends FileAppender>>(); - - /** - * Cache of constructors for each class. Pins the classes so they - * can't be garbage collected until ReflectionUtils can be collected. - */ - private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = - new ConcurrentHashMap<Class<?>, Constructor<?>>(); - - private StorageManager(TajoConf conf) throws IOException { - this.conf = conf; - this.tableBaseDir = TajoConf.getWarehouseDir(conf); - this.fs = tableBaseDir.getFileSystem(conf); - this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, - DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT); - if (!this.blocksMetadataEnabled) - LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')"); - } - - public static StorageManager getStorageManager(TajoConf conf) throws IOException { - return getStorageManager(conf, null); - } - - public static synchronized StorageManager getStorageManager ( - TajoConf conf, Path warehouseDir) throws IOException { - - URI uri; - TajoConf localConf = new TajoConf(conf); - if (warehouseDir != null) { - localConf.setVar(ConfVars.WAREHOUSE_DIR, warehouseDir.toUri().toString()); - } - - uri = TajoConf.getWarehouseDir(localConf).toUri(); - - String key = "file".equals(uri.getScheme()) ? "file" : uri.toString(); - - if(storageManagers.containsKey(key)) { - StorageManager sm = storageManagers.get(key); - return sm; - } else { - StorageManager storageManager = new StorageManager(localConf); - storageManagers.put(key, storageManager); - return storageManager; - } - } - - public Scanner getFileScanner(TableMeta meta, Schema schema, Path path) - throws IOException { - FileSystem fs = path.getFileSystem(conf); - FileStatus status = fs.getFileStatus(path); - return getFileScanner(meta, schema, path, status); - } - - public Scanner getFileScanner(TableMeta meta, Schema schema, Path path, FileStatus status) - throws IOException { - FileFragment fragment = new FileFragment(path.getName(), path, 0, status.getLen()); - return getScanner(meta, schema, fragment); - } - - public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment) throws IOException { - return getScanner(meta, schema, FragmentConvertor.convert(conf, meta.getStoreType(), fragment), schema); - } - - public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException { - return getScanner(meta, schema, FragmentConvertor.convert(conf, meta.getStoreType(), fragment), target); - } - - public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException { - return getScanner(meta, schema, fragment, schema); - } - - public FileSystem getFileSystem() { - return this.fs; - } - - public Path getWarehouseDir() { - return this.tableBaseDir; - } - - public void delete(Path tablePath) throws IOException { - FileSystem fs = tablePath.getFileSystem(conf); - fs.delete(tablePath, true); - } - - public boolean exists(Path path) throws IOException { - FileSystem fileSystem = path.getFileSystem(conf); - return fileSystem.exists(path); - } - - /** - * This method deletes only data contained in the given path. - * - * @param path The path in which data are deleted. - * @throws IOException - */ - public void deleteData(Path path) throws IOException { - FileSystem fileSystem = path.getFileSystem(conf); - FileStatus[] fileLists = fileSystem.listStatus(path); - for (FileStatus status : fileLists) { - fileSystem.delete(status.getPath(), true); - } - } - - public Path getTablePath(String tableName) { - return new Path(tableBaseDir, tableName); - } - - public Appender getAppender(TableMeta meta, Schema schema, Path path) - throws IOException { - Appender appender; - - Class<? extends FileAppender> appenderClass; - - String handlerName = meta.getStoreType().name().toLowerCase(); - appenderClass = APPENDER_HANDLER_CACHE.get(handlerName); - if (appenderClass == null) { - appenderClass = conf.getClass( - String.format("tajo.storage.appender-handler.%s.class", - meta.getStoreType().name().toLowerCase()), null, - FileAppender.class); - APPENDER_HANDLER_CACHE.put(handlerName, appenderClass); - } - - if (appenderClass == null) { - throw new IOException("Unknown Storage Type: " + meta.getStoreType()); - } - - appender = newAppenderInstance(appenderClass, conf, meta, schema, path); - - return appender; - } - - public TableMeta getTableMeta(Path tablePath) throws IOException { - TableMeta meta; - - FileSystem fs = tablePath.getFileSystem(conf); - Path tableMetaPath = new Path(tablePath, ".meta"); - if (!fs.exists(tableMetaPath)) { - throw new FileNotFoundException(".meta file not found in " + tablePath.toString()); - } - - FSDataInputStream tableMetaIn = fs.open(tableMetaPath); - - CatalogProtos.TableProto tableProto = (CatalogProtos.TableProto) FileUtil.loadProto(tableMetaIn, - CatalogProtos.TableProto.getDefaultInstance()); - meta = new TableMeta(tableProto); - - return meta; - } - - public FileFragment[] split(String tableName) throws IOException { - Path tablePath = new Path(tableBaseDir, tableName); - return split(tableName, tablePath, fs.getDefaultBlockSize()); - } - - public FileFragment[] split(String tableName, long fragmentSize) throws IOException { - Path tablePath = new Path(tableBaseDir, tableName); - return split(tableName, tablePath, fragmentSize); - } - - public FileFragment[] splitBroadcastTable(Path tablePath) throws IOException { - FileSystem fs = tablePath.getFileSystem(conf); - List<FileFragment> listTablets = new ArrayList<FileFragment>(); - FileFragment tablet; - - FileStatus[] fileLists = fs.listStatus(tablePath); - for (FileStatus file : fileLists) { - tablet = new FileFragment(tablePath.getName(), file.getPath(), 0, file.getLen()); - listTablets.add(tablet); - } - - FileFragment[] tablets = new FileFragment[listTablets.size()]; - listTablets.toArray(tablets); - - return tablets; - } - - public FileFragment[] split(Path tablePath) throws IOException { - FileSystem fs = tablePath.getFileSystem(conf); - return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize()); - } - - public FileFragment[] split(String tableName, Path tablePath) throws IOException { - return split(tableName, tablePath, fs.getDefaultBlockSize()); - } - - private FileFragment[] split(String tableName, Path tablePath, long size) - throws IOException { - FileSystem fs = tablePath.getFileSystem(conf); - - long defaultBlockSize = size; - List<FileFragment> listTablets = new ArrayList<FileFragment>(); - FileFragment tablet; - - FileStatus[] fileLists = fs.listStatus(tablePath); - for (FileStatus file : fileLists) { - long remainFileSize = file.getLen(); - long start = 0; - if (remainFileSize > defaultBlockSize) { - while (remainFileSize > defaultBlockSize) { - tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize); - listTablets.add(tablet); - start += defaultBlockSize; - remainFileSize -= defaultBlockSize; - } - listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize)); - } else { - listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize)); - } - } - - FileFragment[] tablets = new FileFragment[listTablets.size()]; - listTablets.toArray(tablets); - - return tablets; - } - - public static FileFragment[] splitNG(Configuration conf, String tableName, TableMeta meta, - Path tablePath, long size) - throws IOException { - FileSystem fs = tablePath.getFileSystem(conf); - - long defaultBlockSize = size; - List<FileFragment> listTablets = new ArrayList<FileFragment>(); - FileFragment tablet; - - FileStatus[] fileLists = fs.listStatus(tablePath); - for (FileStatus file : fileLists) { - long remainFileSize = file.getLen(); - long start = 0; - if (remainFileSize > defaultBlockSize) { - while (remainFileSize > defaultBlockSize) { - tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize); - listTablets.add(tablet); - start += defaultBlockSize; - remainFileSize -= defaultBlockSize; - } - listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize)); - } else { - listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize)); - } - } - - FileFragment[] tablets = new FileFragment[listTablets.size()]; - listTablets.toArray(tablets); - - return tablets; - } - - public long calculateSize(Path tablePath) throws IOException { - FileSystem fs = tablePath.getFileSystem(conf); - long totalSize = 0; - - if (fs.exists(tablePath)) { - totalSize = fs.getContentSummary(tablePath).getLength(); - } - - return totalSize; - } - - ///////////////////////////////////////////////////////////////////////////// - // FileInputFormat Area - ///////////////////////////////////////////////////////////////////////////// - - public static final PathFilter hiddenFileFilter = new PathFilter() { - public boolean accept(Path p) { - String name = p.getName(); - return !name.startsWith("_") && !name.startsWith("."); - } - }; - - /** - * Proxy PathFilter that accepts a path only if all filters given in the - * constructor do. Used by the listPaths() to apply the built-in - * hiddenFileFilter together with a user provided one (if any). - */ - private static class MultiPathFilter implements PathFilter { - private List<PathFilter> filters; - - public MultiPathFilter(List<PathFilter> filters) { - this.filters = filters; - } - - public boolean accept(Path path) { - for (PathFilter filter : filters) { - if (!filter.accept(path)) { - return false; - } - } - return true; - } - } - - /** - * List input directories. - * Subclasses may override to, e.g., select only files matching a regular - * expression. - * - * @return array of FileStatus objects - * @throws IOException if zero items. - */ - protected List<FileStatus> listStatus(Path... dirs) throws IOException { - List<FileStatus> result = new ArrayList<FileStatus>(); - if (dirs.length == 0) { - throw new IOException("No input paths specified in job"); - } - - List<IOException> errors = new ArrayList<IOException>(); - - // creates a MultiPathFilter with the hiddenFileFilter and the - // user provided one (if any). - List<PathFilter> filters = new ArrayList<PathFilter>(); - filters.add(hiddenFileFilter); - - PathFilter inputFilter = new MultiPathFilter(filters); - - for (int i = 0; i < dirs.length; ++i) { - Path p = dirs[i]; - - FileSystem fs = p.getFileSystem(conf); - FileStatus[] matches = fs.globStatus(p, inputFilter); - if (matches == null) { - errors.add(new IOException("Input path does not exist: " + p)); - } else if (matches.length == 0) { - errors.add(new IOException("Input Pattern " + p + " matches 0 files")); - } else { - for (FileStatus globStat : matches) { - if (globStat.isDirectory()) { - for (FileStatus stat : fs.listStatus(globStat.getPath(), - inputFilter)) { - result.add(stat); - } - } else { - result.add(globStat); - } - } - } - } - - if (!errors.isEmpty()) { - throw new InvalidInputException(errors); - } - LOG.info("Total input paths to process : " + result.size()); - return result; - } - - /** - * Is the given filename splitable? Usually, true, but if the file is - * stream compressed, it will not be. - * <p/> - * <code>FileInputFormat</code> implementations can override this and return - * <code>false</code> to ensure that individual input files are never split-up - * so that Mappers process entire files. - * - * - * @param path the file name to check - * @param status get the file length - * @return is this file isSplittable? - */ - protected boolean isSplittable(TableMeta meta, Schema schema, Path path, FileStatus status) throws IOException { - Scanner scanner = getFileScanner(meta, schema, path, status); - boolean split = scanner.isSplittable(); - scanner.close(); - return split; - } - - private static final double SPLIT_SLOP = 1.1; // 10% slop - - protected int getBlockIndex(BlockLocation[] blkLocations, - long offset) { - for (int i = 0; i < blkLocations.length; i++) { - // is the offset inside this block? - if ((blkLocations[i].getOffset() <= offset) && - (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) { - return i; - } - } - BlockLocation last = blkLocations[blkLocations.length - 1]; - long fileLength = last.getOffset() + last.getLength() - 1; - throw new IllegalArgumentException("Offset " + offset + - " is outside of file (0.." + - fileLength + ")"); - } - - /** - * A factory that makes the split for this class. It can be overridden - * by sub-classes to make sub-types - */ - protected FileFragment makeSplit(String fragmentId, Path file, long start, long length) { - return new FileFragment(fragmentId, file, start, length); - } - - protected FileFragment makeSplit(String fragmentId, Path file, long start, long length, - String[] hosts) { - return new FileFragment(fragmentId, file, start, length, hosts); - } - - protected FileFragment makeSplit(String fragmentId, Path file, BlockLocation blockLocation) - throws IOException { - return new FileFragment(fragmentId, file, blockLocation); - } - - // for Non Splittable. eg, compressed gzip TextFile - protected FileFragment makeNonSplit(String fragmentId, Path file, long start, long length, - BlockLocation[] blkLocations) throws IOException { - - Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>(); - for (BlockLocation blockLocation : blkLocations) { - for (String host : blockLocation.getHosts()) { - if (hostsBlockMap.containsKey(host)) { - hostsBlockMap.put(host, hostsBlockMap.get(host) + 1); - } else { - hostsBlockMap.put(host, 1); - } - } - } - - List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet()); - Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() { - - @Override - public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) { - return v1.getValue().compareTo(v2.getValue()); - } - }); - - String[] hosts = new String[blkLocations[0].getHosts().length]; - - for (int i = 0; i < hosts.length; i++) { - Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i); - hosts[i] = entry.getKey(); - } - return new FileFragment(fragmentId, file, start, length, hosts); - } - - /** - * Get the minimum split size - * - * @return the minimum number of bytes that can be in a split - */ - public long getMinSplitSize() { - return conf.getLongVar(TajoConf.ConfVars.MINIMUM_SPLIT_SIZE); - } - - /** - * Get Disk Ids by Volume Bytes - */ - private int[] getDiskIds(VolumeId[] volumeIds) { - int[] diskIds = new int[volumeIds.length]; - for (int i = 0; i < volumeIds.length; i++) { - int diskId = -1; - if (volumeIds[i] != null && volumeIds[i].hashCode() > 0) { - diskId = volumeIds[i].hashCode() - zeroVolumeId.hashCode(); - } - diskIds[i] = diskId; - } - return diskIds; - } - - /** - * Generate the map of host and make them into Volume Ids. - * - */ - private Map<String, Set<Integer>> getVolumeMap(List<FileFragment> frags) { - Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>(); - for (FileFragment frag : frags) { - String[] hosts = frag.getHosts(); - int[] diskIds = frag.getDiskIds(); - for (int i = 0; i < hosts.length; i++) { - Set<Integer> volumeList = volumeMap.get(hosts[i]); - if (volumeList == null) { - volumeList = new HashSet<Integer>(); - volumeMap.put(hosts[i], volumeList); - } - - if (diskIds.length > 0 && diskIds[i] > -1) { - volumeList.add(diskIds[i]); - } - } - } - - return volumeMap; - } - /** - * Generate the list of files and make them into FileSplits. - * - * @throws IOException - */ - public List<FileFragment> getSplits(String tableName, TableMeta meta, Schema schema, Path... inputs) - throws IOException { - // generate splits' - - List<FileFragment> splits = Lists.newArrayList(); - List<FileFragment> volumeSplits = Lists.newArrayList(); - List<BlockLocation> blockLocations = Lists.newArrayList(); - - for (Path p : inputs) { - FileSystem fs = p.getFileSystem(conf); - - ArrayList<FileStatus> files = Lists.newArrayList(); - if (fs.isFile(p)) { - files.addAll(Lists.newArrayList(fs.getFileStatus(p))); - } else { - files.addAll(listStatus(p)); - } - - int previousSplitSize = splits.size(); - for (FileStatus file : files) { - Path path = file.getPath(); - long length = file.getLen(); - if (length > 0) { - // Get locations of blocks of file - BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); - boolean splittable = isSplittable(meta, schema, path, file); - if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) { - - if (splittable) { - for (BlockLocation blockLocation : blkLocations) { - volumeSplits.add(makeSplit(tableName, path, blockLocation)); - } - blockLocations.addAll(Arrays.asList(blkLocations)); - - } else { // Non splittable - long blockSize = blkLocations[0].getLength(); - if (blockSize >= length) { - blockLocations.addAll(Arrays.asList(blkLocations)); - for (BlockLocation blockLocation : blkLocations) { - volumeSplits.add(makeSplit(tableName, path, blockLocation)); - } - } else { - splits.add(makeNonSplit(tableName, path, 0, length, blkLocations)); - } - } - - } else { - if (splittable) { - - long minSize = Math.max(getMinSplitSize(), 1); - - long blockSize = file.getBlockSize(); // s3n rest api contained block size but blockLocations is one - long splitSize = Math.max(minSize, blockSize); - long bytesRemaining = length; - - // for s3 - while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { - int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); - splits.add(makeSplit(tableName, path, length - bytesRemaining, splitSize, - blkLocations[blkIndex].getHosts())); - bytesRemaining -= splitSize; - } - if (bytesRemaining > 0) { - int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); - splits.add(makeSplit(tableName, path, length - bytesRemaining, bytesRemaining, - blkLocations[blkIndex].getHosts())); - } - } else { // Non splittable - splits.add(makeNonSplit(tableName, path, 0, length, blkLocations)); - } - } - } else { - //for zero length files - splits.add(makeSplit(tableName, path, 0, length)); - } - } - if(LOG.isDebugEnabled()){ - LOG.debug("# of splits per partition: " + (splits.size() - previousSplitSize)); - } - } - - // Combine original fileFragments with new VolumeId information - setVolumeMeta(volumeSplits, blockLocations); - splits.addAll(volumeSplits); - LOG.info("Total # of splits: " + splits.size()); - return splits; - } - - private void setVolumeMeta(List<FileFragment> splits, final List<BlockLocation> blockLocations) - throws IOException { - - int locationSize = blockLocations.size(); - int splitSize = splits.size(); - if (locationSize == 0 || splitSize == 0) return; - - if (locationSize != splitSize) { - // splits and locations don't match up - LOG.warn("Number of block locations not equal to number of splits: " - + "#locations=" + locationSize - + " #splits=" + splitSize); - return; - } - - DistributedFileSystem fs = (DistributedFileSystem)DistributedFileSystem.get(conf); - int lsLimit = conf.getInt(DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT); - int blockLocationIdx = 0; - - Iterator<FileFragment> iter = splits.iterator(); - while (locationSize > blockLocationIdx) { - - int subSize = Math.min(locationSize - blockLocationIdx, lsLimit); - List<BlockLocation> locations = blockLocations.subList(blockLocationIdx, blockLocationIdx + subSize); - //BlockStorageLocation containing additional volume location information for each replica of each block. - BlockStorageLocation[] blockStorageLocations = fs.getFileBlockStorageLocations(locations); - - for (BlockStorageLocation blockStorageLocation : blockStorageLocations) { - iter.next().setDiskIds(getDiskIds(blockStorageLocation.getVolumeIds())); - blockLocationIdx++; - } - } - LOG.info("# of splits with volumeId " + splitSize); - } - - private static class InvalidInputException extends IOException { - List<IOException> errors; - public InvalidInputException(List<IOException> errors) { - this.errors = errors; - } - - @Override - public String getMessage(){ - StringBuffer sb = new StringBuffer(); - int messageLimit = Math.min(errors.size(), 10); - for (int i = 0; i < messageLimit ; i ++) { - sb.append(errors.get(i).getMessage()).append("\n"); - } - - if(messageLimit < errors.size()) - sb.append("skipped .....").append("\n"); - - return sb.toString(); - } - } - - private static final Class<?>[] DEFAULT_SCANNER_PARAMS = { - Configuration.class, - Schema.class, - TableMeta.class, - FileFragment.class - }; - - private static final Class<?>[] DEFAULT_APPENDER_PARAMS = { - Configuration.class, - Schema.class, - TableMeta.class, - Path.class - }; - - /** - * create a scanner instance. - */ - public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, Schema schema, TableMeta meta, - Fragment fragment) { - T result; - try { - Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass); - if (meth == null) { - meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS); - meth.setAccessible(true); - CONSTRUCTOR_CACHE.put(theClass, meth); - } - result = meth.newInstance(new Object[]{conf, schema, meta, fragment}); - } catch (Exception e) { - throw new RuntimeException(e); - } - - return result; - } - - /** - * create a scanner instance. - */ - public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TableMeta meta, Schema schema, - Path path) { - T result; - try { - Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass); - if (meth == null) { - meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS); - meth.setAccessible(true); - CONSTRUCTOR_CACHE.put(theClass, meth); - } - result = meth.newInstance(new Object[]{conf, schema, meta, path}); - } catch (Exception e) { - throw new RuntimeException(e); - } - - return result; - } - - public Class<? extends Scanner> getScannerClass(CatalogProtos.StoreType storeType) throws IOException { - String handlerName = storeType.name().toLowerCase(); - Class<? extends Scanner> scannerClass = SCANNER_HANDLER_CACHE.get(handlerName); - if (scannerClass == null) { - scannerClass = conf.getClass( - String.format("tajo.storage.scanner-handler.%s.class",storeType.name().toLowerCase()), null, Scanner.class); - SCANNER_HANDLER_CACHE.put(handlerName, scannerClass); - } - - if (scannerClass == null) { - throw new IOException("Unknown Storage Type: " + storeType.name()); - } - - return scannerClass; - } - - public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException { - if (fragment instanceof FileFragment) { - FileFragment fileFragment = (FileFragment)fragment; - if (fileFragment.getEndKey() == 0) { - Scanner scanner = new NullScanner(conf, schema, meta, fileFragment); - scanner.setTarget(target.toArray()); - - return scanner; - } - } - - Scanner scanner; - - Class<? extends Scanner> scannerClass = getScannerClass(meta.getStoreType()); - scanner = newScannerInstance(scannerClass, conf, schema, meta, fragment); - if (scanner.isProjectable()) { - scanner.setTarget(target.toArray()); - } - - return scanner; - } - - public static synchronized SeekableScanner getSeekableScanner( - TajoConf conf, TableMeta meta, Schema schema, FileFragment fragment, Schema target) throws IOException { - return (SeekableScanner)getStorageManager(conf, null).getScanner(meta, schema, fragment, target); - } - - public static synchronized SeekableScanner getSeekableScanner( - TajoConf conf, TableMeta meta, Schema schema, Path path) throws IOException { - - FileSystem fs = path.getFileSystem(conf); - FileStatus status = fs.getFileStatus(path); - FileFragment fragment = new FileFragment(path.getName(), path, 0, status.getLen()); - - return getSeekableScanner(conf, meta, schema, fragment, schema); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java deleted file mode 100644 index f998ebf..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java +++ /dev/null @@ -1,224 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.util.FileUtil; -import org.apache.tajo.util.KeyValueSet; -import parquet.hadoop.ParquetOutputFormat; -import sun.nio.ch.DirectBuffer; - -import java.io.DataInput; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -public class StorageUtil extends StorageConstants { - public static int getRowByteSize(Schema schema) { - int sum = 0; - for(Column col : schema.getColumns()) { - sum += StorageUtil.getColByteSize(col); - } - - return sum; - } - - public static int getColByteSize(Column col) { - switch (col.getDataType().getType()) { - case BOOLEAN: - return 1; - case CHAR: - return 1; - case BIT: - return 1; - case INT2: - return 2; - case INT4: - return 4; - case INT8: - return 8; - case FLOAT4: - return 4; - case FLOAT8: - return 8; - case INET4: - return 4; - case INET6: - return 32; - case TEXT: - return 256; - case BLOB: - return 256; - case DATE: - return 4; - case TIME: - return 8; - case TIMESTAMP: - return 8; - default: - return 0; - } - } - - public static void writeTableMeta(Configuration conf, Path tableroot, TableMeta meta) throws IOException { - FileSystem fs = tableroot.getFileSystem(conf); - FSDataOutputStream out = fs.create(new Path(tableroot, ".meta")); - FileUtil.writeProto(out, meta.getProto()); - out.flush(); - out.close(); - } - - public static Path concatPath(String parent, String...childs) { - return concatPath(new Path(parent), childs); - } - - public static Path concatPath(Path parent, String...childs) { - StringBuilder sb = new StringBuilder(); - - for(int i=0; i < childs.length; i++) { - sb.append(childs[i]); - if(i < childs.length - 1) - sb.append("/"); - } - - return new Path(parent, sb.toString()); - } - - static final String fileNamePatternV08 = "part-[0-9]*-[0-9]*"; - static final String fileNamePatternV09 = "part-[0-9]*-[0-9]*-[0-9]*"; - - /** - * Written files can be one of two forms: "part-[0-9]*-[0-9]*" or "part-[0-9]*-[0-9]*-[0-9]*". - * - * This method finds the maximum sequence number from existing data files through the above patterns. - * If it cannot find any matched file or the maximum number, it will return -1. - * - * @param fs - * @param path - * @param recursive - * @return The maximum sequence number - * @throws IOException - */ - public static int getMaxFileSequence(FileSystem fs, Path path, boolean recursive) throws IOException { - if (!fs.isDirectory(path)) { - return -1; - } - - FileStatus[] files = fs.listStatus(path); - - if (files == null || files.length == 0) { - return -1; - } - - int maxValue = -1; - List<Path> fileNamePatternMatchedList = new ArrayList<Path>(); - - for (FileStatus eachFile: files) { - // In the case of partition table, return largest value within all partition dirs. - if (eachFile.isDirectory() && recursive) { - int value = getMaxFileSequence(fs, eachFile.getPath(), recursive); - if (value > maxValue) { - maxValue = value; - } - } else { - if (eachFile.getPath().getName().matches(fileNamePatternV08) || - eachFile.getPath().getName().matches(fileNamePatternV09)) { - fileNamePatternMatchedList.add(eachFile.getPath()); - } - } - } - - if (fileNamePatternMatchedList.isEmpty()) { - return maxValue; - } - Path lastFile = fileNamePatternMatchedList.get(fileNamePatternMatchedList.size() - 1); - String pathName = lastFile.getName(); - - // 0.8: pathName = part-<ExecutionBlockId.seq>-<QueryUnitId.seq> - // 0.9: pathName = part-<ExecutionBlockId.seq>-<QueryUnitId.seq>-<Sequence> - String[] pathTokens = pathName.split("-"); - if (pathTokens.length == 3) { - return -1; - } else if(pathTokens.length == 4) { - return Integer.parseInt(pathTokens[3]); - } else { - return -1; - } - } - - public static void closeBuffer(ByteBuffer buffer) { - if (buffer != null) { - if (buffer.isDirect()) { - ((DirectBuffer) buffer).cleaner().clean(); - } else { - buffer.clear(); - } - } - } - - public static int readFully(InputStream is, byte[] buffer, int offset, int length) - throws IOException { - int nread = 0; - while (nread < length) { - int nbytes = is.read(buffer, offset + nread, length - nread); - if (nbytes < 0) { - return nread > 0 ? nread : nbytes; - } - nread += nbytes; - } - return nread; - } - - /** - * Similar to readFully(). Skips bytes in a loop. - * @param in The DataInput to skip bytes from - * @param len number of bytes to skip. - * @throws java.io.IOException if it could not skip requested number of bytes - * for any reason (including EOF) - */ - public static void skipFully(DataInput in, int len) throws IOException { - int amt = len; - while (amt > 0) { - long ret = in.skipBytes(amt); - if (ret == 0) { - // skip may return 0 even if we're not at EOF. Luckily, we can - // use the read() method to figure out if we're at the end. - int b = in.readByte(); - if (b == -1) { - throw new EOFException( "Premature EOF from inputStream after " + - "skipping " + (len - amt) + " byte(s)."); - } - ret = 1; - } - amt -= ret; - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java deleted file mode 100644 index a2c08de..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.statistics.ColumnStats; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.common.TajoDataTypes.DataType; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.NullDatum; - -/** - * This class is not thread-safe. - */ -public class TableStatistics { - private static final Log LOG = LogFactory.getLog(TableStatistics.class); - private Schema schema; - private Tuple minValues; - private Tuple maxValues; - private long [] numNulls; - private long numRows = 0; - private long numBytes = 0; - - private boolean [] comparable; - - public TableStatistics(Schema schema) { - this.schema = schema; - minValues = new VTuple(schema.size()); - maxValues = new VTuple(schema.size()); - - numNulls = new long[schema.size()]; - comparable = new boolean[schema.size()]; - - DataType type; - for (int i = 0; i < schema.size(); i++) { - type = schema.getColumn(i).getDataType(); - if (type.getType() == Type.PROTOBUF) { - comparable[i] = false; - } else { - comparable[i] = true; - } - } - } - - public Schema getSchema() { - return this.schema; - } - - public void incrementRow() { - numRows++; - } - - public long getNumRows() { - return this.numRows; - } - - public void setNumBytes(long bytes) { - this.numBytes = bytes; - } - - public long getNumBytes() { - return this.numBytes; - } - - public void analyzeField(int idx, Datum datum) { - if (datum instanceof NullDatum) { - numNulls[idx]++; - return; - } - - if (comparable[idx]) { - if (!maxValues.contains(idx) || - maxValues.get(idx).compareTo(datum) < 0) { - maxValues.put(idx, datum); - } - if (!minValues.contains(idx) || - minValues.get(idx).compareTo(datum) > 0) { - minValues.put(idx, datum); - } - } - } - - public TableStats getTableStat() { - TableStats stat = new TableStats(); - - ColumnStats columnStats; - for (int i = 0; i < schema.size(); i++) { - columnStats = new ColumnStats(schema.getColumn(i)); - columnStats.setNumNulls(numNulls[i]); - if (minValues.get(i) == null || schema.getColumn(i).getDataType().getType() == minValues.get(i).type()) { - columnStats.setMinValue(minValues.get(i)); - } else { - LOG.warn("Wrong statistics column type (" + minValues.get(i).type() + - ", expected=" + schema.getColumn(i).getDataType().getType() + ")"); - } - if (maxValues.get(i) == null || schema.getColumn(i).getDataType().getType() == maxValues.get(i).type()) { - columnStats.setMaxValue(maxValues.get(i)); - } else { - LOG.warn("Wrong statistics column type (" + maxValues.get(i).type() + - ", expected=" + schema.getColumn(i).getDataType().getType() + ")"); - } - stat.addColumnStat(columnStats); - } - - stat.setNumRows(this.numRows); - stat.setNumBytes(this.numBytes); - - return stat; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java deleted file mode 100644 index ab8816b..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java +++ /dev/null @@ -1,227 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import com.google.protobuf.Message; -import org.apache.commons.codec.binary.Base64; -import org.apache.tajo.TajoConstants; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.*; -import org.apache.tajo.datum.protobuf.ProtobufJsonFormat; -import org.apache.tajo.util.Bytes; -import org.apache.tajo.util.NumberUtil; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.TimeZone; - -// Compatibility with Apache Hive -@Deprecated -public class TextSerializerDeserializer implements SerializerDeserializer { - public static final byte[] trueBytes = "true".getBytes(); - public static final byte[] falseBytes = "false".getBytes(); - private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); - - @Override - public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException { - - byte[] bytes; - int length = 0; - TajoDataTypes.DataType dataType = col.getDataType(); - - if (datum == null || datum instanceof NullDatum) { - switch (dataType.getType()) { - case CHAR: - case TEXT: - length = nullCharacters.length; - out.write(nullCharacters); - break; - default: - break; - } - return length; - } - - switch (dataType.getType()) { - case BOOLEAN: - out.write(datum.asBool() ? trueBytes : falseBytes); - length = trueBytes.length; - break; - case CHAR: - byte[] pad = new byte[dataType.getLength() - datum.size()]; - bytes = datum.asTextBytes(); - out.write(bytes); - out.write(pad); - length = bytes.length + pad.length; - break; - case TEXT: - case BIT: - case INT2: - case INT4: - case INT8: - case FLOAT4: - case FLOAT8: - case INET4: - case DATE: - case INTERVAL: - bytes = datum.asTextBytes(); - length = bytes.length; - out.write(bytes); - break; - case TIME: - bytes = ((TimeDatum)datum).asChars(TimeZone.getDefault(), true).getBytes(); - length = bytes.length; - out.write(bytes); - break; - case TIMESTAMP: - bytes = ((TimestampDatum)datum).asChars(TimeZone.getDefault(), true).getBytes(); - length = bytes.length; - out.write(bytes); - break; - case INET6: - case BLOB: - bytes = Base64.encodeBase64(datum.asByteArray(), false); - length = bytes.length; - out.write(bytes, 0, length); - break; - case PROTOBUF: - ProtobufDatum protobuf = (ProtobufDatum) datum; - byte[] protoBytes = protobufJsonFormat.printToString(protobuf.get()).getBytes(); - length = protoBytes.length; - out.write(protoBytes, 0, protoBytes.length); - break; - case NULL_TYPE: - default: - break; - } - return length; - } - - @Override - public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException { - - Datum datum; - switch (col.getDataType().getType()) { - case BOOLEAN: - datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createBool(bytes[offset] == 't' || bytes[offset] == 'T'); - break; - case BIT: - datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createBit(Byte.parseByte(new String(bytes, offset, length))); - break; - case CHAR: - datum = isNullText(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createChar(new String(bytes, offset, length).trim()); - break; - case INT1: - case INT2: - datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createInt2((short) NumberUtil.parseInt(bytes, offset, length)); - break; - case INT4: - datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createInt4(NumberUtil.parseInt(bytes, offset, length)); - break; - case INT8: - datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createInt8(new String(bytes, offset, length)); - break; - case FLOAT4: - datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createFloat4(new String(bytes, offset, length)); - break; - case FLOAT8: - datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createFloat8(NumberUtil.parseDouble(bytes, offset, length)); - break; - case TEXT: { - byte[] chars = new byte[length]; - System.arraycopy(bytes, offset, chars, 0, length); - datum = isNullText(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createText(chars); - break; - } - case DATE: - datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createDate(new String(bytes, offset, length)); - break; - case TIME: - datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createTime(new String(bytes, offset, length)); - break; - case TIMESTAMP: - datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createTimestamp(new String(bytes, offset, length)); - break; - case INTERVAL: - datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createInterval(new String(bytes, offset, length)); - break; - case PROTOBUF: { - if (isNull(bytes, offset, length, nullCharacters)) { - datum = NullDatum.get(); - } else { - ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType()); - Message.Builder builder = factory.newBuilder(); - try { - byte[] protoBytes = new byte[length]; - System.arraycopy(bytes, offset, protoBytes, 0, length); - protobufJsonFormat.merge(protoBytes, builder); - datum = factory.createDatum(builder.build()); - } catch (IOException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - break; - } - case INET4: - datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createInet4(new String(bytes, offset, length)); - break; - case BLOB: { - if (isNull(bytes, offset, length, nullCharacters)) { - datum = NullDatum.get(); - } else { - byte[] blob = new byte[length]; - System.arraycopy(bytes, offset, blob, 0, length); - datum = DatumFactory.createBlob(Base64.decodeBase64(blob)); - } - break; - } - default: - datum = NullDatum.get(); - break; - } - return datum; - } - - private static boolean isNull(byte[] val, int offset, int length, byte[] nullBytes) { - return length == 0 || ((length == nullBytes.length) - && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length)); - } - - private static boolean isNullText(byte[] val, int offset, int length, byte[] nullBytes) { - return length > 0 && length == nullBytes.length - && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java deleted file mode 100644 index 8dffd8d..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java +++ /dev/null @@ -1,32 +0,0 @@ -/*** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.tajo.common.ProtoObject; - -import java.util.Comparator; - -import static org.apache.tajo.index.IndexProtos.TupleComparatorProto; - -public abstract class TupleComparator implements Comparator<Tuple>, ProtoObject<TupleComparatorProto> { - - public abstract int compare(Tuple o1, Tuple o2); - - public abstract boolean isAscendingFirstKey(); -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java deleted file mode 100644 index e824b99..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import com.google.common.base.Objects; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.SortSpec; - -import java.util.Comparator; - -/** - * It represents a pair of start and end tuples. - */ -public class TupleRange implements Comparable<TupleRange>, Cloneable { - private Tuple start; - private Tuple end; - private final TupleComparator comp; - - public TupleRange(final SortSpec[] sortSpecs, final Tuple start, final Tuple end) { - this.comp = new BaseTupleComparator(sortSpecsToSchema(sortSpecs), sortSpecs); - // if there is only one value, start == end - this.start = start; - this.end = end; - } - - public static Schema sortSpecsToSchema(SortSpec[] sortSpecs) { - Schema schema = new Schema(); - for (SortSpec spec : sortSpecs) { - schema.addColumn(spec.getSortKey()); - } - - return schema; - } - - public void setStart(Tuple tuple) { - this.start = tuple; - } - - public final Tuple getStart() { - return this.start; - } - - public void setEnd(Tuple tuple) { - this.end = tuple; - } - - public final Tuple getEnd() { - return this.end; - } - - public String toString() { - return "[" + this.start + ", " + this.end + ")"; - } - - @Override - public int hashCode() { - return Objects.hashCode(start, end); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof TupleRange) { - TupleRange other = (TupleRange) obj; - return this.start.equals(other.start) && this.end.equals(other.end); - } else { - return false; - } - } - - @Override - public int compareTo(TupleRange o) { - // TODO - should handle overlap - int cmpVal = comp.compare(this.start, o.start); - if (cmpVal != 0) { - return cmpVal; - } else { - return comp.compare(this.end, o.end); - } - } - - public static class DescendingTupleRangeComparator - implements Comparator<TupleRange> { - - @Override - public int compare(TupleRange left, TupleRange right) { - return right.compareTo(left); - } - } - - public TupleRange clone() throws CloneNotSupportedException { - TupleRange newRange = (TupleRange) super.clone(); - newRange.setStart(start.clone()); - newRange.setEnd(end.clone()); - return newRange; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java b/tajo-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java deleted file mode 100644 index ad19101..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.annotation; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -@Retention(RetentionPolicy.RUNTIME) -@Target(ElementType.TYPE) -public @interface ForSplitableStore { -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java deleted file mode 100644 index 6af8da0..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java +++ /dev/null @@ -1,219 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.avro; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; - -import org.apache.avro.Schema; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumWriter; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.storage.FileAppender; -import org.apache.tajo.storage.TableStatistics; -import org.apache.tajo.storage.Tuple; - -import java.io.IOException; - -/** - * FileAppender for writing to Avro files. - */ -public class AvroAppender extends FileAppender { - private TableStatistics stats; - private Schema avroSchema; - private List<Schema.Field> avroFields; - private DataFileWriter<GenericRecord> dataFileWriter; - - /** - * Creates a new AvroAppender. - * - * @param conf Configuration properties. - * @param schema The table schema. - * @param meta The table metadata. - * @param path The path of the Parquet file to write to. - */ - public AvroAppender(Configuration conf, - org.apache.tajo.catalog.Schema schema, - TableMeta meta, Path path) throws IOException { - super(conf, schema, meta, path); - } - - /** - * Initializes the Appender. - */ - public void init() throws IOException { - FileSystem fs = path.getFileSystem(conf); - if (!fs.exists(path.getParent())) { - throw new FileNotFoundException(path.toString()); - } - FSDataOutputStream outputStream = fs.create(path); - - avroSchema = AvroUtil.getAvroSchema(meta, conf); - avroFields = avroSchema.getFields(); - - DatumWriter<GenericRecord> datumWriter = - new GenericDatumWriter<GenericRecord>(avroSchema); - dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter); - dataFileWriter.create(avroSchema, outputStream); - - if (enabledStats) { - this.stats = new TableStatistics(schema); - } - super.init(); - } - - /** - * Gets the current offset. Tracking offsets is currenly not implemented, so - * this method always returns 0. - * - * @return 0 - */ - @Override - public long getOffset() throws IOException { - return 0; - } - - private Object getPrimitive(Tuple tuple, int i, Schema.Type avroType) { - if (tuple.get(i) instanceof NullDatum) { - return null; - } - switch (avroType) { - case NULL: - return null; - case BOOLEAN: - return tuple.getBool(i); - case INT: - return tuple.getInt4(i); - case LONG: - return tuple.getInt8(i); - case FLOAT: - return tuple.getFloat4(i); - case DOUBLE: - return tuple.getFloat8(i); - case BYTES: - case FIXED: - return ByteBuffer.wrap(tuple.getBytes(i)); - case STRING: - return tuple.getText(i); - default: - throw new RuntimeException("Unknown primitive type."); - } - } - - /** - * Write a Tuple to the Avro file. - * - * @param tuple The Tuple to write. - */ - @Override - public void addTuple(Tuple tuple) throws IOException { - GenericRecord record = new GenericData.Record(avroSchema); - for (int i = 0; i < schema.size(); ++i) { - Column column = schema.getColumn(i); - if (enabledStats) { - stats.analyzeField(i, tuple.get(i)); - } - Object value; - Schema.Field avroField = avroFields.get(i); - Schema.Type avroType = avroField.schema().getType(); - switch (avroType) { - case NULL: - case BOOLEAN: - case INT: - case LONG: - case FLOAT: - case DOUBLE: - case BYTES: - case STRING: - case FIXED: - value = getPrimitive(tuple, i, avroType); - break; - case RECORD: - throw new RuntimeException("Avro RECORD not supported."); - case ENUM: - throw new RuntimeException("Avro ENUM not supported."); - case MAP: - throw new RuntimeException("Avro MAP not supported."); - case UNION: - List<Schema> schemas = avroField.schema().getTypes(); - if (schemas.size() != 2) { - throw new RuntimeException("Avro UNION not supported."); - } - if (schemas.get(0).getType().equals(Schema.Type.NULL)) { - value = getPrimitive(tuple, i, schemas.get(1).getType()); - } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) { - value = getPrimitive(tuple, i, schemas.get(0).getType()); - } else { - throw new RuntimeException("Avro UNION not supported."); - } - break; - default: - throw new RuntimeException("Unknown type: " + avroType); - } - record.put(i, value); - } - dataFileWriter.append(record); - - if (enabledStats) { - stats.incrementRow(); - } - } - - /** - * Flushes the current state of the file. - */ - @Override - public void flush() throws IOException { - dataFileWriter.flush(); - } - - /** - * Closes the Appender. - */ - @Override - public void close() throws IOException { - dataFileWriter.close(); - } - - /** - * If table statistics is enabled, retrieve the table statistics. - * - * @return Table statistics if enabled or null otherwise. - */ - @Override - public TableStats getStats() { - if (enabledStats) { - return stats.getTableStat(); - } else { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java deleted file mode 100644 index 816ae25..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java +++ /dev/null @@ -1,286 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.avro; - -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; -import org.apache.avro.Schema; -import org.apache.avro.file.DataFileReader; -import org.apache.avro.file.SeekableInput; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericFixed; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumReader; -import org.apache.avro.mapred.FsInput; -import org.apache.avro.util.Utf8; -import org.apache.hadoop.conf.Configuration; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.common.TajoDataTypes.DataType; -import org.apache.tajo.datum.*; -import org.apache.tajo.storage.FileScanner; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; -import org.apache.tajo.storage.fragment.FileFragment; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; - -/** - * FileScanner for reading Avro files - */ -public class AvroScanner extends FileScanner { - private Schema avroSchema; - private List<Schema.Field> avroFields; - private DataFileReader<GenericRecord> dataFileReader; - private int[] projectionMap; - - /** - * Creates a new AvroScanner. - * - * @param conf - * @param schema - * @param meta - * @param fragment - */ - public AvroScanner(Configuration conf, - final org.apache.tajo.catalog.Schema schema, - final TableMeta meta, final FileFragment fragment) { - super(conf, schema, meta, fragment); - } - - /** - * Initializes the AvroScanner. - */ - @Override - public void init() throws IOException { - if (targets == null) { - targets = schema.toArray(); - } - prepareProjection(targets); - - avroSchema = AvroUtil.getAvroSchema(meta, conf); - avroFields = avroSchema.getFields(); - - DatumReader<GenericRecord> datumReader = - new GenericDatumReader<GenericRecord>(avroSchema); - SeekableInput input = new FsInput(fragment.getPath(), conf); - dataFileReader = new DataFileReader<GenericRecord>(input, datumReader); - super.init(); - } - - private void prepareProjection(Column[] targets) { - projectionMap = new int[targets.length]; - for (int i = 0; i < targets.length; ++i) { - projectionMap[i] = schema.getColumnId(targets[i].getQualifiedName()); - } - } - - private static String fromAvroString(Object value) { - if (value instanceof Utf8) { - Utf8 utf8 = (Utf8)value; - return utf8.toString(); - } - return value.toString(); - } - - private static Schema getNonNull(Schema schema) { - if (!schema.getType().equals(Schema.Type.UNION)) { - return schema; - } - List<Schema> schemas = schema.getTypes(); - if (schemas.size() != 2) { - return schema; - } - if (schemas.get(0).getType().equals(Schema.Type.NULL)) { - return schemas.get(1); - } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) { - return schemas.get(0); - } else { - return schema; - } - } - - private Datum convertInt(Object value, TajoDataTypes.Type tajoType) { - int intValue = (Integer)value; - switch (tajoType) { - case BIT: - return DatumFactory.createBit((byte)(intValue & 0xff)); - case INT2: - return DatumFactory.createInt2((short)intValue); - default: - return DatumFactory.createInt4(intValue); - } - } - - private Datum convertBytes(Object value, TajoDataTypes.Type tajoType, - DataType dataType) { - ByteBuffer buffer = (ByteBuffer)value; - byte[] bytes = new byte[buffer.capacity()]; - buffer.get(bytes, 0, bytes.length); - switch (tajoType) { - case INET4: - return DatumFactory.createInet4(bytes); - case PROTOBUF: - try { - ProtobufDatumFactory factory = - ProtobufDatumFactory.get(dataType.getCode()); - Message.Builder builder = factory.newBuilder(); - builder.mergeFrom(bytes); - return factory.createDatum(builder); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); - } - default: - return new BlobDatum(bytes); - } - } - - private Datum convertString(Object value, TajoDataTypes.Type tajoType) { - switch (tajoType) { - case CHAR: - return DatumFactory.createChar(fromAvroString(value)); - default: - return DatumFactory.createText(fromAvroString(value)); - } - } - - /** - * Reads the next Tuple from the Avro file. - * - * @return The next Tuple from the Avro file or null if end of file is - * reached. - */ - @Override - public Tuple next() throws IOException { - if (!dataFileReader.hasNext()) { - return null; - } - - Tuple tuple = new VTuple(schema.size()); - GenericRecord record = dataFileReader.next(); - for (int i = 0; i < projectionMap.length; ++i) { - int columnIndex = projectionMap[i]; - Object value = record.get(columnIndex); - if (value == null) { - tuple.put(columnIndex, NullDatum.get()); - continue; - } - - // Get Avro type. - Schema.Field avroField = avroFields.get(columnIndex); - Schema nonNullAvroSchema = getNonNull(avroField.schema()); - Schema.Type avroType = nonNullAvroSchema.getType(); - - // Get Tajo type. - Column column = schema.getColumn(columnIndex); - DataType dataType = column.getDataType(); - TajoDataTypes.Type tajoType = dataType.getType(); - switch (avroType) { - case NULL: - tuple.put(columnIndex, NullDatum.get()); - break; - case BOOLEAN: - tuple.put(columnIndex, DatumFactory.createBool((Boolean)value)); - break; - case INT: - tuple.put(columnIndex, convertInt(value, tajoType)); - break; - case LONG: - tuple.put(columnIndex, DatumFactory.createInt8((Long)value)); - break; - case FLOAT: - tuple.put(columnIndex, DatumFactory.createFloat4((Float)value)); - break; - case DOUBLE: - tuple.put(columnIndex, DatumFactory.createFloat8((Double)value)); - break; - case BYTES: - tuple.put(columnIndex, convertBytes(value, tajoType, dataType)); - break; - case STRING: - tuple.put(columnIndex, convertString(value, tajoType)); - break; - case RECORD: - throw new RuntimeException("Avro RECORD not supported."); - case ENUM: - throw new RuntimeException("Avro ENUM not supported."); - case MAP: - throw new RuntimeException("Avro MAP not supported."); - case UNION: - throw new RuntimeException("Avro UNION not supported."); - case FIXED: - tuple.put(columnIndex, new BlobDatum(((GenericFixed)value).bytes())); - break; - default: - throw new RuntimeException("Unknown type."); - } - } - return tuple; - } - - /** - * Resets the scanner - */ - @Override - public void reset() throws IOException { - } - - /** - * Closes the scanner. - */ - @Override - public void close() throws IOException { - if (dataFileReader != null) { - dataFileReader.close(); - } - } - - /** - * Returns whether this scanner is projectable. - * - * @return true - */ - @Override - public boolean isProjectable() { - return true; - } - - /** - * Returns whether this scanner is selectable. - * - * @return false - */ - @Override - public boolean isSelectable() { - return false; - } - - /** - * Returns whether this scanner is splittable. - * - * @return false - */ - @Override - public boolean isSplittable() { - return false; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java deleted file mode 100644 index 0d14c3d..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.avro; - -import org.apache.avro.Schema; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.storage.StorageConstants; - -import java.io.IOException; -import java.io.InputStream; -import java.net.URL; - -public class AvroUtil { - public static Schema getAvroSchema(TableMeta meta, Configuration conf) - throws IOException { - - boolean isSchemaLiteral = meta.containsOption(StorageConstants.AVRO_SCHEMA_LITERAL); - boolean isSchemaUrl = meta.containsOption(StorageConstants.AVRO_SCHEMA_URL); - if (!isSchemaLiteral && !isSchemaUrl) { - throw new RuntimeException("No Avro schema for table."); - } - if (isSchemaLiteral) { - String schema = meta.getOption(StorageConstants.AVRO_SCHEMA_LITERAL); - return new Schema.Parser().parse(schema); - } - - String schemaURL = meta.getOption(StorageConstants.AVRO_SCHEMA_URL); - if (schemaURL.toLowerCase().startsWith("http")) { - return getAvroSchemaFromHttp(schemaURL); - } else { - return getAvroSchemaFromFileSystem(schemaURL, conf); - } - } - - public static Schema getAvroSchemaFromHttp(String schemaURL) throws IOException { - InputStream inputStream = new URL(schemaURL).openStream(); - - try { - return new Schema.Parser().parse(inputStream); - } finally { - IOUtils.closeStream(inputStream); - } - } - - public static Schema getAvroSchemaFromFileSystem(String schemaURL, Configuration conf) throws IOException { - Path schemaPath = new Path(schemaURL); - FileSystem fs = schemaPath.getFileSystem(conf); - FSDataInputStream inputStream = fs.open(schemaPath); - - try { - return new Schema.Parser().parse(inputStream); - } finally { - IOUtils.closeStream(inputStream); - } - } -}
