http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-common/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/pom.xml b/tajo-storage/tajo-storage-common/pom.xml index f7c9676..f1d3438 100644 --- a/tajo-storage/tajo-storage-common/pom.xml +++ b/tajo-storage/tajo-storage-common/pom.xml @@ -58,6 +58,12 @@ limitations under the License. <plugin> <groupId>org.apache.rat</groupId> <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes> + <exclude>src/main/resources/*.json</exclude> + <exclude>src/test/resources/*.json</exclude> + </excludes> + </configuration> <executions> <execution> <phase>verify</phase> @@ -293,6 +299,10 @@ limitations under the License. <groupId>io.netty</groupId> <artifactId>netty-buffer</artifactId> </dependency> + <dependency> + <groupId>net.minidev</groupId> + <artifactId>json-smart</artifactId> + </dependency> </dependencies> <profiles> @@ -334,4 +344,4 @@ limitations under the License. </plugin> </plugins> </reporting> -</project> \ No newline at end of file +</project>
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FormatProperty.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FormatProperty.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FormatProperty.java new file mode 100644 index 0000000..0f0cd10 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FormatProperty.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +public class FormatProperty { + private boolean sortedInsertRequired; + + public FormatProperty(boolean sortedInsertRequired) { + this.sortedInsertRequired = sortedInsertRequired; + } + + public boolean sortedInsertRequired() { + return sortedInsertRequired; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java index a8926a0..ce573be 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java @@ -64,7 +64,7 @@ public class MergeScanner implements Scanner { long numBytes = 0; for (Fragment eachFileFragment: rawFragmentList) { - long fragmentLength = Tablespace.getFragmentLength((TajoConf) conf, eachFileFragment); + long fragmentLength = TableSpaceManager.guessFragmentVolume((TajoConf) conf, eachFileFragment); if (fragmentLength > 0) { numBytes += fragmentLength; fragments.add(eachFileFragment); @@ -131,8 +131,7 @@ public class MergeScanner implements Scanner { private Scanner getNextScanner() throws IOException { if (iterator.hasNext()) { currentFragment = iterator.next(); - currentScanner = TableSpaceManager.getStorageManager((TajoConf) conf, meta.getStoreType()).getScanner(meta, schema, - currentFragment, target); + currentScanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, currentFragment, target); currentScanner.init(); return currentScanner; } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java new file mode 100644 index 0000000..12b236f --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.util.FileUtil; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.net.URI; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * It handles available table spaces and cache TableSpace instances. + */ +public class OldStorageManager { + private static final Log LOG = LogFactory.getLog(OldStorageManager.class); + + /** + * Cache of scanner handlers for each storage type. + */ + protected static final Map<String, Class<? extends Scanner>> SCANNER_HANDLER_CACHE + = new ConcurrentHashMap<String, Class<? extends Scanner>>(); + /** + * Cache of appender handlers for each storage type. + */ + protected static final Map<String, Class<? extends Appender>> APPENDER_HANDLER_CACHE + = new ConcurrentHashMap<String, Class<? extends Appender>>(); + private static final Class<?>[] DEFAULT_SCANNER_PARAMS = { + Configuration.class, + Schema.class, + TableMeta.class, + Fragment.class + }; + private static final Class<?>[] DEFAULT_APPENDER_PARAMS = { + Configuration.class, + TaskAttemptId.class, + Schema.class, + TableMeta.class, + Path.class + }; + /** + * Cache of Tablespace. + * Key is manager key(warehouse path) + store type + */ + private static final Map<String, Tablespace> storageManagers = Maps.newHashMap(); + /** + * Cache of constructors for each class. Pins the classes so they + * can't be garbage collected until ReflectionUtils can be collected. + */ + protected static Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap(); + + /** + * Clear all class cache + */ + @VisibleForTesting + protected synchronized static void clearCache() { + CONSTRUCTOR_CACHE.clear(); + SCANNER_HANDLER_CACHE.clear(); + APPENDER_HANDLER_CACHE.clear(); + storageManagers.clear(); + } + + /** + * Close Tablespace + * @throws java.io.IOException + */ + public static void shutdown() throws IOException { + synchronized(storageManagers) { + for (Tablespace eachTablespace : storageManagers.values()) { + eachTablespace.close(); + } + } + clearCache(); + } + + /** + * Returns the proper Tablespace instance according to the storeType. + * + * @param tajoConf Tajo system property. + * @param storeType Storage type + * @return + * @throws IOException + */ + public static Tablespace getStorageManager(TajoConf tajoConf, String storeType) throws IOException { + FileSystem fileSystem = TajoConf.getWarehouseDir(tajoConf).getFileSystem(tajoConf); + if (fileSystem != null) { + return getStorageManager(tajoConf, fileSystem.getUri(), storeType); + } else { + return getStorageManager(tajoConf, null, storeType); + } + } + + /** + * Returns the proper Tablespace instance according to the storeType + * + * @param tajoConf Tajo system property. + * @param uri Key that can identify each storage manager(may be a path) + * @param storeType Storage type + * @return + * @throws IOException + */ + public static synchronized Tablespace getStorageManager( + TajoConf tajoConf, URI uri, String storeType) throws IOException { + Preconditions.checkNotNull(tajoConf); + Preconditions.checkNotNull(uri); + Preconditions.checkNotNull(storeType); + + String typeName; + if (storeType.equalsIgnoreCase("HBASE")) { + typeName = "hbase"; + } else { + typeName = "hdfs"; + } + + synchronized (storageManagers) { + String storeKey = typeName + "_" + uri.toString(); + Tablespace manager = storageManagers.get(storeKey); + + if (manager == null) { + Class<? extends Tablespace> storageManagerClass = + tajoConf.getClass(String.format("tajo.storage.manager.%s.class", typeName), null, Tablespace.class); + + if (storageManagerClass == null) { + throw new IOException("Unknown Storage Type: " + typeName); + } + + try { + Constructor<? extends Tablespace> constructor = + (Constructor<? extends Tablespace>) CONSTRUCTOR_CACHE.get(storageManagerClass); + if (constructor == null) { + constructor = storageManagerClass.getDeclaredConstructor(TableSpaceManager.TABLESPACE_PARAM); + constructor.setAccessible(true); + CONSTRUCTOR_CACHE.put(storageManagerClass, constructor); + } + manager = constructor.newInstance(new Object[]{"noname", uri}); + } catch (Exception e) { + throw new RuntimeException(e); + } + manager.init(tajoConf); + storageManagers.put(storeKey, manager); + } + + return manager; + } + } + + /** + * Returns Scanner instance. + * + * @param conf The system property + * @param meta The table meta + * @param schema The input schema + * @param fragment The fragment for scanning + * @param target The output schema + * @return Scanner instance + * @throws IOException + */ + public static synchronized SeekableScanner getSeekableScanner( + TajoConf conf, TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException { + return (SeekableScanner)getStorageManager(conf, meta.getStoreType()).getScanner(meta, schema, fragment, target); + } + + /** + * Creates a scanner instance. + * + * @param theClass Concrete class of scanner + * @param conf System property + * @param schema Input schema + * @param meta Table meta data + * @param fragment The fragment for scanning + * @param <T> + * @return The scanner instance + */ + public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, Schema schema, TableMeta meta, + Fragment fragment) { + T result; + try { + Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass); + if (meth == null) { + meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS); + meth.setAccessible(true); + CONSTRUCTOR_CACHE.put(theClass, meth); + } + result = meth.newInstance(new Object[]{conf, schema, meta, fragment}); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return result; + } + + /** + * Creates a scanner instance. + * + * @param theClass Concrete class of scanner + * @param conf System property + * @param taskAttemptId Task id + * @param meta Table meta data + * @param schema Input schema + * @param workDir Working directory + * @param <T> + * @return The scanner instance + */ + public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TaskAttemptId taskAttemptId, + TableMeta meta, Schema schema, Path workDir) { + T result; + try { + Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass); + if (meth == null) { + meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS); + meth.setAccessible(true); + CONSTRUCTOR_CACHE.put(theClass, meth); + } + result = meth.newInstance(new Object[]{conf, taskAttemptId, schema, meta, workDir}); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return result; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java index 6816d08..38d0734 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java @@ -19,22 +19,51 @@ package org.apache.tajo.storage; public class StorageProperty { - private boolean supportsInsertInto; - private boolean sortedInsert; + private boolean movable; + private boolean writable; + private boolean insertable; + private boolean absolutePathAllowed; - public boolean isSupportsInsertInto() { - return supportsInsertInto; + public StorageProperty(boolean movable, boolean writable, boolean isInsertable, boolean absolutePathAllowed) { + this.movable = movable; + this.writable = writable; + this.insertable = isInsertable; + this.absolutePathAllowed = absolutePathAllowed; } - public void setSupportsInsertInto(boolean supportsInsertInto) { - this.supportsInsertInto = supportsInsertInto; + /** + * Move-like operation is allowed + * + * @return true if move operation is available + */ + public boolean isMovable() { + return movable; } - public boolean isSortedInsert() { - return sortedInsert; + /** + * Is it Writable storage? + * + * @return true if this storage is writable. + */ + public boolean isWritable() { + return writable; } - public void setSortedInsert(boolean sortedInsert) { - this.sortedInsert = sortedInsert; + /** + * this storage supports insert operation? + * + * @return true if insert operation is allowed. + */ + public boolean isInsertable() { + return insertable; + } + + /** + * Does this storage allows the use of arbitrary absolute paths outside tablespace? + * + * @return + */ + public boolean isArbitraryPathAllowed() { + return this.absolutePathAllowed; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java index 926b5d3..20a5d5c 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java @@ -68,7 +68,7 @@ public class StorageUtil extends StorageConstants { return 0; } } - + public static Path concatPath(String parent, String...childs) { return concatPath(new Path(parent), childs); } @@ -82,7 +82,7 @@ public class StorageUtil extends StorageConstants { sb.append("/"); } - return new Path(parent, sb.toString()); + return new Path(parent + "/" + sb.toString()); } static final String fileNamePatternV08 = "part-[0-9]*-[0-9]*"; http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java index a787cdb..ef04509 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java @@ -19,236 +19,372 @@ package org.apache.tajo.storage; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; +import net.minidev.json.JSONObject; +import net.minidev.json.parser.JSONParser; +import net.minidev.json.parser.ParseException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; -import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.TajoConstants; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.Pair; +import javax.annotation.Nullable; import java.io.IOException; import java.lang.reflect.Constructor; +import java.net.URI; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.TreeMap; +import java.util.UUID; + +import static org.apache.tajo.storage.StorageConstants.LOCAL_FS_URI; /** * It handles available table spaces and cache TableSpace instances. + * + * Default tablespace must be a filesystem-based one. + * HDFS and S3 can be a default tablespace if a Tajo cluster is in fully distributed mode. + * Local file system can be a default tablespace if a Tajo cluster runs on a single machine. */ -public class TableSpaceManager { +public class TableSpaceManager implements StorageService { + private static final Log LOG = LogFactory.getLog(TableSpaceManager.class); - /** - * Cache of scanner handlers for each storage type. - */ - protected static final Map<String, Class<? extends Scanner>> SCANNER_HANDLER_CACHE - = new ConcurrentHashMap<String, Class<? extends Scanner>>(); - /** - * Cache of appender handlers for each storage type. - */ - protected static final Map<String, Class<? extends Appender>> APPENDER_HANDLER_CACHE - = new ConcurrentHashMap<String, Class<? extends Appender>>(); - private static final Class<?>[] DEFAULT_SCANNER_PARAMS = { - Configuration.class, - Schema.class, - TableMeta.class, - Fragment.class - }; - private static final Class<?>[] DEFAULT_APPENDER_PARAMS = { - Configuration.class, - TaskAttemptId.class, - Schema.class, - TableMeta.class, - Path.class - }; - /** - * Cache of Tablespace. - * Key is manager key(warehouse path) + store type - */ - private static final Map<String, Tablespace> storageManagers = Maps.newHashMap(); - /** - * Cache of constructors for each class. Pins the classes so they - * can't be garbage collected until ReflectionUtils can be collected. - */ - private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = - new ConcurrentHashMap<Class<?>, Constructor<?>>(); + public static final String DEFAULT_CONFIG_FILE = "storage-default.json"; + public static final String SITE_CONFIG_FILE = "storage-site.json"; + + /** default tablespace name */ + public static final String DEFAULT_TABLESPACE_NAME = "default"; + + private final static TajoConf systemConf = new TajoConf(); + private final static JSONParser parser = new JSONParser(JSONParser.MODE_JSON_SIMPLE | JSONParser.IGNORE_CONTROL_CHAR); + + // The relation ship among name, URI, Tablespaces must be kept 1:1:1. + protected static final Map<String, URI> SPACES_URIS_MAP = Maps.newHashMap(); + protected static final TreeMap<URI, Tablespace> TABLE_SPACES = Maps.newTreeMap(); + protected static final Map<Class<?>, Constructor<?>> CONSTRUCTORS = Maps.newHashMap(); + protected static final Map<String, Class<? extends Tablespace>> TABLE_SPACE_HANDLERS = Maps.newHashMap(); + + public static final Class [] TABLESPACE_PARAM = new Class [] {String.class, URI.class}; + + static { + instance = new TableSpaceManager(); + } /** - * Clear all class cache + * Singleton instance */ - @VisibleForTesting - protected synchronized static void clearCache() { - CONSTRUCTOR_CACHE.clear(); - SCANNER_HANDLER_CACHE.clear(); - APPENDER_HANDLER_CACHE.clear(); - storageManagers.clear(); + private static final TableSpaceManager instance; + + private TableSpaceManager() { + initForDefaultConfig(); // loading storage-default.json + initSiteConfig(); // storage-site.json will override the configs of storage-default.json + addWarehouseAsSpace(); // adding a warehouse directory for a default tablespace + addLocalFsTablespace(); // adding a tablespace using local file system by default } - /** - * Close Tablespace - * @throws java.io.IOException - */ - public static void shutdown() throws IOException { - synchronized(storageManagers) { - for (Tablespace eachTablespace : storageManagers.values()) { - eachTablespace.close(); - } + private void addWarehouseAsSpace() { + Path warehouseDir = TajoConf.getWarehouseDir(systemConf); + registerTableSpace(DEFAULT_TABLESPACE_NAME, warehouseDir.toUri(), null, true, false); + } + + private void addLocalFsTablespace() { + if (TABLE_SPACES.headMap(LOCAL_FS_URI, true).firstEntry() == null) { + String tmpName = UUID.randomUUID().toString(); + registerTableSpace(tmpName, LOCAL_FS_URI, null, false, false); } - clearCache(); } - /** - * Returns FileStorageManager instance. - * - * @param tajoConf Tajo system property. - * @return - * @throws IOException - */ - public static Tablespace getFileStorageManager(TajoConf tajoConf) throws IOException { - return getStorageManager(tajoConf, "CSV"); + public static TableSpaceManager getInstance() { + return instance; } - /** - * Returns the proper Tablespace instance according to the storeType. - * - * @param tajoConf Tajo system property. - * @param storeType Storage type - * @return - * @throws IOException - */ - public static Tablespace getStorageManager(TajoConf tajoConf, String storeType) throws IOException { - FileSystem fileSystem = TajoConf.getWarehouseDir(tajoConf).getFileSystem(tajoConf); - if (fileSystem != null) { - return getStorageManager(tajoConf, storeType, fileSystem.getUri().toString()); - } else { - return getStorageManager(tajoConf, storeType, null); + private void initForDefaultConfig() { + JSONObject json = loadFromConfig(DEFAULT_CONFIG_FILE); + if (json == null) { + throw new IllegalStateException("There is no " + SITE_CONFIG_FILE); } + applyConfig(json, false); } - /** - * Returns the proper Tablespace instance according to the storeType - * - * @param tajoConf Tajo system property. - * @param storeType Storage type - * @param managerKey Key that can identify each storage manager(may be a path) - * @return - * @throws IOException - */ - private static synchronized Tablespace getStorageManager ( - TajoConf tajoConf, String storeType, String managerKey) throws IOException { + private void initSiteConfig() { + JSONObject json = loadFromConfig(SITE_CONFIG_FILE); + + // if there is no storage-site.json file, nothing happen. + if (json != null) { + applyConfig(json, true); + } + } + + private JSONObject loadFromConfig(String fileName) { + String json; + try { + json = FileUtil.readTextFileFromResource(fileName); + } catch (IOException e) { + throw new RuntimeException(e); + } - String typeName; - if (storeType.equalsIgnoreCase("HBASE")) { - typeName = "hbase"; + if (json != null) { + return parseJson(json); } else { - typeName = "hdfs"; + return null; } + } + + private static JSONObject parseJson(String json) { + try { + return (JSONObject) parser.parse(json); + } catch (ParseException e) { + throw new RuntimeException(e); + } + } - synchronized (storageManagers) { - String storeKey = typeName + "_" + managerKey; - Tablespace manager = storageManagers.get(storeKey); + private void applyConfig(JSONObject json, boolean override) { + loadStorages(json); + loadTableSpaces(json, override); + } - if (manager == null) { - Class<? extends Tablespace> storageManagerClass = - tajoConf.getClass(String.format("tajo.storage.manager.%s.class", typeName), null, Tablespace.class); + private void loadStorages(JSONObject json) { + JSONObject spaces = (JSONObject) json.get(KEY_STORAGES); - if (storageManagerClass == null) { - throw new IOException("Unknown Storage Type: " + typeName); - } + if (spaces != null) { + Pair<String, Class<? extends Tablespace>> pair = null; + for (Map.Entry<String, Object> entry : spaces.entrySet()) { try { - Constructor<? extends Tablespace> constructor = - (Constructor<? extends Tablespace>) CONSTRUCTOR_CACHE.get(storageManagerClass); - if (constructor == null) { - constructor = storageManagerClass.getDeclaredConstructor(new Class<?>[]{String.class}); - constructor.setAccessible(true); - CONSTRUCTOR_CACHE.put(storageManagerClass, constructor); - } - manager = constructor.newInstance(new Object[]{storeType}); - } catch (Exception e) { - throw new RuntimeException(e); + pair = extractStorage(entry); + } catch (ClassNotFoundException e) { + LOG.warn(e); + continue; } - manager.init(tajoConf); - storageManagers.put(storeKey, manager); + + TABLE_SPACE_HANDLERS.put(pair.getFirst(), pair.getSecond()); + } + } + } + + private Pair<String, Class<? extends Tablespace>> extractStorage(Map.Entry<String, Object> entry) + throws ClassNotFoundException { + + String storageType = entry.getKey(); + JSONObject storageDesc = (JSONObject) entry.getValue(); + String handlerClass = (String) storageDesc.get(KEY_STORAGE_HANDLER); + + return new Pair<String, Class<? extends Tablespace>>( + storageType,(Class<? extends Tablespace>) Class.forName(handlerClass)); + } + + private void loadTableSpaces(JSONObject json, boolean override) { + JSONObject spaces = (JSONObject) json.get(KEY_SPACES); + + if (spaces != null) { + for (Map.Entry<String, Object> entry : spaces.entrySet()) { + AddTableSpace(entry.getKey(), (JSONObject) entry.getValue(), override); } + } + } - return manager; + public static void AddTableSpace(String spaceName, JSONObject spaceDesc, boolean override) { + boolean defaultSpace = Boolean.parseBoolean(spaceDesc.getAsString("default")); + URI spaceUri = URI.create(spaceDesc.getAsString("uri")); + + if (defaultSpace) { + registerTableSpace(DEFAULT_TABLESPACE_NAME, spaceUri, spaceDesc, true, override); } + registerTableSpace(spaceName, spaceUri, spaceDesc, true, override); } - /** - * Returns Scanner instance. - * - * @param conf The system property - * @param meta The table meta - * @param schema The input schema - * @param fragment The fragment for scanning - * @param target The output schema - * @return Scanner instance - * @throws IOException - */ - public static synchronized SeekableScanner getSeekableScanner( - TajoConf conf, TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException { - return (SeekableScanner)getStorageManager(conf, meta.getStoreType()).getScanner(meta, schema, fragment, target); + private static void registerTableSpace(String spaceName, URI uri, JSONObject spaceDesc, + boolean visible, boolean override) { + Tablespace tableSpace = initializeTableSpace(spaceName, uri, visible); + tableSpace.setVisible(visible); + + try { + tableSpace.init(systemConf); + } catch (IOException e) { + throw new RuntimeException(e); + } + + putTablespace(tableSpace, override); + + // If the arbitrary path is allowed, root uri is also added as a tablespace + if (tableSpace.getProperty().isArbitraryPathAllowed()) { + URI rootUri = tableSpace.getRootUri(); + // if there already exists or the rootUri is 'file:/', it won't overwrite the tablespace. + if (!TABLE_SPACES.containsKey(rootUri) && !rootUri.toString().startsWith(LOCAL_FS_URI.toString())) { + String tmpName = UUID.randomUUID().toString(); + registerTableSpace(tmpName, rootUri, spaceDesc, false, override); + } + } + } + + private static void putTablespace(Tablespace space, boolean override) { + // It is a device to keep the relationship among name, URI, and tablespace 1:1:1. + + boolean nameExist = SPACES_URIS_MAP.containsKey(space.getName()); + boolean uriExist = TABLE_SPACES.containsKey(space.uri); + + boolean mismatch = nameExist && !SPACES_URIS_MAP.get(space.getName()).equals(space.getUri()); + mismatch = mismatch || uriExist && TABLE_SPACES.get(space.uri).equals(space); + + if (!override && mismatch) { + throw new RuntimeException("Name or URI of Tablespace must be unique."); + } + + SPACES_URIS_MAP.put(space.getName(), space.getUri()); + // We must guarantee that the same uri results in the same tablespace instance. + TABLE_SPACES.put(space.getUri(), space); } /** - * Creates a scanner instance. + * Return length of the fragment. + * In the UNKNOWN_LENGTH case get FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH from the configuration. * - * @param theClass Concrete class of scanner - * @param conf System property - * @param schema Input schema - * @param meta Table meta data - * @param fragment The fragment for scanning - * @param <T> - * @return The scanner instance + * @param conf Tajo system property + * @param fragment Fragment + * @return */ - public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, Schema schema, TableMeta meta, - Fragment fragment) { - T result; + public static long guessFragmentVolume(TajoConf conf, Fragment fragment) { + if (fragment.getLength() == TajoConstants.UNKNOWN_LENGTH) { + return conf.getLongVar(TajoConf.ConfVars.FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH); + } else { + return fragment.getLength(); + } + } + + public static final String KEY_STORAGES = "storages"; // storages + public static final String KEY_STORAGE_HANDLER = "handler"; // storages/?/handler + public static final String KEY_STORAGE_DEFAULT_FORMAT = "default-format"; // storages/?/default-format + + public static final String KEY_SPACES = "spaces"; + + private static Tablespace initializeTableSpace(String spaceName, URI uri, boolean visible) { + Preconditions.checkNotNull(uri.getScheme(), "URI must include scheme, but it was " + uri); + Class<? extends Tablespace> clazz = TABLE_SPACE_HANDLERS.get(uri.getScheme()); + + if (clazz == null) { + throw new RuntimeException("There is no tablespace for " + uri.toString()); + } + try { - Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass); - if (meth == null) { - meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS); - meth.setAccessible(true); - CONSTRUCTOR_CACHE.put(theClass, meth); + Constructor<? extends Tablespace> constructor = + (Constructor<? extends Tablespace>) CONSTRUCTORS.get(clazz); + + if (constructor == null) { + constructor = clazz.getDeclaredConstructor(TABLESPACE_PARAM); + constructor.setAccessible(true); + CONSTRUCTORS.put(clazz, constructor); } - result = meth.newInstance(new Object[]{conf, schema, meta, fragment}); + + return constructor.newInstance(new Object[]{spaceName, uri}); } catch (Exception e) { throw new RuntimeException(e); } + } + + @VisibleForTesting + public static Optional<Tablespace> addTableSpaceForTest(Tablespace space) { + Tablespace existing; + synchronized (SPACES_URIS_MAP) { + // Remove existing one + SPACES_URIS_MAP.remove(space.getName()); + existing = TABLE_SPACES.remove(space.getUri()); + + // Add anotherone for test + registerTableSpace(space.name, space.uri, null, true, true); + } + // if there is an existing one, return it. + return Optional.fromNullable(existing); + } - return result; + public Iterable<String> getSupportSchemes() { + return TABLE_SPACE_HANDLERS.keySet(); } /** - * Creates a scanner instance. + * Get tablespace for the given URI. If uri is null, the default tablespace will be returned * - * @param theClass Concrete class of scanner - * @param conf System property - * @param taskAttemptId Task id - * @param meta Table meta data - * @param schema Input schema - * @param workDir Working directory - * @param <T> - * @return The scanner instance + * @param uri Table or Table Fragment URI. + * @param <T> Tablespace class type + * @return Tablespace. If uri is null, the default tablespace will be returned. */ - public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TaskAttemptId taskAttemptId, - TableMeta meta, Schema schema, Path workDir) { - T result; - try { - Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass); - if (meth == null) { - meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS); - meth.setAccessible(true); - CONSTRUCTOR_CACHE.put(theClass, meth); + public static <T extends Tablespace> Optional<T> get(@Nullable String uri) { + + if (uri == null || uri.isEmpty()) { + return (Optional<T>) Optional.of(getDefault()); + } + + Tablespace lastOne = null; + + // Find the longest matched one. For example, assume that the caller tries to find /x/y/z, and + // there are /x and /x/y. In this case, /x/y will be chosen because it is more specific. + for (Map.Entry<URI, Tablespace> entry: TABLE_SPACES.headMap(URI.create(uri), true).entrySet()) { + if (uri.startsWith(entry.getKey().toString())) { + lastOne = entry.getValue(); } - result = meth.newInstance(new Object[]{conf, taskAttemptId, schema, meta, workDir}); - } catch (Exception e) { - throw new RuntimeException(e); } + return (Optional<T>) Optional.fromNullable(lastOne); + } + + /** + * Get tablespace for the given URI. If uri is null, the default tablespace will be returned + * + * @param uri Table or Table Fragment URI. + * @param <T> Tablespace class type + * @return Tablespace. If uri is null, the default tablespace will be returned. + */ + public static <T extends Tablespace> Optional<T> get(@Nullable URI uri) { + if (uri == null) { + return (Optional<T>) Optional.of(getDefault()); + } else { + return (Optional<T>) get(uri.toString()); + } + } + + /** + * It returns the default tablespace. This method ensures that it always return the tablespace. + * + * @return + */ + public static <T extends Tablespace> T getDefault() { + return (T) getByName(DEFAULT_TABLESPACE_NAME).get(); + } + + public static <T extends Tablespace> T getLocalFs() { + return (T) get(LOCAL_FS_URI).get(); + } + + public static Optional<? extends Tablespace> getByName(String name) { + URI uri = SPACES_URIS_MAP.get(name); + if (uri != null) { + return Optional.of(TABLE_SPACES.get(uri)); + } else { + return Optional.absent(); + } + } + + public static Optional<? extends Tablespace> getAnyByScheme(String scheme) { + for (Map.Entry<URI, Tablespace> entry : TABLE_SPACES.entrySet()) { + String uriScheme = entry.getKey().getScheme(); + if (uriScheme != null && uriScheme.equalsIgnoreCase(scheme)) { + return Optional.of(entry.getValue()); + } + } + + return Optional.absent(); + } + + @Override + public URI getTableURI(@Nullable String spaceName, String databaseName, String tableName) { + Tablespace space = spaceName == null ? getDefault() : getByName(spaceName).get(); + return space.getTableUri(databaseName, tableName); + } - return result; + public static Iterable<Tablespace> getAllTablespaces() { + return TABLE_SPACES.values(); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java index 0626da8..77c5d05 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java @@ -19,10 +19,8 @@ package org.apache.tajo.storage; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.OverridableConf; -import org.apache.tajo.TajoConstants; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SortSpec; @@ -30,16 +28,20 @@ import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.PlanningException; import org.apache.tajo.plan.logical.LogicalNode; import org.apache.tajo.plan.logical.ScanNode; -import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.fragment.FragmentConvertor; import java.io.IOException; +import java.net.URI; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Set; /** * Tablespace manages the functions of storing and reading data. @@ -49,18 +51,24 @@ import java.util.List; */ public abstract class Tablespace { - public static final PathFilter hiddenFileFilter = new PathFilter() { - public boolean accept(Path p) { - String name = p.getName(); - return !name.startsWith("_") && !name.startsWith("."); - } - }; + protected final String name; + protected final URI uri; + /** this space is visible or not. */ + protected boolean visible = true; protected TajoConf conf; - protected String storeType; - public Tablespace(String storeType) { - this.storeType = storeType; + public Tablespace(String name, URI uri) { + this.name = name; + this.uri = uri; + } + + public void setVisible(boolean visible) { + this.visible = visible; + } + + public Set<String> getDependencies() { + return Collections.emptySet(); } /** @@ -69,24 +77,47 @@ public abstract class Tablespace { */ protected abstract void storageInit() throws IOException; + public String getName() { + return name; + } + + public URI getUri() { + return uri; + } + + public boolean isVisible() { + return visible; + } + + public abstract void setConfig(String name, String value); + + public abstract void setConfigs(Map<String, String> configs); + + public String toString() { + return name + "=" + uri.toString(); + } + + public abstract long getTableVolume(URI uri) throws IOException; + /** - * This method is called after executing "CREATE TABLE" statement. - * If a storage is a file based storage, a storage manager may create directory. + * if {@link StorageProperty#isArbitraryPathAllowed} is true, + * the storage allows arbitrary path accesses. In this case, the storage must provide the root URI. * - * @param tableDesc Table description which is created. - * @param ifNotExists Creates the table only when the table does not exist. - * @throws java.io.IOException + * @see {@link StorageProperty#isArbitraryPathAllowed} + * @return Root URI */ - public abstract void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException; + public URI getRootUri() { + throw new UnsupportedException( + String.format("Tablespace '%s' does not allow the use of artibrary paths", uri.toString())); + } /** - * This method is called after executing "DROP TABLE" statement with the 'PURGE' option - * which is the option to delete all the data. + * Get Table URI * - * @param tableDesc - * @throws java.io.IOException + * @param tableName + * @return */ - public abstract void purgeTable(TableDesc tableDesc) throws IOException; + public abstract URI getTableUri(String databaseName, String tableName); /** * Returns the splits that will serve as input for the scan tasks. The @@ -116,7 +147,9 @@ public abstract class Tablespace { * It returns the storage property. * @return The storage property */ - public abstract StorageProperty getStorageProperty(); + public abstract StorageProperty getProperty(); + + public abstract FormatProperty getFormatProperty(String dataFormat); /** * Release storage manager resource @@ -137,21 +170,10 @@ public abstract class Tablespace { * @throws java.io.IOException */ public abstract TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc, - Schema inputSchema, SortSpec[] sortSpecs, + Schema inputSchema, SortSpec [] sortSpecs, TupleRange dataRange) throws IOException; /** - * This method is called before executing 'INSERT' or 'CREATE TABLE as SELECT'. - * In general Tajo creates the target table after finishing the final sub-query of CATS. - * But In the special cases, such as HBase INSERT or CAST query uses the target table information. - * That kind of the storage should implements the logic related to creating table in this method. - * - * @param node The child node of the root node. - * @throws java.io.IOException - */ - public abstract void beforeInsertOrCATS(LogicalNode node) throws IOException; - - /** * It is called when the query failed. * Each storage manager should implement to be processed when the query fails in this method. * @@ -160,21 +182,13 @@ public abstract class Tablespace { */ /** - * Returns the current storage type. - * @return - */ - public String getStoreType() { - return storeType; - } - - /** * Initialize Tablespace instance. It should be called before using. * * @param tajoConf * @throws java.io.IOException */ public void init(TajoConf tajoConf) throws IOException { - this.conf = tajoConf; + this.conf = new TajoConf(tajoConf); storageInit(); } @@ -239,7 +253,7 @@ public abstract class Tablespace { Scanner scanner; Class<? extends Scanner> scannerClass = getScannerClass(meta.getStoreType()); - scanner = TableSpaceManager.newScannerInstance(scannerClass, conf, schema, meta, fragment); + scanner = OldStorageManager.newScannerInstance(scannerClass, conf, schema, meta, fragment); scanner.setTarget(target.toArray()); return scanner; @@ -263,18 +277,18 @@ public abstract class Tablespace { Class<? extends Appender> appenderClass; String handlerName = meta.getStoreType().toLowerCase(); - appenderClass = TableSpaceManager.APPENDER_HANDLER_CACHE.get(handlerName); + appenderClass = OldStorageManager.APPENDER_HANDLER_CACHE.get(handlerName); if (appenderClass == null) { appenderClass = conf.getClass( String.format("tajo.storage.appender-handler.%s.class", handlerName), null, Appender.class); - TableSpaceManager.APPENDER_HANDLER_CACHE.put(handlerName, appenderClass); + OldStorageManager.APPENDER_HANDLER_CACHE.put(handlerName, appenderClass); } if (appenderClass == null) { throw new IOException("Unknown Storage Type: " + meta.getStoreType()); } - appender = TableSpaceManager.newAppenderInstance(appenderClass, conf, taskAttemptId, meta, schema, workDir); + appender = OldStorageManager.newAppenderInstance(appenderClass, conf, taskAttemptId, meta, schema, workDir); return appender; } @@ -288,11 +302,11 @@ public abstract class Tablespace { */ public Class<? extends Scanner> getScannerClass(String storeType) throws IOException { String handlerName = storeType.toLowerCase(); - Class<? extends Scanner> scannerClass = TableSpaceManager.SCANNER_HANDLER_CACHE.get(handlerName); + Class<? extends Scanner> scannerClass = OldStorageManager.SCANNER_HANDLER_CACHE.get(handlerName); if (scannerClass == null) { scannerClass = conf.getClass( String.format("tajo.storage.scanner-handler.%s.class", handlerName), null, Scanner.class); - TableSpaceManager.SCANNER_HANDLER_CACHE.put(handlerName, scannerClass); + OldStorageManager.SCANNER_HANDLER_CACHE.put(handlerName, scannerClass); } if (scannerClass == null) { @@ -303,43 +317,54 @@ public abstract class Tablespace { } /** - * Return length of the fragment. - * In the UNKNOWN_LENGTH case get FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH from the configuration. + * It is called after making logical plan. Storage manager should verify the schema for inserting. * - * @param conf Tajo system property - * @param fragment Fragment - * @return + * @param tableDesc The table description of insert target. + * @param outSchema The output schema of select query for inserting. + * @throws java.io.IOException */ - public static long getFragmentLength(TajoConf conf, Fragment fragment) { - if (fragment.getLength() == TajoConstants.UNKNOWN_LENGTH) { - return conf.getLongVar(ConfVars.FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH); - } else { - return fragment.getLength(); - } + public abstract void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) throws IOException; + + /** + * Rewrite the logical plan. It is assumed that the final plan will be given in this method. + */ + public void rewritePlan(OverridableConf context, LogicalPlan plan) throws PlanningException { + // nothing to do by default } - public abstract void rollbackOutputCommit(LogicalNode node) throws IOException; + //////////////////////////////////////////////////////////////////////////// + // Table Lifecycle Section + //////////////////////////////////////////////////////////////////////////// /** - * It is called after making logical plan. Storage manager should verify the schema for inserting. + * This method is called after executing "CREATE TABLE" statement. + * If a storage is a file based storage, a storage manager may create directory. * - * @param tableDesc The table description of insert target. - * @param outSchema The output schema of select query for inserting. + * @param tableDesc Table description which is created. + * @param ifNotExists Creates the table only when the table does not exist. * @throws java.io.IOException */ - public abstract void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException; + public abstract void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException; /** - * Returns the list of storage specified rewrite rules. - * This values are used by LogicalOptimizer. + * This method is called after executing "DROP TABLE" statement with the 'PURGE' option + * which is the option to delete all the data. * - * @param queryContext The query property - * @param tableDesc The description of the target table. - * @return The list of storage specified rewrite rules + * @param tableDesc * @throws java.io.IOException */ - public abstract List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) - throws IOException; + public abstract void purgeTable(TableDesc tableDesc) throws IOException; + + /** + * This method is called before executing 'INSERT' or 'CREATE TABLE as SELECT'. + * In general Tajo creates the target table after finishing the final sub-query of CATS. + * But In the special cases, such as HBase INSERT or CAST query uses the target table information. + * That kind of the storage should implements the logic related to creating table in this method. + * + * @param node The child node of the root node. + * @throws java.io.IOException + */ + public abstract void prepareTable(LogicalNode node) throws IOException; /** * Finalizes result data. Tajo stores result data in the staging directory. @@ -354,7 +379,20 @@ public abstract class Tablespace { * @return Saved path * @throws java.io.IOException */ - public abstract Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId, - LogicalPlan plan, Schema schema, - TableDesc tableDesc) throws IOException; + public abstract Path commitTable(OverridableConf queryContext, + ExecutionBlockId finalEbId, + LogicalPlan plan, Schema schema, + TableDesc tableDesc) throws IOException; + + public abstract void rollbackTable(LogicalNode node) throws IOException; + + @Override + public boolean equals(Object obj) { + if (obj instanceof Tablespace) { + Tablespace other = (Tablespace) obj; + return name.equals(other.name) && uri.equals(other.uri); + } else { + return false; + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json new file mode 100644 index 0000000..40e17f4 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json @@ -0,0 +1,20 @@ +{ + "storages": { + "hdfs": { + "handler": "org.apache.tajo.storage.FileTablespace", + "default-format": "text" + }, + "file": { + "handler": "org.apache.tajo.storage.FileTablespace", + "default-format": "text" + }, + "s3": { + "handler": "org.apache.tajo.storage.FileTablespace", + "default-format": "text" + }, + "hbase": { + "handler": "org.apache.tajo.storage.hbase.HBaseTablespace", + "default-format": "hbase" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/pom.xml b/tajo-storage/tajo-storage-hbase/pom.xml index 3456b76..5a1dc9a 100644 --- a/tajo-storage/tajo-storage-hbase/pom.xml +++ b/tajo-storage/tajo-storage-hbase/pom.xml @@ -61,6 +61,12 @@ <plugin> <groupId>org.apache.rat</groupId> <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes> + <exclude>src/main/resources/*.json</exclude> + <exclude>src/test/resources/*.json</exclude> + </excludes> + </configuration> <executions> <execution> <phase>verify</phase> @@ -182,6 +188,11 @@ <artifactId>tajo-storage-common</artifactId> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-storage-hdfs</artifactId> + <scope>provided</scope> + </dependency> <dependency> <groupId>org.apache.hadoop</groupId> http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java index 425f392..0fc2922 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java @@ -89,7 +89,7 @@ public abstract class AbstractHBaseAppender implements Appender { if (enabledStats) { stats = new TableStatistics(this.schema); } - columnMapping = new ColumnMapping(schema, meta); + columnMapping = new ColumnMapping(schema, meta.getOptions()); mappingColumnFamilies = columnMapping.getMappingColumns(); http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java deleted file mode 100644 index 32f1e43..0000000 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.hbase; - -import org.apache.tajo.OverridableConf; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.SortSpec; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.plan.LogicalPlan; -import org.apache.tajo.plan.PlanningException; -import org.apache.tajo.plan.logical.LogicalNode; -import org.apache.tajo.plan.logical.LogicalRootNode; -import org.apache.tajo.plan.logical.SortNode; -import org.apache.tajo.plan.logical.SortNode.SortPurpose; -import org.apache.tajo.plan.logical.UnaryNode; -import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; -import org.apache.tajo.plan.util.PlannerUtil; - -public class AddSortForInsertRewriter implements LogicalPlanRewriteRule { - private int[] sortColumnIndexes; - private Column[] sortColumns; - - public AddSortForInsertRewriter(TableDesc tableDesc, Column[] sortColumns) { - this.sortColumns = sortColumns; - this.sortColumnIndexes = new int[sortColumns.length]; - - Schema tableSchema = tableDesc.getSchema(); - for (int i = 0; i < sortColumns.length; i++) { - sortColumnIndexes[i] = tableSchema.getColumnId(sortColumns[i].getQualifiedName()); - } - } - - @Override - public String getName() { - return "AddSortForInsertRewriter"; - } - - @Override - public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) { - String storeType = PlannerUtil.getStoreType(plan); - return storeType != null; - } - - @Override - public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws PlanningException { - LogicalRootNode rootNode = plan.getRootBlock().getRoot(); - UnaryNode insertNode = rootNode.getChild(); - LogicalNode childNode = insertNode.getChild(); - - Schema sortSchema = childNode.getOutSchema(); - SortNode sortNode = plan.createNode(SortNode.class); - sortNode.setSortPurpose(SortPurpose.STORAGE_SPECIFIED); - sortNode.setInSchema(sortSchema); - sortNode.setOutSchema(sortSchema); - - SortSpec[] sortSpecs = new SortSpec[sortColumns.length]; - int index = 0; - - for (int i = 0; i < sortColumnIndexes.length; i++) { - Column sortColumn = sortSchema.getColumn(sortColumnIndexes[i]); - if (sortColumn == null) { - throw new PlanningException("Can't fine proper sort column:" + sortColumns[i]); - } - sortSpecs[index++] = new SortSpec(sortColumn, true, true); - } - sortNode.setSortSpecs(sortSpecs); - - sortNode.setChild(insertNode.getChild()); - insertNode.setChild(sortNode); - plan.getRootBlock().registerNode(sortNode); - - return plan; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java index e66a707..0314e8e 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java @@ -20,18 +20,18 @@ package org.apache.tajo.storage.hbase; import org.apache.hadoop.hbase.util.Bytes; import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.util.BytesUtils; +import org.apache.tajo.util.KeyValueSet; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class ColumnMapping { - private TableMeta tableMeta; private Schema schema; - private char rowKeyDelimiter; + private KeyValueSet tableProperty; + private char rowKeyDelimiter; private String hbaseTableName; private int[] rowKeyFieldIndexes; @@ -45,16 +45,15 @@ public class ColumnMapping { private int numRowKeys; - public ColumnMapping(Schema schema, TableMeta tableMeta) throws IOException { + public ColumnMapping(Schema schema, KeyValueSet tableProperty) throws IOException{ this.schema = schema; - this.tableMeta = tableMeta; - + this.tableProperty = tableProperty; init(); } public void init() throws IOException { - hbaseTableName = tableMeta.getOption(HBaseStorageConstants.META_TABLE_KEY); - String delim = tableMeta.getOption(HBaseStorageConstants.META_ROWKEY_DELIMITER, "").trim(); + hbaseTableName = tableProperty.get(HBaseStorageConstants.META_TABLE_KEY); + String delim = tableProperty.get(HBaseStorageConstants.META_ROWKEY_DELIMITER, "").trim(); if (delim.length() > 0) { rowKeyDelimiter = delim.charAt(0); } @@ -70,7 +69,7 @@ public class ColumnMapping { rowKeyFieldIndexes[i] = -1; } - String columnMapping = tableMeta.getOption(HBaseStorageConstants.META_COLUMNS_KEY, ""); + String columnMapping = tableProperty.get(HBaseStorageConstants.META_COLUMNS_KEY, ""); if (columnMapping == null || columnMapping.isEmpty()) { throw new IOException("'columns' property is required."); } http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java index 9ea0bf6..5961751 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java @@ -29,8 +29,12 @@ import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.hbase.StorageFragmentProtos.*; +import java.net.URI; + public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Cloneable { @Expose + private URI uri; + @Expose private String tableName; @Expose private String hbaseTableName; @@ -45,7 +49,9 @@ public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Clone @Expose private long length; - public HBaseFragment(String tableName, String hbaseTableName, byte[] startRow, byte[] stopRow, String regionLocation) { + public HBaseFragment(URI uri, String tableName, String hbaseTableName, byte[] startRow, byte[] stopRow, + String regionLocation) { + this.uri = uri; this.tableName = tableName; this.hbaseTableName = hbaseTableName; this.startRow = startRow; @@ -62,6 +68,7 @@ public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Clone } private void init(HBaseFragmentProto proto) { + this.uri = URI.create(proto.getUri()); this.tableName = proto.getTableName(); this.hbaseTableName = proto.getHbaseTableName(); this.startRow = proto.getStartRow().toByteArray(); @@ -76,6 +83,10 @@ public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Clone return Bytes.compareTo(startRow, t.startRow); } + public URI getUri() { + return uri; + } + @Override public String getTableName() { return tableName; @@ -107,6 +118,7 @@ public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Clone public Object clone() throws CloneNotSupportedException { HBaseFragment frag = (HBaseFragment) super.clone(); + frag.uri = uri; frag.tableName = tableName; frag.hbaseTableName = hbaseTableName; frag.startRow = startRow; @@ -137,16 +149,20 @@ public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Clone @Override public String toString() { - return "\"fragment\": {\"tableName\": \""+ tableName + "\", hbaseTableName\": \"" + hbaseTableName + "\"" + - ", \"startRow\": \"" + new String(startRow) + "\"" + - ", \"stopRow\": \"" + new String(stopRow) + "\"" + - ", \"length\": \"" + length + "\"}" ; + return + "\"fragment\": {\"uri:\"" + uri.toString() +"\", \"tableName\": \""+ tableName + + "\", hbaseTableName\": \"" + hbaseTableName + "\"" + + ", \"startRow\": \"" + new String(startRow) + "\"" + + ", \"stopRow\": \"" + new String(stopRow) + "\"" + + ", \"length\": \"" + length + "\"}" ; } @Override public FragmentProto getProto() { HBaseFragmentProto.Builder builder = HBaseFragmentProto.newBuilder(); - builder.setTableName(tableName) + builder + .setUri(uri.toString()) + .setTableName(tableName) .setHbaseTableName(hbaseTableName) .setStartRow(ByteString.copyFrom(startRow)) .setStopRow(ByteString.copyFrom(stopRow)) http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java index 19fdf80..916aae7 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java @@ -26,28 +26,29 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.conf.TajoConf; import org.apache.tajo.storage.TableSpaceManager; import org.apache.tajo.storage.Tuple; import java.io.IOException; +import java.net.URI; public class HBasePutAppender extends AbstractHBaseAppender { + private URI uri; private HTableInterface htable; private long totalNumBytes; - public HBasePutAppender(Configuration conf, TaskAttemptId taskAttemptId, + public HBasePutAppender(Configuration conf, URI uri, TaskAttemptId taskAttemptId, Schema schema, TableMeta meta, Path stagingDir) { super(conf, taskAttemptId, schema, meta, stagingDir); + this.uri = uri; } @Override public void init() throws IOException { super.init(); - Configuration hbaseConf = HBaseTablespace.getHBaseConfiguration(conf, meta); - HConnection hconn = ((HBaseTablespace) TableSpaceManager.getStorageManager((TajoConf) conf, "HBASE")) - .getConnection(hbaseConf); + HBaseTablespace space = (HBaseTablespace) TableSpaceManager.get(uri).get(); + HConnection hconn = space.getConnection(); htable = hconn.getTable(columnMapping.getHbaseTableName()); htable.setAutoFlushTo(false); htable.setWriteBufferSize(5 * 1024 * 1024); http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java index 992c13c..16f4c14 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java @@ -117,7 +117,7 @@ public class HBaseScanner implements Scanner { targets = schema.toArray(); } - columnMapping = new ColumnMapping(schema, meta); + columnMapping = new ColumnMapping(schema, meta.getOptions()); targetIndexes = new int[targets.length]; int index = 0; for (Column eachTargetColumn: targets) { @@ -133,8 +133,8 @@ public class HBaseScanner implements Scanner { rowKeyDelimiter = columnMapping.getRowKeyDelimiter(); rowKeyFieldIndexes = columnMapping.getRowKeyFieldIndexes(); - hbaseConf = HBaseTablespace.getHBaseConfiguration(conf, meta); - + HBaseTablespace space = (HBaseTablespace) TableSpaceManager.get(fragment.getUri()).get(); + hbaseConf = space.getHbaseConf(); initScanner(); } @@ -181,8 +181,7 @@ public class HBaseScanner implements Scanner { } if (htable == null) { - HConnection hconn = ((HBaseTablespace) TableSpaceManager.getStorageManager(conf, "HBASE")) - .getConnection(hbaseConf); + HConnection hconn = ((HBaseTablespace) TableSpaceManager.get(fragment.getUri()).get()).getConnection(); htable = hconn.getTable(fragment.getHbaseTableName()); } scanner = htable.getScanner(scan);
