http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java index 24d2dfa..0751035 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java @@ -18,33 +18,28 @@ package org.apache.tajo.storage; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Maps; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; -import org.apache.tajo.*; -import org.apache.tajo.catalog.*; -import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.OverridableConf; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.logical.LogicalNode; -import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.fragment.FragmentConvertor; -import org.apache.tajo.util.TUtil; import java.io.IOException; -import java.lang.reflect.Constructor; -import java.text.NumberFormat; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; /** * StorageManager manages the functions of storing and reading data. @@ -52,23 +47,7 @@ import java.util.concurrent.ConcurrentHashMap; * For supporting such as HDFS, HBASE, a specific StorageManager should be implemented by inheriting this class. * */ -public abstract class StorageManager { - private final Log LOG = LogFactory.getLog(StorageManager.class); - - private static final Class<?>[] DEFAULT_SCANNER_PARAMS = { - Configuration.class, - Schema.class, - TableMeta.class, - Fragment.class - }; - - private static final Class<?>[] DEFAULT_APPENDER_PARAMS = { - Configuration.class, - TaskAttemptId.class, - Schema.class, - TableMeta.class, - Path.class - }; +public abstract class StorageManager implements TableSpace { public static final PathFilter hiddenFileFilter = new PathFilter() { public boolean accept(Path p) { @@ -80,31 +59,6 @@ public abstract class StorageManager { protected TajoConf conf; protected String storeType; - /** - * Cache of StorageManager. - * Key is manager key(warehouse path) + store type - */ - private static final Map<String, StorageManager> storageManagers = Maps.newHashMap(); - - /** - * Cache of scanner handlers for each storage type. - */ - protected static final Map<String, Class<? extends Scanner>> SCANNER_HANDLER_CACHE - = new ConcurrentHashMap<String, Class<? extends Scanner>>(); - - /** - * Cache of appender handlers for each storage type. - */ - protected static final Map<String, Class<? extends Appender>> APPENDER_HANDLER_CACHE - = new ConcurrentHashMap<String, Class<? extends Appender>>(); - - /** - * Cache of constructors for each class. Pins the classes so they - * can't be garbage collected until ReflectionUtils can be collected. - */ - private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = - new ConcurrentHashMap<Class<?>, Constructor<?>>(); - public StorageManager(String storeType) { this.storeType = storeType; } @@ -123,6 +77,7 @@ public abstract class StorageManager { * @param ifNotExists Creates the table only when the table does not exist. * @throws java.io.IOException */ + @Override public abstract void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException; /** @@ -132,6 +87,7 @@ public abstract class StorageManager { * @param tableDesc * @throws java.io.IOException */ + @Override public abstract void purgeTable(TableDesc tableDesc) throws IOException; /** @@ -143,6 +99,7 @@ public abstract class StorageManager { * @return The list of input fragments. * @throws java.io.IOException */ + @Override public abstract List<Fragment> getSplits(String fragmentId, TableDesc tableDesc, ScanNode scanNode) throws IOException; @@ -167,21 +124,11 @@ public abstract class StorageManager { /** * Release storage manager resource */ - public abstract void closeStorageManager(); + @Override + public abstract void close(); /** - * Clear all class cache - */ - @VisibleForTesting - protected synchronized static void clearCache() { - CONSTRUCTOR_CACHE.clear(); - SCANNER_HANDLER_CACHE.clear(); - APPENDER_HANDLER_CACHE.clear(); - storageManagers.clear(); - } - - /** * It is called by a Repartitioner for range shuffling when the SortRangeType of SortNode is USING_STORAGE_MANAGER. * In general Repartitioner determines the partition range using previous output statistics data. * In the special cases, such as HBase Repartitioner uses the result of this method. @@ -237,19 +184,6 @@ public abstract class StorageManager { } /** - * Close StorageManager - * @throws java.io.IOException - */ - public static void close() throws IOException { - synchronized(storageManagers) { - for (StorageManager eachStorageManager: storageManagers.values()) { - eachStorageManager.closeStorageManager(); - } - } - clearCache(); - } - - /** * Returns the splits that will serve as input for the scan tasks. The * number of splits matches the number of regions in a table. * @@ -263,85 +197,6 @@ public abstract class StorageManager { } /** - * Returns FileStorageManager instance. - * - * @param tajoConf Tajo system property. - * @return - * @throws java.io.IOException - */ - public static StorageManager getFileStorageManager(TajoConf tajoConf) throws IOException { - return getStorageManager(tajoConf, "CSV"); - } - - /** - * Returns the proper StorageManager instance according to the storeType. - * - * @param tajoConf Tajo system property. - * @param storeType Storage type - * @return - * @throws java.io.IOException - */ - public static StorageManager getStorageManager(TajoConf tajoConf, String storeType) throws IOException { - FileSystem fileSystem = TajoConf.getWarehouseDir(tajoConf).getFileSystem(tajoConf); - if (fileSystem != null) { - return getStorageManager(tajoConf, storeType, fileSystem.getUri().toString()); - } else { - return getStorageManager(tajoConf, storeType, null); - } - } - - /** - * Returns the proper StorageManager instance according to the storeType - * - * @param tajoConf Tajo system property. - * @param storeType Storage type - * @param managerKey Key that can identify each storage manager(may be a path) - * @return - * @throws java.io.IOException - */ - private static synchronized StorageManager getStorageManager ( - TajoConf tajoConf, String storeType, String managerKey) throws IOException { - - String typeName; - if (storeType.equalsIgnoreCase("HBASE")) { - typeName = "hbase"; - } else { - typeName = "hdfs"; - } - - synchronized (storageManagers) { - String storeKey = typeName + "_" + managerKey; - StorageManager manager = storageManagers.get(storeKey); - - if (manager == null) { - Class<? extends StorageManager> storageManagerClass = - tajoConf.getClass(String.format("tajo.storage.manager.%s.class", typeName), null, StorageManager.class); - - if (storageManagerClass == null) { - throw new IOException("Unknown Storage Type: " + typeName); - } - - try { - Constructor<? extends StorageManager> constructor = - (Constructor<? extends StorageManager>) CONSTRUCTOR_CACHE.get(storageManagerClass); - if (constructor == null) { - constructor = storageManagerClass.getDeclaredConstructor(new Class<?>[]{String.class}); - constructor.setAccessible(true); - CONSTRUCTOR_CACHE.put(storageManagerClass, constructor); - } - manager = constructor.newInstance(new Object[]{storeType}); - } catch (Exception e) { - throw new RuntimeException(e); - } - manager.init(tajoConf); - storageManagers.put(storeKey, manager); - } - - return manager; - } - } - - /** * Returns Scanner instance. * * @param meta The table meta @@ -351,6 +206,7 @@ public abstract class StorageManager { * @return Scanner instance * @throws java.io.IOException */ + @Override public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException { return getScanner(meta, schema, FragmentConvertor.convert(conf, fragment), target); } @@ -364,6 +220,7 @@ public abstract class StorageManager { * @return Scanner instance * @throws java.io.IOException */ + @Override public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException { return getScanner(meta, schema, fragment, schema); } @@ -378,6 +235,7 @@ public abstract class StorageManager { * @return Scanner instance * @throws java.io.IOException */ + @Override public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException { if (fragment.isEmpty()) { Scanner scanner = new NullScanner(conf, schema, meta, fragment); @@ -389,29 +247,13 @@ public abstract class StorageManager { Scanner scanner; Class<? extends Scanner> scannerClass = getScannerClass(meta.getStoreType()); - scanner = newScannerInstance(scannerClass, conf, schema, meta, fragment); + scanner = TableSpaceManager.newScannerInstance(scannerClass, conf, schema, meta, fragment); scanner.setTarget(target.toArray()); return scanner; } /** - * Returns Scanner instance. - * - * @param conf The system property - * @param meta The table meta - * @param schema The input schema - * @param fragment The fragment for scanning - * @param target The output schema - * @return Scanner instance - * @throws java.io.IOException - */ - public static synchronized SeekableScanner getSeekableScanner( - TajoConf conf, TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException { - return (SeekableScanner)getStorageManager(conf, meta.getStoreType()).getScanner(meta, schema, fragment, target); - } - - /** * Returns Appender instance. * @param queryContext Query property. * @param taskAttemptId Task id. @@ -429,82 +271,23 @@ public abstract class StorageManager { Class<? extends Appender> appenderClass; String handlerName = meta.getStoreType().toLowerCase(); - appenderClass = APPENDER_HANDLER_CACHE.get(handlerName); + appenderClass = TableSpaceManager.APPENDER_HANDLER_CACHE.get(handlerName); if (appenderClass == null) { appenderClass = conf.getClass( String.format("tajo.storage.appender-handler.%s.class", handlerName), null, Appender.class); - APPENDER_HANDLER_CACHE.put(handlerName, appenderClass); + TableSpaceManager.APPENDER_HANDLER_CACHE.put(handlerName, appenderClass); } if (appenderClass == null) { throw new IOException("Unknown Storage Type: " + meta.getStoreType()); } - appender = newAppenderInstance(appenderClass, conf, taskAttemptId, meta, schema, workDir); + appender = TableSpaceManager.newAppenderInstance(appenderClass, conf, taskAttemptId, meta, schema, workDir); return appender; } /** - * Creates a scanner instance. - * - * @param theClass Concrete class of scanner - * @param conf System property - * @param schema Input schema - * @param meta Table meta data - * @param fragment The fragment for scanning - * @param <T> - * @return The scanner instance - */ - public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, Schema schema, TableMeta meta, - Fragment fragment) { - T result; - try { - Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass); - if (meth == null) { - meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS); - meth.setAccessible(true); - CONSTRUCTOR_CACHE.put(theClass, meth); - } - result = meth.newInstance(new Object[]{conf, schema, meta, fragment}); - } catch (Exception e) { - throw new RuntimeException(e); - } - - return result; - } - - /** - * Creates a scanner instance. - * - * @param theClass Concrete class of scanner - * @param conf System property - * @param taskAttemptId Task id - * @param meta Table meta data - * @param schema Input schema - * @param workDir Working directory - * @param <T> - * @return The scanner instance - */ - public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TaskAttemptId taskAttemptId, - TableMeta meta, Schema schema, Path workDir) { - T result; - try { - Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass); - if (meth == null) { - meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS); - meth.setAccessible(true); - CONSTRUCTOR_CACHE.put(theClass, meth); - } - result = meth.newInstance(new Object[]{conf, taskAttemptId, schema, meta, workDir}); - } catch (Exception e) { - throw new RuntimeException(e); - } - - return result; - } - - /** * Return the Scanner class for the StoreType that is defined in storage-default.xml. * * @param storeType store type @@ -513,11 +296,11 @@ public abstract class StorageManager { */ public Class<? extends Scanner> getScannerClass(String storeType) throws IOException { String handlerName = storeType.toLowerCase(); - Class<? extends Scanner> scannerClass = SCANNER_HANDLER_CACHE.get(handlerName); + Class<? extends Scanner> scannerClass = TableSpaceManager.SCANNER_HANDLER_CACHE.get(handlerName); if (scannerClass == null) { scannerClass = conf.getClass( String.format("tajo.storage.scanner-handler.%s.class", handlerName), null, Scanner.class); - SCANNER_HANDLER_CACHE.put(handlerName, scannerClass); + TableSpaceManager.SCANNER_HANDLER_CACHE.put(handlerName, scannerClass); } if (scannerClass == null) { @@ -550,6 +333,7 @@ public abstract class StorageManager { * @param outSchema The output schema of select query for inserting. * @throws java.io.IOException */ + @Override public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException { // nothing to do } @@ -563,7 +347,9 @@ public abstract class StorageManager { * @return The list of storage specified rewrite rules * @throws java.io.IOException */ - public List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException { + @Override + public List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) + throws IOException { return null; } @@ -580,375 +366,8 @@ public abstract class StorageManager { * @return Saved path * @throws java.io.IOException */ - public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId, + @Override + public abstract Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan, Schema schema, - TableDesc tableDesc) throws IOException { - return commitOutputData(queryContext, finalEbId, plan, schema, tableDesc, true); - } - - /** - * Finalizes result data. Tajo stores result data in the staging directory. - * If the query fails, clean up the staging directory. - * Otherwise the query is successful, move to the final directory from the staging directory. - * - * @param queryContext The query property - * @param finalEbId The final execution block id - * @param plan The query plan - * @param schema The final output schema - * @param tableDesc The description of the target table - * @param changeFileSeq If true change result file name with max sequence. - * @return Saved path - * @throws java.io.IOException - */ - protected Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId, - LogicalPlan plan, Schema schema, - TableDesc tableDesc, boolean changeFileSeq) throws IOException { - Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR)); - Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); - Path finalOutputDir; - if (!queryContext.get(QueryVars.OUTPUT_TABLE_PATH, "").isEmpty()) { - finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_PATH)); - try { - FileSystem fs = stagingResultDir.getFileSystem(conf); - - if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // INSERT OVERWRITE INTO - - // It moves the original table into the temporary location. - // Then it moves the new result table into the original table location. - // Upon failed, it recovers the original table if possible. - boolean movedToOldTable = false; - boolean committed = false; - Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); - ContentSummary summary = fs.getContentSummary(stagingResultDir); - - if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty() && summary.getFileCount() > 0L) { - // This is a map for existing non-leaf directory to rename. A key is current directory and a value is - // renaming directory. - Map<Path, Path> renameDirs = TUtil.newHashMap(); - // This is a map for recovering existing partition directory. A key is current directory and a value is - // temporary directory to back up. - Map<Path, Path> recoveryDirs = TUtil.newHashMap(); - - try { - if (!fs.exists(finalOutputDir)) { - fs.mkdirs(finalOutputDir); - } - - visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(), - renameDirs, oldTableDir); - - // Rename target partition directories - for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { - // Backup existing data files for recovering - if (fs.exists(entry.getValue())) { - String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(), - oldTableDir.toString()); - Path recoveryPath = new Path(recoveryPathString); - fs.rename(entry.getValue(), recoveryPath); - fs.exists(recoveryPath); - recoveryDirs.put(entry.getValue(), recoveryPath); - } - // Delete existing directory - fs.delete(entry.getValue(), true); - // Rename staging directory to final output directory - fs.rename(entry.getKey(), entry.getValue()); - } - - } catch (IOException ioe) { - // Remove created dirs - for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { - fs.delete(entry.getValue(), true); - } - - // Recovery renamed dirs - for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) { - fs.delete(entry.getValue(), true); - fs.rename(entry.getValue(), entry.getKey()); - } - - throw new IOException(ioe.getMessage()); - } - } else { // no partition - try { - - // if the final output dir exists, move all contents to the temporary table dir. - // Otherwise, just make the final output dir. As a result, the final output dir will be empty. - if (fs.exists(finalOutputDir)) { - fs.mkdirs(oldTableDir); - - for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) { - fs.rename(status.getPath(), oldTableDir); - } - - movedToOldTable = fs.exists(oldTableDir); - } else { // if the parent does not exist, make its parent directory. - fs.mkdirs(finalOutputDir); - } - - // Move the results to the final output dir. - for (FileStatus status : fs.listStatus(stagingResultDir)) { - fs.rename(status.getPath(), finalOutputDir); - } - - // Check the final output dir - committed = fs.exists(finalOutputDir); - - } catch (IOException ioe) { - // recover the old table - if (movedToOldTable && !committed) { - - // if commit is failed, recover the old data - for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) { - fs.delete(status.getPath(), true); - } - - for (FileStatus status : fs.listStatus(oldTableDir)) { - fs.rename(status.getPath(), finalOutputDir); - } - } - - throw new IOException(ioe.getMessage()); - } - } - } else { - String queryType = queryContext.get(QueryVars.COMMAND_TYPE); - - if (queryType != null && queryType.equals(NodeType.INSERT.name())) { // INSERT INTO an existing table - - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(3); - - if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) { - for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { - if (eachFile.isFile()) { - LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath()); - continue; - } - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1, changeFileSeq); - } - } else { - int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1; - for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { - if (eachFile.getPath().getName().startsWith("_")) { - continue; - } - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++, changeFileSeq); - } - } - // checking all file moved and remove empty dir - verifyAllFileMoved(fs, stagingResultDir); - FileStatus[] files = fs.listStatus(stagingResultDir); - if (files != null && files.length != 0) { - for (FileStatus eachFile: files) { - LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); - } - } - } else { // CREATE TABLE AS SELECT (CTAS) - if (fs.exists(finalOutputDir)) { - for (FileStatus status : fs.listStatus(stagingResultDir)) { - fs.rename(status.getPath(), finalOutputDir); - } - } else { - fs.rename(stagingResultDir, finalOutputDir); - } - LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir); - } - } - - // remove the staging directory if the final output dir is given. - Path stagingDirRoot = stagingDir.getParent(); - fs.delete(stagingDirRoot, true); - } catch (Throwable t) { - LOG.error(t); - throw new IOException(t); - } - } else { - finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); - } - - return finalOutputDir; - } - - /** - * Attach the sequence number to the output file name and than move the file into the final result path. - * - * @param fs FileSystem - * @param stagingResultDir The staging result dir - * @param fileStatus The file status - * @param finalOutputPath Final output path - * @param nf Number format - * @param fileSeq The sequence number - * @throws java.io.IOException - */ - private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir, - FileStatus fileStatus, Path finalOutputPath, - NumberFormat nf, - int fileSeq, boolean changeFileSeq) throws IOException { - if (fileStatus.isDirectory()) { - String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); - if (subPath != null) { - Path finalSubPath = new Path(finalOutputPath, subPath); - if (!fs.exists(finalSubPath)) { - fs.mkdirs(finalSubPath); - } - int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false); - for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) { - if (eachFile.getPath().getName().startsWith("_")) { - continue; - } - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq, changeFileSeq); - } - } else { - throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath()); - } - } else { - String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); - if (subPath != null) { - Path finalSubPath = new Path(finalOutputPath, subPath); - if (changeFileSeq) { - finalSubPath = new Path(finalSubPath.getParent(), replaceFileNameSeq(finalSubPath, fileSeq, nf)); - } - if (!fs.exists(finalSubPath.getParent())) { - fs.mkdirs(finalSubPath.getParent()); - } - if (fs.exists(finalSubPath)) { - throw new IOException("Already exists data file:" + finalSubPath); - } - boolean success = fs.rename(fileStatus.getPath(), finalSubPath); - if (success) { - LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " + - "to final output[" + finalSubPath + "]"); - } else { - LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " + - "to final output[" + finalSubPath + "]"); - } - } - } - } - - /** - * Removes the path of the parent. - * @param parentPath - * @param childPath - * @return - */ - private String extractSubPath(Path parentPath, Path childPath) { - String parentPathStr = parentPath.toUri().getPath(); - String childPathStr = childPath.toUri().getPath(); - - if (parentPathStr.length() > childPathStr.length()) { - return null; - } - - int index = childPathStr.indexOf(parentPathStr); - if (index != 0) { - return null; - } - - return childPathStr.substring(parentPathStr.length() + 1); - } - - /** - * Attach the sequence number to a path. - * - * @param path Path - * @param seq sequence number - * @param nf Number format - * @return New path attached with sequence number - * @throws java.io.IOException - */ - private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) throws IOException { - String[] tokens = path.getName().split("-"); - if (tokens.length != 4) { - throw new IOException("Wrong result file name:" + path); - } - return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + nf.format(seq); - } - - /** - * Make sure all files are moved. - * @param fs FileSystem - * @param stagingPath The stagind directory - * @return - * @throws java.io.IOException - */ - private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException { - FileStatus[] files = fs.listStatus(stagingPath); - if (files != null && files.length != 0) { - for (FileStatus eachFile: files) { - if (eachFile.isFile()) { - LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); - return false; - } else { - if (verifyAllFileMoved(fs, eachFile.getPath())) { - fs.delete(eachFile.getPath(), false); - } else { - return false; - } - } - } - } - - return true; - } - - /** - * This method sets a rename map which includes renamed staging directory to final output directory recursively. - * If there exists some data files, this delete it for duplicate data. - * - * - * @param fs - * @param stagingPath - * @param outputPath - * @param stagingParentPathString - * @throws java.io.IOException - */ - private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath, - String stagingParentPathString, - Map<Path, Path> renameDirs, Path oldTableDir) throws IOException { - FileStatus[] files = fs.listStatus(stagingPath); - - for(FileStatus eachFile : files) { - if (eachFile.isDirectory()) { - Path oldPath = eachFile.getPath(); - - // Make recover directory. - String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString, - oldTableDir.toString()); - Path recoveryPath = new Path(recoverPathString); - if (!fs.exists(recoveryPath)) { - fs.mkdirs(recoveryPath); - } - - visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString, - renameDirs, oldTableDir); - // Find last order partition for renaming - String newPathString = oldPath.toString().replaceAll(stagingParentPathString, - outputPath.toString()); - Path newPath = new Path(newPathString); - if (!isLeafDirectory(fs, eachFile.getPath())) { - renameDirs.put(eachFile.getPath(), newPath); - } else { - if (!fs.exists(newPath)) { - fs.mkdirs(newPath); - } - } - } - } - } - - private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException { - boolean retValue = false; - - FileStatus[] files = fs.listStatus(path); - for (FileStatus file : files) { - if (fs.isDirectory(file.getPath())) { - retValue = true; - break; - } - } - - return retValue; - } + TableDesc tableDesc) throws IOException; }
http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpace.java new file mode 100644 index 0000000..ef4aa9a --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpace.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.OverridableConf; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; +import org.apache.tajo.storage.fragment.Fragment; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * It manages each tablespace; e.g., HDFS, Local file system, and Amazon S3. + */ +public interface TableSpace extends Closeable { + //public void format() throws IOException; + + void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException; + + void purgeTable(TableDesc tableDesc) throws IOException; + + List<Fragment> getSplits(String fragmentId, TableDesc tableDesc, ScanNode scanNode) throws IOException; + + List<Fragment> getSplits(String fragmentId, TableDesc tableDesc) throws IOException; + +// public void renameTable() throws IOException; +// +// public void truncateTable() throws IOException; +// +// public long availableCapacity() throws IOException; +// +// public long totalCapacity() throws IOException; + + Scanner getScanner(TableMeta meta, Schema schema, CatalogProtos.FragmentProto fragment, Schema target) throws IOException; + + Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException; + + Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException; + + Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId, + LogicalPlan plan, Schema schema, + TableDesc tableDesc) throws IOException; + + void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException; + + List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException; + + void close() throws IOException; +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java new file mode 100644 index 0000000..42a5e07 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.storage.fragment.Fragment; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * It handles available table spaces and cache TableSpace instances. + */ +public class TableSpaceManager { + + /** + * Cache of scanner handlers for each storage type. + */ + protected static final Map<String, Class<? extends Scanner>> SCANNER_HANDLER_CACHE + = new ConcurrentHashMap<String, Class<? extends Scanner>>(); + /** + * Cache of appender handlers for each storage type. + */ + protected static final Map<String, Class<? extends Appender>> APPENDER_HANDLER_CACHE + = new ConcurrentHashMap<String, Class<? extends Appender>>(); + private static final Class<?>[] DEFAULT_SCANNER_PARAMS = { + Configuration.class, + Schema.class, + TableMeta.class, + Fragment.class + }; + private static final Class<?>[] DEFAULT_APPENDER_PARAMS = { + Configuration.class, + TaskAttemptId.class, + Schema.class, + TableMeta.class, + Path.class + }; + /** + * Cache of StorageManager. + * Key is manager key(warehouse path) + store type + */ + private static final Map<String, StorageManager> storageManagers = Maps.newHashMap(); + /** + * Cache of constructors for each class. Pins the classes so they + * can't be garbage collected until ReflectionUtils can be collected. + */ + private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = + new ConcurrentHashMap<Class<?>, Constructor<?>>(); + + /** + * Clear all class cache + */ + @VisibleForTesting + protected synchronized static void clearCache() { + CONSTRUCTOR_CACHE.clear(); + SCANNER_HANDLER_CACHE.clear(); + APPENDER_HANDLER_CACHE.clear(); + storageManagers.clear(); + } + + /** + * Close StorageManager + * @throws java.io.IOException + */ + public static void shutdown() throws IOException { + synchronized(storageManagers) { + for (StorageManager eachStorageManager: storageManagers.values()) { + eachStorageManager.close(); + } + } + clearCache(); + } + + /** + * Returns FileStorageManager instance. + * + * @param tajoConf Tajo system property. + * @return + * @throws IOException + */ + public static StorageManager getFileStorageManager(TajoConf tajoConf) throws IOException { + return getStorageManager(tajoConf, "CSV"); + } + + /** + * Returns the proper StorageManager instance according to the storeType. + * + * @param tajoConf Tajo system property. + * @param storeType Storage type + * @return + * @throws IOException + */ + public static StorageManager getStorageManager(TajoConf tajoConf, String storeType) throws IOException { + FileSystem fileSystem = TajoConf.getWarehouseDir(tajoConf).getFileSystem(tajoConf); + if (fileSystem != null) { + return getStorageManager(tajoConf, storeType, fileSystem.getUri().toString()); + } else { + return getStorageManager(tajoConf, storeType, null); + } + } + + /** + * Returns the proper StorageManager instance according to the storeType + * + * @param tajoConf Tajo system property. + * @param storeType Storage type + * @param managerKey Key that can identify each storage manager(may be a path) + * @return + * @throws IOException + */ + private static synchronized StorageManager getStorageManager ( + TajoConf tajoConf, String storeType, String managerKey) throws IOException { + + String typeName; + if (storeType.equalsIgnoreCase("HBASE")) { + typeName = "hbase"; + } else { + typeName = "hdfs"; + } + + synchronized (storageManagers) { + String storeKey = typeName + "_" + managerKey; + StorageManager manager = storageManagers.get(storeKey); + + if (manager == null) { + Class<? extends StorageManager> storageManagerClass = + tajoConf.getClass(String.format("tajo.storage.manager.%s.class", typeName), null, StorageManager.class); + + if (storageManagerClass == null) { + throw new IOException("Unknown Storage Type: " + typeName); + } + + try { + Constructor<? extends StorageManager> constructor = + (Constructor<? extends StorageManager>) CONSTRUCTOR_CACHE.get(storageManagerClass); + if (constructor == null) { + constructor = storageManagerClass.getDeclaredConstructor(new Class<?>[]{String.class}); + constructor.setAccessible(true); + CONSTRUCTOR_CACHE.put(storageManagerClass, constructor); + } + manager = constructor.newInstance(new Object[]{storeType}); + } catch (Exception e) { + throw new RuntimeException(e); + } + manager.init(tajoConf); + storageManagers.put(storeKey, manager); + } + + return manager; + } + } + + /** + * Returns Scanner instance. + * + * @param conf The system property + * @param meta The table meta + * @param schema The input schema + * @param fragment The fragment for scanning + * @param target The output schema + * @return Scanner instance + * @throws IOException + */ + public static synchronized SeekableScanner getSeekableScanner( + TajoConf conf, TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException { + return (SeekableScanner)getStorageManager(conf, meta.getStoreType()).getScanner(meta, schema, fragment, target); + } + + /** + * Creates a scanner instance. + * + * @param theClass Concrete class of scanner + * @param conf System property + * @param schema Input schema + * @param meta Table meta data + * @param fragment The fragment for scanning + * @param <T> + * @return The scanner instance + */ + public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, Schema schema, TableMeta meta, + Fragment fragment) { + T result; + try { + Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass); + if (meth == null) { + meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS); + meth.setAccessible(true); + CONSTRUCTOR_CACHE.put(theClass, meth); + } + result = meth.newInstance(new Object[]{conf, schema, meta, fragment}); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return result; + } + + /** + * Creates a scanner instance. + * + * @param theClass Concrete class of scanner + * @param conf System property + * @param taskAttemptId Task id + * @param meta Table meta data + * @param schema Input schema + * @param workDir Working directory + * @param <T> + * @return The scanner instance + */ + public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TaskAttemptId taskAttemptId, + TableMeta meta, Schema schema, Path workDir) { + T result; + try { + Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass); + if (meth == null) { + meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS); + meth.setAccessible(true); + CONSTRUCTOR_CACHE.put(theClass, meth); + } + result = meth.newInstance(new Object[]{conf, taskAttemptId, schema, meta, workDir}); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return result; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java index 7f3cb04..09a86b4 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java @@ -28,7 +28,7 @@ import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; -import org.apache.tajo.storage.StorageManager; +import org.apache.tajo.storage.TableSpaceManager; import org.apache.tajo.storage.Tuple; import java.io.IOException; @@ -47,7 +47,7 @@ public class HBasePutAppender extends AbstractHBaseAppender { super.init(); Configuration hbaseConf = HBaseStorageManager.getHBaseConfiguration(conf, meta); - HConnection hconn = ((HBaseStorageManager) StorageManager.getStorageManager((TajoConf)conf, "HBASE")) + HConnection hconn = ((HBaseStorageManager) TableSpaceManager.getStorageManager((TajoConf) conf, "HBASE")) .getConnection(hbaseConf); htable = hconn.getTable(columnMapping.getHbaseTableName()); htable.setAutoFlushTo(false); http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java index df60bb3..24bfd4d 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java @@ -36,10 +36,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.NullDatum; import org.apache.tajo.datum.TextDatum; -import org.apache.tajo.storage.Scanner; -import org.apache.tajo.storage.StorageManager; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; +import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.util.BytesUtils; @@ -184,7 +181,7 @@ public class HBaseScanner implements Scanner { } if (htable == null) { - HConnection hconn = ((HBaseStorageManager)StorageManager.getStorageManager(conf, "HBASE")) + HConnection hconn = ((HBaseStorageManager) TableSpaceManager.getStorageManager(conf, "HBASE")) .getConnection(hbaseConf); htable = hconn.getTable(fragment.getHbaseTableName()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java index 5f0695c..3653574 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java @@ -18,6 +18,7 @@ package org.apache.tajo.storage.hbase; +import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,6 +43,7 @@ import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.exception.UnimplementedException; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.logical.CreateTableNode; @@ -78,7 +80,7 @@ public class HBaseStorageManager extends StorageManager { } @Override - public void closeStorageManager() { + public void close() { synchronized (connMap) { for (HConnection eachConn: connMap.values()) { try { @@ -942,6 +944,8 @@ public class HBaseStorageManager extends StorageManager { if (tableDesc == null) { throw new IOException("TableDesc is null while calling loadIncrementalHFiles: " + finalEbId); } + Preconditions.checkArgument(tableDesc.getName() != null && tableDesc.getPath() == null); + Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR)); Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); @@ -960,29 +964,23 @@ public class HBaseStorageManager extends StorageManager { } committer.commitJob(jobContext); - if (tableDesc.getName() == null && tableDesc.getPath() != null) { - - // insert into location - return super.commitOutputData(queryContext, finalEbId, plan, schema, tableDesc, false); - } else { - // insert into table - String tableName = tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY); + // insert into table + String tableName = tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY); - HTable htable = new HTable(hbaseConf, tableName); + HTable htable = new HTable(hbaseConf, tableName); + try { + LoadIncrementalHFiles loadIncrementalHFiles = null; try { - LoadIncrementalHFiles loadIncrementalHFiles = null; - try { - loadIncrementalHFiles = new LoadIncrementalHFiles(hbaseConf); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new IOException(e.getMessage(), e); - } - loadIncrementalHFiles.doBulkLoad(stagingResultDir, htable); - - return stagingResultDir; - } finally { - htable.close(); + loadIncrementalHFiles = new LoadIncrementalHFiles(hbaseConf); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new IOException(e.getMessage(), e); } + loadIncrementalHFiles.doBulkLoad(stagingResultDir, htable); + + return stagingResultDir; + } finally { + htable.close(); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java index aa7aa28..39ccf44 100644 --- a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java +++ b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java @@ -19,14 +19,13 @@ package org.apache.tajo.storage.hbase; import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.TextDatum; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.logical.ScanNode; -import org.apache.tajo.storage.StorageManager; +import org.apache.tajo.storage.TableSpaceManager; import org.apache.tajo.util.Pair; import org.junit.Test; @@ -48,7 +47,7 @@ public class TestHBaseStorageManager { scanNode.setQual(evalNodeA); HBaseStorageManager storageManager = - (HBaseStorageManager) StorageManager.getStorageManager(new TajoConf(), "HBASE"); + (HBaseStorageManager) TableSpaceManager.getStorageManager(new TajoConf(), "HBASE"); List<Set<EvalNode>> indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn}); assertNotNull(indexEvals); assertEquals(1, indexEvals.size()); http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java index 3daed96..c041771 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java @@ -57,7 +57,7 @@ public abstract class FileAppender implements Appender { throw new IllegalArgumentException("Configuration must be an instance of TajoConf"); } - this.path = ((FileStorageManager)StorageManager.getFileStorageManager((TajoConf) conf)) + this.path = ((FileStorageManager) TableSpaceManager.getFileStorageManager((TajoConf) conf)) .getAppenderFilePath(taskAttemptId, workDir); } else { this.path = workDir; http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java index 635dade..4efc3b7 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java @@ -27,17 +27,18 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.tajo.OverridableConf; -import org.apache.tajo.TajoConstants; -import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.*; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.util.Bytes; +import org.apache.tajo.util.TUtil; import java.io.IOException; import java.text.NumberFormat; @@ -864,7 +865,7 @@ public class FileStorageManager extends StorageManager { } @Override - public void closeStorageManager() { + public void close() { } @Override @@ -876,6 +877,12 @@ public class FileStorageManager extends StorageManager { } @Override + public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan, + Schema schema, TableDesc tableDesc) throws IOException { + return commitOutputData(queryContext, true); + } + + @Override public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc, Schema inputSchema, SortSpec[] sortSpecs, TupleRange dataRange) throws IOException { @@ -899,6 +906,366 @@ public class FileStorageManager extends StorageManager { FileStatus status = fs.getFileStatus(path); FileFragment fragment = new FileFragment(path.getName(), path, 0, status.getLen()); - return getSeekableScanner(conf, meta, schema, fragment, schema); + return TableSpaceManager.getSeekableScanner(conf, meta, schema, fragment, schema); + } + + /** + * Finalizes result data. Tajo stores result data in the staging directory. + * If the query fails, clean up the staging directory. + * Otherwise the query is successful, move to the final directory from the staging directory. + * + * @param queryContext The query property + * @param changeFileSeq If true change result file name with max sequence. + * @return Saved path + * @throws java.io.IOException + */ + protected Path commitOutputData(OverridableConf queryContext, boolean changeFileSeq) throws IOException { + Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR)); + Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + Path finalOutputDir; + if (!queryContext.get(QueryVars.OUTPUT_TABLE_PATH, "").isEmpty()) { + finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_PATH)); + try { + FileSystem fs = stagingResultDir.getFileSystem(conf); + + if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // INSERT OVERWRITE INTO + + // It moves the original table into the temporary location. + // Then it moves the new result table into the original table location. + // Upon failed, it recovers the original table if possible. + boolean movedToOldTable = false; + boolean committed = false; + Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); + ContentSummary summary = fs.getContentSummary(stagingResultDir); + + if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty() && summary.getFileCount() > 0L) { + // This is a map for existing non-leaf directory to rename. A key is current directory and a value is + // renaming directory. + Map<Path, Path> renameDirs = TUtil.newHashMap(); + // This is a map for recovering existing partition directory. A key is current directory and a value is + // temporary directory to back up. + Map<Path, Path> recoveryDirs = TUtil.newHashMap(); + + try { + if (!fs.exists(finalOutputDir)) { + fs.mkdirs(finalOutputDir); + } + + visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(), + renameDirs, oldTableDir); + + // Rename target partition directories + for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { + // Backup existing data files for recovering + if (fs.exists(entry.getValue())) { + String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(), + oldTableDir.toString()); + Path recoveryPath = new Path(recoveryPathString); + fs.rename(entry.getValue(), recoveryPath); + fs.exists(recoveryPath); + recoveryDirs.put(entry.getValue(), recoveryPath); + } + // Delete existing directory + fs.delete(entry.getValue(), true); + // Rename staging directory to final output directory + fs.rename(entry.getKey(), entry.getValue()); + } + + } catch (IOException ioe) { + // Remove created dirs + for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { + fs.delete(entry.getValue(), true); + } + + // Recovery renamed dirs + for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) { + fs.delete(entry.getValue(), true); + fs.rename(entry.getValue(), entry.getKey()); + } + + throw new IOException(ioe.getMessage()); + } + } else { // no partition + try { + + // if the final output dir exists, move all contents to the temporary table dir. + // Otherwise, just make the final output dir. As a result, the final output dir will be empty. + if (fs.exists(finalOutputDir)) { + fs.mkdirs(oldTableDir); + + for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) { + fs.rename(status.getPath(), oldTableDir); + } + + movedToOldTable = fs.exists(oldTableDir); + } else { // if the parent does not exist, make its parent directory. + fs.mkdirs(finalOutputDir); + } + + // Move the results to the final output dir. + for (FileStatus status : fs.listStatus(stagingResultDir)) { + fs.rename(status.getPath(), finalOutputDir); + } + + // Check the final output dir + committed = fs.exists(finalOutputDir); + + } catch (IOException ioe) { + // recover the old table + if (movedToOldTable && !committed) { + + // if commit is failed, recover the old data + for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) { + fs.delete(status.getPath(), true); + } + + for (FileStatus status : fs.listStatus(oldTableDir)) { + fs.rename(status.getPath(), finalOutputDir); + } + } + + throw new IOException(ioe.getMessage()); + } + } + } else { + String queryType = queryContext.get(QueryVars.COMMAND_TYPE); + + if (queryType != null && queryType.equals(NodeType.INSERT.name())) { // INSERT INTO an existing table + + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(3); + + if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) { + for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { + if (eachFile.isFile()) { + LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath()); + continue; + } + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1, changeFileSeq); + } + } else { + int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1; + for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { + if (eachFile.getPath().getName().startsWith("_")) { + continue; + } + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++, changeFileSeq); + } + } + // checking all file moved and remove empty dir + verifyAllFileMoved(fs, stagingResultDir); + FileStatus[] files = fs.listStatus(stagingResultDir); + if (files != null && files.length != 0) { + for (FileStatus eachFile: files) { + LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); + } + } + } else { // CREATE TABLE AS SELECT (CTAS) + if (fs.exists(finalOutputDir)) { + for (FileStatus status : fs.listStatus(stagingResultDir)) { + fs.rename(status.getPath(), finalOutputDir); + } + } else { + fs.rename(stagingResultDir, finalOutputDir); + } + LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir); + } + } + + // remove the staging directory if the final output dir is given. + Path stagingDirRoot = stagingDir.getParent(); + fs.delete(stagingDirRoot, true); + } catch (Throwable t) { + LOG.error(t); + throw new IOException(t); + } + } else { + finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + } + + return finalOutputDir; + } + + /** + * Attach the sequence number to the output file name and than move the file into the final result path. + * + * @param fs FileSystem + * @param stagingResultDir The staging result dir + * @param fileStatus The file status + * @param finalOutputPath Final output path + * @param nf Number format + * @param fileSeq The sequence number + * @throws java.io.IOException + */ + private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir, + FileStatus fileStatus, Path finalOutputPath, + NumberFormat nf, + int fileSeq, boolean changeFileSeq) throws IOException { + if (fileStatus.isDirectory()) { + String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); + if (subPath != null) { + Path finalSubPath = new Path(finalOutputPath, subPath); + if (!fs.exists(finalSubPath)) { + fs.mkdirs(finalSubPath); + } + int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false); + for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) { + if (eachFile.getPath().getName().startsWith("_")) { + continue; + } + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq, changeFileSeq); + } + } else { + throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath()); + } + } else { + String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); + if (subPath != null) { + Path finalSubPath = new Path(finalOutputPath, subPath); + if (changeFileSeq) { + finalSubPath = new Path(finalSubPath.getParent(), replaceFileNameSeq(finalSubPath, fileSeq, nf)); + } + if (!fs.exists(finalSubPath.getParent())) { + fs.mkdirs(finalSubPath.getParent()); + } + if (fs.exists(finalSubPath)) { + throw new IOException("Already exists data file:" + finalSubPath); + } + boolean success = fs.rename(fileStatus.getPath(), finalSubPath); + if (success) { + LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " + + "to final output[" + finalSubPath + "]"); + } else { + LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " + + "to final output[" + finalSubPath + "]"); + } + } + } + } + + /** + * Removes the path of the parent. + * @param parentPath + * @param childPath + * @return + */ + private String extractSubPath(Path parentPath, Path childPath) { + String parentPathStr = parentPath.toUri().getPath(); + String childPathStr = childPath.toUri().getPath(); + + if (parentPathStr.length() > childPathStr.length()) { + return null; + } + + int index = childPathStr.indexOf(parentPathStr); + if (index != 0) { + return null; + } + + return childPathStr.substring(parentPathStr.length() + 1); + } + + /** + * Attach the sequence number to a path. + * + * @param path Path + * @param seq sequence number + * @param nf Number format + * @return New path attached with sequence number + * @throws java.io.IOException + */ + private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) throws IOException { + String[] tokens = path.getName().split("-"); + if (tokens.length != 4) { + throw new IOException("Wrong result file name:" + path); + } + return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + nf.format(seq); + } + + /** + * Make sure all files are moved. + * @param fs FileSystem + * @param stagingPath The stagind directory + * @return + * @throws java.io.IOException + */ + private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException { + FileStatus[] files = fs.listStatus(stagingPath); + if (files != null && files.length != 0) { + for (FileStatus eachFile: files) { + if (eachFile.isFile()) { + LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); + return false; + } else { + if (verifyAllFileMoved(fs, eachFile.getPath())) { + fs.delete(eachFile.getPath(), false); + } else { + return false; + } + } + } + } + + return true; + } + + /** + * This method sets a rename map which includes renamed staging directory to final output directory recursively. + * If there exists some data files, this delete it for duplicate data. + * + * + * @param fs + * @param stagingPath + * @param outputPath + * @param stagingParentPathString + * @throws java.io.IOException + */ + private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath, + String stagingParentPathString, + Map<Path, Path> renameDirs, Path oldTableDir) throws IOException { + FileStatus[] files = fs.listStatus(stagingPath); + + for(FileStatus eachFile : files) { + if (eachFile.isDirectory()) { + Path oldPath = eachFile.getPath(); + + // Make recover directory. + String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString, + oldTableDir.toString()); + Path recoveryPath = new Path(recoverPathString); + if (!fs.exists(recoveryPath)) { + fs.mkdirs(recoveryPath); + } + + visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString, + renameDirs, oldTableDir); + // Find last order partition for renaming + String newPathString = oldPath.toString().replaceAll(stagingParentPathString, + outputPath.toString()); + Path newPath = new Path(newPathString); + if (!isLeafDirectory(fs, eachFile.getPath())) { + renameDirs.put(eachFile.getPath(), newPath); + } else { + if (!fs.exists(newPath)) { + fs.mkdirs(newPath); + } + } + } + } + } + + private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException { + boolean retValue = false; + + FileStatus[] files = fs.listStatus(path); + for (FileStatus file : files) { + if (fs.isDirectory(file.getPath())) { + retValue = true; + break; + } + } + + return retValue; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java index 4635b76..1846ed6 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java @@ -30,11 +30,9 @@ import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.util.Pair; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -84,7 +82,7 @@ public class HashShuffleAppenderManager { if (!fs.exists(dataFile.getParent())) { fs.mkdirs(dataFile.getParent()); } - FileAppender appender = (FileAppender)((FileStorageManager)StorageManager.getFileStorageManager(tajoConf)) + FileAppender appender = (FileAppender)((FileStorageManager) TableSpaceManager.getFileStorageManager(tajoConf)) .getAppender(meta, outSchema, dataFile); appender.enableStats(); appender.init(); http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java index 68a2cf2..779f908 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java @@ -30,7 +30,6 @@ import org.apache.hadoop.util.NativeCodeLoader; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; @@ -123,7 +122,7 @@ public class TestCompressionStorages { String fileName = "Compression_" + codec.getSimpleName(); Path tablePath = new Path(testDir, fileName); - Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + Appender appender = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); appender.enableStats(); appender.init(); @@ -155,7 +154,7 @@ public class TestCompressionStorages { FileFragment[] tablets = new FileFragment[1]; tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen); - Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, tablets[0], schema); + Scanner scanner = TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, tablets[0], schema); if (storeType.equalsIgnoreCase("CSV")) { if (SplittableCompressionCodec.class.isAssignableFrom(codec)) { http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java index 6e15c51..2260d2a 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java @@ -26,7 +26,6 @@ import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; @@ -104,7 +103,7 @@ public class TestDelimitedTextFile { TableMeta meta = CatalogUtil.newTableMeta("JSON"); meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "-1"); FileFragment fragment = getFileFragment("testErrorTolerance1.json"); - Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + Scanner scanner = TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); scanner.init(); Tuple tuple; @@ -126,7 +125,7 @@ public class TestDelimitedTextFile { TableMeta meta = CatalogUtil.newTableMeta("JSON"); meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1"); FileFragment fragment = getFileFragment("testErrorTolerance1.json"); - Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + Scanner scanner = TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); scanner.init(); assertNotNull(scanner.next()); @@ -148,7 +147,7 @@ public class TestDelimitedTextFile { TableMeta meta = CatalogUtil.newTableMeta("JSON"); meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "0"); FileFragment fragment = getFileFragment("testErrorTolerance2.json"); - Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + Scanner scanner = TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); scanner.init(); try { @@ -167,7 +166,7 @@ public class TestDelimitedTextFile { TableMeta meta = CatalogUtil.newTableMeta("JSON"); meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1"); FileFragment fragment = getFileFragment("testErrorTolerance3.json"); - Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + Scanner scanner = TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); scanner.init(); try { http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java index 7d5eee1..41c6c67 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hdfs.*; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; @@ -81,7 +80,7 @@ public class TestFileStorageManager { Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv"); fs.mkdirs(path.getParent()); - FileStorageManager fileStorageManager = (FileStorageManager)StorageManager.getFileStorageManager(conf); + FileStorageManager fileStorageManager = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf); assertEquals(fs.getUri(), fileStorageManager.getFileSystem().getUri()); Appender appender = fileStorageManager.getAppender(meta, schema, path); @@ -128,7 +127,7 @@ public class TestFileStorageManager { } assertTrue(fs.exists(tablePath)); - FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(tajoConf); + FileStorageManager sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(tajoConf); assertEquals(fs.getUri(), sm.getFileSystem().getUri()); Schema schema = new Schema(); @@ -182,7 +181,7 @@ public class TestFileStorageManager { DFSTestUtil.createFile(fs, tmpFile, 10, (short) 2, 0xDEADDEADl); } assertTrue(fs.exists(tablePath)); - FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(tajoConf); + FileStorageManager sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(tajoConf); assertEquals(fs.getUri(), sm.getFileSystem().getUri()); Schema schema = new Schema(); @@ -221,11 +220,11 @@ public class TestFileStorageManager { try { /* Local FileSystem */ - FileStorageManager sm = (FileStorageManager)StorageManager.getStorageManager(conf, "CSV"); + FileStorageManager sm = (FileStorageManager) TableSpaceManager.getStorageManager(conf, "CSV"); assertEquals(fs.getUri(), sm.getFileSystem().getUri()); /* Distributed FileSystem */ - sm = (FileStorageManager)StorageManager.getStorageManager(tajoConf, "CSV"); + sm = (FileStorageManager) TableSpaceManager.getStorageManager(tajoConf, "CSV"); assertNotEquals(fs.getUri(), sm.getFileSystem().getUri()); assertEquals(cluster.getFileSystem().getUri(), sm.getFileSystem().getUri()); } finally { http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java index b4a60fc..1222fae 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java @@ -25,7 +25,6 @@ import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; @@ -58,7 +57,7 @@ public class TestFileSystems { public TestFileSystems(FileSystem fs) throws IOException { this.fs = fs; this.conf = new TajoConf(fs.getConf()); - sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf); testDir = getTestDir(this.fs, TEST_PATH); } http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java index 1078b84..266f906 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java @@ -28,7 +28,6 @@ import org.apache.hadoop.io.compress.DeflateCodec; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.DatumFactory; @@ -66,7 +65,7 @@ public class TestLineReader { TableMeta meta = CatalogUtil.newTableMeta("TEXT"); Path tablePath = new Path(testDir, "line.data"); - FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender( + FileAppender appender = (FileAppender) TableSpaceManager.getFileStorageManager(conf).getAppender( null, null, meta, schema, tablePath); appender.enableStats(); appender.init(); @@ -119,7 +118,7 @@ public class TestLineReader { meta.putOption("compression.codec", DeflateCodec.class.getCanonicalName()); Path tablePath = new Path(testDir, "testLineDelimitedReaderWithCompression." + DeflateCodec.class.getSimpleName()); - FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender( + FileAppender appender = (FileAppender) TableSpaceManager.getFileStorageManager(conf).getAppender( null, null, meta, schema, tablePath); appender.enableStats(); appender.init(); @@ -177,7 +176,7 @@ public class TestLineReader { TableMeta meta = CatalogUtil.newTableMeta("TEXT"); Path tablePath = new Path(testDir, "testLineDelimitedReader"); - FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender( + FileAppender appender = (FileAppender) TableSpaceManager.getFileStorageManager(conf).getAppender( null, null, meta, schema, tablePath); appender.enableStats(); appender.init(); @@ -280,7 +279,7 @@ public class TestLineReader { TableMeta meta = CatalogUtil.newTableMeta("TEXT"); Path tablePath = new Path(testDir, "testSeekableByteBufLineReader.data"); - FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender( + FileAppender appender = (FileAppender) TableSpaceManager.getFileStorageManager(conf).getAppender( null, null, meta, schema, tablePath); appender.enableStats(); appender.init(); http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java index 2c856e1..82acaf3 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java @@ -24,7 +24,6 @@ import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; @@ -95,7 +94,7 @@ public class TestMergeScanner { conf.setStrings("tajo.storage.projectable-scanner", "rcfile", "parquet", "avro"); testDir = CommonTestingUtil.getTestDir(TEST_PATH); fs = testDir.getFileSystem(conf); - sm = StorageManager.getFileStorageManager(conf); + sm = TableSpaceManager.getFileStorageManager(conf); } @Test @@ -115,7 +114,7 @@ public class TestMergeScanner { } Path table1Path = new Path(testDir, storeType + "_1.data"); - Appender appender1 = StorageManager.getFileStorageManager(conf).getAppender(null, null, meta, schema, table1Path); + Appender appender1 = TableSpaceManager.getFileStorageManager(conf).getAppender(null, null, meta, schema, table1Path); appender1.enableStats(); appender1.init(); int tupleNum = 10000; @@ -137,7 +136,7 @@ public class TestMergeScanner { } Path table2Path = new Path(testDir, storeType + "_2.data"); - Appender appender2 = StorageManager.getFileStorageManager(conf).getAppender(null, null, meta, schema, table2Path); + Appender appender2 = TableSpaceManager.getFileStorageManager(conf).getAppender(null, null, meta, schema, table2Path); appender2.enableStats(); appender2.init();
