TAJO-1322: Invalid stored caching on StorageManager. (jinho) Closes #367
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/e656ee28 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/e656ee28 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/e656ee28 Branch: refs/heads/index_support Commit: e656ee2872e660d95c25e7b080064f44a8d9d01e Parents: 4595375 Author: jhkim <[email protected]> Authored: Mon Feb 2 14:42:24 2015 +0900 Committer: jhkim <[email protected]> Committed: Mon Feb 2 14:42:24 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../org/apache/tajo/storage/StorageManager.java | 40 ++++++------- .../tajo/storage/TestFileStorageManager.java | 60 +++++++++++++++----- 3 files changed, 69 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/e656ee28/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 3b59347..4a3715b 100644 --- a/CHANGES +++ b/CHANGES @@ -174,6 +174,8 @@ Release 0.10.0 - unreleased BUG FIXES + TAJO-1322: Invalid stored caching on StorageManager. (jinho) + TAJO-1319: Tajo can't find HBase configuration file. (jaehwa) TAJO-1312: Stage causes Invalid event error: SQ_SHUFFLE_REPORT http://git-wip-us.apache.org/repos/asf/tajo/blob/e656ee28/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java index 34caa80..d929591 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java @@ -42,7 +42,6 @@ import org.apache.tajo.util.TUtil; import java.io.IOException; import java.lang.reflect.Constructor; -import java.net.URI; import java.text.NumberFormat; import java.util.List; import java.util.Map; @@ -284,14 +283,11 @@ public abstract class StorageManager { * @throws java.io.IOException */ public static StorageManager getFileStorageManager(TajoConf tajoConf, Path warehousePath) throws IOException { - URI uri; TajoConf copiedConf = new TajoConf(tajoConf); if (warehousePath != null) { copiedConf.setVar(ConfVars.WAREHOUSE_DIR, warehousePath.toUri().toString()); } - uri = TajoConf.getWarehouseDir(copiedConf).toUri(); - String key = "file".equals(uri.getScheme()) ? "file" : uri.toString(); - return getStorageManager(copiedConf, StoreType.CSV, key); + return getStorageManager(copiedConf, StoreType.CSV); } /** @@ -303,7 +299,7 @@ public abstract class StorageManager { * @throws java.io.IOException */ public static StorageManager getStorageManager(TajoConf tajoConf, String storeType) throws IOException { - if ("HBASE".equals(storeType)) { + if ("HBASE".equalsIgnoreCase(storeType)) { return getStorageManager(tajoConf, StoreType.HBASE); } else { return getStorageManager(tajoConf, StoreType.CSV); @@ -319,7 +315,12 @@ public abstract class StorageManager { * @throws java.io.IOException */ public static StorageManager getStorageManager(TajoConf tajoConf, StoreType storeType) throws IOException { - return getStorageManager(tajoConf, storeType, null); + FileSystem fileSystem = TajoConf.getWarehouseDir(tajoConf).getFileSystem(tajoConf); + if (fileSystem != null) { + return getStorageManager(tajoConf, storeType, fileSystem.getUri().toString()); + } else { + return getStorageManager(tajoConf, storeType, null); + } } /** @@ -331,22 +332,23 @@ public abstract class StorageManager { * @return * @throws java.io.IOException */ - public static synchronized StorageManager getStorageManager ( + private static synchronized StorageManager getStorageManager ( TajoConf tajoConf, StoreType storeType, String managerKey) throws IOException { + + String typeName; + switch (storeType) { + case HBASE: + typeName = "hbase"; + break; + default: + typeName = "hdfs"; + } + synchronized (storageManagers) { - String storeKey = CatalogUtil.getStoreTypeString(storeType) + managerKey; + String storeKey = typeName + "_" + managerKey; StorageManager manager = storageManagers.get(storeKey); - if (manager == null) { - String typeName = "hdfs"; - - switch (storeType) { - case HBASE: - typeName = "hbase"; - break; - default: - typeName = "hdfs"; - } + if (manager == null) { Class<? extends StorageManager> storageManagerClass = tajoConf.getClass(String.format("tajo.storage.manager.%s.class", typeName), null, StorageManager.class); http://git-wip-us.apache.org/repos/asf/tajo/blob/e656ee28/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java index 19a39a2..c4df8d7 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java @@ -38,7 +38,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.io.File; import java.io.IOException; import java.util.List; import java.util.UUID; @@ -48,7 +47,6 @@ import static org.junit.Assert.*; public class TestFileStorageManager { private TajoConf conf; private static String TEST_PATH = "target/test-data/TestFileStorageManager"; - StorageManager sm = null; private Path testDir; private FileSystem fs; @@ -57,7 +55,6 @@ public class TestFileStorageManager { conf = new TajoConf(); testDir = CommonTestingUtil.getTestDir(TEST_PATH); fs = testDir.getFileSystem(conf); - sm = StorageManager.getFileStorageManager(conf, testDir); } @After @@ -84,14 +81,17 @@ public class TestFileStorageManager { Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv"); fs.mkdirs(path.getParent()); - Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, path); + FileStorageManager fileStorageManager = (FileStorageManager)StorageManager.getFileStorageManager(conf); + assertEquals(fs.getUri(), fileStorageManager.getFileSystem().getUri()); + + Appender appender = fileStorageManager.getAppender(meta, schema, path); appender.init(); for(Tuple t : tuples) { appender.addTuple(t); } appender.close(); - Scanner scanner = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getFileScanner(meta, schema, path); + Scanner scanner = fileStorageManager.getFileScanner(meta, schema, path); scanner.init(); int i=0; while(scanner.next() != null) { @@ -110,6 +110,9 @@ public class TestFileStorageManager { final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(1).build(); + cluster.waitClusterUp(); + TajoConf tajoConf = new TajoConf(conf); + tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo"); int testCount = 10; Path tablePath = new Path("/testGetSplit"); @@ -125,7 +128,8 @@ public class TestFileStorageManager { } assertTrue(fs.exists(tablePath)); - FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(new TajoConf(conf), tablePath); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(tajoConf, tablePath); + assertEquals(fs.getUri(), sm.getFileSystem().getUri()); Schema schema = new Schema(); schema.addColumn("id", Type.INT4); @@ -148,10 +152,7 @@ public class TestFileStorageManager { assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]); fs.close(); } finally { - cluster.shutdown(); - - File dir = new File(testDataPath); - dir.delete(); + cluster.shutdown(true); } } @@ -165,6 +166,10 @@ public class TestFileStorageManager { final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(2).build(); + cluster.waitClusterUp(); + + TajoConf tajoConf = new TajoConf(conf); + tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo"); int testCount = 10; Path tablePath = new Path("/testGetSplitWithBlockStorageLocationsBatching"); @@ -177,7 +182,8 @@ public class TestFileStorageManager { DFSTestUtil.createFile(fs, tmpFile, 10, (short) 2, 0xDEADDEADl); } assertTrue(fs.exists(tablePath)); - FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(new TajoConf(conf), tablePath); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(tajoConf, tablePath); + assertEquals(fs.getUri(), sm.getFileSystem().getUri()); Schema schema = new Schema(); schema.addColumn("id", Type.INT4); @@ -194,10 +200,36 @@ public class TestFileStorageManager { assertNotEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]); fs.close(); } finally { - cluster.shutdown(); + cluster.shutdown(true); + } + } - File dir = new File(testDataPath); - dir.delete(); + @Test + public void testStoreType() throws Exception { + final Configuration hdfsConf = new HdfsConfiguration(); + String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString(); + hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath); + hdfsConf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); + hdfsConf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(hdfsConf) + .numDataNodes(2).build(); + cluster.waitClusterUp(); + + TajoConf tajoConf = new TajoConf(hdfsConf); + tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo"); + + try { + /* Local FileSystem */ + FileStorageManager sm = (FileStorageManager)StorageManager.getStorageManager(conf, StoreType.CSV); + assertEquals(fs.getUri(), sm.getFileSystem().getUri()); + + /* Distributed FileSystem */ + sm = (FileStorageManager)StorageManager.getStorageManager(tajoConf, StoreType.CSV); + assertNotEquals(fs.getUri(), sm.getFileSystem().getUri()); + assertEquals(cluster.getFileSystem().getUri(), sm.getFileSystem().getUri()); + } finally { + cluster.shutdown(true); } } }
