Repository: incubator-drill
Updated Branches:
  refs/heads/master 9628f9bb5 -> 81fb18a08


DRILL-1414: Move profile storage to DFS rather than using PStore


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/786fd36e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/786fd36e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/786fd36e

Branch: refs/heads/master
Commit: 786fd36eb90676673dd1ccd64ae42ae3e8166df9
Parents: 9628f9b
Author: Aditya Kishore <adi...@maprtech.com>
Authored: Fri Sep 26 00:42:49 2014 -0700
Committer: Steven Phillips <sphill...@maprtech.com>
Committed: Sun Sep 28 00:10:38 2014 -0700

----------------------------------------------------------------------
 .../exec/store/hbase/config/HBasePStore.java    | 28 +++++-
 .../store/hbase/config/HBasePStoreProvider.java |  8 +-
 .../src/resources/drill-override-example.conf   |  4 +
 .../org/apache/drill/exec/ExecConstants.java    |  2 +
 .../exec/server/rest/ProfileResources.java      | 31 +++++--
 .../exec/store/dfs/shim/DrillFileSystem.java    |  1 +
 .../dfs/shim/fallback/FallbackFileSystem.java   | 10 +-
 .../org/apache/drill/exec/store/sys/PStore.java |  2 +
 .../drill/exec/store/sys/local/LocalPStore.java | 58 +++++++++---
 .../store/sys/local/NoWriteLocalPStore.java     | 15 ++-
 .../drill/exec/store/sys/zk/ZkPStore.java       | 96 +++++++++++++++-----
 .../exec/store/sys/zk/ZkPStoreProvider.java     | 42 ++++++++-
 .../drill/exec/work/foreman/QueryStatus.java    |  6 +-
 .../src/main/resources/rest/profile/list.ftl    | 20 +++-
 14 files changed, 261 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
index 6324180..b5a697c 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.hbase.config;
 
 import static 
org.apache.drill.exec.store.hbase.config.HBasePStoreProvider.FAMILY;
+import static 
org.apache.drill.exec.store.hbase.config.HBasePStoreProvider.FAMILY_BLOB;
 import static 
org.apache.drill.exec.store.hbase.config.HBasePStoreProvider.QUALIFIER;
 
 import java.io.IOException;
@@ -58,10 +59,19 @@ public class HBasePStore<V> implements PStore<V> {
   }
 
   @Override
-  public synchronized V get(String key) {
+  public V get(String key) {
+    return get(key, FAMILY);
+  }
+
+  @Override
+  public V getBlob(String key) {
+    return get(key, FAMILY_BLOB);
+  }
+
+  protected synchronized V get(String key, byte[] family) {
     try {
       Get get = new Get(row(key));
-      get.addColumn(FAMILY, QUALIFIER);
+      get.addColumn(family, QUALIFIER);
       return value(table.get(get));
     } catch (IOException e) {
       throw new DrillRuntimeException("Caught error while getting row '" + key 
+ "' from for table:" + Bytes.toString(table.getTableName()), e);
@@ -69,10 +79,19 @@ public class HBasePStore<V> implements PStore<V> {
   }
 
   @Override
-  public synchronized void put(String key, V value) {
+  public void put(String key, V value) {
+    put(key, FAMILY, value);
+  }
+
+  @Override
+  public void putBlob(String key, V value) {
+    put(key, FAMILY_BLOB, value);
+  }
+
+  protected synchronized void put(String key, byte[] family, V value) {
     try {
       Put put = new Put(row(key));
-      put.add(FAMILY, QUALIFIER, bytes(value));
+      put.add(family, QUALIFIER, bytes(value));
       table.put(put);
     } catch (IOException e) {
       throw new DrillRuntimeException("Caught error while putting row '" + key 
+ "' from for table:" + Bytes.toString(table.getTableName()), e);
@@ -123,7 +142,6 @@ public class HBasePStore<V> implements PStore<V> {
   private void delete(byte[] row) {
     try {
       Delete del = new Delete(row);
-      del.deleteColumns(FAMILY, QUALIFIER);
       table.delete(del);
     } catch (IOException e) {
       throw new DrillRuntimeException("Caught error while deleting row '" + 
Bytes.toStringBinary(row)

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java
index 48e6c42..b310651 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java
@@ -44,6 +44,8 @@ public class HBasePStoreProvider implements PStoreProvider {
 
   static final byte[] FAMILY = Bytes.toBytes("s");
 
+  static final byte[] FAMILY_BLOB = Bytes.toBytes("t");
+
   static final byte[] QUALIFIER = Bytes.toBytes("d");
 
   private final String storeTableName;
@@ -86,12 +88,14 @@ public class HBasePStoreProvider implements PStoreProvider {
       if (!admin.tableExists(storeTableName)) {
         HTableDescriptor desc = new HTableDescriptor(storeTableName);
         desc.addFamily(new HColumnDescriptor(FAMILY).setMaxVersions(1));
+        desc.addFamily(new HColumnDescriptor(FAMILY_BLOB).setMaxVersions(1));
         admin.createTable(desc);
       } else {
         HTableDescriptor desc = 
admin.getTableDescriptor(Bytes.toBytes(storeTableName));
-        if (!desc.hasFamily(FAMILY)) {
+        if (!desc.hasFamily(FAMILY) || !desc.hasFamily(FAMILY_BLOB)) {
           throw new DrillRuntimeException("The HBase table " + storeTableName
-              + " specified as persistent store exists but does not contain 
column family: " + Bytes.toString(FAMILY));
+              + " specified as persistent store exists but does not contain 
column family: "
+              + (desc.hasFamily(FAMILY) ? Bytes.toString(FAMILY_BLOB) : 
Bytes.toString(FAMILY)));
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/distribution/src/resources/drill-override-example.conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-override-example.conf 
b/distribution/src/resources/drill-override-example.conf
index 64a7bc5..4cd342a 100644
--- a/distribution/src/resources/drill-override-example.conf
+++ b/distribution/src/resources/drill-override-example.conf
@@ -96,6 +96,10 @@ drill.exec: {
   },
   sys.store.provider: {
     class: "org.apache.drill.exec.store.sys.zk.ZkPStoreProvider",
+    # The following section is used by ZkPStoreProvider
+    zk: {
+      blobroot: "file:///var/log/drill"
+    },
     # The following section is only required by LocalPStoreProvider
     local: {
       path: "/tmp/drill",

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index c210541..1717237 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -172,4 +172,6 @@ public interface ExecConstants {
   public static final OptionValidator ENABLE_VERBOSE_ERRORS = new 
BooleanValidator(ENABLE_VERBOSE_ERRORS_KEY, false);
 
   public static final String BOOTSTRAP_STORAGE_PLUGINS_FILE = 
"bootstrap-storage-plugins.json";
+
+  public static final String DRILL_SYS_FILE_SUFFIX = ".sys.drill";
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java
index 9cbc2e7..5f1e49d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java
@@ -47,7 +47,8 @@ import com.google.common.collect.Lists;
 public class ProfileResources {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ProfileResources.class);
 
-  @Inject WorkManager work;
+  @Inject
+  WorkManager work;
 
   public static class ProfileInfo implements Comparable<ProfileInfo> {
     public static final SimpleDateFormat format = new 
SimpleDateFormat("MM/dd/yyyy HH:mm:ss");
@@ -55,10 +56,12 @@ public class ProfileResources {
     private String queryId;
     private Date time;
     private String location;
+    private String foreman;
 
-    public ProfileInfo(String queryId, long time) {
+    public ProfileInfo(String queryId, long time, String foreman) {
       this.queryId = queryId;
       this.time = new Date(time);
+      this.foreman = foreman;
       this.location = "http://localhost:8047/profile/"; + queryId + ".json";
     }
 
@@ -78,6 +81,11 @@ public class ProfileResources {
     public int compareTo(ProfileInfo other) {
       return time.compareTo(other.time);
     }
+
+    public String getForeman() {
+      return foreman;
+    }
+
   }
 
   @XmlRootElement
@@ -95,7 +103,7 @@ public class ProfileResources {
     }
 
     public List<ProfileInfo> getFinishedQueries() {
-      return  finishedQueries;
+      return finishedQueries;
     }
   }
 
@@ -114,12 +122,12 @@ public class ProfileResources {
     List<ProfileInfo> runningQueries = Lists.newArrayList();
     List<ProfileInfo> finishedQueries = Lists.newArrayList();
 
-    for(Map.Entry<String, QueryProfile> entry : store){
+    for (Map.Entry<String, QueryProfile> entry : store) {
       QueryProfile profile = entry.getValue();
       if (profile.getState() == QueryState.RUNNING || profile.getState() == 
QueryState.PENDING) {
-        runningQueries.add(new ProfileInfo(entry.getKey(), 
profile.getStart()));
+        runningQueries.add(new ProfileInfo(entry.getKey(), profile.getStart(), 
profile.getForeman().getAddress()));
       } else {
-        finishedQueries.add(new ProfileInfo(entry.getKey(), 
profile.getStart()));
+        finishedQueries.add(new ProfileInfo(entry.getKey(), 
profile.getStart(), profile.getForeman().getAddress()));
       }
     }
 
@@ -145,8 +153,12 @@ public class ProfileResources {
       logger.debug("Failed to get profile for: " + queryId);
       return QueryProfile.getDefaultInstance();
     }
-    QueryProfile profile = store.get(queryId);
-    return profile == null ?  QueryProfile.getDefaultInstance() : profile;
+    // the complete profile is now stored as blob in the PStore
+    QueryProfile profile = store.getBlob(queryId);
+    if (profile == null) {
+      profile = store.get(queryId); // this is to load profile data from older 
builds.
+    }
+    return profile == null ? QueryProfile.getDefaultInstance() : profile;
   }
 
   @GET
@@ -186,4 +198,5 @@ public class ProfileResources {
     }
     return "Query " + queryId + " not running";
   }
-}
\ No newline at end of file
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillFileSystem.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillFileSystem.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillFileSystem.java
index a78871e..d3f9134 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillFileSystem.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillFileSystem.java
@@ -39,5 +39,6 @@ public abstract class DrillFileSystem implements 
AutoCloseable{
   public abstract FileStatus getFileStatus(Path p) throws IOException;
   public abstract DrillOutputStream create(Path p) throws IOException;
   public abstract DrillInputStream open(Path p) throws IOException;
+  public abstract boolean mkdirs(Path f) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java
index 3d1b9f2..959529a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java
@@ -146,6 +146,14 @@ public class FallbackFileSystem extends DrillFileSystem {
 
   }
 
-
+  @Override
+  public boolean mkdirs(Path folderPath) throws IOException {
+    if (!fs.exists(folderPath)) {
+      return fs.mkdirs(folderPath);
+    } else if (!fs.getFileStatus(folderPath).isDirectory()) {
+      throw new IOException("The specified folder path exists and is not a 
folder.");
+    }
+    return false;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java
index e6cca8c..040a99d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java
@@ -21,7 +21,9 @@ import java.util.Map;
 
 public interface PStore<V> extends Iterable<Map.Entry<String, V>> {
   public V get(String key);
+  public V getBlob(String key);
   public void put(String key, V value);
+  public void putBlob(String key, V value);
   public boolean putIfAbsent(String key, V value);
   public void delete(String key);
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStore.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStore.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStore.java
index 35e4aea..c10f862 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStore.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStore.java
@@ -17,8 +17,11 @@
  */
 package org.apache.drill.exec.store.sys.local;
 
+import static org.apache.drill.exec.ExecConstants.DRILL_SYS_FILE_SUFFIX;
+
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -35,18 +38,20 @@ import org.apache.drill.exec.store.sys.PStoreConfig;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-public class LocalPStore<V> implements PStore<V>{
+public class LocalPStore<V> implements PStore<V> {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LocalPStore.class);
 
+  private static final String BLOB_QUALIFIER = "blob";
+
   private final File basePath;
+  private final File blobPath;
   private final PStoreConfig<V> config;
-  private static final String SUFFIX = ".sys.drill";
-
   public LocalPStore(File base, PStoreConfig<V> config) {
     super();
     this.basePath = new File(base, config.getName());
-    if (!basePath.exists()) {
-      basePath.mkdirs();
+    this.blobPath = new File(basePath, BLOB_QUALIFIER);
+    if (!blobPath.exists()) {
+      blobPath.mkdirs();
     }
     this.config = config;
   }
@@ -59,31 +64,38 @@ public class LocalPStore<V> implements PStore<V>{
     }
     List<String> files = Lists.newArrayList();
     for (String s : f) {
-      if (s.endsWith(SUFFIX)) {
-        files.add(s.substring(0, s.length() - SUFFIX.length()));
+      if (s.endsWith(DRILL_SYS_FILE_SUFFIX)) {
+        files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length()));
       }
     }
 
     return new Iter(files.iterator());
   }
 
-  private File p(String name) throws IOException {
+  private File p(String name, boolean blob) throws IOException {
     Preconditions.checkArgument(
         !name.contains("/") &&
         !name.contains(":") &&
         !name.contains(".."));
 
-    File f = new File(basePath, name + SUFFIX);
+    File f = new File(blob ? blobPath : basePath, name + 
DRILL_SYS_FILE_SUFFIX);
     // do this to check file name.
     f.getCanonicalPath();
     return f;
   }
 
-
-
   @Override
   public V get(String key) {
-    try (InputStream is = new FileInputStream(p(key))) {
+    return get(key, false);
+  }
+
+  @Override
+  public V getBlob(String key) {
+    return get(key, true);
+  }
+
+  protected V get(String key, boolean blob) {
+    try (InputStream is = new FileInputStream(p(key, blob))) {
       return config.getSerializer().deserialize(IOUtils.toByteArray(is));
     } catch (IOException e) {
       throw new RuntimeException(e);
@@ -92,7 +104,16 @@ public class LocalPStore<V> implements PStore<V>{
 
   @Override
   public void put(String key, V value) {
-    try (OutputStream os = new FileOutputStream(p(key))) {
+    put(key, false, value);
+  }
+
+  @Override
+  public void putBlob(String key, V value) {
+    put(key, true, value);
+  }
+
+  protected void put(String key, boolean blob, V value) {
+    try (OutputStream os = new FileOutputStream(p(key, blob))) {
       IOUtils.write(config.getSerializer().serialize(value), os);
     } catch (IOException e) {
       throw new RuntimeException(e);
@@ -102,7 +123,7 @@ public class LocalPStore<V> implements PStore<V>{
   @Override
   public boolean putIfAbsent(String key, V value) {
     try {
-      File f = p(key);
+      File f = p(key, false);
       if (f.exists()) {
         return false;
       } else {
@@ -117,12 +138,19 @@ public class LocalPStore<V> implements PStore<V>{
   @Override
   public void delete(String key) {
     try {
-      p(key).delete();
+      delete(key, false);
+      delete(key, true);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
   }
 
+  protected void delete(String key, boolean blob) throws IOException {
+    try {
+      p(key, blob).delete();
+    } catch (FileNotFoundException e) { /* ignored */ }
+  }
+
   private class Iter implements Iterator<Entry<String, V>>{
 
     private Iterator<String> keys;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
index 435bd0d..d1ef931 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
@@ -30,6 +30,8 @@ public class NoWriteLocalPStore<V> implements PStore<V>{
 
   private ConcurrentMap<String, V> map = Maps.newConcurrentMap();
 
+  private ConcurrentMap<String, V> blobMap = Maps.newConcurrentMap();
+
   public NoWriteLocalPStore() {
     super();
   }
@@ -45,8 +47,18 @@ public class NoWriteLocalPStore<V> implements PStore<V>{
   }
 
   @Override
+  public V getBlob(String key) {
+    return blobMap.get(key);
+  }
+
+  @Override
   public void put(String key, V value) {
-    map.put(key,  value);
+    map.put(key, value);
+  }
+
+  @Override
+  public void putBlob(String key, V value) {
+    blobMap.put(key, value);
   }
 
   @Override
@@ -57,6 +69,7 @@ public class NoWriteLocalPStore<V> implements PStore<V>{
   @Override
   public void delete(String key) {
     map.remove(key);
+    blobMap.remove(key);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
index eb21c70..67b947a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
@@ -17,37 +17,52 @@
  */
 package org.apache.drill.exec.store.sys.zk;
 
+import static org.apache.drill.exec.ExecConstants.DRILL_SYS_FILE_SUFFIX;
+
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.shim.DrillInputStream;
+import org.apache.drill.exec.store.dfs.shim.DrillOutputStream;
 import org.apache.drill.exec.store.sys.PStore;
 import org.apache.drill.exec.store.sys.PStoreConfig;
+import org.apache.hadoop.fs.Path;
 import org.apache.zookeeper.CreateMode;
 
 import com.google.common.base.Preconditions;
 
-public class ZkPStore<V> implements PStore<V>{
+public class ZkPStore<V> implements PStore<V> {
 
   private CuratorFramework framework;
   private PStoreConfig<V> config;
   private String prefix;
   private String parent;
+  private DrillFileSystem fs;
+  private Path blobPath;
+  private boolean blobPathCreated;
 
-  ZkPStore(CuratorFramework framework, PStoreConfig<V> config) throws 
IOException {
+  ZkPStore(CuratorFramework framework, DrillFileSystem fs, Path blobRoot, 
PStoreConfig<V> config)
+      throws IOException {
     this.parent = "/" + config.getName();
     this.prefix = parent + "/";
     this.framework = framework;
     this.config = config;
+    this.fs = fs;
+    this.blobPath = new Path(blobRoot, config.getName());
+    this.blobPathCreated = false;
 
     // make sure the parent node exists.
-    try{
-      if(framework.checkExists().forPath(parent) == null) {
+    try {
+      if (framework.checkExists().forPath(parent) == null) {
         framework.create().withMode(CreateMode.PERSISTENT).forPath(parent);
       }
-    }catch(Exception e){
+    } catch (Exception e) {
       throw new RuntimeException("Failure while accessing Zookeeper", e);
     }
 
@@ -55,69 +70,68 @@ public class ZkPStore<V> implements PStore<V>{
 
   @Override
   public Iterator<Entry<String, V>> iterator() {
-    try{
+    try {
       List<String> children = framework.getChildren().forPath(parent);
       return new Iter(children.iterator());
-    }catch(Exception e){
+    } catch (Exception e) {
       throw new RuntimeException("Failure while accessing Zookeeper", e);
     }
-
   }
 
-  private String p(String key){
-    Preconditions.checkArgument(!key.contains("/"), "You cannot use keys that 
have slashes in them when using the Zookeeper SystemTable storage interface.");
+  private String p(String key) {
+    Preconditions.checkArgument(!key.contains("/"),
+        "You cannot use keys that have slashes in them when using the 
Zookeeper SystemTable storage interface.");
     return prefix + key;
   }
 
   @Override
   public V get(String key) {
-    try{
+    try {
       byte[] bytes = framework.getData().forPath(p(key));
-      if(bytes == null){
+      if (bytes == null) {
         return null;
       }
       return config.getSerializer().deserialize(bytes);
 
-    }catch(Exception e){
+    } catch (Exception e) {
       throw new RuntimeException("Failure while accessing Zookeeper", e);
     }
   }
 
   @Override
   public void put(String key, V value) {
-    try{
-      if(framework.checkExists().forPath(p(key)) != null) {
+    try {
+      if (framework.checkExists().forPath(p(key)) != null) {
         framework.setData().forPath(p(key), 
config.getSerializer().serialize(value));
-      }else{
+      } else {
         framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), 
config.getSerializer().serialize(value));
       }
 
-    }catch(Exception e){
+    } catch (Exception e) {
       throw new RuntimeException("Failure while accessing Zookeeper", e);
     }
-
   }
 
   @Override
   public boolean putIfAbsent(String key, V value) {
-    try{
-      if(framework.checkExists().forPath(p(key)) != null) {
+    try {
+      if (framework.checkExists().forPath(p(key)) != null) {
         return false;
-      }else{
+      } else {
         framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), 
config.getSerializer().serialize(value));
         return true;
       }
 
-    }catch(Exception e){
+    } catch (Exception e) {
       throw new RuntimeException("Failure while accessing Zookeeper", e);
     }
   }
 
   @Override
   public void delete(String key) {
-    try{
+    try {
       framework.delete().forPath(p(key));
-    }catch(Exception e){
+    } catch (Exception e) {
       throw new RuntimeException("Failure while accessing Zookeeper", e);
     }
   }
@@ -177,4 +191,38 @@ public class ZkPStore<V> implements PStore<V>{
 
   }
 
+  @Override
+  public V getBlob(String key) {
+    try (DrillInputStream is = fs.open(path(key))) {
+      return 
config.getSerializer().deserialize(IOUtils.toByteArray(is.getInputStream()));
+    } catch (FileNotFoundException e) {
+      return null;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void putBlob(String key, V value) {
+    try (DrillOutputStream os = fs.create(path(key))) {
+      IOUtils.write(config.getSerializer().serialize(value), 
os.getOuputStream());
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Path path(String name) throws IOException {
+    Preconditions.checkArgument(
+        !name.contains("/") &&
+        !name.contains(":") &&
+        !name.contains(".."));
+
+    if (!blobPathCreated) {
+      fs.mkdirs(blobPath);
+      blobPathCreated = true;
+    }
+
+    return new Path(blobPath, name + DRILL_SYS_FILE_SUFFIX);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
index f4513c2..82a1fe7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
@@ -17,32 +17,72 @@
  */
 package org.apache.drill.exec.store.sys.zk;
 
+import java.io.File;
 import java.io.IOException;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
 import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.shim.FileSystemCreator;
 import org.apache.drill.exec.store.sys.PStore;
 import org.apache.drill.exec.store.sys.PStoreConfig;
 import org.apache.drill.exec.store.sys.PStoreProvider;
 import org.apache.drill.exec.store.sys.PStoreRegistry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.annotations.VisibleForTesting;
 
 public class ZkPStoreProvider implements PStoreProvider{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ZkPStoreProvider.class);
 
+  private static final String DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT = 
"drill.exec.sys.store.provider.zk.blobroot";
+
   private final CuratorFramework curator;
 
+  private final DrillFileSystem fs;
+
+  private final Path blobRoot;
+
   public ZkPStoreProvider(PStoreRegistry registry) throws 
DrillbitStartupException {
     ClusterCoordinator coord = registry.getClusterCoordinator();
     if (!(coord instanceof ZKClusterCoordinator)) {
       throw new DrillbitStartupException("A ZkPStoreProvider was created 
without a ZKClusterCoordinator.");
     }
     this.curator = 
((ZKClusterCoordinator)registry.getClusterCoordinator()).getCurator();
+
+    if 
(registry.getConfig().hasPath(DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT)) {
+      blobRoot = new 
Path(registry.getConfig().getString(DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT));
+    } else {
+      String drillLogDir = System.getenv("DRILL_LOG_DIR");
+      if (drillLogDir == null) {
+        drillLogDir = "/var/log/drill";
+      }
+      blobRoot = new Path(new File(drillLogDir).getAbsoluteFile().toURI());
+    }
+    Configuration fsConf = new Configuration();
+    fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, blobRoot.toUri().toString());
+    try {
+      fs = FileSystemCreator.getFileSystem(registry.getConfig(), fsConf);
+      fs.mkdirs(blobRoot);
+    } catch (IOException e) {
+      throw new DrillbitStartupException("Unable to initialize blob storage.", 
e);
+    }
+
   }
 
+  @VisibleForTesting
   public ZkPStoreProvider(CuratorFramework curator) {
     this.curator = curator;
+    this.fs = null;
+    String drillLogDir = System.getenv("DRILL_LOG_DIR");
+    if (drillLogDir == null) {
+      drillLogDir = "/var/log/drill";
+    }
+    blobRoot = new Path(new File(drillLogDir).getAbsoluteFile().toURI());
   }
 
   @Override
@@ -51,7 +91,7 @@ public class ZkPStoreProvider implements PStoreProvider{
 
   @Override
   public <V> PStore<V> getPStore(PStoreConfig<V> store) throws IOException {
-    return new ZkPStore<V>(curator, store);
+    return new ZkPStore<V>(curator, fs, blobRoot, store);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
index 45a151e..dcd58df 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
@@ -122,8 +122,10 @@ public class QueryStatus {
 
   public void updateCache() {
     QueryState queryState = foreman.getQueryState();
-    boolean fullStatus = queryState == QueryState.COMPLETED || queryState == 
QueryState.FAILED;
-    profileCache.put(queryId, getAsProfile(fullStatus));
+    profileCache.put(queryId, getAsProfile(false));
+    if (queryState == QueryState.COMPLETED || queryState == QueryState.FAILED) 
{
+      profileCache.putBlob(queryId, getAsProfile(true));
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/exec/java-exec/src/main/resources/rest/profile/list.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/rest/profile/list.ftl 
b/exec/java-exec/src/main/resources/rest/profile/list.ftl
index 76f9a07..2f8cd4b 100644
--- a/exec/java-exec/src/main/resources/rest/profile/list.ftl
+++ b/exec/java-exec/src/main/resources/rest/profile/list.ftl
@@ -23,7 +23,8 @@
       <table class="table table-hover">
         <thead>
            <td>Time</td>
-           <td>Query</td>
+           <td>Query Id</td>
+           <td>Foreman</td>
         </thead>
         <tbody>
           <#list model.getRunningQueries() as query>
@@ -36,6 +37,13 @@
                 </div>
               </a>
             </td>
+            <td>
+              <a 
href="http://${query.getForeman()}:8047/profiles/${query.getQueryId()}" 
target="_blank">
+                <div style="height:100%;width:100%">
+                  ${query.getForeman()}
+                </div>
+              </a>
+            </td>
           </tr>
           </#list>
         </tbody>
@@ -54,7 +62,8 @@
     <table class="table table-hover">
       <thead>
          <td>Time</td>
-         <td>Query</td>
+         <td>Query Id</td>
+         <td>Foreman</td>
       </thead>
       <tbody>
         <#list model.getFinishedQueries() as query>
@@ -67,6 +76,13 @@
               </div>
             </a>
           </td>
+          <td>
+            <a 
href="http://${query.getForeman()}:8047/profiles/${query.getQueryId()}" 
target="_blank">
+              <div style="height:100%;width:100%">
+                ${query.getForeman()}
+              </div>
+            </a>
+          </td>
         </tr>
         </#list>
       </tbody>

Reply via email to