http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-plan/src/main/java/org/apache/tajo/plan/Target.java ---------------------------------------------------------------------- diff --cc tajo-plan/src/main/java/org/apache/tajo/plan/Target.java index f49a93d,f49a93d..0524240 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/Target.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/Target.java @@@ -21,7 -21,7 +21,7 @@@ package org.apache.tajo.plan import com.google.gson.annotations.Expose; import org.apache.tajo.catalog.Column; import org.apache.tajo.common.TajoDataTypes.DataType; --import org.apache.tajo.json.GsonObject; ++import org.apache.tajo.storage.json.GsonObject; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.expr.FieldEval; import org.apache.tajo.plan.serder.PlanGsonHelper;
http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-plan/src/main/java/org/apache/tajo/plan/expr/CaseWhenEval.java ---------------------------------------------------------------------- diff --cc tajo-plan/src/main/java/org/apache/tajo/plan/expr/CaseWhenEval.java index 4321d02,4321d02..24101e5 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/CaseWhenEval.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/CaseWhenEval.java @@@ -27,7 -27,7 +27,7 @@@ import org.apache.tajo.common.TajoDataT import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.NullDatum; --import org.apache.tajo.json.GsonObject; ++import org.apache.tajo.storage.json.GsonObject; import org.apache.tajo.plan.serder.PlanGsonHelper; import org.apache.tajo.storage.Tuple; import org.apache.tajo.util.TUtil; http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalNode.java ---------------------------------------------------------------------- diff --cc tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalNode.java index 638383a,638383a..e028481 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalNode.java @@@ -22,7 -22,7 +22,7 @@@ import com.google.gson.annotations.Expo import org.apache.tajo.catalog.Schema; import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.datum.Datum; --import org.apache.tajo.json.GsonObject; ++import org.apache.tajo.storage.json.GsonObject; import org.apache.tajo.plan.serder.PlanGsonHelper; import org.apache.tajo.storage.Tuple; http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-plan/src/main/java/org/apache/tajo/plan/function/GeneralFunction.java ---------------------------------------------------------------------- diff --cc tajo-plan/src/main/java/org/apache/tajo/plan/function/GeneralFunction.java index 006449f,006449f..5a83f48 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/GeneralFunction.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/GeneralFunction.java @@@ -23,7 -23,7 +23,7 @@@ import org.apache.tajo.catalog.json.Cat import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.datum.Datum; import org.apache.tajo.function.Function; --import org.apache.tajo.json.GsonObject; ++import org.apache.tajo.storage.json.GsonObject; import org.apache.tajo.plan.expr.FunctionEval; import org.apache.tajo.storage.Tuple; http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-plan/src/main/java/org/apache/tajo/plan/logical/BinaryNode.java ---------------------------------------------------------------------- diff --cc tajo-plan/src/main/java/org/apache/tajo/plan/logical/BinaryNode.java index 709ef34,709ef34..55b6871 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/BinaryNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/BinaryNode.java @@@ -19,7 -19,7 +19,7 @@@ package org.apache.tajo.plan.logical; import com.google.gson.annotations.Expose; --import org.apache.tajo.json.GsonObject; ++import org.apache.tajo.storage.json.GsonObject; public abstract class BinaryNode extends LogicalNode implements Cloneable, GsonObject { @Expose LogicalNode leftChild = null; http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-plan/src/main/java/org/apache/tajo/plan/logical/LogicalNode.java ---------------------------------------------------------------------- diff --cc tajo-plan/src/main/java/org/apache/tajo/plan/logical/LogicalNode.java index c42a05e,c42a05e..beb2a8f --- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/LogicalNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/LogicalNode.java @@@ -23,7 -23,7 +23,7 @@@ package org.apache.tajo.plan.logical import com.google.gson.annotations.Expose; import org.apache.tajo.catalog.Schema; --import org.apache.tajo.json.GsonObject; ++import org.apache.tajo.storage.json.GsonObject; import org.apache.tajo.plan.PlanString; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.serder.PlanGsonHelper; http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeAdapter.java ---------------------------------------------------------------------- diff --cc tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeAdapter.java index 5a75e58,5a75e58..8003af5 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeAdapter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/EvalNodeAdapter.java @@@ -22,8 -22,8 +22,8 @@@ package org.apache.tajo.plan.serder; import com.google.gson.*; --import org.apache.tajo.json.CommonGsonHelper; --import org.apache.tajo.json.GsonSerDerAdapter; ++import org.apache.tajo.storage.json.CommonGsonHelper; ++import org.apache.tajo.storage.json.GsonSerDerAdapter; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.expr.EvalType; http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeAdapter.java ---------------------------------------------------------------------- diff --cc tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeAdapter.java index f86ba5f,f86ba5f..20c16bb --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeAdapter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeAdapter.java @@@ -22,10 -22,10 +22,10 @@@ package org.apache.tajo.plan.serder; import com.google.gson.*; --import org.apache.tajo.json.CommonGsonHelper; ++import org.apache.tajo.storage.json.CommonGsonHelper; import org.apache.tajo.plan.logical.LogicalNode; import org.apache.tajo.plan.logical.NodeType; --import org.apache.tajo.json.GsonSerDerAdapter; ++import org.apache.tajo.storage.json.GsonSerDerAdapter; import java.lang.reflect.Type; http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-plan/src/main/java/org/apache/tajo/plan/serder/PlanGsonHelper.java ---------------------------------------------------------------------- diff --cc tajo-plan/src/main/java/org/apache/tajo/plan/serder/PlanGsonHelper.java index 8cafbd0,8cafbd0..e0a7ad5 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/PlanGsonHelper.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/PlanGsonHelper.java @@@ -27,7 -27,7 +27,7 @@@ import org.apache.tajo.catalog.json.Tab import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.datum.Datum; import org.apache.tajo.function.Function; --import org.apache.tajo.json.*; ++import org.apache.tajo.storage.json.*; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.function.AggFunction; import org.apache.tajo.plan.function.GeneralFunction; http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java index d2a692d,0000000..58d26d8 mode 100644,000000..100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java @@@ -1,933 -1,0 +1,980 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; ++import org.apache.hadoop.fs.PathFilter; +import org.apache.tajo.*; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.NodeType; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.plan.rewrite.RewriteRule; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.fragment.FragmentConvertor; +import org.apache.tajo.util.TUtil; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.net.URI; +import java.text.NumberFormat; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * StorageManager manages the functions of storing and reading data. + * StorageManager is a abstract class. + * For supporting such as HDFS, HBASE, a specific StorageManager should be implemented by inheriting this class. + * + */ +public abstract class StorageManager { + private final Log LOG = LogFactory.getLog(StorageManager.class); + + private static final Class<?>[] DEFAULT_SCANNER_PARAMS = { + Configuration.class, + Schema.class, + TableMeta.class, + Fragment.class + }; + + private static final Class<?>[] DEFAULT_APPENDER_PARAMS = { + Configuration.class, + QueryUnitAttemptId.class, + Schema.class, + TableMeta.class, + Path.class + }; + ++ public static final PathFilter hiddenFileFilter = new PathFilter() { ++ public boolean accept(Path p) { ++ String name = p.getName(); ++ return !name.startsWith("_") && !name.startsWith("."); ++ } ++ }; ++ + protected TajoConf conf; + protected StoreType storeType; + + /** + * Cache of StorageManager. + * Key is manager key(warehouse path) + store type + */ + private static final Map<String, StorageManager> storageManagers = Maps.newHashMap(); + + /** + * Cache of scanner handlers for each storage type. + */ + protected static final Map<String, Class<? extends Scanner>> SCANNER_HANDLER_CACHE + = new ConcurrentHashMap<String, Class<? extends Scanner>>(); + + /** + * Cache of appender handlers for each storage type. + */ + protected static final Map<String, Class<? extends Appender>> APPENDER_HANDLER_CACHE + = new ConcurrentHashMap<String, Class<? extends Appender>>(); + + /** + * Cache of constructors for each class. Pins the classes so they + * can't be garbage collected until ReflectionUtils can be collected. + */ + private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = + new ConcurrentHashMap<Class<?>, Constructor<?>>(); + + public StorageManager(StoreType storeType) { + this.storeType = storeType; + } + + /** + * Initialize storage manager. + * @throws java.io.IOException + */ + protected abstract void storageInit() throws IOException; + + /** + * This method is called after executing "CREATE TABLE" statement. + * If a storage is a file based storage, a storage manager may create directory. + * + * @param tableDesc Table description which is created. + * @param ifNotExists Creates the table only when the table does not exist. + * @throws java.io.IOException + */ + public abstract void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException; + + /** + * This method is called after executing "DROP TABLE" statement with the 'PURGE' option + * which is the option to delete all the data. + * + * @param tableDesc + * @throws java.io.IOException + */ + public abstract void purgeTable(TableDesc tableDesc) throws IOException; + + /** + * Returns the splits that will serve as input for the scan tasks. The + * number of splits matches the number of regions in a table. + * @param fragmentId The table name or previous ExecutionBlockId + * @param tableDesc The table description for the target data. + * @param scanNode The logical node for scanning. + * @return The list of input fragments. + * @throws java.io.IOException + */ + public abstract List<Fragment> getSplits(String fragmentId, TableDesc tableDesc, + ScanNode scanNode) throws IOException; + + /** + * It returns the splits that will serve as input for the non-forward query scanner such as 'select * from table1'. + * The result list should be small. If there is many fragments for scanning, TajoMaster uses the paging navigation. + * @param tableDesc The table description for the target data. + * @param currentPage The current page number within the entire list. + * @param numFragments The number of fragments in the result. + * @return The list of input fragments. + * @throws java.io.IOException + */ + public abstract List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments) + throws IOException; + + /** + * It returns the storage property. + * @return The storage property + */ + public abstract StorageProperty getStorageProperty(); + + /** + * Release storage manager resource + */ + public abstract void closeStorageManager(); + + /** + * It is called by a Repartitioner for range shuffling when the SortRangeType of SortNode is USING_STORAGE_MANAGER. + * In general Repartitioner determines the partition range using previous output statistics data. + * In the special cases, such as HBase Repartitioner uses the result of this method. + * + * @param queryContext The current query context which contains query properties. + * @param tableDesc The table description for the target data. + * @param inputSchema The input schema + * @param sortSpecs The sort specification that contains the sort column and sort order. + * @return The list of sort ranges. + * @throws java.io.IOException + */ + public abstract TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc, + Schema inputSchema, SortSpec[] sortSpecs, + TupleRange dataRange) throws IOException; + + /** + * This method is called before executing 'INSERT' or 'CREATE TABLE as SELECT'. + * In general Tajo creates the target table after finishing the final sub-query of CATS. + * But In the special cases, such as HBase INSERT or CAST query uses the target table information. + * That kind of the storage should implements the logic related to creating table in this method. + * + * @param node The child node of the root node. + * @throws java.io.IOException + */ + public abstract void beforeInsertOrCATS(LogicalNode node) throws IOException; + + /** + * It is called when the query failed. + * Each storage manager should implement to be processed when the query fails in this method. + * + * @param node The child node of the root node. + * @throws java.io.IOException + */ + public abstract void rollbackOutputCommit(LogicalNode node) throws IOException; + + /** + * Returns the current storage type. + * @return + */ + public StoreType getStoreType() { + return storeType; + } + + /** + * Initialize StorageManager instance. It should be called before using. + * + * @param tajoConf + * @throws java.io.IOException + */ + public void init(TajoConf tajoConf) throws IOException { + this.conf = tajoConf; + storageInit(); + } + + /** + * Close StorageManager + * @throws java.io.IOException + */ + public void close() throws IOException { + synchronized(storageManagers) { + for (StorageManager eachStorageManager: storageManagers.values()) { + eachStorageManager.closeStorageManager(); + } + } + } + + /** + * Returns the splits that will serve as input for the scan tasks. The + * number of splits matches the number of regions in a table. + * + * @param fragmentId The table name or previous ExecutionBlockId + * @param tableDesc The table description for the target data. + * @return The list of input fragments. + * @throws java.io.IOException + */ + public List<Fragment> getSplits(String fragmentId, TableDesc tableDesc) throws IOException { + return getSplits(fragmentId, tableDesc, null); + } + + /** + * Returns FileStorageManager instance. + * + * @param tajoConf Tajo system property. + * @return + * @throws java.io.IOException + */ + public static StorageManager getFileStorageManager(TajoConf tajoConf) throws IOException { + return getFileStorageManager(tajoConf, null); + } + + /** + * Returns FileStorageManager instance and sets WAREHOUSE_DIR property in tajoConf with warehousePath parameter. + * + * @param tajoConf Tajo system property. + * @param warehousePath The warehouse directory to be set in the tajoConf. + * @return + * @throws java.io.IOException + */ + public static StorageManager getFileStorageManager(TajoConf tajoConf, Path warehousePath) throws IOException { + URI uri; + TajoConf copiedConf = new TajoConf(tajoConf); + if (warehousePath != null) { + copiedConf.setVar(ConfVars.WAREHOUSE_DIR, warehousePath.toUri().toString()); + } + uri = TajoConf.getWarehouseDir(copiedConf).toUri(); + String key = "file".equals(uri.getScheme()) ? "file" : uri.toString(); + return getStorageManager(copiedConf, StoreType.CSV, key); + } + + /** + * Returns the proper StorageManager instance according to the storeType. + * + * @param tajoConf Tajo system property. + * @param storeType Storage type + * @return + * @throws java.io.IOException + */ + public static StorageManager getStorageManager(TajoConf tajoConf, String storeType) throws IOException { + if ("HBASE".equals(storeType)) { + return getStorageManager(tajoConf, StoreType.HBASE); + } else { + return getStorageManager(tajoConf, StoreType.CSV); + } + } + + /** + * Returns the proper StorageManager instance according to the storeType. + * + * @param tajoConf Tajo system property. + * @param storeType Storage type + * @return + * @throws java.io.IOException + */ + public static StorageManager getStorageManager(TajoConf tajoConf, StoreType storeType) throws IOException { + return getStorageManager(tajoConf, storeType, null); + } + + /** + * Returns the proper StorageManager instance according to the storeType + * + * @param tajoConf Tajo system property. + * @param storeType Storage type + * @param managerKey Key that can identify each storage manager(may be a path) + * @return + * @throws java.io.IOException + */ + public static synchronized StorageManager getStorageManager ( + TajoConf tajoConf, StoreType storeType, String managerKey) throws IOException { + synchronized (storageManagers) { + String storeKey = storeType + managerKey; + StorageManager manager = storageManagers.get(storeKey); + if (manager == null) { + String typeName = "hdfs"; + + switch (storeType) { + case HBASE: + typeName = "hbase"; + break; + default: + typeName = "hdfs"; + } + + Class<? extends StorageManager> storageManagerClass = + tajoConf.getClass(String.format("tajo.storage.manager.%s.class", typeName), null, StorageManager.class); + + if (storageManagerClass == null) { + throw new IOException("Unknown Storage Type: " + typeName); + } + + try { + Constructor<? extends StorageManager> constructor = + (Constructor<? extends StorageManager>) CONSTRUCTOR_CACHE.get(storageManagerClass); + if (constructor == null) { + constructor = storageManagerClass.getDeclaredConstructor(new Class<?>[]{StoreType.class}); + constructor.setAccessible(true); + CONSTRUCTOR_CACHE.put(storageManagerClass, constructor); + } + manager = constructor.newInstance(new Object[]{storeType}); + } catch (Exception e) { + throw new RuntimeException(e); + } + manager.init(tajoConf); + storageManagers.put(storeKey, manager); + } + + return manager; + } + } + + /** + * Returns Scanner instance. + * + * @param meta The table meta + * @param schema The input schema + * @param fragment The fragment for scanning + * @param target Columns which are selected. + * @return Scanner instance + * @throws java.io.IOException + */ + public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException { + return getScanner(meta, schema, FragmentConvertor.convert(conf, fragment), target); + } + + /** + * Returns Scanner instance. + * + * @param meta The table meta + * @param schema The input schema + * @param fragment The fragment for scanning + * @return Scanner instance + * @throws java.io.IOException + */ + public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException { + return getScanner(meta, schema, fragment, schema); + } + + /** + * Returns Scanner instance. + * + * @param meta The table meta + * @param schema The input schema + * @param fragment The fragment for scanning + * @param target The output schema + * @return Scanner instance + * @throws java.io.IOException + */ + public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException { + if (fragment.isEmpty()) { + Scanner scanner = new NullScanner(conf, schema, meta, fragment); + scanner.setTarget(target.toArray()); + + return scanner; + } + + Scanner scanner; + + Class<? extends Scanner> scannerClass = getScannerClass(meta.getStoreType()); + scanner = newScannerInstance(scannerClass, conf, schema, meta, fragment); + if (scanner.isProjectable()) { + scanner.setTarget(target.toArray()); + } + + return scanner; + } + + /** + * Returns Scanner instance. + * + * @param conf The system property + * @param meta The table meta + * @param schema The input schema + * @param fragment The fragment for scanning + * @param target The output schema + * @return Scanner instance + * @throws java.io.IOException + */ + public static synchronized SeekableScanner getSeekableScanner( + TajoConf conf, TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException { + return (SeekableScanner)getStorageManager(conf, meta.getStoreType()).getScanner(meta, schema, fragment, target); + } + + /** + * Returns Appender instance. + * @param queryContext Query property. + * @param taskAttemptId Task id. + * @param meta Table meta data. + * @param schema Output schema. + * @param workDir Working directory + * @return Appender instance + * @throws java.io.IOException + */ + public Appender getAppender(OverridableConf queryContext, + QueryUnitAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir) + throws IOException { + Appender appender; + + Class<? extends Appender> appenderClass; + + String handlerName = meta.getStoreType().name().toLowerCase(); + appenderClass = APPENDER_HANDLER_CACHE.get(handlerName); + if (appenderClass == null) { + appenderClass = conf.getClass( + String.format("tajo.storage.appender-handler.%s.class", + meta.getStoreType().name().toLowerCase()), null, Appender.class); + APPENDER_HANDLER_CACHE.put(handlerName, appenderClass); + } + + if (appenderClass == null) { + throw new IOException("Unknown Storage Type: " + meta.getStoreType()); + } + + appender = newAppenderInstance(appenderClass, conf, taskAttemptId, meta, schema, workDir); + + return appender; + } + + /** + * Creates a scanner instance. + * + * @param theClass Concrete class of scanner + * @param conf System property + * @param schema Input schema + * @param meta Table meta data + * @param fragment The fragment for scanning + * @param <T> + * @return The scanner instance + */ + public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, Schema schema, TableMeta meta, + Fragment fragment) { + T result; + try { + Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass); + if (meth == null) { + meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS); + meth.setAccessible(true); + CONSTRUCTOR_CACHE.put(theClass, meth); + } + result = meth.newInstance(new Object[]{conf, schema, meta, fragment}); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return result; + } + + /** + * Creates a scanner instance. + * + * @param theClass Concrete class of scanner + * @param conf System property + * @param taskAttemptId Task id + * @param meta Table meta data + * @param schema Input schema + * @param workDir Working directory + * @param <T> + * @return The scanner instance + */ + public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, QueryUnitAttemptId taskAttemptId, + TableMeta meta, Schema schema, Path workDir) { + T result; + try { + Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass); + if (meth == null) { + meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS); + meth.setAccessible(true); + CONSTRUCTOR_CACHE.put(theClass, meth); + } + result = meth.newInstance(new Object[]{conf, taskAttemptId, schema, meta, workDir}); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return result; + } + + /** + * Return the Scanner class for the StoreType that is defined in storage-default.xml. + * + * @param storeType store type + * @return The Scanner class + * @throws java.io.IOException + */ + public Class<? extends Scanner> getScannerClass(CatalogProtos.StoreType storeType) throws IOException { + String handlerName = storeType.name().toLowerCase(); + Class<? extends Scanner> scannerClass = SCANNER_HANDLER_CACHE.get(handlerName); + if (scannerClass == null) { + scannerClass = conf.getClass( + String.format("tajo.storage.scanner-handler.%s.class",storeType.name().toLowerCase()), null, Scanner.class); + SCANNER_HANDLER_CACHE.put(handlerName, scannerClass); + } + + if (scannerClass == null) { + throw new IOException("Unknown Storage Type: " + storeType.name()); + } + + return scannerClass; + } + + /** + * Return length of the fragment. + * In the UNKNOWN_LENGTH case get FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH from the configuration. + * + * @param conf Tajo system property + * @param fragment Fragment + * @return + */ + public static long getFragmentLength(TajoConf conf, Fragment fragment) { + if (fragment.getLength() == TajoConstants.UNKNOWN_LENGTH) { + return conf.getLongVar(ConfVars.FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH); + } else { + return fragment.getLength(); + } + } + + /** + * It is called after making logical plan. Storage manager should verify the schema for inserting. + * + * @param tableDesc The table description of insert target. + * @param outSchema The output schema of select query for inserting. + * @throws java.io.IOException + */ + public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException { + // nothing to do + } + + /** + * Returns the list of storage specified rewrite rules. + * This values are used by LogicalOptimizer. + * + * @param queryContext The query property + * @param tableDesc The description of the target table. + * @return The list of storage specified rewrite rules + * @throws java.io.IOException + */ + public List<RewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException { + return null; + } + + /** + * Finalizes result data. Tajo stores result data in the staging directory. + * If the query fails, clean up the staging directory. + * Otherwise the query is successful, move to the final directory from the staging directory. + * + * @param queryContext The query property + * @param finalEbId The final execution block id + * @param plan The query plan + * @param schema The final output schema + * @param tableDesc The description of the target table + * @return Saved path + * @throws java.io.IOException + */ + public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId, + LogicalPlan plan, Schema schema, + TableDesc tableDesc) throws IOException { + return commitOutputData(queryContext, finalEbId, plan, schema, tableDesc, true); + } + + /** + * Finalizes result data. Tajo stores result data in the staging directory. + * If the query fails, clean up the staging directory. + * Otherwise the query is successful, move to the final directory from the staging directory. + * + * @param queryContext The query property + * @param finalEbId The final execution block id + * @param plan The query plan + * @param schema The final output schema + * @param tableDesc The description of the target table + * @param changeFileSeq If true change result file name with max sequence. + * @return Saved path + * @throws java.io.IOException + */ + protected Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId, + LogicalPlan plan, Schema schema, + TableDesc tableDesc, boolean changeFileSeq) throws IOException { + Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR)); + Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + Path finalOutputDir; + if (!queryContext.get(QueryVars.OUTPUT_TABLE_PATH, "").isEmpty()) { + finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_PATH)); - FileSystem fs = stagingResultDir.getFileSystem(conf); - - if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // INSERT OVERWRITE INTO - - // It moves the original table into the temporary location. - // Then it moves the new result table into the original table location. - // Upon failed, it recovers the original table if possible. - boolean movedToOldTable = false; - boolean committed = false; - Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); - - if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) { - // This is a map for existing non-leaf directory to rename. A key is current directory and a value is - // renaming directory. - Map<Path, Path> renameDirs = TUtil.newHashMap(); - // This is a map for recovering existing partition directory. A key is current directory and a value is - // temporary directory to back up. - Map<Path, Path> recoveryDirs = TUtil.newHashMap(); - - try { - if (!fs.exists(finalOutputDir)) { - fs.mkdirs(finalOutputDir); - } ++ try { ++ FileSystem fs = stagingResultDir.getFileSystem(conf); ++ ++ if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // INSERT OVERWRITE INTO + - visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(), - renameDirs, oldTableDir); - - // Rename target partition directories - for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { - // Backup existing data files for recovering - if (fs.exists(entry.getValue())) { - String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(), - oldTableDir.toString()); - Path recoveryPath = new Path(recoveryPathString); - fs.rename(entry.getValue(), recoveryPath); - fs.exists(recoveryPath); - recoveryDirs.put(entry.getValue(), recoveryPath); ++ // It moves the original table into the temporary location. ++ // Then it moves the new result table into the original table location. ++ // Upon failed, it recovers the original table if possible. ++ boolean movedToOldTable = false; ++ boolean committed = false; ++ Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); ++ ++ if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) { ++ // This is a map for existing non-leaf directory to rename. A key is current directory and a value is ++ // renaming directory. ++ Map<Path, Path> renameDirs = TUtil.newHashMap(); ++ // This is a map for recovering existing partition directory. A key is current directory and a value is ++ // temporary directory to back up. ++ Map<Path, Path> recoveryDirs = TUtil.newHashMap(); ++ ++ try { ++ if (!fs.exists(finalOutputDir)) { ++ fs.mkdirs(finalOutputDir); + } - // Delete existing directory - fs.delete(entry.getValue(), true); - // Rename staging directory to final output directory - fs.rename(entry.getKey(), entry.getValue()); - } + - } catch (IOException ioe) { - // Remove created dirs - for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { - fs.delete(entry.getValue(), true); - } ++ visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(), ++ renameDirs, oldTableDir); ++ ++ // Rename target partition directories ++ for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { ++ // Backup existing data files for recovering ++ if (fs.exists(entry.getValue())) { ++ String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(), ++ oldTableDir.toString()); ++ Path recoveryPath = new Path(recoveryPathString); ++ fs.rename(entry.getValue(), recoveryPath); ++ fs.exists(recoveryPath); ++ recoveryDirs.put(entry.getValue(), recoveryPath); ++ } ++ // Delete existing directory ++ fs.delete(entry.getValue(), true); ++ // Rename staging directory to final output directory ++ fs.rename(entry.getKey(), entry.getValue()); ++ } + - // Recovery renamed dirs - for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) { - fs.delete(entry.getValue(), true); - fs.rename(entry.getValue(), entry.getKey()); - } - throw new IOException(ioe.getMessage()); - } - } else { - try { - if (fs.exists(finalOutputDir)) { - fs.rename(finalOutputDir, oldTableDir); - movedToOldTable = fs.exists(oldTableDir); - } else { // if the parent does not exist, make its parent directory. - fs.mkdirs(finalOutputDir.getParent()); ++ } catch (IOException ioe) { ++ // Remove created dirs ++ for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { ++ fs.delete(entry.getValue(), true); ++ } ++ ++ // Recovery renamed dirs ++ for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) { ++ fs.delete(entry.getValue(), true); ++ fs.rename(entry.getValue(), entry.getKey()); ++ } ++ throw new IOException(ioe.getMessage()); + } ++ } else { // no partition ++ try { ++ ++ // if the final output dir exists, move all contents to the temporary table dir. ++ // Otherwise, just make the final output dir. As a result, the final output dir will be empty. ++ if (fs.exists(finalOutputDir)) { ++ fs.mkdirs(oldTableDir); + - fs.rename(stagingResultDir, finalOutputDir); - committed = fs.exists(finalOutputDir); - } catch (IOException ioe) { - // recover the old table - if (movedToOldTable && !committed) { - fs.rename(oldTableDir, finalOutputDir); ++ for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) { ++ fs.rename(status.getPath(), oldTableDir); ++ } ++ ++ movedToOldTable = fs.exists(oldTableDir); ++ } else { // if the parent does not exist, make its parent directory. ++ fs.mkdirs(finalOutputDir); ++ } ++ ++ // Move the results to the final output dir. ++ for (FileStatus status : fs.listStatus(stagingResultDir)) { ++ fs.rename(status.getPath(), finalOutputDir); ++ } ++ ++ // Check the final output dir ++ committed = fs.exists(finalOutputDir); ++ ++ } catch (IOException ioe) { ++ // recover the old table ++ if (movedToOldTable && !committed) { ++ ++ // if commit is failed, recover the old data ++ for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) { ++ fs.delete(status.getPath(), true); ++ } ++ ++ for (FileStatus status : fs.listStatus(oldTableDir)) { ++ fs.rename(status.getPath(), finalOutputDir); ++ } ++ } ++ ++ throw new IOException(ioe.getMessage()); + } + } - } - } else { - String queryType = queryContext.get(QueryVars.COMMAND_TYPE); ++ } else { ++ String queryType = queryContext.get(QueryVars.COMMAND_TYPE); + - if (queryType != null && queryType.equals(NodeType.INSERT.name())) { // INSERT INTO an existing table ++ if (queryType != null && queryType.equals(NodeType.INSERT.name())) { // INSERT INTO an existing table + - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(3); ++ NumberFormat fmt = NumberFormat.getInstance(); ++ fmt.setGroupingUsed(false); ++ fmt.setMinimumIntegerDigits(3); + - if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) { - for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { - if (eachFile.isFile()) { - LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath()); - continue; ++ if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) { ++ for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { ++ if (eachFile.isFile()) { ++ LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath()); ++ continue; ++ } ++ moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1, changeFileSeq); ++ } ++ } else { ++ int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1; ++ for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { ++ if (eachFile.getPath().getName().startsWith("_")) { ++ continue; ++ } ++ moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++, changeFileSeq); + } - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1, changeFileSeq); + } - } else { - int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1; - for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { - if (eachFile.getPath().getName().startsWith("_")) { - continue; ++ // checking all file moved and remove empty dir ++ verifyAllFileMoved(fs, stagingResultDir); ++ FileStatus[] files = fs.listStatus(stagingResultDir); ++ if (files != null && files.length != 0) { ++ for (FileStatus eachFile: files) { ++ LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); + } - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++, changeFileSeq); + } - } - // checking all file moved and remove empty dir - verifyAllFileMoved(fs, stagingResultDir); - FileStatus[] files = fs.listStatus(stagingResultDir); - if (files != null && files.length != 0) { - for (FileStatus eachFile: files) { - LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); ++ } else { // CREATE TABLE AS SELECT (CTAS) ++ if (fs.exists(finalOutputDir)) { ++ for (FileStatus status : fs.listStatus(stagingResultDir)) { ++ fs.rename(status.getPath(), finalOutputDir); ++ } ++ } else { ++ fs.rename(stagingResultDir, finalOutputDir); + } ++ LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir); + } - } else { // CREATE TABLE AS SELECT (CTAS) - fs.rename(stagingResultDir, finalOutputDir); - LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir); + } ++ ++ // remove the staging directory if the final output dir is given. ++ Path stagingDirRoot = stagingDir.getParent(); ++ fs.delete(stagingDirRoot, true); ++ } catch (Throwable t) { ++ LOG.error(t); ++ throw new IOException(t); + } + } else { + finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + } + + return finalOutputDir; + } + + /** + * Attach the sequence number to the output file name and than move the file into the final result path. + * + * @param fs FileSystem + * @param stagingResultDir The staging result dir + * @param fileStatus The file status + * @param finalOutputPath Final output path + * @param nf Number format + * @param fileSeq The sequence number + * @throws java.io.IOException + */ + private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir, + FileStatus fileStatus, Path finalOutputPath, + NumberFormat nf, + int fileSeq, boolean changeFileSeq) throws IOException { + if (fileStatus.isDirectory()) { + String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); + if (subPath != null) { + Path finalSubPath = new Path(finalOutputPath, subPath); + if (!fs.exists(finalSubPath)) { + fs.mkdirs(finalSubPath); + } + int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false); + for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) { + if (eachFile.getPath().getName().startsWith("_")) { + continue; + } + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq, changeFileSeq); + } + } else { + throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath()); + } + } else { + String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); + if (subPath != null) { + Path finalSubPath = new Path(finalOutputPath, subPath); + if (changeFileSeq) { + finalSubPath = new Path(finalSubPath.getParent(), replaceFileNameSeq(finalSubPath, fileSeq, nf)); + } + if (!fs.exists(finalSubPath.getParent())) { + fs.mkdirs(finalSubPath.getParent()); + } + if (fs.exists(finalSubPath)) { + throw new IOException("Already exists data file:" + finalSubPath); + } + boolean success = fs.rename(fileStatus.getPath(), finalSubPath); + if (success) { + LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " + + "to final output[" + finalSubPath + "]"); + } else { + LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " + + "to final output[" + finalSubPath + "]"); + } + } + } + } + + /** + * Removes the path of the parent. + * @param parentPath + * @param childPath + * @return + */ + private String extractSubPath(Path parentPath, Path childPath) { + String parentPathStr = parentPath.toUri().getPath(); + String childPathStr = childPath.toUri().getPath(); + + if (parentPathStr.length() > childPathStr.length()) { + return null; + } + + int index = childPathStr.indexOf(parentPathStr); + if (index != 0) { + return null; + } + + return childPathStr.substring(parentPathStr.length() + 1); + } + + /** + * Attach the sequence number to a path. + * + * @param path Path + * @param seq sequence number + * @param nf Number format + * @return New path attached with sequence number + * @throws java.io.IOException + */ + private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) throws IOException { + String[] tokens = path.getName().split("-"); + if (tokens.length != 4) { + throw new IOException("Wrong result file name:" + path); + } + return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + nf.format(seq); + } + + /** + * Make sure all files are moved. + * @param fs FileSystem + * @param stagingPath The stagind directory + * @return + * @throws java.io.IOException + */ + private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException { + FileStatus[] files = fs.listStatus(stagingPath); + if (files != null && files.length != 0) { + for (FileStatus eachFile: files) { + if (eachFile.isFile()) { + LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); + return false; + } else { + if (verifyAllFileMoved(fs, eachFile.getPath())) { + fs.delete(eachFile.getPath(), false); + } else { + return false; + } + } + } + } + + return true; + } + + /** + * This method sets a rename map which includes renamed staging directory to final output directory recursively. + * If there exists some data files, this delete it for duplicate data. + * + * + * @param fs + * @param stagingPath + * @param outputPath + * @param stagingParentPathString + * @throws java.io.IOException + */ + private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath, + String stagingParentPathString, + Map<Path, Path> renameDirs, Path oldTableDir) throws IOException { + FileStatus[] files = fs.listStatus(stagingPath); + + for(FileStatus eachFile : files) { + if (eachFile.isDirectory()) { + Path oldPath = eachFile.getPath(); + + // Make recover directory. + String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString, + oldTableDir.toString()); + Path recoveryPath = new Path(recoverPathString); + if (!fs.exists(recoveryPath)) { + fs.mkdirs(recoveryPath); + } + + visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString, + renameDirs, oldTableDir); + // Find last order partition for renaming + String newPathString = oldPath.toString().replaceAll(stagingParentPathString, + outputPath.toString()); + Path newPath = new Path(newPathString); + if (!isLeafDirectory(fs, eachFile.getPath())) { + renameDirs.put(eachFile.getPath(), newPath); + } else { + if (!fs.exists(newPath)) { + fs.mkdirs(newPath); + } + } + } + } + } + + private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException { + boolean retValue = false; + + FileStatus[] files = fs.listStatus(path); + for (FileStatus file : files) { + if (fs.isDirectory(file.getPath())) { + retValue = true; + break; + } + } + + return retValue; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml index 47d11c7,0000000..67033ed mode 100644,000000..100644 --- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml @@@ -1,184 -1,0 +1,198 @@@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<configuration> + <!-- Storage Manager Configuration --> + <property> + <name>tajo.storage.manager.hdfs.class</name> + <value>org.apache.tajo.storage.FileStorageManager</value> + </property> + <property> + <name>tajo.storage.manager.hbase.class</name> + <value>org.apache.tajo.storage.hbase.HBaseStorageManager</value> + </property> + + <property> + <name>tajo.storage.manager.concurrency.perDisk</name> + <value>1</value> + <description></description> + </property> + + <!--- Registered Scanner Handler --> + <property> + <name>tajo.storage.scanner-handler</name> - <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro,hbase</value> ++ <value>textfile,csv,json,raw,rcfile,row,parquet,sequencefile,avro,hbase</value> + </property> + + <!--- Fragment Class Configurations --> + <property> + <name>tajo.storage.fragment.textfile.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.csv.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> ++ <name>tajo.storage.fragment.json.class</name> ++ <value>org.apache.tajo.storage.fragment.FileFragment</value> ++ </property> ++ <property> + <name>tajo.storage.fragment.raw.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.rcfile.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.row.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.parquet.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.sequencefile.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.avro.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> + <name>tajo.storage.fragment.hbase.class</name> + <value>org.apache.tajo.storage.hbase.HBaseFragment</value> + </property> + + <!--- Scanner Handler --> + <property> + <name>tajo.storage.scanner-handler.textfile.class</name> + <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.csv.class</name> + <value>org.apache.tajo.storage.CSVFile$CSVScanner</value> + </property> + + <property> ++ <name>tajo.storage.scanner-handler.json.class</name> ++ <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value> ++ </property> ++ ++ <property> + <name>tajo.storage.scanner-handler.raw.class</name> + <value>org.apache.tajo.storage.RawFile$RawFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.rcfile.class</name> + <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.rowfile.class</name> + <value>org.apache.tajo.storage.RowFile$RowFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.parquet.class</name> + <value>org.apache.tajo.storage.parquet.ParquetScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.sequencefile.class</name> + <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.avro.class</name> + <value>org.apache.tajo.storage.avro.AvroScanner</value> + </property> + + <property> + <name>tajo.storage.scanner-handler.hbase.class</name> + <value>org.apache.tajo.storage.hbase.HBaseScanner</value> + </property> + + <!--- Appender Handler --> + <property> + <name>tajo.storage.appender-handler</name> + <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro,hbase</value> + </property> + + <property> + <name>tajo.storage.appender-handler.textfile.class</name> + <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.csv.class</name> + <value>org.apache.tajo.storage.CSVFile$CSVAppender</value> + </property> + + <property> ++ <name>tajo.storage.appender-handler.json.class</name> ++ <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value> ++ </property> ++ ++ <property> + <name>tajo.storage.appender-handler.raw.class</name> + <value>org.apache.tajo.storage.RawFile$RawFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.rcfile.class</name> + <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.rowfile.class</name> + <value>org.apache.tajo.storage.RowFile$RowFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.parquet.class</name> + <value>org.apache.tajo.storage.parquet.ParquetAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.sequencefile.class</name> + <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.avro.class</name> + <value>org.apache.tajo.storage.avro.AvroAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.hbase.class</name> + <value>org.apache.tajo.storage.hbase.HFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.hfile.class</name> + <value>org.apache.tajo.storage.hbase.HFileAppender</value> + </property> +</configuration> http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/pom.xml ---------------------------------------------------------------------- diff --cc tajo-storage/tajo-storage-hdfs/pom.xml index 5105ac5,0000000..ef8e9c2 mode 100644,000000..100644 --- a/tajo-storage/tajo-storage-hdfs/pom.xml +++ b/tajo-storage/tajo-storage-hdfs/pom.xml @@@ -1,380 -1,0 +1,385 @@@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Copyright 2012 Database Lab., Korea Univ. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>tajo-project</artifactId> + <groupId>org.apache.tajo</groupId> + <version>0.9.1-SNAPSHOT</version> + <relativePath>../../tajo-project</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>tajo-storage-hdfs</artifactId> + <packaging>jar</packaging> + <name>Tajo HDFS Storage</name> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + <parquet.version>1.5.0</parquet.version> + <parquet.format.version>2.1.0</parquet.format.version> + </properties> + + <repositories> + <repository> + <id>repository.jboss.org</id> + <url>https://repository.jboss.org/nexus/content/repositories/releases/ + </url> + <snapshots> + <enabled>false</enabled> + </snapshots> + </repository> + </repositories> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.6</source> + <target>1.6</target> + <encoding>${project.build.sourceEncoding}</encoding> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <executions> + <execution> + <phase>verify</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + <configuration> + <excludes> - <exclude>src/test/resources/testVariousTypes.avsc</exclude> ++ <exclude>src/test/resources/dataset/**</exclude> + </excludes> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemProperties> + <tajo.test>TRUE</tajo.test> + </systemProperties> + <argLine>-Xms512m -Xmx1024m -XX:MaxPermSize=128m -Dfile.encoding=UTF-8</argLine> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.4</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <id>create-protobuf-generated-sources-directory</id> + <phase>initialize</phase> + <configuration> + <target> + <mkdir dir="target/generated-sources/proto" /> + </target> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>1.2</version> + <executions> + <execution> + <id>generate-sources</id> + <phase>generate-sources</phase> + <configuration> + <executable>protoc</executable> + <arguments> + <argument>-Isrc/main/proto/</argument> + <argument>--proto_path=../../tajo-common/src/main/proto</argument> + <argument>--proto_path=../../tajo-catalog/tajo-catalog-common/src/main/proto</argument> + <argument>--java_out=target/generated-sources/proto</argument> + <argument>src/main/proto/StorageFragmentProtos.proto</argument> + </arguments> + </configuration> + <goals> + <goal>exec</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.5</version> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>target/generated-sources/proto</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-report-plugin</artifactId> + </plugin> + </plugins> + </build> + + + <dependencies> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-catalog-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-plan</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-storage-common</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>trevni-core</artifactId> + <version>1.7.3</version> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>trevni-avro</artifactId> + <version>1.7.3</version> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + <exclusions> + <exclusion> + <artifactId>zookeeper</artifactId> + <groupId>org.apache.zookeeper</groupId> + </exclusion> + <exclusion> + <artifactId>slf4j-api</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>jersey-json</artifactId> + <groupId>com.sun.jersey</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>commons-el</groupId> + <artifactId>commons-el</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-runtime</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-2.1-jetty</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey.jersey-test-framework</groupId> + <artifactId>jersey-test-framework-grizzly2</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>commons-el</groupId> + <artifactId>commons-el</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-runtime</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-2.1-jetty</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey.jersey-test-framework</groupId> + <artifactId>jersey-test-framework-grizzly2</artifactId> + </exclusion> + <exclusion> + <artifactId>hadoop-yarn-server-tests</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + <exclusion> + <artifactId>hadoop-mapreduce-client-jobclient</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + <exclusion> + <artifactId>hadoop-mapreduce-client-app</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + <exclusion> + <artifactId>hadoop-yarn-api</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + <exclusion> + <artifactId>hadoop-mapreduce-client-hs</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + <exclusion> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>parquet-column</artifactId> + <version>${parquet.version}</version> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>parquet-hadoop</artifactId> + <version>${parquet.version}</version> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>parquet-format</artifactId> + <version>${parquet.format.version}</version> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-buffer</artifactId> + </dependency> ++ <dependency> ++ <groupId>net.minidev</groupId> ++ <artifactId>json-smart</artifactId> ++ <version>2.0</version> ++ </dependency> + </dependencies> + + <profiles> + <profile> + <id>docs</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <executions> + <execution> + <!-- build javadoc jars per jar for publishing to maven --> + <id>module-javadocs</id> + <phase>package</phase> + <goals> + <goal>jar</goal> + </goals> + <configuration> + <destDir>${project.build.directory}</destDir> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + + <reporting> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-report-plugin</artifactId> + <version>2.15</version> + </plugin> + </plugins> + </reporting> +</project>
