http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java new file mode 100644 index 0000000..26af769 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java @@ -0,0 +1,390 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import net.minidev.json.JSONObject; +import net.minidev.json.parser.JSONParser; +import net.minidev.json.parser.ParseException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.Pair; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.net.URI; +import java.util.Map; +import java.util.TreeMap; +import java.util.UUID; + +import static org.apache.tajo.storage.StorageConstants.LOCAL_FS_URI; + +/** + * It handles available table spaces and cache TableSpace instances. + * + * Default tablespace must be a filesystem-based one. + * HDFS and S3 can be a default tablespace if a Tajo cluster is in fully distributed mode. + * Local file system can be a default tablespace if a Tajo cluster runs on a single machine. + */ +public class TablespaceManager implements StorageService { + private static final Log LOG = LogFactory.getLog(TablespaceManager.class); + + public static final String DEFAULT_CONFIG_FILE = "storage-default.json"; + public static final String SITE_CONFIG_FILE = "storage-site.json"; + + /** default tablespace name */ + public static final String DEFAULT_TABLESPACE_NAME = "default"; + + private final static TajoConf systemConf = new TajoConf(); + private final static JSONParser parser = new JSONParser(JSONParser.MODE_JSON_SIMPLE | JSONParser.IGNORE_CONTROL_CHAR); + + // The relation ship among name, URI, Tablespaces must be kept 1:1:1. + protected static final Map<String, URI> SPACES_URIS_MAP = Maps.newHashMap(); + protected static final TreeMap<URI, Tablespace> TABLE_SPACES = Maps.newTreeMap(); + + protected static final Map<Class<?>, Constructor<?>> CONSTRUCTORS = Maps.newHashMap(); + protected static final Map<String, Class<? extends Tablespace>> TABLE_SPACE_HANDLERS = Maps.newHashMap(); + + public static final Class [] TABLESPACE_PARAM = new Class [] {String.class, URI.class}; + + static { + instance = new TablespaceManager(); + } + /** + * Singleton instance + */ + private static final TablespaceManager instance; + + private TablespaceManager() { + initForDefaultConfig(); // loading storage-default.json + initSiteConfig(); // storage-site.json will override the configs of storage-default.json + addWarehouseAsSpace(); // adding a warehouse directory for a default tablespace + addLocalFsTablespace(); // adding a tablespace using local file system by default + } + + private void addWarehouseAsSpace() { + Path warehouseDir = TajoConf.getWarehouseDir(systemConf); + registerTableSpace(DEFAULT_TABLESPACE_NAME, warehouseDir.toUri(), null, true, false); + } + + private void addLocalFsTablespace() { + if (TABLE_SPACES.headMap(LOCAL_FS_URI, true).firstEntry() == null) { + String tmpName = UUID.randomUUID().toString(); + registerTableSpace(tmpName, LOCAL_FS_URI, null, false, false); + } + } + + public static TablespaceManager getInstance() { + return instance; + } + + private void initForDefaultConfig() { + JSONObject json = loadFromConfig(DEFAULT_CONFIG_FILE); + if (json == null) { + throw new IllegalStateException("There is no " + SITE_CONFIG_FILE); + } + applyConfig(json, false); + } + + private void initSiteConfig() { + JSONObject json = loadFromConfig(SITE_CONFIG_FILE); + + // if there is no storage-site.json file, nothing happen. + if (json != null) { + applyConfig(json, true); + } + } + + private JSONObject loadFromConfig(String fileName) { + String json; + try { + json = FileUtil.readTextFileFromResource(fileName); + } catch (IOException e) { + throw new RuntimeException(e); + } + + if (json != null) { + return parseJson(json); + } else { + return null; + } + } + + private static JSONObject parseJson(String json) { + try { + return (JSONObject) parser.parse(json); + } catch (ParseException e) { + throw new RuntimeException(e); + } + } + + private void applyConfig(JSONObject json, boolean override) { + loadStorages(json); + loadTableSpaces(json, override); + } + + private void loadStorages(JSONObject json) { + JSONObject spaces = (JSONObject) json.get(KEY_STORAGES); + + if (spaces != null) { + Pair<String, Class<? extends Tablespace>> pair = null; + for (Map.Entry<String, Object> entry : spaces.entrySet()) { + + try { + pair = extractStorage(entry); + } catch (ClassNotFoundException e) { + LOG.warn(e); + continue; + } + + TABLE_SPACE_HANDLERS.put(pair.getFirst(), pair.getSecond()); + } + } + } + + private Pair<String, Class<? extends Tablespace>> extractStorage(Map.Entry<String, Object> entry) + throws ClassNotFoundException { + + String storageType = entry.getKey(); + JSONObject storageDesc = (JSONObject) entry.getValue(); + String handlerClass = (String) storageDesc.get(KEY_STORAGE_HANDLER); + + return new Pair<String, Class<? extends Tablespace>>( + storageType,(Class<? extends Tablespace>) Class.forName(handlerClass)); + } + + private void loadTableSpaces(JSONObject json, boolean override) { + JSONObject spaces = (JSONObject) json.get(KEY_SPACES); + + if (spaces != null) { + for (Map.Entry<String, Object> entry : spaces.entrySet()) { + AddTableSpace(entry.getKey(), (JSONObject) entry.getValue(), override); + } + } + } + + public static void AddTableSpace(String spaceName, JSONObject spaceDesc, boolean override) { + boolean defaultSpace = Boolean.parseBoolean(spaceDesc.getAsString("default")); + URI spaceUri = URI.create(spaceDesc.getAsString("uri")); + + if (defaultSpace) { + registerTableSpace(DEFAULT_TABLESPACE_NAME, spaceUri, spaceDesc, true, override); + } + registerTableSpace(spaceName, spaceUri, spaceDesc, true, override); + } + + private static void registerTableSpace(String spaceName, URI uri, JSONObject spaceDesc, + boolean visible, boolean override) { + Tablespace tableSpace = initializeTableSpace(spaceName, uri, visible); + tableSpace.setVisible(visible); + + try { + tableSpace.init(systemConf); + } catch (IOException e) { + throw new RuntimeException(e); + } + + putTablespace(tableSpace, override); + + // If the arbitrary path is allowed, root uri is also added as a tablespace + if (tableSpace.getProperty().isArbitraryPathAllowed()) { + URI rootUri = tableSpace.getRootUri(); + // if there already exists or the rootUri is 'file:/', it won't overwrite the tablespace. + if (!TABLE_SPACES.containsKey(rootUri) && !rootUri.toString().startsWith(LOCAL_FS_URI.toString())) { + String tmpName = UUID.randomUUID().toString(); + registerTableSpace(tmpName, rootUri, spaceDesc, false, override); + } + } + } + + private static void putTablespace(Tablespace space, boolean override) { + // It is a device to keep the relationship among name, URI, and tablespace 1:1:1. + + boolean nameExist = SPACES_URIS_MAP.containsKey(space.getName()); + boolean uriExist = TABLE_SPACES.containsKey(space.uri); + + boolean mismatch = nameExist && !SPACES_URIS_MAP.get(space.getName()).equals(space.getUri()); + mismatch = mismatch || uriExist && TABLE_SPACES.get(space.uri).equals(space); + + if (!override && mismatch) { + throw new RuntimeException("Name or URI of Tablespace must be unique."); + } + + SPACES_URIS_MAP.put(space.getName(), space.getUri()); + // We must guarantee that the same uri results in the same tablespace instance. + TABLE_SPACES.put(space.getUri(), space); + } + + /** + * Return length of the fragment. + * In the UNKNOWN_LENGTH case get FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH from the configuration. + * + * @param conf Tajo system property + * @param fragment Fragment + * @return + */ + public static long guessFragmentVolume(TajoConf conf, Fragment fragment) { + if (fragment.getLength() == TajoConstants.UNKNOWN_LENGTH) { + return conf.getLongVar(TajoConf.ConfVars.FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH); + } else { + return fragment.getLength(); + } + } + + public static final String KEY_STORAGES = "storages"; // storages + public static final String KEY_STORAGE_HANDLER = "handler"; // storages/?/handler + public static final String KEY_STORAGE_DEFAULT_FORMAT = "default-format"; // storages/?/default-format + + public static final String KEY_SPACES = "spaces"; + + private static Tablespace initializeTableSpace(String spaceName, URI uri, boolean visible) { + Preconditions.checkNotNull(uri.getScheme(), "URI must include scheme, but it was " + uri); + Class<? extends Tablespace> clazz = TABLE_SPACE_HANDLERS.get(uri.getScheme()); + + if (clazz == null) { + throw new RuntimeException("There is no tablespace for " + uri.toString()); + } + + try { + Constructor<? extends Tablespace> constructor = + (Constructor<? extends Tablespace>) CONSTRUCTORS.get(clazz); + + if (constructor == null) { + constructor = clazz.getDeclaredConstructor(TABLESPACE_PARAM); + constructor.setAccessible(true); + CONSTRUCTORS.put(clazz, constructor); + } + + return constructor.newInstance(new Object[]{spaceName, uri}); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @VisibleForTesting + public static Optional<Tablespace> addTableSpaceForTest(Tablespace space) { + Tablespace existing; + synchronized (SPACES_URIS_MAP) { + // Remove existing one + SPACES_URIS_MAP.remove(space.getName()); + existing = TABLE_SPACES.remove(space.getUri()); + + // Add anotherone for test + registerTableSpace(space.name, space.uri, null, true, true); + } + // if there is an existing one, return it. + return Optional.fromNullable(existing); + } + + public Iterable<String> getSupportSchemes() { + return TABLE_SPACE_HANDLERS.keySet(); + } + + /** + * Get tablespace for the given URI. If uri is null, the default tablespace will be returned + * + * @param uri Table or Table Fragment URI. + * @param <T> Tablespace class type + * @return Tablespace. If uri is null, the default tablespace will be returned. + */ + public static <T extends Tablespace> Optional<T> get(@Nullable String uri) { + + if (uri == null || uri.isEmpty()) { + return (Optional<T>) Optional.of(getDefault()); + } + + Tablespace lastOne = null; + + // Find the longest matched one. For example, assume that the caller tries to find /x/y/z, and + // there are /x and /x/y. In this case, /x/y will be chosen because it is more specific. + for (Map.Entry<URI, Tablespace> entry: TABLE_SPACES.headMap(URI.create(uri), true).entrySet()) { + if (uri.startsWith(entry.getKey().toString())) { + lastOne = entry.getValue(); + } + } + return (Optional<T>) Optional.fromNullable(lastOne); + } + + /** + * Get tablespace for the given URI. If uri is null, the default tablespace will be returned + * + * @param uri Table or Table Fragment URI. + * @param <T> Tablespace class type + * @return Tablespace. If uri is null, the default tablespace will be returned. + */ + public static <T extends Tablespace> Optional<T> get(@Nullable URI uri) { + if (uri == null) { + return (Optional<T>) Optional.of(getDefault()); + } else { + return (Optional<T>) get(uri.toString()); + } + } + + /** + * It returns the default tablespace. This method ensures that it always return the tablespace. + * + * @return + */ + public static <T extends Tablespace> T getDefault() { + return (T) getByName(DEFAULT_TABLESPACE_NAME).get(); + } + + public static <T extends Tablespace> T getLocalFs() { + return (T) get(LOCAL_FS_URI).get(); + } + + public static Optional<? extends Tablespace> getByName(String name) { + URI uri = SPACES_URIS_MAP.get(name); + if (uri != null) { + return Optional.of(TABLE_SPACES.get(uri)); + } else { + return Optional.absent(); + } + } + + public static Optional<? extends Tablespace> getAnyByScheme(String scheme) { + for (Map.Entry<URI, Tablespace> entry : TABLE_SPACES.entrySet()) { + String uriScheme = entry.getKey().getScheme(); + if (uriScheme != null && uriScheme.equalsIgnoreCase(scheme)) { + return Optional.of(entry.getValue()); + } + } + + return Optional.absent(); + } + + @Override + public URI getTableURI(@Nullable String spaceName, String databaseName, String tableName) { + Tablespace space = spaceName == null ? getDefault() : getByName(spaceName).get(); + return space.getTableUri(databaseName, tableName); + } + + public static Iterable<Tablespace> getAllTablespaces() { + return TABLE_SPACES.values(); + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java index 916aae7..7943134 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java @@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.storage.TableSpaceManager; +import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.storage.Tuple; import java.io.IOException; @@ -47,7 +47,7 @@ public class HBasePutAppender extends AbstractHBaseAppender { public void init() throws IOException { super.init(); - HBaseTablespace space = (HBaseTablespace) TableSpaceManager.get(uri).get(); + HBaseTablespace space = (HBaseTablespace) TablespaceManager.get(uri).get(); HConnection hconn = space.getConnection(); htable = hconn.getTable(columnMapping.getHbaseTableName()); htable.setAutoFlushTo(false); http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java index 16f4c14..7369897 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java @@ -133,7 +133,7 @@ public class HBaseScanner implements Scanner { rowKeyDelimiter = columnMapping.getRowKeyDelimiter(); rowKeyFieldIndexes = columnMapping.getRowKeyFieldIndexes(); - HBaseTablespace space = (HBaseTablespace) TableSpaceManager.get(fragment.getUri()).get(); + HBaseTablespace space = (HBaseTablespace) TablespaceManager.get(fragment.getUri()).get(); hbaseConf = space.getHbaseConf(); initScanner(); } @@ -181,7 +181,7 @@ public class HBaseScanner implements Scanner { } if (htable == null) { - HConnection hconn = ((HBaseTablespace) TableSpaceManager.get(fragment.getUri()).get()).getConnection(); + HConnection hconn = ((HBaseTablespace) TablespaceManager.get(fragment.getUri()).get()).getConnection(); htable = hconn.getTable(fragment.getHbaseTableName()); } scanner = htable.getScanner(scan); http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java index 5fac0cf..18bb7ed 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java @@ -51,10 +51,7 @@ import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.util.Bytes; -import org.apache.tajo.util.BytesUtils; -import org.apache.tajo.util.Pair; -import org.apache.tajo.util.TUtil; +import org.apache.tajo.util.*; import java.io.BufferedReader; import java.io.IOException; @@ -68,9 +65,9 @@ import java.util.*; public class HBaseTablespace extends Tablespace { private static final Log LOG = LogFactory.getLog(HBaseTablespace.class); - public static final StorageProperty HBASE_STORAGE_PROPERTIES = new StorageProperty(false, true, true, false); - - public static final FormatProperty HFILE_FORMAT_PROPERTIES = new FormatProperty(true); + public static final StorageProperty HBASE_STORAGE_PROPERTIES = new StorageProperty("hbase", false, true, false); + public static final FormatProperty HFILE_FORMAT_PROPERTIES = new FormatProperty(true, false, true); + public static final FormatProperty PUT_MODE_PROPERTIES = new FormatProperty(true, true, false); private Configuration hbaseConf; @@ -572,6 +569,15 @@ public class HBaseTablespace extends Tablespace { } @Override + public Appender getAppenderForInsertRow(OverridableConf queryContext, + TaskAttemptId taskAttemptId, + TableMeta meta, + Schema schema, + Path workDir) throws IOException { + return new HBasePutAppender(conf, uri, taskAttemptId, schema, meta, workDir); + } + + @Override public Appender getAppender(OverridableConf queryContext, TaskAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir) throws IOException { @@ -1096,8 +1102,14 @@ public class HBaseTablespace extends Tablespace { } @Override - public FormatProperty getFormatProperty(String format) { - return HFILE_FORMAT_PROPERTIES; + public FormatProperty getFormatProperty(TableMeta meta) { + KeyValueSet tableProperty = meta.getOptions(); + if (tableProperty.isTrue(HBaseStorageConstants.INSERT_PUT_MODE) || + tableProperty.isTrue(StorageConstants.INSERT_DIRECTLY)) { + return PUT_MODE_PROPERTIES; + } else { + return HFILE_FORMAT_PROPERTIES; + } } public void prepareTable(LogicalNode node) throws IOException { @@ -1134,6 +1146,24 @@ public class HBaseTablespace extends Tablespace { } @Override + public URI getStagingUri(OverridableConf context, String queryId, TableMeta meta) throws IOException { + if (meta.getOptions().isTrue(HBaseStorageConstants.INSERT_PUT_MODE)) { + throw new IOException("Staging phase is not supported in this storage."); + } else { + return TablespaceManager.getDefault().getStagingUri(context, queryId, meta); + } + } + + public URI prepareStagingSpace(TajoConf conf, String queryId, OverridableConf context, + TableMeta meta) throws IOException { + if (!meta.getOptions().isTrue(HBaseStorageConstants.INSERT_PUT_MODE)) { + return TablespaceManager.getDefault().prepareStagingSpace(conf, queryId, context, meta); + } else { + throw new IOException("Staging phase is not supported in this storage."); + } + } + + @Override public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) throws IOException { if (tableDesc != null) { Schema tableSchema = tableDesc.getSchema(); http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java index f7cbb5a..f0c8f15 100644 --- a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java +++ b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java @@ -25,7 +25,7 @@ import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.TextDatum; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.logical.ScanNode; -import org.apache.tajo.storage.TableSpaceManager; +import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.util.Pair; import org.junit.BeforeClass; import org.junit.Test; @@ -45,7 +45,7 @@ public class TestHBaseTableSpace { String tableSpaceUri = "hbase:zk://host1:2171"; HBaseTablespace hBaseTablespace = new HBaseTablespace("cluster1", URI.create(tableSpaceUri)); hBaseTablespace.init(new TajoConf()); - TableSpaceManager.addTableSpaceForTest(hBaseTablespace); + TablespaceManager.addTableSpaceForTest(hBaseTablespace); } @Test @@ -58,8 +58,8 @@ public class TestHBaseTableSpace { @Test public void testTablespaceHandler() throws Exception { - assertTrue((TableSpaceManager.getByName("cluster1").get()) instanceof HBaseTablespace); - assertTrue((TableSpaceManager.get(URI.create("hbase:zk://host1:2171")).get()) + assertTrue((TablespaceManager.getByName("cluster1").get()) instanceof HBaseTablespace); + assertTrue((TablespaceManager.get(URI.create("hbase:zk://host1:2171")).get()) instanceof HBaseTablespace); } @@ -73,7 +73,7 @@ public class TestHBaseTableSpace { EvalNode evalNodeA = new BinaryEval(EvalType.AND, evalNode1, evalNode2); scanNode.setQual(evalNodeA); - HBaseTablespace storageManager = (HBaseTablespace) TableSpaceManager.getByName("cluster1").get(); + HBaseTablespace storageManager = (HBaseTablespace) TablespaceManager.getByName("cluster1").get(); List<Set<EvalNode>> indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn}); assertNotNull(indexEvals); assertEquals(1, indexEvals.size()); http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java index 081fa3f..efe2bfd 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java @@ -58,7 +58,7 @@ public abstract class FileAppender implements Appender { throw new IllegalArgumentException("Configuration must be an instance of TajoConf"); } - Optional<FileTablespace> spaceResult = TableSpaceManager.get(workDir.toUri()); + Optional<FileTablespace> spaceResult = TablespaceManager.get(workDir.toUri()); if (!spaceResult.isPresent()) { throw new IllegalStateException("No TableSpace for " + workDir.toUri()); http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index 2ce1f09..3b63012 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -19,14 +19,17 @@ package org.apache.tajo.storage; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.tajo.*; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.statistics.TableStats; @@ -92,8 +95,12 @@ public class FileTablespace extends Tablespace { } }; + private static final StorageProperty FileStorageProperties = new StorageProperty("TEXT", true, true, true); + private static final FormatProperty GeneralFileProperties = new FormatProperty(true, false, true); + protected FileSystem fs; - protected Path basePath; + protected Path spacePath; + protected Path stagingRootPath; protected boolean blocksMetadataEnabled; private static final HdfsVolumeId zeroVolumeId = new HdfsVolumeId(Bytes.toBytes(0)); @@ -103,8 +110,9 @@ public class FileTablespace extends Tablespace { @Override protected void storageInit() throws IOException { - this.basePath = new Path(uri); - this.fs = basePath.getFileSystem(conf); + this.spacePath = new Path(uri); + this.fs = spacePath.getFileSystem(conf); + this.stagingRootPath = fs.makeQualified(new Path(conf.getVar(TajoConf.ConfVars.STAGING_ROOT_DIR))); this.conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, fs.getUri().toString()); this.blocksMetadataEnabled = @@ -167,7 +175,7 @@ public class FileTablespace extends Tablespace { @Override public URI getTableUri(String databaseName, String tableName) { - return StorageUtil.concatPath(basePath, databaseName, tableName).toUri(); + return StorageUtil.concatPath(spacePath, databaseName, tableName).toUri(); } private String partitionPath = ""; @@ -192,12 +200,12 @@ public class FileTablespace extends Tablespace { } public FileFragment[] split(String tableName) throws IOException { - Path tablePath = new Path(basePath, tableName); + Path tablePath = new Path(spacePath, tableName); return split(tableName, tablePath, fs.getDefaultBlockSize()); } public FileFragment[] split(String tableName, long fragmentSize) throws IOException { - Path tablePath = new Path(basePath, tableName); + Path tablePath = new Path(spacePath, tableName); return split(tableName, tablePath, fragmentSize); } @@ -491,30 +499,6 @@ public class FileTablespace extends Tablespace { } /** - * Generate the map of host and make them into Volume Ids. - * - */ - private Map<String, Set<Integer>> getVolumeMap(List<FileFragment> frags) { - Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>(); - for (FileFragment frag : frags) { - String[] hosts = frag.getHosts(); - int[] diskIds = frag.getDiskIds(); - for (int i = 0; i < hosts.length; i++) { - Set<Integer> volumeList = volumeMap.get(hosts[i]); - if (volumeList == null) { - volumeList = new HashSet<Integer>(); - volumeMap.put(hosts[i], volumeList); - } - - if (diskIds.length > 0 && diskIds[i] > -1) { - volumeList.add(diskIds[i]); - } - } - } - - return volumeMap; - } - /** * Generate the list of files and make them into FileSplits. * * @throws IOException @@ -674,7 +658,7 @@ public class FileTablespace extends Tablespace { String simpleTableName = splitted[1]; // create a table directory (i.e., ${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} ) - Path tablePath = StorageUtil.concatPath(basePath, databaseName, simpleTableName); + Path tablePath = StorageUtil.concatPath(spacePath, databaseName, simpleTableName); tableDesc.setUri(tablePath.toUri()); } else { Preconditions.checkState(tableDesc.getUri() != null, "ERROR: LOCATION must be given."); @@ -851,22 +835,14 @@ public class FileTablespace extends Tablespace { } } - private static final StorageProperty FileStorageProperties = new StorageProperty(true, true, true, true); - private static final FormatProperty GeneralFileProperties = new FormatProperty(false); - private static final FormatProperty HFileProperties = new FormatProperty(true); - @Override public StorageProperty getProperty() { return FileStorageProperties; } @Override - public FormatProperty getFormatProperty(String format) { - if (format.equalsIgnoreCase("hbase")) { - return HFileProperties; - } else { - return GeneralFileProperties; - } + public FormatProperty getFormatProperty(TableMeta meta) { + return GeneralFileProperties; } @Override @@ -882,6 +858,84 @@ public class FileTablespace extends Tablespace { } @Override + public URI getStagingUri(OverridableConf context, String queryId, TableMeta meta) throws IOException { + String outputPath = context.get(QueryVars.OUTPUT_TABLE_URI, ""); + + Path stagingDir; + // The fact that there is no output means that this query is neither CTAS or INSERT (OVERWRITE) INTO + // So, this query results won't be materialized as a part of a table. + // The result will be temporarily written in the staging directory. + if (outputPath.isEmpty()) { + // for temporarily written in the storage directory + stagingDir = fs.makeQualified(new Path(stagingRootPath, queryId)); + } else { + Optional<Tablespace> spaceResult = TablespaceManager.get(outputPath); + if (!spaceResult.isPresent()) { + throw new IOException("No registered Tablespace for " + outputPath); + } + + Tablespace space = spaceResult.get(); + if (space.getProperty().isMovable()) { // checking if this tablespace allows MOVE operation + // If this space allows move operation, the staging directory will be underneath the final output table uri. + stagingDir = fs.makeQualified(StorageUtil.concatPath(outputPath, TMP_STAGING_DIR_PREFIX, queryId)); + } else { + stagingDir = fs.makeQualified(new Path(stagingRootPath, queryId)); + } + } + + return stagingDir.toUri(); + } + + // query submission directory is private! + final public static FsPermission STAGING_DIR_PERMISSION = FsPermission.createImmutable((short) 0700); // rwx-------- + public static final String TMP_STAGING_DIR_PREFIX = ".staging"; + + public URI prepareStagingSpace(TajoConf conf, String queryId, OverridableConf context, TableMeta meta) + throws IOException { + + String realUser; + String currentUser; + UserGroupInformation ugi; + ugi = UserGroupInformation.getLoginUser(); + realUser = ugi.getShortUserName(); + currentUser = UserGroupInformation.getCurrentUser().getShortUserName(); + + + Path stagingDir = new Path(getStagingUri(context, queryId, meta)); + + //////////////////////////////////////////// + // Create Output Directory + //////////////////////////////////////////// + + if (fs.exists(stagingDir)) { + throw new IOException("The staging directory '" + stagingDir + "' already exists"); + } + fs.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION)); + FileStatus fsStatus = fs.getFileStatus(stagingDir); + String owner = fsStatus.getOwner(); + + if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) { + throw new IOException("The ownership on the user's query " + + "directory " + stagingDir + " is not as expected. " + + "It is owned by " + owner + ". The directory must " + + "be owned by the submitter " + currentUser + " or " + + "by " + realUser); + } + + if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) { + LOG.info("Permissions on staging directory " + stagingDir + " are " + + "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " + + "to correct value " + STAGING_DIR_PERMISSION); + fs.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION)); + } + + Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + fs.mkdirs(stagingResultDir); + + return stagingDir.toUri(); + } + + @Override public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) throws IOException { } @@ -1257,4 +1311,6 @@ public class FileTablespace extends Tablespace { return retValue; } + + } http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java index bd5502d..1d32291 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java @@ -85,7 +85,7 @@ public class HashShuffleAppenderManager { fs.mkdirs(dataFile.getParent()); } - FileTablespace space = (FileTablespace) TableSpaceManager.get(dataFile.toUri()).get(); + FileTablespace space = (FileTablespace) TablespaceManager.get(dataFile.toUri()).get(); FileAppender appender = (FileAppender) space.getAppender(meta, outSchema, dataFile); appender.enableStats(); appender.init(); http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java index ab63d55..f50a20d 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java @@ -122,7 +122,7 @@ public class TestCompressionStorages { String fileName = "Compression_" + codec.getSimpleName(); Path tablePath = new Path(testDir, fileName); - Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(meta, schema, tablePath); + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(meta, schema, tablePath); appender.enableStats(); appender.init(); @@ -154,7 +154,7 @@ public class TestCompressionStorages { FileFragment[] tablets = new FileFragment[1]; tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen); - Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, tablets[0], schema); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, tablets[0], schema); if (storeType.equalsIgnoreCase("CSV")) { if (SplittableCompressionCodec.class.isAssignableFrom(codec)) { http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java index 2d919cd..ca5885c 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java @@ -103,7 +103,7 @@ public class TestDelimitedTextFile { TableMeta meta = CatalogUtil.newTableMeta("JSON"); meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "-1"); FileFragment fragment = getFileFragment("testErrorTolerance1.json"); - Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); scanner.init(); Tuple tuple; @@ -125,7 +125,7 @@ public class TestDelimitedTextFile { TableMeta meta = CatalogUtil.newTableMeta("JSON"); meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1"); FileFragment fragment = getFileFragment("testErrorTolerance1.json"); - Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); scanner.init(); assertNotNull(scanner.next()); @@ -147,7 +147,7 @@ public class TestDelimitedTextFile { TableMeta meta = CatalogUtil.newTableMeta("JSON"); meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "0"); FileFragment fragment = getFileFragment("testErrorTolerance2.json"); - Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); scanner.init(); try { @@ -166,7 +166,7 @@ public class TestDelimitedTextFile { TableMeta meta = CatalogUtil.newTableMeta("JSON"); meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1"); FileFragment fragment = getFileFragment("testErrorTolerance3.json"); - Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); scanner.init(); try { http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java index 9237e07..1119968 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java @@ -57,7 +57,7 @@ public class TestFileSystems { public TestFileSystems(FileSystem fs) throws IOException { this.fs = fs; this.conf = new TajoConf(fs.getConf()); - sm = TableSpaceManager.getLocalFs(); + sm = TablespaceManager.getLocalFs(); testDir = getTestDir(this.fs, TEST_PATH); } http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java index ec3e143..09b91ea 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java @@ -81,7 +81,7 @@ public class TestFileTablespace { Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv"); localFs.mkdirs(path.getParent()); - FileTablespace fileStorageManager = (FileTablespace) TableSpaceManager.getLocalFs(); + FileTablespace fileStorageManager = (FileTablespace) TablespaceManager.getLocalFs(); assertEquals(localFs.getUri(), fileStorageManager.getFileSystem().getUri()); Appender appender = fileStorageManager.getAppender(meta, schema, path); @@ -224,24 +224,24 @@ public class TestFileTablespace { Optional<Tablespace> existingTs = Optional.absent(); try { /* Local FileSystem */ - FileTablespace space = TableSpaceManager.getLocalFs(); + FileTablespace space = TablespaceManager.getLocalFs(); assertEquals(localFs.getUri(), space.getFileSystem().getUri()); FileTablespace distTablespace = new FileTablespace("testGetFileTablespace", uri); distTablespace.init(conf); - existingTs = TableSpaceManager.addTableSpaceForTest(distTablespace); + existingTs = TablespaceManager.addTableSpaceForTest(distTablespace); /* Distributed FileSystem */ - space = (FileTablespace) TableSpaceManager.get(uri).get(); + space = (FileTablespace) TablespaceManager.get(uri).get(); assertEquals(cluster.getFileSystem().getUri(), space.getFileSystem().getUri()); - space = (FileTablespace) TableSpaceManager.getByName("testGetFileTablespace").get(); + space = (FileTablespace) TablespaceManager.getByName("testGetFileTablespace").get(); assertEquals(cluster.getFileSystem().getUri(), space.getFileSystem().getUri()); } finally { if (existingTs.isPresent()) { - TableSpaceManager.addTableSpaceForTest(existingTs.get()); + TablespaceManager.addTableSpaceForTest(existingTs.get()); } cluster.shutdown(true); http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java index c13ce16..7410778 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java @@ -65,7 +65,7 @@ public class TestLineReader { TableMeta meta = CatalogUtil.newTableMeta("TEXT"); Path tablePath = new Path(testDir, "line.data"); - FileAppender appender = (FileAppender) TableSpaceManager.getLocalFs().getAppender( + FileAppender appender = (FileAppender) TablespaceManager.getLocalFs().getAppender( null, null, meta, schema, tablePath); appender.enableStats(); appender.init(); @@ -118,7 +118,7 @@ public class TestLineReader { meta.putOption("compression.codec", DeflateCodec.class.getCanonicalName()); Path tablePath = new Path(testDir, "testLineDelimitedReaderWithCompression." + DeflateCodec.class.getSimpleName()); - FileAppender appender = (FileAppender) (TableSpaceManager.getLocalFs()).getAppender( + FileAppender appender = (FileAppender) (TablespaceManager.getLocalFs()).getAppender( null, null, meta, schema, tablePath); appender.enableStats(); appender.init(); @@ -176,7 +176,7 @@ public class TestLineReader { TableMeta meta = CatalogUtil.newTableMeta("TEXT"); Path tablePath = new Path(testDir, "testLineDelimitedReader"); - FileAppender appender = (FileAppender) TableSpaceManager.getLocalFs().getAppender( + FileAppender appender = (FileAppender) TablespaceManager.getLocalFs().getAppender( null, null, meta, schema, tablePath); appender.enableStats(); appender.init(); @@ -279,7 +279,7 @@ public class TestLineReader { TableMeta meta = CatalogUtil.newTableMeta("TEXT"); Path tablePath = new Path(testDir, "testSeekableByteBufLineReader.data"); - FileAppender appender = (FileAppender) TableSpaceManager.getLocalFs().getAppender( + FileAppender appender = (FileAppender) TablespaceManager.getLocalFs().getAppender( null, null, meta, schema, tablePath); appender.enableStats(); appender.init(); http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java index 79928ff..331d3e8 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java @@ -94,7 +94,7 @@ public class TestMergeScanner { conf.setStrings("tajo.storage.projectable-scanner", "rcfile", "parquet", "avro"); testDir = CommonTestingUtil.getTestDir(TEST_PATH); fs = testDir.getFileSystem(conf); - sm = TableSpaceManager.getLocalFs(); + sm = TablespaceManager.getLocalFs(); } @Test @@ -114,7 +114,7 @@ public class TestMergeScanner { } Path table1Path = new Path(testDir, storeType + "_1.data"); - Appender appender1 = TableSpaceManager.getLocalFs().getAppender(null, null, meta, schema, table1Path); + Appender appender1 = TablespaceManager.getLocalFs().getAppender(null, null, meta, schema, table1Path); appender1.enableStats(); appender1.init(); int tupleNum = 10000; @@ -136,7 +136,7 @@ public class TestMergeScanner { } Path table2Path = new Path(testDir, storeType + "_2.data"); - Appender appender2 = TableSpaceManager.getLocalFs().getAppender(null, null, meta, schema, table2Path); + Appender appender2 = TablespaceManager.getLocalFs().getAppender(null, null, meta, schema, table2Path); appender2.enableStats(); appender2.init(); http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java index ce2a926..dbfdac3 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -155,7 +155,7 @@ public class TestStorages { TableMeta meta = CatalogUtil.newTableMeta(storeType); Path tablePath = new Path(testDir, "Splitable.data"); - FileTablespace sm = TableSpaceManager.getLocalFs(); + FileTablespace sm = TablespaceManager.getLocalFs(); Appender appender = sm.getAppender(meta, schema, tablePath); appender.enableStats(); appender.init(); @@ -210,7 +210,7 @@ public class TestStorages { TableMeta meta = CatalogUtil.newTableMeta(storeType); Path tablePath = new Path(testDir, "Splitable.data"); - FileTablespace sm = TableSpaceManager.getLocalFs(); + FileTablespace sm = TablespaceManager.getLocalFs(); Appender appender = sm.getAppender(meta, schema, tablePath); appender.enableStats(); appender.init(); @@ -271,7 +271,7 @@ public class TestStorages { } Path tablePath = new Path(testDir, "testProjection.data"); - FileTablespace sm = TableSpaceManager.getLocalFs(); + FileTablespace sm = TablespaceManager.getLocalFs(); Appender appender = sm.getAppender(meta, schema, tablePath); appender.init(); int tupleNum = 10000; @@ -347,7 +347,7 @@ public class TestStorages { meta.putOption(StorageConstants.AVRO_SCHEMA_URL, path); } - FileTablespace sm = TableSpaceManager.getLocalFs(); + FileTablespace sm = TablespaceManager.getLocalFs(); Path tablePath = new Path(testDir, "testVariousTypes.data"); Appender appender = sm.getAppender(meta, schema, tablePath); appender.init(); @@ -425,7 +425,7 @@ public class TestStorages { } Path tablePath = new Path(testDir, "testVariousTypes.data"); - FileTablespace sm = TableSpaceManager.getLocalFs(); + FileTablespace sm = TablespaceManager.getLocalFs(); Appender appender = sm.getAppender(meta, schema, tablePath); appender.init(); @@ -469,7 +469,7 @@ public class TestStorages { FileStatus status = fs.getFileStatus(tablePath); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); scanner.init(); Tuple retrieved; @@ -513,7 +513,7 @@ public class TestStorages { meta.putOption(StorageConstants.CSVFILE_SERDE, TextSerializerDeserializer.class.getName()); Path tablePath = new Path(testDir, "testVariousTypes.data"); - FileTablespace sm = TableSpaceManager.getLocalFs(); + FileTablespace sm = TablespaceManager.getLocalFs(); Appender appender = sm.getAppender(meta, schema, tablePath); appender.enableStats(); appender.init(); @@ -544,7 +544,7 @@ public class TestStorages { assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); scanner.init(); Tuple retrieved; @@ -582,7 +582,7 @@ public class TestStorages { meta.putOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName()); Path tablePath = new Path(testDir, "testVariousTypes.data"); - FileTablespace sm = TableSpaceManager.getLocalFs(); + FileTablespace sm = TablespaceManager.getLocalFs(); Appender appender = sm.getAppender(meta, schema, tablePath); appender.enableStats(); appender.init(); @@ -613,7 +613,7 @@ public class TestStorages { assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); scanner.init(); Tuple retrieved; @@ -651,7 +651,7 @@ public class TestStorages { meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName()); Path tablePath = new Path(testDir, "testVariousTypes.data"); - FileTablespace sm = TableSpaceManager.getLocalFs(); + FileTablespace sm = TablespaceManager.getLocalFs(); Appender appender = sm.getAppender(meta, schema, tablePath); appender.enableStats(); appender.init(); @@ -682,7 +682,7 @@ public class TestStorages { assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); scanner.init(); assertTrue(scanner instanceof SequenceFileScanner); @@ -724,7 +724,7 @@ public class TestStorages { meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, BinarySerializerDeserializer.class.getName()); Path tablePath = new Path(testDir, "testVariousTypes.data"); - FileTablespace sm = TableSpaceManager.getLocalFs(); + FileTablespace sm = TablespaceManager.getLocalFs(); Appender appender = sm.getAppender(meta, schema, tablePath); appender.enableStats(); appender.init(); @@ -756,7 +756,7 @@ public class TestStorages { assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); scanner.init(); assertTrue(scanner instanceof SequenceFileScanner); @@ -786,7 +786,7 @@ public class TestStorages { TableMeta meta = CatalogUtil.newTableMeta(storeType, options); Path tablePath = new Path(testDir, "testTime.data"); - FileTablespace sm = TableSpaceManager.getLocalFs(); + FileTablespace sm = TablespaceManager.getLocalFs(); Appender appender = sm.getAppender(meta, schema, tablePath); appender.init(); @@ -801,7 +801,7 @@ public class TestStorages { FileStatus status = fs.getFileStatus(tablePath); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); scanner.init(); Tuple retrieved; @@ -827,7 +827,7 @@ public class TestStorages { TableMeta meta = CatalogUtil.newTableMeta(storeType); Path tablePath = new Path(testDir, "Seekable.data"); - FileTablespace sm = TableSpaceManager.getLocalFs(); + FileTablespace sm = TablespaceManager.getLocalFs(); FileAppender appender = (FileAppender) sm.getAppender(meta, schema, tablePath); appender.enableStats(); appender.init(); @@ -869,7 +869,7 @@ public class TestStorages { long readBytes = 0; long readRows = 0; for (long offset : offsets) { - scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, + scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, new FileFragment("table", tablePath, prevOffset, offset - prevOffset), schema); scanner.init(); @@ -917,7 +917,7 @@ public class TestStorages { conf.setInt(RawFile.WRITE_BUFFER_SIZE, record + headerSize); } - FileTablespace sm = TableSpaceManager.getLocalFs(); + FileTablespace sm = TablespaceManager.getLocalFs(); Path tablePath = new Path(testDir, "testMaxValue.data"); Appender appender = sm.getAppender(meta, schema, tablePath); @@ -972,7 +972,7 @@ public class TestStorages { meta.setOptions(CatalogUtil.newPhysicalProperties(storeType)); Path tablePath = new Path(testDir, "testLessThanSchemaSize.data"); - FileTablespace sm = TableSpaceManager.getLocalFs(); + FileTablespace sm = TablespaceManager.getLocalFs(); Appender appender = sm.getAppender(meta, dataSchema, tablePath); appender.init(); @@ -998,7 +998,7 @@ public class TestStorages { inSchema.addColumn("col5", Type.INT8); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, inSchema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, inSchema, fragment); Schema target = new Schema(); @@ -1036,7 +1036,7 @@ public class TestStorages { meta.setOptions(CatalogUtil.newPhysicalProperties(storeType)); Path tablePath = new Path(testDir, "test_storetype_oversize.data"); - FileTablespace sm = TableSpaceManager.getLocalFs(); + FileTablespace sm = TablespaceManager.getLocalFs(); Appender appender = sm.getAppender(meta, dataSchema, tablePath); appender.init(); http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java index 1a62f52..22fb607 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java @@ -89,7 +89,7 @@ public class TestBSTIndex { meta = CatalogUtil.newTableMeta(storeType); Path tablePath = new Path(testDir, "testFindValue_" + storeType); - Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs()) + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(meta, schema, tablePath); appender.init(); Tuple tuple; @@ -178,7 +178,7 @@ public class TestBSTIndex { meta = CatalogUtil.newTableMeta(storeType); Path tablePath = new Path(testDir, "testBuildIndexWithAppender_" + storeType); - FileAppender appender = (FileAppender) ((FileTablespace)TableSpaceManager.getLocalFs()) + FileAppender appender = (FileAppender) ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(meta, schema, tablePath); appender.init(); @@ -257,7 +257,7 @@ public class TestBSTIndex { meta = CatalogUtil.newTableMeta(storeType); Path tablePath = StorageUtil.concatPath(testDir, "testFindOmittedValue_" + storeType); - Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs()).getAppender(meta, schema, tablePath); + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(meta, schema, tablePath); appender.init(); Tuple tuple; for (int i = 0; i < TUPLE_NUM; i += 2) { @@ -327,7 +327,7 @@ public class TestBSTIndex { meta = CatalogUtil.newTableMeta(storeType); Path tablePath = new Path(testDir, "testFindNextKeyValue_" + storeType); - Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs()) + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(meta, schema, tablePath); appender.init(); Tuple tuple; @@ -418,7 +418,7 @@ public class TestBSTIndex { meta = CatalogUtil.newTableMeta(storeType); Path tablePath = new Path(testDir, "testFindNextKeyOmittedValue_" + storeType); - Appender appender = (((FileTablespace)TableSpaceManager.getLocalFs())) + Appender appender = (((FileTablespace) TablespaceManager.getLocalFs())) .getAppender(meta, schema, tablePath); appender.init(); Tuple tuple; @@ -498,7 +498,7 @@ public class TestBSTIndex { meta = CatalogUtil.newTableMeta(storeType); Path tablePath = new Path(testDir, "testFindMinValue" + storeType); - Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs()) + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(meta, schema, tablePath); appender.init(); @@ -582,7 +582,7 @@ public class TestBSTIndex { meta = CatalogUtil.newTableMeta(storeType); Path tablePath = new Path(testDir, "testMinMax_" + storeType); - Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs()) + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(meta, schema, tablePath); appender.init(); Tuple tuple; @@ -687,7 +687,7 @@ public class TestBSTIndex { meta = CatalogUtil.newTableMeta(storeType); Path tablePath = new Path(testDir, "testConcurrentAccess_" + storeType); - Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs()) + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(meta, schema, tablePath); appender.init(); @@ -768,7 +768,7 @@ public class TestBSTIndex { meta = CatalogUtil.newTableMeta(storeType); Path tablePath = new Path(testDir, "testFindValueDescOrder_" + storeType); - Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs()) + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) .getAppender(meta, schema, tablePath); appender.init(); @@ -860,7 +860,7 @@ public class TestBSTIndex { meta = CatalogUtil.newTableMeta(storeType); Path tablePath = new Path(testDir, "testFindNextKeyValueDescOrder_" + storeType); - Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs()).getAppender(meta, schema, tablePath); + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(meta, schema, tablePath); appender.init(); Tuple tuple; http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java index 5ad7a27..72810fd 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java @@ -77,7 +77,7 @@ public class TestSingleCSVFileBSTIndex { Path tablePath = StorageUtil.concatPath(testDir, "testFindValueInSingleCSV", "table.csv"); fs.mkdirs(tablePath.getParent()); - Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs()).getAppender(meta, schema, tablePath); + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(meta, schema, tablePath); appender.init(); Tuple tuple; for (int i = 0; i < TUPLE_NUM; i++) { @@ -166,7 +166,7 @@ public class TestSingleCSVFileBSTIndex { Path tablePath = StorageUtil.concatPath(testDir, "testFindNextKeyValueInSingleCSV", "table1.csv"); fs.mkdirs(tablePath.getParent()); - Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(meta, schema, tablePath); + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(meta, schema, tablePath); appender.init(); Tuple tuple; for(int i = 0 ; i < TUPLE_NUM; i ++ ) { http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java index 2fbf5d6..8095081 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java @@ -69,7 +69,7 @@ public class TestJsonSerDe { FileSystem fs = FileSystem.getLocal(conf); FileStatus status = fs.getFileStatus(tablePath); FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); - Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment); scanner.init(); Tuple tuple = scanner.next();
