http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java new file mode 100644 index 0000000..e2d89d6 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java @@ -0,0 +1,979 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.tajo.*; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.NodeType; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.plan.rewrite.RewriteRule; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.fragment.FragmentConvertor; +import org.apache.tajo.util.TUtil; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.net.URI; +import java.text.NumberFormat; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * StorageManager manages the functions of storing and reading data. + * StorageManager is a abstract class. + * For supporting such as HDFS, HBASE, a specific StorageManager should be implemented by inheriting this class. + * + */ +public abstract class StorageManager { + private final Log LOG = LogFactory.getLog(StorageManager.class); + + private static final Class<?>[] DEFAULT_SCANNER_PARAMS = { + Configuration.class, + Schema.class, + TableMeta.class, + Fragment.class + }; + + private static final Class<?>[] DEFAULT_APPENDER_PARAMS = { + Configuration.class, + QueryUnitAttemptId.class, + Schema.class, + TableMeta.class, + Path.class + }; + + public static final PathFilter hiddenFileFilter = new PathFilter() { + public boolean accept(Path p) { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }; + + protected TajoConf conf; + protected StoreType storeType; + + /** + * Cache of StorageManager. + * Key is manager key(warehouse path) + store type + */ + private static final Map<String, StorageManager> storageManagers = Maps.newHashMap(); + + /** + * Cache of scanner handlers for each storage type. + */ + protected static final Map<String, Class<? extends Scanner>> SCANNER_HANDLER_CACHE + = new ConcurrentHashMap<String, Class<? extends Scanner>>(); + + /** + * Cache of appender handlers for each storage type. + */ + protected static final Map<String, Class<? extends Appender>> APPENDER_HANDLER_CACHE + = new ConcurrentHashMap<String, Class<? extends Appender>>(); + + /** + * Cache of constructors for each class. Pins the classes so they + * can't be garbage collected until ReflectionUtils can be collected. + */ + private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = + new ConcurrentHashMap<Class<?>, Constructor<?>>(); + + public StorageManager(StoreType storeType) { + this.storeType = storeType; + } + + /** + * Initialize storage manager. + * @throws java.io.IOException + */ + protected abstract void storageInit() throws IOException; + + /** + * This method is called after executing "CREATE TABLE" statement. + * If a storage is a file based storage, a storage manager may create directory. + * + * @param tableDesc Table description which is created. + * @param ifNotExists Creates the table only when the table does not exist. + * @throws java.io.IOException + */ + public abstract void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException; + + /** + * This method is called after executing "DROP TABLE" statement with the 'PURGE' option + * which is the option to delete all the data. + * + * @param tableDesc + * @throws java.io.IOException + */ + public abstract void purgeTable(TableDesc tableDesc) throws IOException; + + /** + * Returns the splits that will serve as input for the scan tasks. The + * number of splits matches the number of regions in a table. + * @param fragmentId The table name or previous ExecutionBlockId + * @param tableDesc The table description for the target data. + * @param scanNode The logical node for scanning. + * @return The list of input fragments. + * @throws java.io.IOException + */ + public abstract List<Fragment> getSplits(String fragmentId, TableDesc tableDesc, + ScanNode scanNode) throws IOException; + + /** + * It returns the splits that will serve as input for the non-forward query scanner such as 'select * from table1'. + * The result list should be small. If there is many fragments for scanning, TajoMaster uses the paging navigation. + * @param tableDesc The table description for the target data. + * @param currentPage The current page number within the entire list. + * @param numFragments The number of fragments in the result. + * @return The list of input fragments. + * @throws java.io.IOException + */ + public abstract List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments) + throws IOException; + + /** + * It returns the storage property. + * @return The storage property + */ + public abstract StorageProperty getStorageProperty(); + + /** + * Release storage manager resource + */ + public abstract void closeStorageManager(); + + /** + * It is called by a Repartitioner for range shuffling when the SortRangeType of SortNode is USING_STORAGE_MANAGER. + * In general Repartitioner determines the partition range using previous output statistics data. + * In the special cases, such as HBase Repartitioner uses the result of this method. + * + * @param queryContext The current query context which contains query properties. + * @param tableDesc The table description for the target data. + * @param inputSchema The input schema + * @param sortSpecs The sort specification that contains the sort column and sort order. + * @return The list of sort ranges. + * @throws java.io.IOException + */ + public abstract TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc, + Schema inputSchema, SortSpec[] sortSpecs, + TupleRange dataRange) throws IOException; + + /** + * This method is called before executing 'INSERT' or 'CREATE TABLE as SELECT'. + * In general Tajo creates the target table after finishing the final sub-query of CATS. + * But In the special cases, such as HBase INSERT or CAST query uses the target table information. + * That kind of the storage should implements the logic related to creating table in this method. + * + * @param node The child node of the root node. + * @throws java.io.IOException + */ + public abstract void beforeInsertOrCATS(LogicalNode node) throws IOException; + + /** + * It is called when the query failed. + * Each storage manager should implement to be processed when the query fails in this method. + * + * @param node The child node of the root node. + * @throws java.io.IOException + */ + public abstract void rollbackOutputCommit(LogicalNode node) throws IOException; + + /** + * Returns the current storage type. + * @return + */ + public StoreType getStoreType() { + return storeType; + } + + /** + * Initialize StorageManager instance. It should be called before using. + * + * @param tajoConf + * @throws java.io.IOException + */ + public void init(TajoConf tajoConf) throws IOException { + this.conf = tajoConf; + storageInit(); + } + + /** + * Close StorageManager + * @throws java.io.IOException + */ + public void close() throws IOException { + synchronized(storageManagers) { + for (StorageManager eachStorageManager: storageManagers.values()) { + eachStorageManager.closeStorageManager(); + } + } + } + + /** + * Returns the splits that will serve as input for the scan tasks. The + * number of splits matches the number of regions in a table. + * + * @param fragmentId The table name or previous ExecutionBlockId + * @param tableDesc The table description for the target data. + * @return The list of input fragments. + * @throws java.io.IOException + */ + public List<Fragment> getSplits(String fragmentId, TableDesc tableDesc) throws IOException { + return getSplits(fragmentId, tableDesc, null); + } + + /** + * Returns FileStorageManager instance. + * + * @param tajoConf Tajo system property. + * @return + * @throws java.io.IOException + */ + public static StorageManager getFileStorageManager(TajoConf tajoConf) throws IOException { + return getFileStorageManager(tajoConf, null); + } + + /** + * Returns FileStorageManager instance and sets WAREHOUSE_DIR property in tajoConf with warehousePath parameter. + * + * @param tajoConf Tajo system property. + * @param warehousePath The warehouse directory to be set in the tajoConf. + * @return + * @throws java.io.IOException + */ + public static StorageManager getFileStorageManager(TajoConf tajoConf, Path warehousePath) throws IOException { + URI uri; + TajoConf copiedConf = new TajoConf(tajoConf); + if (warehousePath != null) { + copiedConf.setVar(ConfVars.WAREHOUSE_DIR, warehousePath.toUri().toString()); + } + uri = TajoConf.getWarehouseDir(copiedConf).toUri(); + String key = "file".equals(uri.getScheme()) ? "file" : uri.toString(); + return getStorageManager(copiedConf, StoreType.CSV, key); + } + + /** + * Returns the proper StorageManager instance according to the storeType. + * + * @param tajoConf Tajo system property. + * @param storeType Storage type + * @return + * @throws java.io.IOException + */ + public static StorageManager getStorageManager(TajoConf tajoConf, String storeType) throws IOException { + if ("HBASE".equals(storeType)) { + return getStorageManager(tajoConf, StoreType.HBASE); + } else { + return getStorageManager(tajoConf, StoreType.CSV); + } + } + + /** + * Returns the proper StorageManager instance according to the storeType. + * + * @param tajoConf Tajo system property. + * @param storeType Storage type + * @return + * @throws java.io.IOException + */ + public static StorageManager getStorageManager(TajoConf tajoConf, StoreType storeType) throws IOException { + return getStorageManager(tajoConf, storeType, null); + } + + /** + * Returns the proper StorageManager instance according to the storeType + * + * @param tajoConf Tajo system property. + * @param storeType Storage type + * @param managerKey Key that can identify each storage manager(may be a path) + * @return + * @throws java.io.IOException + */ + public static synchronized StorageManager getStorageManager ( + TajoConf tajoConf, StoreType storeType, String managerKey) throws IOException { + synchronized (storageManagers) { + String storeKey = storeType + managerKey; + StorageManager manager = storageManagers.get(storeKey); + if (manager == null) { + String typeName = "hdfs"; + + switch (storeType) { + case HBASE: + typeName = "hbase"; + break; + default: + typeName = "hdfs"; + } + + Class<? extends StorageManager> storageManagerClass = + tajoConf.getClass(String.format("tajo.storage.manager.%s.class", typeName), null, StorageManager.class); + + if (storageManagerClass == null) { + throw new IOException("Unknown Storage Type: " + typeName); + } + + try { + Constructor<? extends StorageManager> constructor = + (Constructor<? extends StorageManager>) CONSTRUCTOR_CACHE.get(storageManagerClass); + if (constructor == null) { + constructor = storageManagerClass.getDeclaredConstructor(new Class<?>[]{StoreType.class}); + constructor.setAccessible(true); + CONSTRUCTOR_CACHE.put(storageManagerClass, constructor); + } + manager = constructor.newInstance(new Object[]{storeType}); + } catch (Exception e) { + throw new RuntimeException(e); + } + manager.init(tajoConf); + storageManagers.put(storeKey, manager); + } + + return manager; + } + } + + /** + * Returns Scanner instance. + * + * @param meta The table meta + * @param schema The input schema + * @param fragment The fragment for scanning + * @param target Columns which are selected. + * @return Scanner instance + * @throws java.io.IOException + */ + public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException { + return getScanner(meta, schema, FragmentConvertor.convert(conf, fragment), target); + } + + /** + * Returns Scanner instance. + * + * @param meta The table meta + * @param schema The input schema + * @param fragment The fragment for scanning + * @return Scanner instance + * @throws java.io.IOException + */ + public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException { + return getScanner(meta, schema, fragment, schema); + } + + /** + * Returns Scanner instance. + * + * @param meta The table meta + * @param schema The input schema + * @param fragment The fragment for scanning + * @param target The output schema + * @return Scanner instance + * @throws java.io.IOException + */ + public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException { + if (fragment.isEmpty()) { + Scanner scanner = new NullScanner(conf, schema, meta, fragment); + scanner.setTarget(target.toArray()); + + return scanner; + } + + Scanner scanner; + + Class<? extends Scanner> scannerClass = getScannerClass(meta.getStoreType()); + scanner = newScannerInstance(scannerClass, conf, schema, meta, fragment); + if (scanner.isProjectable()) { + scanner.setTarget(target.toArray()); + } + + return scanner; + } + + /** + * Returns Scanner instance. + * + * @param conf The system property + * @param meta The table meta + * @param schema The input schema + * @param fragment The fragment for scanning + * @param target The output schema + * @return Scanner instance + * @throws java.io.IOException + */ + public static synchronized SeekableScanner getSeekableScanner( + TajoConf conf, TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException { + return (SeekableScanner)getStorageManager(conf, meta.getStoreType()).getScanner(meta, schema, fragment, target); + } + + /** + * Returns Appender instance. + * @param queryContext Query property. + * @param taskAttemptId Task id. + * @param meta Table meta data. + * @param schema Output schema. + * @param workDir Working directory + * @return Appender instance + * @throws java.io.IOException + */ + public Appender getAppender(OverridableConf queryContext, + QueryUnitAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir) + throws IOException { + Appender appender; + + Class<? extends Appender> appenderClass; + + String handlerName = meta.getStoreType().name().toLowerCase(); + appenderClass = APPENDER_HANDLER_CACHE.get(handlerName); + if (appenderClass == null) { + appenderClass = conf.getClass( + String.format("tajo.storage.appender-handler.%s.class", + meta.getStoreType().name().toLowerCase()), null, Appender.class); + APPENDER_HANDLER_CACHE.put(handlerName, appenderClass); + } + + if (appenderClass == null) { + throw new IOException("Unknown Storage Type: " + meta.getStoreType()); + } + + appender = newAppenderInstance(appenderClass, conf, taskAttemptId, meta, schema, workDir); + + return appender; + } + + /** + * Creates a scanner instance. + * + * @param theClass Concrete class of scanner + * @param conf System property + * @param schema Input schema + * @param meta Table meta data + * @param fragment The fragment for scanning + * @param <T> + * @return The scanner instance + */ + public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, Schema schema, TableMeta meta, + Fragment fragment) { + T result; + try { + Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass); + if (meth == null) { + meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS); + meth.setAccessible(true); + CONSTRUCTOR_CACHE.put(theClass, meth); + } + result = meth.newInstance(new Object[]{conf, schema, meta, fragment}); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return result; + } + + /** + * Creates a scanner instance. + * + * @param theClass Concrete class of scanner + * @param conf System property + * @param taskAttemptId Task id + * @param meta Table meta data + * @param schema Input schema + * @param workDir Working directory + * @param <T> + * @return The scanner instance + */ + public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, QueryUnitAttemptId taskAttemptId, + TableMeta meta, Schema schema, Path workDir) { + T result; + try { + Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass); + if (meth == null) { + meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS); + meth.setAccessible(true); + CONSTRUCTOR_CACHE.put(theClass, meth); + } + result = meth.newInstance(new Object[]{conf, taskAttemptId, schema, meta, workDir}); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return result; + } + + /** + * Return the Scanner class for the StoreType that is defined in storage-default.xml. + * + * @param storeType store type + * @return The Scanner class + * @throws java.io.IOException + */ + public Class<? extends Scanner> getScannerClass(CatalogProtos.StoreType storeType) throws IOException { + String handlerName = storeType.name().toLowerCase(); + Class<? extends Scanner> scannerClass = SCANNER_HANDLER_CACHE.get(handlerName); + if (scannerClass == null) { + scannerClass = conf.getClass( + String.format("tajo.storage.scanner-handler.%s.class",storeType.name().toLowerCase()), null, Scanner.class); + SCANNER_HANDLER_CACHE.put(handlerName, scannerClass); + } + + if (scannerClass == null) { + throw new IOException("Unknown Storage Type: " + storeType.name()); + } + + return scannerClass; + } + + /** + * Return length of the fragment. + * In the UNKNOWN_LENGTH case get FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH from the configuration. + * + * @param conf Tajo system property + * @param fragment Fragment + * @return + */ + public static long getFragmentLength(TajoConf conf, Fragment fragment) { + if (fragment.getLength() == TajoConstants.UNKNOWN_LENGTH) { + return conf.getLongVar(ConfVars.FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH); + } else { + return fragment.getLength(); + } + } + + /** + * It is called after making logical plan. Storage manager should verify the schema for inserting. + * + * @param tableDesc The table description of insert target. + * @param outSchema The output schema of select query for inserting. + * @throws java.io.IOException + */ + public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException { + // nothing to do + } + + /** + * Returns the list of storage specified rewrite rules. + * This values are used by LogicalOptimizer. + * + * @param queryContext The query property + * @param tableDesc The description of the target table. + * @return The list of storage specified rewrite rules + * @throws java.io.IOException + */ + public List<RewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException { + return null; + } + + /** + * Finalizes result data. Tajo stores result data in the staging directory. + * If the query fails, clean up the staging directory. + * Otherwise the query is successful, move to the final directory from the staging directory. + * + * @param queryContext The query property + * @param finalEbId The final execution block id + * @param plan The query plan + * @param schema The final output schema + * @param tableDesc The description of the target table + * @return Saved path + * @throws java.io.IOException + */ + public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId, + LogicalPlan plan, Schema schema, + TableDesc tableDesc) throws IOException { + return commitOutputData(queryContext, finalEbId, plan, schema, tableDesc, true); + } + + /** + * Finalizes result data. Tajo stores result data in the staging directory. + * If the query fails, clean up the staging directory. + * Otherwise the query is successful, move to the final directory from the staging directory. + * + * @param queryContext The query property + * @param finalEbId The final execution block id + * @param plan The query plan + * @param schema The final output schema + * @param tableDesc The description of the target table + * @param changeFileSeq If true change result file name with max sequence. + * @return Saved path + * @throws java.io.IOException + */ + protected Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId, + LogicalPlan plan, Schema schema, + TableDesc tableDesc, boolean changeFileSeq) throws IOException { + Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR)); + Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + Path finalOutputDir; + if (!queryContext.get(QueryVars.OUTPUT_TABLE_PATH, "").isEmpty()) { + finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_PATH)); + try { + FileSystem fs = stagingResultDir.getFileSystem(conf); + + if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // INSERT OVERWRITE INTO + + // It moves the original table into the temporary location. + // Then it moves the new result table into the original table location. + // Upon failed, it recovers the original table if possible. + boolean movedToOldTable = false; + boolean committed = false; + Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); + ContentSummary summary = fs.getContentSummary(stagingResultDir); + + if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty() && summary.getFileCount() > 0L) { + // This is a map for existing non-leaf directory to rename. A key is current directory and a value is + // renaming directory. + Map<Path, Path> renameDirs = TUtil.newHashMap(); + // This is a map for recovering existing partition directory. A key is current directory and a value is + // temporary directory to back up. + Map<Path, Path> recoveryDirs = TUtil.newHashMap(); + + try { + if (!fs.exists(finalOutputDir)) { + fs.mkdirs(finalOutputDir); + } + + visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(), + renameDirs, oldTableDir); + + // Rename target partition directories + for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { + // Backup existing data files for recovering + if (fs.exists(entry.getValue())) { + String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(), + oldTableDir.toString()); + Path recoveryPath = new Path(recoveryPathString); + fs.rename(entry.getValue(), recoveryPath); + fs.exists(recoveryPath); + recoveryDirs.put(entry.getValue(), recoveryPath); + } + // Delete existing directory + fs.delete(entry.getValue(), true); + // Rename staging directory to final output directory + fs.rename(entry.getKey(), entry.getValue()); + } + + } catch (IOException ioe) { + // Remove created dirs + for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { + fs.delete(entry.getValue(), true); + } + + // Recovery renamed dirs + for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) { + fs.delete(entry.getValue(), true); + fs.rename(entry.getValue(), entry.getKey()); + } + + throw new IOException(ioe.getMessage()); + } + } else { // no partition + try { + + // if the final output dir exists, move all contents to the temporary table dir. + // Otherwise, just make the final output dir. As a result, the final output dir will be empty. + if (fs.exists(finalOutputDir)) { + fs.mkdirs(oldTableDir); + + for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) { + fs.rename(status.getPath(), oldTableDir); + } + + movedToOldTable = fs.exists(oldTableDir); + } else { // if the parent does not exist, make its parent directory. + fs.mkdirs(finalOutputDir); + } + + // Move the results to the final output dir. + for (FileStatus status : fs.listStatus(stagingResultDir)) { + fs.rename(status.getPath(), finalOutputDir); + } + + // Check the final output dir + committed = fs.exists(finalOutputDir); + + } catch (IOException ioe) { + // recover the old table + if (movedToOldTable && !committed) { + + // if commit is failed, recover the old data + for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) { + fs.delete(status.getPath(), true); + } + + for (FileStatus status : fs.listStatus(oldTableDir)) { + fs.rename(status.getPath(), finalOutputDir); + } + } + + throw new IOException(ioe.getMessage()); + } + } + } else { + String queryType = queryContext.get(QueryVars.COMMAND_TYPE); + + if (queryType != null && queryType.equals(NodeType.INSERT.name())) { // INSERT INTO an existing table + + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(3); + + if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) { + for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { + if (eachFile.isFile()) { + LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath()); + continue; + } + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1, changeFileSeq); + } + } else { + int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1; + for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { + if (eachFile.getPath().getName().startsWith("_")) { + continue; + } + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++, changeFileSeq); + } + } + // checking all file moved and remove empty dir + verifyAllFileMoved(fs, stagingResultDir); + FileStatus[] files = fs.listStatus(stagingResultDir); + if (files != null && files.length != 0) { + for (FileStatus eachFile: files) { + LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); + } + } + } else { // CREATE TABLE AS SELECT (CTAS) + if (fs.exists(finalOutputDir)) { + for (FileStatus status : fs.listStatus(stagingResultDir)) { + fs.rename(status.getPath(), finalOutputDir); + } + } else { + fs.rename(stagingResultDir, finalOutputDir); + } + LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir); + } + } + + // remove the staging directory if the final output dir is given. + Path stagingDirRoot = stagingDir.getParent(); + fs.delete(stagingDirRoot, true); + } catch (Throwable t) { + LOG.error(t); + throw new IOException(t); + } + } else { + finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + } + + return finalOutputDir; + } + + /** + * Attach the sequence number to the output file name and than move the file into the final result path. + * + * @param fs FileSystem + * @param stagingResultDir The staging result dir + * @param fileStatus The file status + * @param finalOutputPath Final output path + * @param nf Number format + * @param fileSeq The sequence number + * @throws java.io.IOException + */ + private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir, + FileStatus fileStatus, Path finalOutputPath, + NumberFormat nf, + int fileSeq, boolean changeFileSeq) throws IOException { + if (fileStatus.isDirectory()) { + String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); + if (subPath != null) { + Path finalSubPath = new Path(finalOutputPath, subPath); + if (!fs.exists(finalSubPath)) { + fs.mkdirs(finalSubPath); + } + int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false); + for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) { + if (eachFile.getPath().getName().startsWith("_")) { + continue; + } + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq, changeFileSeq); + } + } else { + throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath()); + } + } else { + String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); + if (subPath != null) { + Path finalSubPath = new Path(finalOutputPath, subPath); + if (changeFileSeq) { + finalSubPath = new Path(finalSubPath.getParent(), replaceFileNameSeq(finalSubPath, fileSeq, nf)); + } + if (!fs.exists(finalSubPath.getParent())) { + fs.mkdirs(finalSubPath.getParent()); + } + if (fs.exists(finalSubPath)) { + throw new IOException("Already exists data file:" + finalSubPath); + } + boolean success = fs.rename(fileStatus.getPath(), finalSubPath); + if (success) { + LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " + + "to final output[" + finalSubPath + "]"); + } else { + LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " + + "to final output[" + finalSubPath + "]"); + } + } + } + } + + /** + * Removes the path of the parent. + * @param parentPath + * @param childPath + * @return + */ + private String extractSubPath(Path parentPath, Path childPath) { + String parentPathStr = parentPath.toUri().getPath(); + String childPathStr = childPath.toUri().getPath(); + + if (parentPathStr.length() > childPathStr.length()) { + return null; + } + + int index = childPathStr.indexOf(parentPathStr); + if (index != 0) { + return null; + } + + return childPathStr.substring(parentPathStr.length() + 1); + } + + /** + * Attach the sequence number to a path. + * + * @param path Path + * @param seq sequence number + * @param nf Number format + * @return New path attached with sequence number + * @throws java.io.IOException + */ + private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) throws IOException { + String[] tokens = path.getName().split("-"); + if (tokens.length != 4) { + throw new IOException("Wrong result file name:" + path); + } + return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + nf.format(seq); + } + + /** + * Make sure all files are moved. + * @param fs FileSystem + * @param stagingPath The stagind directory + * @return + * @throws java.io.IOException + */ + private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException { + FileStatus[] files = fs.listStatus(stagingPath); + if (files != null && files.length != 0) { + for (FileStatus eachFile: files) { + if (eachFile.isFile()) { + LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); + return false; + } else { + if (verifyAllFileMoved(fs, eachFile.getPath())) { + fs.delete(eachFile.getPath(), false); + } else { + return false; + } + } + } + } + + return true; + } + + /** + * This method sets a rename map which includes renamed staging directory to final output directory recursively. + * If there exists some data files, this delete it for duplicate data. + * + * + * @param fs + * @param stagingPath + * @param outputPath + * @param stagingParentPathString + * @throws java.io.IOException + */ + private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath, + String stagingParentPathString, + Map<Path, Path> renameDirs, Path oldTableDir) throws IOException { + FileStatus[] files = fs.listStatus(stagingPath); + + for(FileStatus eachFile : files) { + if (eachFile.isDirectory()) { + Path oldPath = eachFile.getPath(); + + // Make recover directory. + String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString, + oldTableDir.toString()); + Path recoveryPath = new Path(recoverPathString); + if (!fs.exists(recoveryPath)) { + fs.mkdirs(recoveryPath); + } + + visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString, + renameDirs, oldTableDir); + // Find last order partition for renaming + String newPathString = oldPath.toString().replaceAll(stagingParentPathString, + outputPath.toString()); + Path newPath = new Path(newPathString); + if (!isLeafDirectory(fs, eachFile.getPath())) { + renameDirs.put(eachFile.getPath(), newPath); + } else { + if (!fs.exists(newPath)) { + fs.mkdirs(newPath); + } + } + } + } + } + + private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException { + boolean retValue = false; + + FileStatus[] files = fs.listStatus(path); + for (FileStatus file : files) { + if (fs.isDirectory(file.getPath())) { + retValue = true; + break; + } + } + + return retValue; + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java new file mode 100644 index 0000000..6816d08 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +public class StorageProperty { + private boolean supportsInsertInto; + private boolean sortedInsert; + + public boolean isSupportsInsertInto() { + return supportsInsertInto; + } + + public void setSupportsInsertInto(boolean supportsInsertInto) { + this.supportsInsertInto = supportsInsertInto; + } + + public boolean isSortedInsert() { + return sortedInsert; + } + + public void setSortedInsert(boolean sortedInsert) { + this.sortedInsert = sortedInsert; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java new file mode 100644 index 0000000..2b196c9 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.util.FileUtil; +import sun.nio.ch.DirectBuffer; + +import java.io.DataInput; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class StorageUtil extends StorageConstants { + public static int getRowByteSize(Schema schema) { + int sum = 0; + for(Column col : schema.getColumns()) { + sum += StorageUtil.getColByteSize(col); + } + + return sum; + } + + public static int getColByteSize(Column col) { + switch (col.getDataType().getType()) { + case BOOLEAN: + return 1; + case CHAR: + return 1; + case BIT: + return 1; + case INT2: + return 2; + case INT4: + return 4; + case INT8: + return 8; + case FLOAT4: + return 4; + case FLOAT8: + return 8; + case INET4: + return 4; + case INET6: + return 32; + case TEXT: + return 256; + case BLOB: + return 256; + case DATE: + return 4; + case TIME: + return 8; + case TIMESTAMP: + return 8; + default: + return 0; + } + } + + public static void writeTableMeta(Configuration conf, Path tableroot, TableMeta meta) throws IOException { + FileSystem fs = tableroot.getFileSystem(conf); + FSDataOutputStream out = fs.create(new Path(tableroot, ".meta")); + FileUtil.writeProto(out, meta.getProto()); + out.flush(); + out.close(); + } + + public static Path concatPath(String parent, String...childs) { + return concatPath(new Path(parent), childs); + } + + public static Path concatPath(Path parent, String...childs) { + StringBuilder sb = new StringBuilder(); + + for(int i=0; i < childs.length; i++) { + sb.append(childs[i]); + if(i < childs.length - 1) + sb.append("/"); + } + + return new Path(parent, sb.toString()); + } + + static final String fileNamePatternV08 = "part-[0-9]*-[0-9]*"; + static final String fileNamePatternV09 = "part-[0-9]*-[0-9]*-[0-9]*"; + + /** + * Written files can be one of two forms: "part-[0-9]*-[0-9]*" or "part-[0-9]*-[0-9]*-[0-9]*". + * + * This method finds the maximum sequence number from existing data files through the above patterns. + * If it cannot find any matched file or the maximum number, it will return -1. + * + * @param fs + * @param path + * @param recursive + * @return The maximum sequence number + * @throws java.io.IOException + */ + public static int getMaxFileSequence(FileSystem fs, Path path, boolean recursive) throws IOException { + if (!fs.isDirectory(path)) { + return -1; + } + + FileStatus[] files = fs.listStatus(path); + + if (files == null || files.length == 0) { + return -1; + } + + int maxValue = -1; + List<Path> fileNamePatternMatchedList = new ArrayList<Path>(); + + for (FileStatus eachFile: files) { + // In the case of partition table, return largest value within all partition dirs. + if (eachFile.isDirectory() && recursive) { + int value = getMaxFileSequence(fs, eachFile.getPath(), recursive); + if (value > maxValue) { + maxValue = value; + } + } else { + if (eachFile.getPath().getName().matches(fileNamePatternV08) || + eachFile.getPath().getName().matches(fileNamePatternV09)) { + fileNamePatternMatchedList.add(eachFile.getPath()); + } + } + } + + if (fileNamePatternMatchedList.isEmpty()) { + return maxValue; + } + Path lastFile = fileNamePatternMatchedList.get(fileNamePatternMatchedList.size() - 1); + String pathName = lastFile.getName(); + + // 0.8: pathName = part-<ExecutionBlockId.seq>-<QueryUnitId.seq> + // 0.9: pathName = part-<ExecutionBlockId.seq>-<QueryUnitId.seq>-<Sequence> + String[] pathTokens = pathName.split("-"); + if (pathTokens.length == 3) { + return -1; + } else if(pathTokens.length == 4) { + return Integer.parseInt(pathTokens[3]); + } else { + return -1; + } + } + + public static void closeBuffer(ByteBuffer buffer) { + if (buffer != null) { + if (buffer.isDirect()) { + ((DirectBuffer) buffer).cleaner().clean(); + } else { + buffer.clear(); + } + } + } + + public static int readFully(InputStream is, byte[] buffer, int offset, int length) + throws IOException { + int nread = 0; + while (nread < length) { + int nbytes = is.read(buffer, offset + nread, length - nread); + if (nbytes < 0) { + return nread > 0 ? nread : nbytes; + } + nread += nbytes; + } + return nread; + } + + /** + * Similar to readFully(). Skips bytes in a loop. + * @param in The DataInput to skip bytes from + * @param len number of bytes to skip. + * @throws java.io.IOException if it could not skip requested number of bytes + * for any reason (including EOF) + */ + public static void skipFully(DataInput in, int len) throws IOException { + int amt = len; + while (amt > 0) { + long ret = in.skipBytes(amt); + if (ret == 0) { + // skip may return 0 even if we're not at EOF. Luckily, we can + // use the read() method to figure out if we're at the end. + int b = in.readByte(); + if (b == -1) { + throw new EOFException( "Premature EOF from inputStream after " + + "skipping " + (len - amt) + " byte(s)."); + } + ret = 1; + } + amt -= ret; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java new file mode 100644 index 0000000..a2c08de --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.statistics.ColumnStats; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes.DataType; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.NullDatum; + +/** + * This class is not thread-safe. + */ +public class TableStatistics { + private static final Log LOG = LogFactory.getLog(TableStatistics.class); + private Schema schema; + private Tuple minValues; + private Tuple maxValues; + private long [] numNulls; + private long numRows = 0; + private long numBytes = 0; + + private boolean [] comparable; + + public TableStatistics(Schema schema) { + this.schema = schema; + minValues = new VTuple(schema.size()); + maxValues = new VTuple(schema.size()); + + numNulls = new long[schema.size()]; + comparable = new boolean[schema.size()]; + + DataType type; + for (int i = 0; i < schema.size(); i++) { + type = schema.getColumn(i).getDataType(); + if (type.getType() == Type.PROTOBUF) { + comparable[i] = false; + } else { + comparable[i] = true; + } + } + } + + public Schema getSchema() { + return this.schema; + } + + public void incrementRow() { + numRows++; + } + + public long getNumRows() { + return this.numRows; + } + + public void setNumBytes(long bytes) { + this.numBytes = bytes; + } + + public long getNumBytes() { + return this.numBytes; + } + + public void analyzeField(int idx, Datum datum) { + if (datum instanceof NullDatum) { + numNulls[idx]++; + return; + } + + if (comparable[idx]) { + if (!maxValues.contains(idx) || + maxValues.get(idx).compareTo(datum) < 0) { + maxValues.put(idx, datum); + } + if (!minValues.contains(idx) || + minValues.get(idx).compareTo(datum) > 0) { + minValues.put(idx, datum); + } + } + } + + public TableStats getTableStat() { + TableStats stat = new TableStats(); + + ColumnStats columnStats; + for (int i = 0; i < schema.size(); i++) { + columnStats = new ColumnStats(schema.getColumn(i)); + columnStats.setNumNulls(numNulls[i]); + if (minValues.get(i) == null || schema.getColumn(i).getDataType().getType() == minValues.get(i).type()) { + columnStats.setMinValue(minValues.get(i)); + } else { + LOG.warn("Wrong statistics column type (" + minValues.get(i).type() + + ", expected=" + schema.getColumn(i).getDataType().getType() + ")"); + } + if (maxValues.get(i) == null || schema.getColumn(i).getDataType().getType() == maxValues.get(i).type()) { + columnStats.setMaxValue(maxValues.get(i)); + } else { + LOG.warn("Wrong statistics column type (" + maxValues.get(i).type() + + ", expected=" + schema.getColumn(i).getDataType().getType() + ")"); + } + stat.addColumnStat(columnStats); + } + + stat.setNumRows(this.numRows); + stat.setNumBytes(this.numBytes); + + return stat; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java new file mode 100644 index 0000000..ab8816b --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.protobuf.Message; +import org.apache.commons.codec.binary.Base64; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.*; +import org.apache.tajo.datum.protobuf.ProtobufJsonFormat; +import org.apache.tajo.util.Bytes; +import org.apache.tajo.util.NumberUtil; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.TimeZone; + +// Compatibility with Apache Hive +@Deprecated +public class TextSerializerDeserializer implements SerializerDeserializer { + public static final byte[] trueBytes = "true".getBytes(); + public static final byte[] falseBytes = "false".getBytes(); + private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); + + @Override + public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException { + + byte[] bytes; + int length = 0; + TajoDataTypes.DataType dataType = col.getDataType(); + + if (datum == null || datum instanceof NullDatum) { + switch (dataType.getType()) { + case CHAR: + case TEXT: + length = nullCharacters.length; + out.write(nullCharacters); + break; + default: + break; + } + return length; + } + + switch (dataType.getType()) { + case BOOLEAN: + out.write(datum.asBool() ? trueBytes : falseBytes); + length = trueBytes.length; + break; + case CHAR: + byte[] pad = new byte[dataType.getLength() - datum.size()]; + bytes = datum.asTextBytes(); + out.write(bytes); + out.write(pad); + length = bytes.length + pad.length; + break; + case TEXT: + case BIT: + case INT2: + case INT4: + case INT8: + case FLOAT4: + case FLOAT8: + case INET4: + case DATE: + case INTERVAL: + bytes = datum.asTextBytes(); + length = bytes.length; + out.write(bytes); + break; + case TIME: + bytes = ((TimeDatum)datum).asChars(TimeZone.getDefault(), true).getBytes(); + length = bytes.length; + out.write(bytes); + break; + case TIMESTAMP: + bytes = ((TimestampDatum)datum).asChars(TimeZone.getDefault(), true).getBytes(); + length = bytes.length; + out.write(bytes); + break; + case INET6: + case BLOB: + bytes = Base64.encodeBase64(datum.asByteArray(), false); + length = bytes.length; + out.write(bytes, 0, length); + break; + case PROTOBUF: + ProtobufDatum protobuf = (ProtobufDatum) datum; + byte[] protoBytes = protobufJsonFormat.printToString(protobuf.get()).getBytes(); + length = protoBytes.length; + out.write(protoBytes, 0, protoBytes.length); + break; + case NULL_TYPE: + default: + break; + } + return length; + } + + @Override + public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException { + + Datum datum; + switch (col.getDataType().getType()) { + case BOOLEAN: + datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createBool(bytes[offset] == 't' || bytes[offset] == 'T'); + break; + case BIT: + datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createBit(Byte.parseByte(new String(bytes, offset, length))); + break; + case CHAR: + datum = isNullText(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createChar(new String(bytes, offset, length).trim()); + break; + case INT1: + case INT2: + datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createInt2((short) NumberUtil.parseInt(bytes, offset, length)); + break; + case INT4: + datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createInt4(NumberUtil.parseInt(bytes, offset, length)); + break; + case INT8: + datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createInt8(new String(bytes, offset, length)); + break; + case FLOAT4: + datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createFloat4(new String(bytes, offset, length)); + break; + case FLOAT8: + datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createFloat8(NumberUtil.parseDouble(bytes, offset, length)); + break; + case TEXT: { + byte[] chars = new byte[length]; + System.arraycopy(bytes, offset, chars, 0, length); + datum = isNullText(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createText(chars); + break; + } + case DATE: + datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createDate(new String(bytes, offset, length)); + break; + case TIME: + datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createTime(new String(bytes, offset, length)); + break; + case TIMESTAMP: + datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createTimestamp(new String(bytes, offset, length)); + break; + case INTERVAL: + datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createInterval(new String(bytes, offset, length)); + break; + case PROTOBUF: { + if (isNull(bytes, offset, length, nullCharacters)) { + datum = NullDatum.get(); + } else { + ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType()); + Message.Builder builder = factory.newBuilder(); + try { + byte[] protoBytes = new byte[length]; + System.arraycopy(bytes, offset, protoBytes, 0, length); + protobufJsonFormat.merge(protoBytes, builder); + datum = factory.createDatum(builder.build()); + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + break; + } + case INET4: + datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() + : DatumFactory.createInet4(new String(bytes, offset, length)); + break; + case BLOB: { + if (isNull(bytes, offset, length, nullCharacters)) { + datum = NullDatum.get(); + } else { + byte[] blob = new byte[length]; + System.arraycopy(bytes, offset, blob, 0, length); + datum = DatumFactory.createBlob(Base64.decodeBase64(blob)); + } + break; + } + default: + datum = NullDatum.get(); + break; + } + return datum; + } + + private static boolean isNull(byte[] val, int offset, int length, byte[] nullBytes) { + return length == 0 || ((length == nullBytes.length) + && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length)); + } + + private static boolean isNullText(byte[] val, int offset, int length, byte[] nullBytes) { + return length > 0 && length == nullBytes.length + && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleComparator.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleComparator.java new file mode 100644 index 0000000..8dffd8d --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleComparator.java @@ -0,0 +1,32 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.tajo.common.ProtoObject; + +import java.util.Comparator; + +import static org.apache.tajo.index.IndexProtos.TupleComparatorProto; + +public abstract class TupleComparator implements Comparator<Tuple>, ProtoObject<TupleComparatorProto> { + + public abstract int compare(Tuple o1, Tuple o2); + + public abstract boolean isAscendingFirstKey(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java new file mode 100644 index 0000000..e824b99 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.base.Objects; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; + +import java.util.Comparator; + +/** + * It represents a pair of start and end tuples. + */ +public class TupleRange implements Comparable<TupleRange>, Cloneable { + private Tuple start; + private Tuple end; + private final TupleComparator comp; + + public TupleRange(final SortSpec[] sortSpecs, final Tuple start, final Tuple end) { + this.comp = new BaseTupleComparator(sortSpecsToSchema(sortSpecs), sortSpecs); + // if there is only one value, start == end + this.start = start; + this.end = end; + } + + public static Schema sortSpecsToSchema(SortSpec[] sortSpecs) { + Schema schema = new Schema(); + for (SortSpec spec : sortSpecs) { + schema.addColumn(spec.getSortKey()); + } + + return schema; + } + + public void setStart(Tuple tuple) { + this.start = tuple; + } + + public final Tuple getStart() { + return this.start; + } + + public void setEnd(Tuple tuple) { + this.end = tuple; + } + + public final Tuple getEnd() { + return this.end; + } + + public String toString() { + return "[" + this.start + ", " + this.end + ")"; + } + + @Override + public int hashCode() { + return Objects.hashCode(start, end); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof TupleRange) { + TupleRange other = (TupleRange) obj; + return this.start.equals(other.start) && this.end.equals(other.end); + } else { + return false; + } + } + + @Override + public int compareTo(TupleRange o) { + // TODO - should handle overlap + int cmpVal = comp.compare(this.start, o.start); + if (cmpVal != 0) { + return cmpVal; + } else { + return comp.compare(this.end, o.end); + } + } + + public static class DescendingTupleRangeComparator + implements Comparator<TupleRange> { + + @Override + public int compare(TupleRange left, TupleRange right) { + return right.compareTo(left); + } + } + + public TupleRange clone() throws CloneNotSupportedException { + TupleRange newRange = (TupleRange) super.clone(); + newRange.setStart(start.clone()); + newRange.setEnd(end.clone()); + return newRange; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java new file mode 100644 index 0000000..ad19101 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface ForSplitableStore { +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/compress/CodecPool.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/compress/CodecPool.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/compress/CodecPool.java new file mode 100644 index 0000000..baeda8c --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/compress/CodecPool.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.compress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DoNotPool; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A global compressor/decompressor pool used to save and reuse (possibly + * native) compression/decompression codecs. + */ +public final class CodecPool { + private static final Log LOG = LogFactory.getLog(CodecPool.class); + + /** + * A global compressor pool used to save the expensive + * construction/destruction of (possibly native) decompression codecs. + */ + private static final Map<Class<Compressor>, List<Compressor>> COMPRESSOR_POOL = + new HashMap<Class<Compressor>, List<Compressor>>(); + + /** + * A global decompressor pool used to save the expensive + * construction/destruction of (possibly native) decompression codecs. + */ + private static final Map<Class<Decompressor>, List<Decompressor>> DECOMPRESSOR_POOL = + new HashMap<Class<Decompressor>, List<Decompressor>>(); + + private static <T> T borrow(Map<Class<T>, List<T>> pool, + Class<? extends T> codecClass) { + T codec = null; + + // Check if an appropriate codec is available + synchronized (pool) { + if (pool.containsKey(codecClass)) { + List<T> codecList = pool.get(codecClass); + + if (codecList != null) { + synchronized (codecList) { + if (!codecList.isEmpty()) { + codec = codecList.remove(codecList.size() - 1); + } + } + } + } + } + + return codec; + } + + private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) { + if (codec != null) { + Class<T> codecClass = (Class<T>) codec.getClass(); + synchronized (pool) { + if (!pool.containsKey(codecClass)) { + pool.put(codecClass, new ArrayList<T>()); + } + + List<T> codecList = pool.get(codecClass); + synchronized (codecList) { + codecList.add(codec); + } + } + } + } + + /** + * Get a {@link Compressor} for the given {@link CompressionCodec} from the + * pool or a new one. + * + * @param codec + * the <code>CompressionCodec</code> for which to get the + * <code>Compressor</code> + * @param conf the <code>Configuration</code> object which contains confs for creating or reinit the compressor + * @return <code>Compressor</code> for the given <code>CompressionCodec</code> + * from the pool or a new one + */ + public static Compressor getCompressor(CompressionCodec codec, Configuration conf) { + Compressor compressor = borrow(COMPRESSOR_POOL, codec.getCompressorType()); + if (compressor == null) { + compressor = codec.createCompressor(); + LOG.info("Got brand-new compressor ["+codec.getDefaultExtension()+"]"); + } else { + compressor.reinit(conf); + if(LOG.isDebugEnabled()) { + LOG.debug("Got recycled compressor"); + } + } + return compressor; + } + + public static Compressor getCompressor(CompressionCodec codec) { + return getCompressor(codec, null); + } + + /** + * Get a {@link Decompressor} for the given {@link CompressionCodec} from the + * pool or a new one. + * + * @param codec + * the <code>CompressionCodec</code> for which to get the + * <code>Decompressor</code> + * @return <code>Decompressor</code> for the given + * <code>CompressionCodec</code> the pool or a new one + */ + public static Decompressor getDecompressor(CompressionCodec codec) { + Decompressor decompressor = borrow(DECOMPRESSOR_POOL, codec + .getDecompressorType()); + if (decompressor == null) { + decompressor = codec.createDecompressor(); + LOG.info("Got brand-new decompressor ["+codec.getDefaultExtension()+"]"); + } else { + if(LOG.isDebugEnabled()) { + LOG.debug("Got recycled decompressor"); + } + } + return decompressor; + } + + /** + * Return the {@link Compressor} to the pool. + * + * @param compressor + * the <code>Compressor</code> to be returned to the pool + */ + public static void returnCompressor(Compressor compressor) { + if (compressor == null) { + return; + } + // if the compressor can't be reused, don't pool it. + if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) { + return; + } + compressor.reset(); + payback(COMPRESSOR_POOL, compressor); + } + + /** + * Return the {@link Decompressor} to the pool. + * + * @param decompressor + * the <code>Decompressor</code> to be returned to the pool + */ + public static void returnDecompressor(Decompressor decompressor) { + if (decompressor == null) { + return; + } + // if the decompressor can't be reused, don't pool it. + if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) { + return; + } + decompressor.reset(); + payback(DECOMPRESSOR_POOL, decompressor); + } + + private CodecPool() { + // prevent instantiation + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java new file mode 100644 index 0000000..bb035a8 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * + */ +package org.apache.tajo.storage.exception; + +import org.apache.hadoop.fs.Path; + +import java.io.IOException; + +public class AlreadyExistsStorageException extends IOException { + private static final long serialVersionUID = 965518916144019032L; + + + public AlreadyExistsStorageException(String path) { + super("Error: "+path+" alreay exists"); + } + + public AlreadyExistsStorageException(Path path) { + this(path.toString()); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java new file mode 100644 index 0000000..a67d1f7 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.exception; + +public class UnknownCodecException extends Exception { + + private static final long serialVersionUID = 4287230843540404529L; + + public UnknownCodecException() { + + } + + public UnknownCodecException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java new file mode 100644 index 0000000..d18b5a0 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.exception; + +public class UnknownDataTypeException extends Exception { + + private static final long serialVersionUID = -2630390595968966164L; + + public UnknownDataTypeException() { + + } + + public UnknownDataTypeException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java new file mode 100644 index 0000000..8b197d6 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * + */ +package org.apache.tajo.storage.exception; + +public class UnsupportedFileTypeException extends RuntimeException { + private static final long serialVersionUID = -8160289695849000342L; + + public UnsupportedFileTypeException() { + } + + /** + * @param message + */ + public UnsupportedFileTypeException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java new file mode 100644 index 0000000..ac43197 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.fragment; + +import org.apache.tajo.common.ProtoObject; + +import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; + +public interface Fragment extends ProtoObject<FragmentProto> { + + public abstract String getTableName(); + + @Override + public abstract FragmentProto getProto(); + + public abstract long getLength(); + + public abstract String getKey(); + + public String[] getHosts(); + + public abstract boolean isEmpty(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java new file mode 100644 index 0000000..07720c7 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.fragment; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; +import org.apache.hadoop.conf.Configuration; +import org.apache.tajo.annotation.ThreadSafe; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.List; +import java.util.Map; + +import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; + +@ThreadSafe +public class FragmentConvertor { + /** + * Cache of fragment classes + */ + protected static final Map<String, Class<? extends Fragment>> CACHED_FRAGMENT_CLASSES = Maps.newConcurrentMap(); + /** + * Cache of constructors for each class. + */ + private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap(); + /** + * default parameter for all constructors + */ + private static final Class<?>[] DEFAULT_FRAGMENT_PARAMS = { ByteString.class }; + + public static Class<? extends Fragment> getFragmentClass(Configuration conf, String storeType) + throws IOException { + Class<? extends Fragment> fragmentClass = CACHED_FRAGMENT_CLASSES.get(storeType.toLowerCase()); + if (fragmentClass == null) { + fragmentClass = conf.getClass( + String.format("tajo.storage.fragment.%s.class", storeType.toLowerCase()), null, Fragment.class); + CACHED_FRAGMENT_CLASSES.put(storeType.toLowerCase(), fragmentClass); + } + + if (fragmentClass == null) { + throw new IOException("No such a fragment for " + storeType.toLowerCase()); + } + + return fragmentClass; + } + + public static <T extends Fragment> T convert(Class<T> clazz, FragmentProto fragment) { + T result; + try { + Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz); + if (constructor == null) { + constructor = clazz.getDeclaredConstructor(DEFAULT_FRAGMENT_PARAMS); + constructor.setAccessible(true); + CONSTRUCTOR_CACHE.put(clazz, constructor); + } + result = constructor.newInstance(new Object[]{fragment.getContents()}); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return result; + } + + public static <T extends Fragment> T convert(Configuration conf, FragmentProto fragment) + throws IOException { + Class<T> fragmentClass = (Class<T>) getFragmentClass(conf, fragment.getStoreType().toLowerCase()); + if (fragmentClass == null) { + throw new IOException("No such a fragment class for " + fragment.getStoreType()); + } + return convert(fragmentClass, fragment); + } + + public static <T extends Fragment> List<T> convert(Class<T> clazz, FragmentProto...fragments) + throws IOException { + List<T> list = Lists.newArrayList(); + if (fragments == null) { + return list; + } + for (FragmentProto proto : fragments) { + list.add(convert(clazz, proto)); + } + return list; + } + + public static <T extends Fragment> List<T> convert(Configuration conf, FragmentProto...fragments) throws IOException { + List<T> list = Lists.newArrayList(); + if (fragments == null) { + return list; + } + for (FragmentProto proto : fragments) { + list.add((T) convert(conf, proto)); + } + return list; + } + + public static List<FragmentProto> toFragmentProtoList(Fragment... fragments) { + List<FragmentProto> list = Lists.newArrayList(); + if (fragments == null) { + return list; + } + for (Fragment fragment : fragments) { + list.add(fragment.getProto()); + } + return list; + } + + public static FragmentProto [] toFragmentProtoArray(Fragment... fragments) { + List<FragmentProto> list = toFragmentProtoList(fragments); + return list.toArray(new FragmentProto[list.size()]); + } +}
