http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java index 23c2406..e37be58 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java @@ -18,43 +18,65 @@ package org.apache.tajo.storage; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.*; +import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.rewrite.RewriteRule; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.fragment.FragmentConvertor; -import org.apache.tajo.util.Bytes; -import org.apache.tajo.util.FileUtil; +import org.apache.tajo.storage.hbase.HBaseStorageManager; +import org.apache.tajo.util.TUtil; -import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.Constructor; import java.net.URI; +import java.text.NumberFormat; import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** - * StorageManager + * StorageManager manages the functions of storing and reading data. + * StorageManager is a abstract class. + * For supporting such as HDFS, HBASE, a specific StorageManager should be implemented by inheriting this class. + * */ public abstract class StorageManager { private final Log LOG = LogFactory.getLog(StorageManager.class); + private static final Class<?>[] DEFAULT_SCANNER_PARAMS = { + Configuration.class, + Schema.class, + TableMeta.class, + Fragment.class + }; + + private static final Class<?>[] DEFAULT_APPENDER_PARAMS = { + Configuration.class, + QueryUnitAttemptId.class, + Schema.class, + TableMeta.class, + Path.class + }; + protected TajoConf conf; + protected StoreType storeType; + /** + * Cache of StorageManager. + * Key is manager key(warehouse path) + store type + */ private static final Map<String, StorageManager> storageManagers = Maps.newHashMap(); /** @@ -66,8 +88,8 @@ public abstract class StorageManager { /** * Cache of appender handlers for each storage type. */ - protected static final Map<String, Class<? extends FileAppender>> APPENDER_HANDLER_CACHE - = new ConcurrentHashMap<String, Class<? extends FileAppender>>(); + protected static final Map<String, Class<? extends Appender>> APPENDER_HANDLER_CACHE + = new ConcurrentHashMap<String, Class<? extends Appender>>(); /** * Cache of constructors for each class. Pins the classes so they @@ -76,20 +98,169 @@ public abstract class StorageManager { private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = new ConcurrentHashMap<Class<?>, Constructor<?>>(); - protected abstract void storageInit() throws IOException ; - public abstract void createTable(TableDesc tableDesc) throws IOException; + public StorageManager(StoreType storeType) { + this.storeType = storeType; + } + + /** + * Initialize storage manager. + * @throws IOException + */ + protected abstract void storageInit() throws IOException; + + /** + * This method is called after executing "CREATE TABLE" statement. + * If a storage is a file based storage, a storage manager may create directory. + * + * @param tableDesc Table description which is created. + * @param ifNotExists Creates the table only when the table does not exist. + * @throws IOException + */ + public abstract void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException; + + /** + * This method is called after executing "DROP TABLE" statement with the 'PURGE' option + * which is the option to delete all the data. + * + * @param tableDesc + * @throws IOException + */ public abstract void purgeTable(TableDesc tableDesc) throws IOException; - public abstract List<Fragment> getSplits(String fragmentId, TableDesc tableDesc) throws IOException; + /** + * Returns the splits that will serve as input for the scan tasks. The + * number of splits matches the number of regions in a table. + * @param fragmentId The table name or previous ExecutionBlockId + * @param tableDesc The table description for the target data. + * @param scanNode The logical node for scanning. + * @return The list of input fragments. + * @throws IOException + */ + public abstract List<Fragment> getSplits(String fragmentId, TableDesc tableDesc, + ScanNode scanNode) throws IOException; + + /** + * It returns the splits that will serve as input for the non-forward query scanner such as 'select * from table1'. + * The result list should be small. If there is many fragments for scanning, TajoMaster uses the paging navigation. + * @param tableDesc The table description for the target data. + * @param currentPage The current page number within the entire list. + * @param numFragments The number of fragments in the result. + * @return The list of input fragments. + * @throws IOException + */ + public abstract List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments) + throws IOException; + + /** + * It returns the storage property. + * @return The storage property + */ + public abstract StorageProperty getStorageProperty(); + + /** + * Release storage manager resource + */ + public abstract void closeStorageManager(); + + /** + * It is called by a Repartitioner for range shuffling when the SortRangeType of SortNode is STORAGE_SPECIFIED. + * In general Repartitioner determines the partition range using previous output statistics data. + * In the special cases, such as HBase Repartitioner uses the result of this method. + * + * @param queryContext The current query context which contains query properties. + * @param tableDesc The table description for the target data. + * @param inputSchema The input schema + * @param sortSpecs The sort specification that contains the sort column and sort order. + * @return The list of sort ranges. + * @throws IOException + */ + public abstract TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc, + Schema inputSchema, SortSpec[] sortSpecs, + TupleRange dataRange) throws IOException; + + /** + * This method is called before executing 'INSERT' or 'CREATE TABLE as SELECT'. + * In general Tajo creates the target table after finishing the final sub-query of CATS. + * But In the special cases, such as HBase INSERT or CAST query uses the target table information. + * That kind of the storage should implements the logic related to creating table in this method. + * + * @param node The child node of the root node. + * @throws IOException + */ + public abstract void beforeInsertOrCATS(LogicalNode node) throws IOException; + + /** + * It is called when the query failed. + * Each storage manager should implement to be processed when the query fails in this method. + * + * @param node The child node of the root node. + * @throws IOException + */ + public abstract void rollbackOutputCommit(LogicalNode node) throws IOException; + + /** + * Returns the current storage type. + * @return + */ + public StoreType getStoreType() { + return storeType; + } + + /** + * Initialize StorageManager instance. It should be called before using. + * + * @param tajoConf + * @throws IOException + */ public void init(TajoConf tajoConf) throws IOException { this.conf = tajoConf; storageInit(); } + /** + * Close StorageManager + * @throws IOException + */ + public void close() throws IOException { + synchronized(storageManagers) { + for (StorageManager eachStorageManager: storageManagers.values()) { + eachStorageManager.closeStorageManager(); + } + } + } + + /** + * Returns the splits that will serve as input for the scan tasks. The + * number of splits matches the number of regions in a table. + * + * @param fragmentId The table name or previous ExecutionBlockId + * @param tableDesc The table description for the target data. + * @return The list of input fragments. + * @throws IOException + */ + public List<Fragment> getSplits(String fragmentId, TableDesc tableDesc) throws IOException { + return getSplits(fragmentId, tableDesc, null); + } + + /** + * Returns FileStorageManager instance. + * + * @param tajoConf Tajo system property. + * @return + * @throws IOException + */ public static FileStorageManager getFileStorageManager(TajoConf tajoConf) throws IOException { return getFileStorageManager(tajoConf, null); } + /** + * Returns FileStorageManager instance and sets WAREHOUSE_DIR property in tajoConf with warehousePath parameter. + * + * @param tajoConf Tajo system property. + * @param warehousePath The warehouse directory to be set in the tajoConf. + * @return + * @throws IOException + */ public static FileStorageManager getFileStorageManager(TajoConf tajoConf, Path warehousePath) throws IOException { URI uri; TajoConf copiedConf = new TajoConf(tajoConf); @@ -101,22 +272,58 @@ public abstract class StorageManager { return (FileStorageManager) getStorageManager(copiedConf, StoreType.CSV, key); } + /** + * Returns the proper StorageManager instance according to the storeType. + * + * @param tajoConf Tajo system property. + * @param storeType Storage type + * @return + * @throws IOException + */ + public static StorageManager getStorageManager(TajoConf tajoConf, String storeType) throws IOException { + if ("HBASE".equals(storeType)) { + return getStorageManager(tajoConf, StoreType.HBASE); + } else { + return getStorageManager(tajoConf, StoreType.CSV); + } + } + + /** + * Returns the proper StorageManager instance according to the storeType. + * + * @param tajoConf Tajo system property. + * @param storeType Storage type + * @return + * @throws IOException + */ public static StorageManager getStorageManager(TajoConf tajoConf, StoreType storeType) throws IOException { return getStorageManager(tajoConf, storeType, null); } + /** + * Returns the proper StorageManager instance according to the storeType + * + * @param tajoConf Tajo system property. + * @param storeType Storage type + * @param managerKey Key that can identify each storage manager(may be a path) + * @return + * @throws IOException + */ public static synchronized StorageManager getStorageManager ( - TajoConf conf, StoreType storeType, String managerKey) throws IOException { + TajoConf tajoConf, StoreType storeType, String managerKey) throws IOException { synchronized (storageManagers) { String storeKey = storeType + managerKey; StorageManager manager = storageManagers.get(storeKey); if (manager == null) { switch (storeType) { + case HBASE: + manager = new HBaseStorageManager(storeType); + break; default: - manager = new FileStorageManager(); + manager = new FileStorageManager(storeType); } - manager.init(conf); + manager.init(tajoConf); storageManagers.put(storeKey, manager); } @@ -124,27 +331,121 @@ public abstract class StorageManager { } } + /** + * Returns Scanner instance. + * + * @param meta The table meta + * @param schema The input schema + * @param fragment The fragment for scanning + * @param target Columns which are selected. + * @return Scanner instance + * @throws IOException + */ public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException { - return getScanner(meta, schema, FragmentConvertor.convert(conf, meta.getStoreType(), fragment), target); + return getScanner(meta, schema, FragmentConvertor.convert(conf, fragment), target); } + /** + * Returns Scanner instance. + * + * @param meta The table meta + * @param schema The input schema + * @param fragment The fragment for scanning + * @return Scanner instance + * @throws IOException + */ public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException { return getScanner(meta, schema, fragment, schema); } - public Appender getAppender(TableMeta meta, Schema schema, Path path) + /** + * Returns Scanner instance. + * + * @param meta The table meta + * @param schema The input schema + * @param fragment The fragment for scanning + * @param target The output schema + * @return Scanner instance + * @throws IOException + */ + public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException { + if (fragment.isEmpty()) { + Scanner scanner = new NullScanner(conf, schema, meta, fragment); + scanner.setTarget(target.toArray()); + + return scanner; + } + + Scanner scanner; + + Class<? extends Scanner> scannerClass = getScannerClass(meta.getStoreType()); + scanner = newScannerInstance(scannerClass, conf, schema, meta, fragment); + if (scanner.isProjectable()) { + scanner.setTarget(target.toArray()); + } + + return scanner; + } + + /** + * Returns Scanner instance. + * + * @param conf The system property + * @param meta The table meta + * @param schema The input schema + * @param fragment The fragment for scanning + * @param target The output schema + * @return Scanner instance + * @throws IOException + */ + public static synchronized SeekableScanner getSeekableScanner( + TajoConf conf, TableMeta meta, Schema schema, FileFragment fragment, Schema target) throws IOException { + return (SeekableScanner)getStorageManager(conf, meta.getStoreType()).getScanner(meta, schema, fragment, target); + } + + /** + * Returns Scanner instance. + * + * @param conf The system property + * @param meta The table meta + * @param schema The input schema + * @param path The data file path + * @return Scanner instance + * @throws IOException + */ + public static synchronized SeekableScanner getSeekableScanner( + TajoConf conf, TableMeta meta, Schema schema, Path path) throws IOException { + + FileSystem fs = path.getFileSystem(conf); + FileStatus status = fs.getFileStatus(path); + FileFragment fragment = new FileFragment(path.getName(), path, 0, status.getLen()); + + return getSeekableScanner(conf, meta, schema, fragment, schema); + } + + /** + * Returns Appender instance. + * @param queryContext Query property. + * @param taskAttemptId Task id. + * @param meta Table meta data. + * @param schema Output schema. + * @param workDir Working directory + * @return Appender instance + * @throws IOException + */ + public Appender getAppender(OverridableConf queryContext, + QueryUnitAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir) throws IOException { Appender appender; - Class<? extends FileAppender> appenderClass; + Class<? extends Appender> appenderClass; String handlerName = meta.getStoreType().name().toLowerCase(); appenderClass = APPENDER_HANDLER_CACHE.get(handlerName); if (appenderClass == null) { appenderClass = conf.getClass( String.format("tajo.storage.appender-handler.%s.class", - meta.getStoreType().name().toLowerCase()), null, - FileAppender.class); + meta.getStoreType().name().toLowerCase()), null, Appender.class); APPENDER_HANDLER_CACHE.put(handlerName, appenderClass); } @@ -152,27 +453,21 @@ public abstract class StorageManager { throw new IOException("Unknown Storage Type: " + meta.getStoreType()); } - appender = newAppenderInstance(appenderClass, conf, meta, schema, path); + appender = newAppenderInstance(appenderClass, conf, taskAttemptId, meta, schema, workDir); return appender; } - private static final Class<?>[] DEFAULT_SCANNER_PARAMS = { - Configuration.class, - Schema.class, - TableMeta.class, - Fragment.class - }; - - private static final Class<?>[] DEFAULT_APPENDER_PARAMS = { - Configuration.class, - Schema.class, - TableMeta.class, - Path.class - }; - /** - * create a scanner instance. + * Creates a scanner instance. + * + * @param theClass Concrete class of scanner + * @param conf System property + * @param schema Input schema + * @param meta Table meta data + * @param fragment The fragment for scanning + * @param <T> + * @return The scanner instance */ public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, Schema schema, TableMeta meta, Fragment fragment) { @@ -193,10 +488,19 @@ public abstract class StorageManager { } /** - * create a scanner instance. + * Creates a scanner instance. + * + * @param theClass Concrete class of scanner + * @param conf System property + * @param taskAttemptId Task id + * @param meta Table meta data + * @param schema Input schema + * @param workDir Working directory + * @param <T> + * @return The scanner instance */ - public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TableMeta meta, Schema schema, - Path path) { + public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, QueryUnitAttemptId taskAttemptId, + TableMeta meta, Schema schema, Path workDir) { T result; try { Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass); @@ -205,7 +509,7 @@ public abstract class StorageManager { meth.setAccessible(true); CONSTRUCTOR_CACHE.put(theClass, meth); } - result = meth.newInstance(new Object[]{conf, schema, meta, path}); + result = meth.newInstance(new Object[]{conf, taskAttemptId, schema, meta, workDir}); } catch (Exception e) { throw new RuntimeException(e); } @@ -213,6 +517,13 @@ public abstract class StorageManager { return result; } + /** + * Return the Scanner class for the StoreType that is defined in storage-default.xml. + * + * @param storeType store type + * @return The Scanner class + * @throws IOException + */ public Class<? extends Scanner> getScannerClass(CatalogProtos.StoreType storeType) throws IOException { String handlerName = storeType.name().toLowerCase(); Class<? extends Scanner> scannerClass = SCANNER_HANDLER_CACHE.get(handlerName); @@ -229,37 +540,387 @@ public abstract class StorageManager { return scannerClass; } - public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException { - if (fragment.isEmpty()) { - Scanner scanner = new NullScanner(conf, schema, meta, fragment); - scanner.setTarget(target.toArray()); + /** + * Return length of the fragment. + * In the UNKNOWN_LENGTH case get FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH from the configuration. + * + * @param conf Tajo system property + * @param fragment Fragment + * @return + */ + public static long getFragmentLength(TajoConf conf, Fragment fragment) { + if (fragment.getLength() == TajoConstants.UNKNOWN_LENGTH) { + return conf.getLongVar(ConfVars.FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH); + } else { + return fragment.getLength(); + } + } - return scanner; + /** + * It is called after making logical plan. Storage manager should verify the schema for inserting. + * + * @param tableDesc The table description of insert target. + * @param outSchema The output schema of select query for inserting. + * @throws IOException + */ + public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException { + // nothing to do + } + + /** + * Returns the list of storage specified rewrite rules. + * This values are used by LogicalOptimizer. + * + * @param queryContext The query property + * @param tableDesc The description of the target table. + * @return The list of storage specified rewrite rules + * @throws IOException + */ + public List<RewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException { + return null; + } + + /** + * Finalizes result data. Tajo stores result data in the staging directory. + * If the query fails, clean up the staging directory. + * Otherwise the query is successful, move to the final directory from the staging directory. + * + * @param queryContext The query property + * @param finalEbId The final execution block id + * @param plan The query plan + * @param schema The final output schema + * @param tableDesc The description of the target table + * @return Saved path + * @throws IOException + */ + public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId, + LogicalPlan plan, Schema schema, + TableDesc tableDesc) throws IOException { + return commitOutputData(queryContext, finalEbId, plan, schema, tableDesc, true); + } + + /** + * Finalizes result data. Tajo stores result data in the staging directory. + * If the query fails, clean up the staging directory. + * Otherwise the query is successful, move to the final directory from the staging directory. + * + * @param queryContext The query property + * @param finalEbId The final execution block id + * @param plan The query plan + * @param schema The final output schema + * @param tableDesc The description of the target table + * @param changeFileSeq If true change result file name with max sequence. + * @return Saved path + * @throws IOException + */ + protected Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId, + LogicalPlan plan, Schema schema, + TableDesc tableDesc, boolean changeFileSeq) throws IOException { + Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR)); + Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + Path finalOutputDir; + if (!queryContext.get(QueryVars.OUTPUT_TABLE_PATH, "").isEmpty()) { + finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_PATH)); + FileSystem fs = stagingResultDir.getFileSystem(conf); + + if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // INSERT OVERWRITE INTO + + // It moves the original table into the temporary location. + // Then it moves the new result table into the original table location. + // Upon failed, it recovers the original table if possible. + boolean movedToOldTable = false; + boolean committed = false; + Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); + + if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) { + // This is a map for existing non-leaf directory to rename. A key is current directory and a value is + // renaming directory. + Map<Path, Path> renameDirs = TUtil.newHashMap(); + // This is a map for recovering existing partition directory. A key is current directory and a value is + // temporary directory to back up. + Map<Path, Path> recoveryDirs = TUtil.newHashMap(); + + try { + if (!fs.exists(finalOutputDir)) { + fs.mkdirs(finalOutputDir); + } + + visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(), + renameDirs, oldTableDir); + + // Rename target partition directories + for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { + // Backup existing data files for recovering + if (fs.exists(entry.getValue())) { + String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(), + oldTableDir.toString()); + Path recoveryPath = new Path(recoveryPathString); + fs.rename(entry.getValue(), recoveryPath); + fs.exists(recoveryPath); + recoveryDirs.put(entry.getValue(), recoveryPath); + } + // Delete existing directory + fs.delete(entry.getValue(), true); + // Rename staging directory to final output directory + fs.rename(entry.getKey(), entry.getValue()); + } + + } catch (IOException ioe) { + // Remove created dirs + for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { + fs.delete(entry.getValue(), true); + } + + // Recovery renamed dirs + for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) { + fs.delete(entry.getValue(), true); + fs.rename(entry.getValue(), entry.getKey()); + } + throw new IOException(ioe.getMessage()); + } + } else { + try { + if (fs.exists(finalOutputDir)) { + fs.rename(finalOutputDir, oldTableDir); + movedToOldTable = fs.exists(oldTableDir); + } else { // if the parent does not exist, make its parent directory. + fs.mkdirs(finalOutputDir.getParent()); + } + + fs.rename(stagingResultDir, finalOutputDir); + committed = fs.exists(finalOutputDir); + } catch (IOException ioe) { + // recover the old table + if (movedToOldTable && !committed) { + fs.rename(oldTableDir, finalOutputDir); + } + } + } + } else { + String queryType = queryContext.get(QueryVars.COMMAND_TYPE); + + if (queryType != null && queryType.equals(NodeType.INSERT.name())) { // INSERT INTO an existing table + + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(3); + + if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) { + for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { + if (eachFile.isFile()) { + LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath()); + continue; + } + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1, changeFileSeq); + } + } else { + int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1; + for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { + if (eachFile.getPath().getName().startsWith("_")) { + continue; + } + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++, changeFileSeq); + } + } + // checking all file moved and remove empty dir + verifyAllFileMoved(fs, stagingResultDir); + FileStatus[] files = fs.listStatus(stagingResultDir); + if (files != null && files.length != 0) { + for (FileStatus eachFile: files) { + LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); + } + } + } else { // CREATE TABLE AS SELECT (CTAS) + fs.rename(stagingResultDir, finalOutputDir); + LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir); + } + } + } else { + finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); } - Scanner scanner; + return finalOutputDir; + } - Class<? extends Scanner> scannerClass = getScannerClass(meta.getStoreType()); - scanner = newScannerInstance(scannerClass, conf, schema, meta, fragment); - if (scanner.isProjectable()) { - scanner.setTarget(target.toArray()); + /** + * Attach the sequence number to the output file name and than move the file into the final result path. + * + * @param fs FileSystem + * @param stagingResultDir The staging result dir + * @param fileStatus The file status + * @param finalOutputPath Final output path + * @param nf Number format + * @param fileSeq The sequence number + * @throws IOException + */ + private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir, + FileStatus fileStatus, Path finalOutputPath, + NumberFormat nf, + int fileSeq, boolean changeFileSeq) throws IOException { + if (fileStatus.isDirectory()) { + String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); + if (subPath != null) { + Path finalSubPath = new Path(finalOutputPath, subPath); + if (!fs.exists(finalSubPath)) { + fs.mkdirs(finalSubPath); + } + int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false); + for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) { + if (eachFile.getPath().getName().startsWith("_")) { + continue; + } + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq, changeFileSeq); + } + } else { + throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath()); + } + } else { + String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); + if (subPath != null) { + Path finalSubPath = new Path(finalOutputPath, subPath); + if (changeFileSeq) { + finalSubPath = new Path(finalSubPath.getParent(), replaceFileNameSeq(finalSubPath, fileSeq, nf)); + } + if (!fs.exists(finalSubPath.getParent())) { + fs.mkdirs(finalSubPath.getParent()); + } + if (fs.exists(finalSubPath)) { + throw new IOException("Already exists data file:" + finalSubPath); + } + boolean success = fs.rename(fileStatus.getPath(), finalSubPath); + if (success) { + LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " + + "to final output[" + finalSubPath + "]"); + } else { + LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " + + "to final output[" + finalSubPath + "]"); + } + } } + } - return scanner; + /** + * Removes the path of the parent. + * @param parentPath + * @param childPath + * @return + */ + private String extractSubPath(Path parentPath, Path childPath) { + String parentPathStr = parentPath.toUri().getPath(); + String childPathStr = childPath.toUri().getPath(); + + if (parentPathStr.length() > childPathStr.length()) { + return null; + } + + int index = childPathStr.indexOf(parentPathStr); + if (index != 0) { + return null; + } + + return childPathStr.substring(parentPathStr.length() + 1); } - public static synchronized SeekableScanner getSeekableScanner( - TajoConf conf, TableMeta meta, Schema schema, FileFragment fragment, Schema target) throws IOException { - return (SeekableScanner)getStorageManager(conf, meta.getStoreType()).getScanner(meta, schema, fragment, target); + /** + * Attach the sequence number to a path. + * + * @param path Path + * @param seq sequence number + * @param nf Number format + * @return New path attached with sequence number + * @throws IOException + */ + private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) throws IOException { + String[] tokens = path.getName().split("-"); + if (tokens.length != 4) { + throw new IOException("Wrong result file name:" + path); + } + return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + nf.format(seq); } - public static synchronized SeekableScanner getSeekableScanner( - TajoConf conf, TableMeta meta, Schema schema, Path path) throws IOException { + /** + * Make sure all files are moved. + * @param fs FileSystem + * @param stagingPath The stagind directory + * @return + * @throws IOException + */ + private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException { + FileStatus[] files = fs.listStatus(stagingPath); + if (files != null && files.length != 0) { + for (FileStatus eachFile: files) { + if (eachFile.isFile()) { + LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); + return false; + } else { + if (verifyAllFileMoved(fs, eachFile.getPath())) { + fs.delete(eachFile.getPath(), false); + } else { + return false; + } + } + } + } - FileSystem fs = path.getFileSystem(conf); - FileStatus status = fs.getFileStatus(path); - FileFragment fragment = new FileFragment(path.getName(), path, 0, status.getLen()); + return true; + } - return getSeekableScanner(conf, meta, schema, fragment, schema); + /** + * This method sets a rename map which includes renamed staging directory to final output directory recursively. + * If there exists some data files, this delete it for duplicate data. + * + * + * @param fs + * @param stagingPath + * @param outputPath + * @param stagingParentPathString + * @throws IOException + */ + private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath, + String stagingParentPathString, + Map<Path, Path> renameDirs, Path oldTableDir) throws IOException { + FileStatus[] files = fs.listStatus(stagingPath); + + for(FileStatus eachFile : files) { + if (eachFile.isDirectory()) { + Path oldPath = eachFile.getPath(); + + // Make recover directory. + String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString, + oldTableDir.toString()); + Path recoveryPath = new Path(recoverPathString); + if (!fs.exists(recoveryPath)) { + fs.mkdirs(recoveryPath); + } + + visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString, + renameDirs, oldTableDir); + // Find last order partition for renaming + String newPathString = oldPath.toString().replaceAll(stagingParentPathString, + outputPath.toString()); + Path newPath = new Path(newPathString); + if (!isLeafDirectory(fs, eachFile.getPath())) { + renameDirs.put(eachFile.getPath(), newPath); + } else { + if (!fs.exists(newPath)) { + fs.mkdirs(newPath); + } + } + } + } + } + + private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException { + boolean retValue = false; + + FileStatus[] files = fs.listStatus(path); + for (FileStatus file : files) { + if (fs.isDirectory(file.getPath())) { + retValue = true; + break; + } + } + + return retValue; } }
http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/StorageProperty.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageProperty.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageProperty.java new file mode 100644 index 0000000..6816d08 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageProperty.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +public class StorageProperty { + private boolean supportsInsertInto; + private boolean sortedInsert; + + public boolean isSupportsInsertInto() { + return supportsInsertInto; + } + + public void setSupportsInsertInto(boolean supportsInsertInto) { + this.supportsInsertInto = supportsInsertInto; + } + + public boolean isSortedInsert() { + return sortedInsert; + } + + public void setSortedInsert(boolean sortedInsert) { + this.sortedInsert = sortedInsert; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java index 6af8da0..9e1e7ea 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java @@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import org.apache.tajo.QueryUnitAttemptId; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.catalog.TableMeta; @@ -58,12 +59,13 @@ public class AvroAppender extends FileAppender { * @param conf Configuration properties. * @param schema The table schema. * @param meta The table metadata. - * @param path The path of the Parquet file to write to. + * @param workDir The path of the Parquet file to write to. */ public AvroAppender(Configuration conf, + QueryUnitAttemptId taskAttemptId, org.apache.tajo.catalog.Schema schema, - TableMeta meta, Path path) throws IOException { - super(conf, schema, meta, path); + TableMeta meta, Path workDir) throws IOException { + super(conf, taskAttemptId, schema, meta, workDir); } /** http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java index dcd9f0a..4a83dbf 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java @@ -24,6 +24,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.util.TUtil; import java.io.IOException; @@ -37,8 +38,8 @@ import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; public class FileFragment implements Fragment, Comparable<FileFragment>, Cloneable { @Expose private String tableName; // required @Expose private Path uri; // required - @Expose private Long startOffset; // required - @Expose private Long length; // required + @Expose public Long startOffset; // required + @Expose public Long length; // required private String[] hosts; // Datanode hostnames @Expose private int[] diskIds; @@ -229,6 +230,7 @@ public class FileFragment implements Fragment, Comparable<FileFragment>, Cloneab FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder(); fragmentBuilder.setId(this.tableName); + fragmentBuilder.setStoreType(StoreType.CSV.name()); fragmentBuilder.setContents(builder.buildPartial().toByteString()); return fragmentBuilder.build(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java index 0315a8d..07720c7 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java @@ -30,7 +30,6 @@ import java.util.List; import java.util.Map; import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; -import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType; @ThreadSafe public class FragmentConvertor { @@ -47,18 +46,17 @@ public class FragmentConvertor { */ private static final Class<?>[] DEFAULT_FRAGMENT_PARAMS = { ByteString.class }; - public static Class<? extends Fragment> getFragmentClass(Configuration conf, StoreType storeType) - throws IOException { - String handlerName = storeType.name().toLowerCase(); - Class<? extends Fragment> fragmentClass = CACHED_FRAGMENT_CLASSES.get(handlerName); + public static Class<? extends Fragment> getFragmentClass(Configuration conf, String storeType) + throws IOException { + Class<? extends Fragment> fragmentClass = CACHED_FRAGMENT_CLASSES.get(storeType.toLowerCase()); if (fragmentClass == null) { fragmentClass = conf.getClass( - String.format("tajo.storage.fragment.%s.class", storeType.name().toLowerCase()), null, Fragment.class); - CACHED_FRAGMENT_CLASSES.put(handlerName, fragmentClass); + String.format("tajo.storage.fragment.%s.class", storeType.toLowerCase()), null, Fragment.class); + CACHED_FRAGMENT_CLASSES.put(storeType.toLowerCase(), fragmentClass); } if (fragmentClass == null) { - throw new IOException("No such a fragment for " + storeType.name()); + throw new IOException("No such a fragment for " + storeType.toLowerCase()); } return fragmentClass; @@ -81,11 +79,11 @@ public class FragmentConvertor { return result; } - public static <T extends Fragment> T convert(Configuration conf, StoreType storeType, FragmentProto fragment) + public static <T extends Fragment> T convert(Configuration conf, FragmentProto fragment) throws IOException { - Class<T> fragmentClass = (Class<T>) getFragmentClass(conf, storeType); + Class<T> fragmentClass = (Class<T>) getFragmentClass(conf, fragment.getStoreType().toLowerCase()); if (fragmentClass == null) { - throw new IOException("No such a fragment class for " + storeType.name()); + throw new IOException("No such a fragment class for " + fragment.getStoreType()); } return convert(fragmentClass, fragment); } @@ -102,14 +100,13 @@ public class FragmentConvertor { return list; } - public static <T extends Fragment> List<T> convert(Configuration conf, StoreType storeType, - FragmentProto...fragments) throws IOException { + public static <T extends Fragment> List<T> convert(Configuration conf, FragmentProto...fragments) throws IOException { List<T> list = Lists.newArrayList(); if (fragments == null) { return list; } for (FragmentProto proto : fragments) { - list.add((T) convert(conf, storeType, proto)); + list.add((T) convert(conf, proto)); } return list; } http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java new file mode 100644 index 0000000..8615235 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.storage.Appender; +import org.apache.tajo.storage.TableStatistics; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.util.TUtil; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * An abstract class for HBase appender. + */ +public abstract class AbstractHBaseAppender implements Appender { + protected Configuration conf; + protected Schema schema; + protected TableMeta meta; + protected QueryUnitAttemptId taskAttemptId; + protected Path stagingDir; + protected boolean inited = false; + + protected ColumnMapping columnMapping; + protected TableStatistics stats; + protected boolean enabledStats; + + protected int columnNum; + + protected byte[][][] mappingColumnFamilies; + protected boolean[] isBinaryColumns; + protected boolean[] isRowKeyMappings; + protected boolean[] isColumnKeys; + protected boolean[] isColumnValues; + protected int[] rowKeyFieldIndexes; + protected int[] rowkeyColumnIndexes; + protected char rowKeyDelimiter; + + // the following four variables are used for '<cfname>:key:' or '<cfname>:value:' mapping + protected int[] columnKeyValueDataIndexes; + protected byte[][] columnKeyDatas; + protected byte[][] columnValueDatas; + protected byte[][] columnKeyCfNames; + + protected KeyValue[] keyValues; + + public AbstractHBaseAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, + Schema schema, TableMeta meta, Path stagingDir) { + this.conf = conf; + this.schema = schema; + this.meta = meta; + this.stagingDir = stagingDir; + this.taskAttemptId = taskAttemptId; + } + + @Override + public void init() throws IOException { + if (inited) { + throw new IllegalStateException("FileAppender is already initialized."); + } + inited = true; + if (enabledStats) { + stats = new TableStatistics(this.schema); + } + columnMapping = new ColumnMapping(schema, meta); + + mappingColumnFamilies = columnMapping.getMappingColumns(); + + isRowKeyMappings = columnMapping.getIsRowKeyMappings(); + List<Integer> rowkeyColumnIndexList = new ArrayList<Integer>(); + for (int i = 0; i < isRowKeyMappings.length; i++) { + if (isRowKeyMappings[i]) { + rowkeyColumnIndexList.add(i); + } + } + rowkeyColumnIndexes = TUtil.toArray(rowkeyColumnIndexList); + + isBinaryColumns = columnMapping.getIsBinaryColumns(); + isColumnKeys = columnMapping.getIsColumnKeys(); + isColumnValues = columnMapping.getIsColumnValues(); + rowKeyDelimiter = columnMapping.getRowKeyDelimiter(); + rowKeyFieldIndexes = columnMapping.getRowKeyFieldIndexes(); + + this.columnNum = schema.size(); + + // In the case of '<cfname>:key:' or '<cfname>:value:' KeyValue object should be set with the qualifier and value + // which are mapped to the same column family. + columnKeyValueDataIndexes = new int[isColumnKeys.length]; + int index = 0; + int numKeyValues = 0; + Map<String, Integer> cfNameIndexMap = new HashMap<String, Integer>(); + for (int i = 0; i < isColumnKeys.length; i++) { + if (isRowKeyMappings[i]) { + continue; + } + if (isColumnKeys[i] || isColumnValues[i]) { + String cfName = new String(mappingColumnFamilies[i][0]); + if (!cfNameIndexMap.containsKey(cfName)) { + cfNameIndexMap.put(cfName, index); + columnKeyValueDataIndexes[i] = index; + index++; + numKeyValues++; + } else { + columnKeyValueDataIndexes[i] = cfNameIndexMap.get(cfName); + } + } else { + numKeyValues++; + } + } + columnKeyCfNames = new byte[cfNameIndexMap.size()][]; + for (Map.Entry<String, Integer> entry: cfNameIndexMap.entrySet()) { + columnKeyCfNames[entry.getValue()] = entry.getKey().getBytes(); + } + columnKeyDatas = new byte[cfNameIndexMap.size()][]; + columnValueDatas = new byte[cfNameIndexMap.size()][]; + + keyValues = new KeyValue[numKeyValues]; + } + + private ByteArrayOutputStream bout = new ByteArrayOutputStream(); + + protected byte[] getRowKeyBytes(Tuple tuple) throws IOException { + Datum datum; + byte[] rowkey; + if (rowkeyColumnIndexes.length > 1) { + bout.reset(); + for (int i = 0; i < rowkeyColumnIndexes.length; i++) { + datum = tuple.get(rowkeyColumnIndexes[i]); + if (isBinaryColumns[rowkeyColumnIndexes[i]]) { + rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), datum); + } else { + rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), datum); + } + bout.write(rowkey); + if (i < rowkeyColumnIndexes.length - 1) { + bout.write(rowKeyDelimiter); + } + } + rowkey = bout.toByteArray(); + } else { + int index = rowkeyColumnIndexes[0]; + datum = tuple.get(index); + if (isBinaryColumns[index]) { + rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(index), datum); + } else { + rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(index), datum); + } + } + + return rowkey; + } + + protected void readKeyValues(Tuple tuple, byte[] rowkey) throws IOException { + int keyValIndex = 0; + for (int i = 0; i < columnNum; i++) { + if (isRowKeyMappings[i]) { + continue; + } + Datum datum = tuple.get(i); + byte[] value; + if (isBinaryColumns[i]) { + value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), datum); + } else { + value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), datum); + } + + if (isColumnKeys[i]) { + columnKeyDatas[columnKeyValueDataIndexes[i]] = value; + } else if (isColumnValues[i]) { + columnValueDatas[columnKeyValueDataIndexes[i]] = value; + } else { + keyValues[keyValIndex] = new KeyValue(rowkey, mappingColumnFamilies[i][0], mappingColumnFamilies[i][1], value); + keyValIndex++; + } + } + + for (int i = 0; i < columnKeyDatas.length; i++) { + keyValues[keyValIndex++] = new KeyValue(rowkey, columnKeyCfNames[i], columnKeyDatas[i], columnValueDatas[i]); + } + } + + @Override + public void enableStats() { + enabledStats = true; + } + + @Override + public TableStats getStats() { + if (enabledStats) { + return stats.getTableStat(); + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java new file mode 100644 index 0000000..8044494 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.PlanningException; +import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.logical.SortNode.SortPurpose; +import org.apache.tajo.plan.rewrite.RewriteRule; +import org.apache.tajo.plan.util.PlannerUtil; + +public class AddSortForInsertRewriter implements RewriteRule { + private int[] sortColumnIndexes; + private Column[] sortColumns; + public AddSortForInsertRewriter(TableDesc tableDesc, Column[] sortColumns) { + this.sortColumns = sortColumns; + this.sortColumnIndexes = new int[sortColumns.length]; + + Schema tableSchema = tableDesc.getSchema(); + for (int i = 0; i < sortColumns.length; i++) { + sortColumnIndexes[i] = tableSchema.getColumnId(sortColumns[i].getQualifiedName()); + } + } + + @Override + public String getName() { + return "AddSortForInsertRewriter"; + } + + @Override + public boolean isEligible(LogicalPlan plan) { + StoreType storeType = PlannerUtil.getStoreType(plan); + return storeType != null; + } + + @Override + public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException { + LogicalRootNode rootNode = plan.getRootBlock().getRoot(); + UnaryNode insertNode = rootNode.getChild(); + LogicalNode childNode = insertNode.getChild(); + + Schema sortSchema = childNode.getOutSchema(); + SortNode sortNode = plan.createNode(SortNode.class); + sortNode.setSortPurpose(SortPurpose.STORAGE_SPECIFIED); + sortNode.setInSchema(sortSchema); + sortNode.setOutSchema(sortSchema); + + SortSpec[] sortSpecs = new SortSpec[sortColumns.length]; + int index = 0; + + for (int i = 0; i < sortColumnIndexes.length; i++) { + Column sortColumn = sortSchema.getColumn(sortColumnIndexes[i]); + if (sortColumn == null) { + throw new PlanningException("Can't fine proper sort column:" + sortColumns[i]); + } + sortSpecs[index++] = new SortSpec(sortColumn, true, true); + } + sortNode.setSortSpecs(sortSpecs); + + sortNode.setChild(insertNode.getChild()); + insertNode.setChild(sortNode); + plan.getRootBlock().registerNode(sortNode); + + return plan; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java new file mode 100644 index 0000000..f80bd5e --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.util.BytesUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class ColumnMapping { + private TableMeta tableMeta; + private Schema schema; + private char rowKeyDelimiter; + + private String hbaseTableName; + + private int[] rowKeyFieldIndexes; + private boolean[] isRowKeyMappings; + private boolean[] isBinaryColumns; + private boolean[] isColumnKeys; + private boolean[] isColumnValues; + + // schema order -> 0: cf name, 1: column name -> name bytes + private byte[][][] mappingColumns; + + private int numRowKeys; + + public ColumnMapping(Schema schema, TableMeta tableMeta) throws IOException { + this.schema = schema; + this.tableMeta = tableMeta; + + init(); + } + + private void init() throws IOException { + hbaseTableName = tableMeta.getOption(HBaseStorageConstants.META_TABLE_KEY); + String delim = tableMeta.getOption(HBaseStorageConstants.META_ROWKEY_DELIMITER, "").trim(); + if (delim.length() > 0) { + rowKeyDelimiter = delim.charAt(0); + } + isRowKeyMappings = new boolean[schema.size()]; + rowKeyFieldIndexes = new int[schema.size()]; + isBinaryColumns = new boolean[schema.size()]; + isColumnKeys = new boolean[schema.size()]; + isColumnValues = new boolean[schema.size()]; + + mappingColumns = new byte[schema.size()][][]; + + for (int i = 0; i < schema.size(); i++) { + rowKeyFieldIndexes[i] = -1; + } + + String columnMapping = tableMeta.getOption(HBaseStorageConstants.META_COLUMNS_KEY, ""); + if (columnMapping == null || columnMapping.isEmpty()) { + throw new IOException("'columns' property is required."); + } + + String[] columnMappingTokens = columnMapping.split(","); + + if (columnMappingTokens.length != schema.getColumns().size()) { + throw new IOException("The number of mapped HBase columns is great than the number of Tajo table columns"); + } + + int index = 0; + for (String eachToken: columnMappingTokens) { + mappingColumns[index] = new byte[2][]; + + byte[][] mappingTokens = BytesUtils.splitPreserveAllTokens(eachToken.trim().getBytes(), ':'); + + if (mappingTokens.length == 3) { + if (mappingTokens[0].length == 0) { + // cfname + throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:key:#b' " + + "or '<cfname>:value:' or '<cfname>:value:#b'"); + } + //<cfname>:key: or <cfname>:value: + if (mappingTokens[2].length != 0) { + String binaryOption = new String(mappingTokens[2]); + if ("#b".equals(binaryOption)) { + isBinaryColumns[index] = true; + } else { + throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:key:#b' " + + "or '<cfname>:value:' or '<cfname>:value:#b'"); + } + } + mappingColumns[index][0] = mappingTokens[0]; + String keyOrValue = new String(mappingTokens[1]); + if (HBaseStorageConstants.KEY_COLUMN_MAPPING.equalsIgnoreCase(keyOrValue)) { + isColumnKeys[index] = true; + } else if (HBaseStorageConstants.VALUE_COLUMN_MAPPING.equalsIgnoreCase(keyOrValue)) { + isColumnValues[index] = true; + } else { + throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:value:'"); + } + } else if (mappingTokens.length == 2) { + //<cfname>: or <cfname>:<qualifier> or :key + String cfName = new String(mappingTokens[0]); + String columnName = new String(mappingTokens[1]); + RowKeyMapping rowKeyMapping = getRowKeyMapping(cfName, columnName); + if (rowKeyMapping != null) { + isRowKeyMappings[index] = true; + numRowKeys++; + isBinaryColumns[index] = rowKeyMapping.isBinary(); + if (!cfName.isEmpty()) { + if (rowKeyDelimiter == 0) { + throw new IOException("hbase.rowkey.delimiter is required."); + } + rowKeyFieldIndexes[index] = Integer.parseInt(cfName); + } else { + rowKeyFieldIndexes[index] = -1; //rowkey is mapped a single column. + } + } else { + if (cfName.isEmpty()) { + throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:value:'"); + } + if (cfName != null) { + mappingColumns[index][0] = Bytes.toBytes(cfName); + } + + if (columnName != null && !columnName.isEmpty()) { + String[] columnNameTokens = columnName.split("#"); + if (columnNameTokens[0].isEmpty()) { + mappingColumns[index][1] = null; + } else { + mappingColumns[index][1] = Bytes.toBytes(columnNameTokens[0]); + } + if (columnNameTokens.length == 2 && "b".equals(columnNameTokens[1])) { + isBinaryColumns[index] = true; + } + } + } + } else { + throw new IOException(eachToken + " 'column' attribute '[cfname]:[qualfier]:'"); + } + + index++; + } // for loop + } + + public List<String> getColumnFamilyNames() { + List<String> cfNames = new ArrayList<String>(); + + for (byte[][] eachCfName: mappingColumns) { + if (eachCfName != null && eachCfName.length > 0 && eachCfName[0] != null) { + String cfName = new String(eachCfName[0]); + if (!cfNames.contains(cfName)) { + cfNames.add(cfName); + } + } + } + + return cfNames; + } + + private RowKeyMapping getRowKeyMapping(String cfName, String columnName) { + if (columnName == null || columnName.isEmpty()) { + return null; + } + + String[] tokens = columnName.split("#"); + if (!tokens[0].equalsIgnoreCase(HBaseStorageConstants.KEY_COLUMN_MAPPING)) { + return null; + } + + RowKeyMapping rowKeyMapping = new RowKeyMapping(); + + if (tokens.length == 2 && "b".equals(tokens[1])) { + rowKeyMapping.setBinary(true); + } + + if (cfName != null && !cfName.isEmpty()) { + rowKeyMapping.setKeyFieldIndex(Integer.parseInt(cfName)); + } + return rowKeyMapping; + } + + public char getRowKeyDelimiter() { + return rowKeyDelimiter; + } + + public int[] getRowKeyFieldIndexes() { + return rowKeyFieldIndexes; + } + + public boolean[] getIsRowKeyMappings() { + return isRowKeyMappings; + } + + public byte[][][] getMappingColumns() { + return mappingColumns; + } + + public Schema getSchema() { + return schema; + } + + public boolean[] getIsBinaryColumns() { + return isBinaryColumns; + } + + public String getHbaseTableName() { + return hbaseTableName; + } + + public boolean[] getIsColumnKeys() { + return isColumnKeys; + } + + public int getNumRowKeys() { + return numRowKeys; + } + + public boolean[] getIsColumnValues() { + return isColumnValues; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java new file mode 100644 index 0000000..c05c5bb --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.util.Bytes; + +import java.io.IOException; + +public class HBaseBinarySerializerDeserializer { + + public static Datum deserialize(Column col, byte[] bytes) throws IOException { + Datum datum; + switch (col.getDataType().getType()) { + case INT1: + case INT2: + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt2(Bytes.toShort(bytes)); + break; + case INT4: + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt4(Bytes.toInt(bytes)); + break; + case INT8: + if (bytes.length == 4) { + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt8(Bytes.toInt(bytes)); + } else { + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt8(Bytes.toLong(bytes)); + } + break; + case FLOAT4: + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createFloat4(Bytes.toFloat(bytes)); + break; + case FLOAT8: + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createFloat8(Bytes.toDouble(bytes)); + break; + case TEXT: + datum = bytes == null ? NullDatum.get() : DatumFactory.createText(bytes); + break; + default: + datum = NullDatum.get(); + break; + } + return datum; + } + + public static byte[] serialize(Column col, Datum datum) throws IOException { + if (datum == null || datum instanceof NullDatum) { + return null; + } + + byte[] bytes; + switch (col.getDataType().getType()) { + case INT1: + case INT2: + bytes = Bytes.toBytes(datum.asInt2()); + break; + case INT4: + bytes = Bytes.toBytes(datum.asInt4()); + break; + case INT8: + bytes = Bytes.toBytes(datum.asInt8()); + break; + case FLOAT4: + bytes = Bytes.toBytes(datum.asFloat4()); + break; + case FLOAT8: + bytes = Bytes.toBytes(datum.asFloat8()); + break; + case TEXT: + bytes = Bytes.toBytes(datum.asChars()); + break; + default: + bytes = null; + break; + } + + return bytes; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java new file mode 100644 index 0000000..43ad7f3 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import com.google.common.base.Objects; +import com.google.gson.annotations.Expose; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.fragment.StorageFragmentProtos.HBaseFragmentProto; + +public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Cloneable { + @Expose + private String tableName; + @Expose + private String hbaseTableName; + @Expose + private byte[] startRow; + @Expose + private byte[] stopRow; + @Expose + private String regionLocation; + @Expose + private boolean last; + @Expose + private long length; + + public HBaseFragment(String tableName, String hbaseTableName, byte[] startRow, byte[] stopRow, String regionLocation) { + this.tableName = tableName; + this.hbaseTableName = hbaseTableName; + this.startRow = startRow; + this.stopRow = stopRow; + this.regionLocation = regionLocation; + this.last = false; + } + + public HBaseFragment(ByteString raw) throws InvalidProtocolBufferException { + HBaseFragmentProto.Builder builder = HBaseFragmentProto.newBuilder(); + builder.mergeFrom(raw); + builder.build(); + init(builder.build()); + } + + private void init(HBaseFragmentProto proto) { + this.tableName = proto.getTableName(); + this.hbaseTableName = proto.getHbaseTableName(); + this.startRow = proto.getStartRow().toByteArray(); + this.stopRow = proto.getStopRow().toByteArray(); + this.regionLocation = proto.getRegionLocation(); + this.length = proto.getLength(); + this.last = proto.getLast(); + } + + @Override + public int compareTo(HBaseFragment t) { + return Bytes.compareTo(startRow, t.startRow); + } + + @Override + public String getTableName() { + return tableName; + } + + @Override + public String getKey() { + return new String(startRow); + } + + @Override + public boolean isEmpty() { + return startRow == null || stopRow == null; + } + + @Override + public long getLength() { + return length; + } + + public void setLength(long length) { + this.length = length; + } + + @Override + public String[] getHosts() { + return new String[] {regionLocation}; + } + + public Object clone() throws CloneNotSupportedException { + HBaseFragment frag = (HBaseFragment) super.clone(); + frag.tableName = tableName; + frag.hbaseTableName = hbaseTableName; + frag.startRow = startRow; + frag.stopRow = stopRow; + frag.regionLocation = regionLocation; + frag.last = last; + frag.length = length; + return frag; + } + + @Override + public boolean equals(Object o) { + if (o instanceof HBaseFragment) { + HBaseFragment t = (HBaseFragment) o; + if (tableName.equals(t.tableName) + && Bytes.equals(startRow, t.startRow) + && Bytes.equals(stopRow, t.stopRow)) { + return true; + } + } + return false; + } + + @Override + public int hashCode() { + return Objects.hashCode(tableName, hbaseTableName, startRow, stopRow); + } + + @Override + public String toString() { + return "\"fragment\": {\"tableName\": \""+ tableName + "\", hbaseTableName\": \"" + hbaseTableName + "\"" + + ", \"startRow\": \"" + new String(startRow) + "\"" + + ", \"stopRow\": \"" + new String(stopRow) + "\"" + + ", \"length\": \"" + length + "\"}" ; + } + + @Override + public FragmentProto getProto() { + HBaseFragmentProto.Builder builder = HBaseFragmentProto.newBuilder(); + builder.setTableName(tableName) + .setHbaseTableName(hbaseTableName) + .setStartRow(ByteString.copyFrom(startRow)) + .setStopRow(ByteString.copyFrom(stopRow)) + .setLast(last) + .setLength(length) + .setRegionLocation(regionLocation); + + FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder(); + fragmentBuilder.setId(this.tableName); + fragmentBuilder.setContents(builder.buildPartial().toByteString()); + fragmentBuilder.setStoreType(StoreType.HBASE.name()); + return fragmentBuilder.build(); + } + + public byte[] getStartRow() { + return startRow; + } + + public byte[] getStopRow() { + return stopRow; + } + + public String getRegionLocation() { + return regionLocation; + } + + public boolean isLast() { + return last; + } + + public void setLast(boolean last) { + this.last = last; + } + + public String getHbaseTableName() { + return hbaseTableName; + } + + public void setHbaseTableName(String hbaseTableName) { + this.hbaseTableName = hbaseTableName; + } + + public void setStartRow(byte[] startRow) { + this.startRow = startRow; + } + + public void setStopRow(byte[] stopRow) { + this.stopRow = stopRow; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java new file mode 100644 index 0000000..50f61a8 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.storage.StorageManager; +import org.apache.tajo.storage.Tuple; + +import java.io.IOException; + +public class HBasePutAppender extends AbstractHBaseAppender { + private HTableInterface htable; + private long totalNumBytes; + + public HBasePutAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, + Schema schema, TableMeta meta, Path stagingDir) { + super(conf, taskAttemptId, schema, meta, stagingDir); + } + + @Override + public void init() throws IOException { + super.init(); + + Configuration hbaseConf = HBaseStorageManager.getHBaseConfiguration(conf, meta); + HConnection hconn = ((HBaseStorageManager) StorageManager.getStorageManager((TajoConf)conf, StoreType.HBASE)) + .getConnection(hbaseConf); + htable = hconn.getTable(columnMapping.getHbaseTableName()); + htable.setAutoFlushTo(false); + htable.setWriteBufferSize(5 * 1024 * 1024); + } + + @Override + public void addTuple(Tuple tuple) throws IOException { + byte[] rowkey = getRowKeyBytes(tuple); + totalNumBytes += rowkey.length; + Put put = new Put(rowkey); + readKeyValues(tuple, rowkey); + + for (int i = 0; i < columnNum; i++) { + if (isRowKeyMappings[i]) { + continue; + } + Datum datum = tuple.get(i); + byte[] value; + if (isBinaryColumns[i]) { + value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), datum); + } else { + value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), datum); + } + + if (isColumnKeys[i]) { + columnKeyDatas[columnKeyValueDataIndexes[i]] = value; + } else if (isColumnValues[i]) { + columnValueDatas[columnKeyValueDataIndexes[i]] = value; + } else { + put.add(mappingColumnFamilies[i][0], mappingColumnFamilies[i][1], value); + totalNumBytes += value.length; + } + } + + for (int i = 0; i < columnKeyDatas.length; i++) { + put.add(columnKeyCfNames[i], columnKeyDatas[i], columnValueDatas[i]); + totalNumBytes += columnKeyDatas[i].length + columnValueDatas[i].length; + } + + htable.put(put); + + if (enabledStats) { + stats.incrementRow(); + stats.setNumBytes(totalNumBytes); + } + } + + @Override + public void flush() throws IOException { + htable.flushCommits(); + } + + @Override + public long getEstimatedOutputSize() throws IOException { + return 0; + } + + @Override + public void close() throws IOException { + if (htable != null) { + htable.flushCommits(); + htable.close(); + } + if (enabledStats) { + stats.setNumBytes(totalNumBytes); + } + } +}
