Repository: tajo
Updated Branches:
  refs/heads/master 4595375f7 -> e656ee287


TAJO-1322: Invalid stored caching on StorageManager. (jinho)

Closes #367


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/e656ee28
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/e656ee28
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/e656ee28

Branch: refs/heads/master
Commit: e656ee2872e660d95c25e7b080064f44a8d9d01e
Parents: 4595375
Author: jhkim <[email protected]>
Authored: Mon Feb 2 14:42:24 2015 +0900
Committer: jhkim <[email protected]>
Committed: Mon Feb 2 14:42:24 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../org/apache/tajo/storage/StorageManager.java | 40 ++++++-------
 .../tajo/storage/TestFileStorageManager.java    | 60 +++++++++++++++-----
 3 files changed, 69 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/e656ee28/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 3b59347..4a3715b 100644
--- a/CHANGES
+++ b/CHANGES
@@ -174,6 +174,8 @@ Release 0.10.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1322: Invalid stored caching on StorageManager. (jinho)
+
     TAJO-1319: Tajo can't find HBase configuration file. (jaehwa)
 
     TAJO-1312: Stage causes Invalid event error: SQ_SHUFFLE_REPORT 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e656ee28/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
index 34caa80..d929591 100644
--- 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
@@ -42,7 +42,6 @@ import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
-import java.net.URI;
 import java.text.NumberFormat;
 import java.util.List;
 import java.util.Map;
@@ -284,14 +283,11 @@ public abstract class StorageManager {
    * @throws java.io.IOException
    */
   public static StorageManager getFileStorageManager(TajoConf tajoConf, Path 
warehousePath) throws IOException {
-    URI uri;
     TajoConf copiedConf = new TajoConf(tajoConf);
     if (warehousePath != null) {
       copiedConf.setVar(ConfVars.WAREHOUSE_DIR, 
warehousePath.toUri().toString());
     }
-    uri = TajoConf.getWarehouseDir(copiedConf).toUri();
-    String key = "file".equals(uri.getScheme()) ? "file" : uri.toString();
-    return getStorageManager(copiedConf, StoreType.CSV, key);
+    return getStorageManager(copiedConf, StoreType.CSV);
   }
 
   /**
@@ -303,7 +299,7 @@ public abstract class StorageManager {
    * @throws java.io.IOException
    */
   public static StorageManager getStorageManager(TajoConf tajoConf, String 
storeType) throws IOException {
-    if ("HBASE".equals(storeType)) {
+    if ("HBASE".equalsIgnoreCase(storeType)) {
       return getStorageManager(tajoConf, StoreType.HBASE);
     } else {
       return getStorageManager(tajoConf, StoreType.CSV);
@@ -319,7 +315,12 @@ public abstract class StorageManager {
    * @throws java.io.IOException
    */
   public static StorageManager getStorageManager(TajoConf tajoConf, StoreType 
storeType) throws IOException {
-    return getStorageManager(tajoConf, storeType, null);
+    FileSystem fileSystem = 
TajoConf.getWarehouseDir(tajoConf).getFileSystem(tajoConf);
+    if (fileSystem != null) {
+      return getStorageManager(tajoConf, storeType, 
fileSystem.getUri().toString());
+    } else {
+      return getStorageManager(tajoConf, storeType, null);
+    }
   }
 
   /**
@@ -331,22 +332,23 @@ public abstract class StorageManager {
    * @return
    * @throws java.io.IOException
    */
-  public static synchronized StorageManager getStorageManager (
+  private static synchronized StorageManager getStorageManager (
       TajoConf tajoConf, StoreType storeType, String managerKey) throws 
IOException {
+
+    String typeName;
+    switch (storeType) {
+      case HBASE:
+        typeName = "hbase";
+        break;
+      default:
+        typeName = "hdfs";
+    }
+
     synchronized (storageManagers) {
-      String storeKey = CatalogUtil.getStoreTypeString(storeType) + managerKey;
+      String storeKey = typeName + "_" + managerKey;
       StorageManager manager = storageManagers.get(storeKey);
-      if (manager == null) {
-        String typeName = "hdfs";
-
-        switch (storeType) {
-          case HBASE:
-            typeName = "hbase";
-            break;
-          default:
-            typeName = "hdfs";
-        }
 
+      if (manager == null) {
         Class<? extends StorageManager> storageManagerClass =
             tajoConf.getClass(String.format("tajo.storage.manager.%s.class", 
typeName), null, StorageManager.class);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e656ee28/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
index 19a39a2..c4df8d7 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
@@ -38,7 +38,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
@@ -48,7 +47,6 @@ import static org.junit.Assert.*;
 public class TestFileStorageManager {
        private TajoConf conf;
        private static String TEST_PATH = 
"target/test-data/TestFileStorageManager";
-  StorageManager sm = null;
   private Path testDir;
   private FileSystem fs;
 
@@ -57,7 +55,6 @@ public class TestFileStorageManager {
                conf = new TajoConf();
     testDir = CommonTestingUtil.getTestDir(TEST_PATH);
     fs = testDir.getFileSystem(conf);
-    sm = StorageManager.getFileStorageManager(conf, testDir);
        }
 
        @After
@@ -84,14 +81,17 @@ public class TestFileStorageManager {
 
     Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", 
"table.csv");
     fs.mkdirs(path.getParent());
-               Appender appender = 
((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta,
 schema, path);
+    FileStorageManager fileStorageManager = 
(FileStorageManager)StorageManager.getFileStorageManager(conf);
+    assertEquals(fs.getUri(), fileStorageManager.getFileSystem().getUri());
+
+               Appender appender = fileStorageManager.getAppender(meta, 
schema, path);
     appender.init();
                for(Tuple t : tuples) {
                  appender.addTuple(t);
                }
                appender.close();
 
-               Scanner scanner = 
((FileStorageManager)StorageManager.getFileStorageManager(conf)).getFileScanner(meta,
 schema, path);
+               Scanner scanner = fileStorageManager.getFileScanner(meta, 
schema, path);
     scanner.init();
                int i=0;
                while(scanner.next() != null) {
@@ -110,6 +110,9 @@ public class TestFileStorageManager {
 
     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
         .numDataNodes(1).build();
+    cluster.waitClusterUp();
+    TajoConf tajoConf = new TajoConf(conf);
+    tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, 
cluster.getFileSystem().getUri() + "/tajo");
 
     int testCount = 10;
     Path tablePath = new Path("/testGetSplit");
@@ -125,7 +128,8 @@ public class TestFileStorageManager {
       }
 
       assertTrue(fs.exists(tablePath));
-      FileStorageManager sm = 
(FileStorageManager)StorageManager.getFileStorageManager(new TajoConf(conf), 
tablePath);
+      FileStorageManager sm = 
(FileStorageManager)StorageManager.getFileStorageManager(tajoConf, tablePath);
+      assertEquals(fs.getUri(), sm.getFileSystem().getUri());
 
       Schema schema = new Schema();
       schema.addColumn("id", Type.INT4);
@@ -148,10 +152,7 @@ public class TestFileStorageManager {
       assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
       fs.close();
     } finally {
-      cluster.shutdown();
-
-      File dir = new File(testDataPath);
-      dir.delete();
+      cluster.shutdown(true);
     }
   }
 
@@ -165,6 +166,10 @@ public class TestFileStorageManager {
 
     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
         .numDataNodes(2).build();
+    cluster.waitClusterUp();
+
+    TajoConf tajoConf = new TajoConf(conf);
+    tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, 
cluster.getFileSystem().getUri() + "/tajo");
 
     int testCount = 10;
     Path tablePath = new 
Path("/testGetSplitWithBlockStorageLocationsBatching");
@@ -177,7 +182,8 @@ public class TestFileStorageManager {
         DFSTestUtil.createFile(fs, tmpFile, 10, (short) 2, 0xDEADDEADl);
       }
       assertTrue(fs.exists(tablePath));
-      FileStorageManager sm = 
(FileStorageManager)StorageManager.getFileStorageManager(new TajoConf(conf), 
tablePath);
+      FileStorageManager sm = 
(FileStorageManager)StorageManager.getFileStorageManager(tajoConf, tablePath);
+      assertEquals(fs.getUri(), sm.getFileSystem().getUri());
 
       Schema schema = new Schema();
       schema.addColumn("id", Type.INT4);
@@ -194,10 +200,36 @@ public class TestFileStorageManager {
       assertNotEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
       fs.close();
     } finally {
-      cluster.shutdown();
+      cluster.shutdown(true);
+    }
+  }
 
-      File dir = new File(testDataPath);
-      dir.delete();
+  @Test
+  public void testStoreType() throws Exception {
+    final Configuration hdfsConf = new HdfsConfiguration();
+    String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
+    hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
+    hdfsConf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+    hdfsConf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);
+
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(hdfsConf)
+        .numDataNodes(2).build();
+    cluster.waitClusterUp();
+
+    TajoConf tajoConf = new TajoConf(hdfsConf);
+    tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, 
cluster.getFileSystem().getUri() + "/tajo");
+
+    try {
+      /* Local FileSystem */
+      FileStorageManager sm = 
(FileStorageManager)StorageManager.getStorageManager(conf, StoreType.CSV);
+      assertEquals(fs.getUri(), sm.getFileSystem().getUri());
+
+      /* Distributed FileSystem */
+      sm = (FileStorageManager)StorageManager.getStorageManager(tajoConf, 
StoreType.CSV);
+      assertNotEquals(fs.getUri(), sm.getFileSystem().getUri());
+      assertEquals(cluster.getFileSystem().getUri(), 
sm.getFileSystem().getUri());
+    } finally {
+      cluster.shutdown(true);
     }
   }
 }

Reply via email to