Repository: incubator-drill
Updated Branches:
  refs/heads/master c96773474 -> 740aad20d


DRILL-1339: Use EStore to track running query status.

Move common code to ZkAbstractStore.

Get full profile from foreman directly.

code cleanup.

code change based on review comments.

ZK store check node exists before delete. Add error message in case of error.

Use a different profile for running queries, so that running query would have 
different ZK node from completed queries.

More log.  Do not delete query state in EStore. In stead, modify the state in 
EStore.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/740aad20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/740aad20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/740aad20

Branch: refs/heads/master
Commit: 740aad20d6db735291d4f67958d0d55f5bd633bb
Parents: c967734
Author: Jinfeng Ni <j...@maprtech.com>
Authored: Wed Oct 22 13:54:53 2014 -0700
Committer: Jinfeng Ni <j...@maprtech.com>
Committed: Thu Oct 30 18:16:13 2014 -0700

----------------------------------------------------------------------
 .../store/hbase/config/HBasePStoreProvider.java |  40 +++++
 .../store/mongo/config/MongoPStoreProvider.java |  11 ++
 .../exec/server/rest/ProfileResources.java      |  78 +++++++--
 .../org/apache/drill/exec/store/sys/EStore.java |  32 ++++
 .../drill/exec/store/sys/EStoreProvider.java    |  29 ++++
 .../drill/exec/store/sys/PStoreProvider.java    |   1 +
 .../store/sys/local/LocalEStoreProvider.java    |  45 +++++
 .../store/sys/local/LocalPStoreProvider.java    |  12 +-
 .../drill/exec/store/sys/local/MapEStore.java   |  54 ++++++
 .../exec/store/sys/zk/ZkAbstractStore.java      | 164 +++++++++++++++++++
 .../drill/exec/store/sys/zk/ZkEStore.java       |  56 +++++++
 .../exec/store/sys/zk/ZkEStoreProvider.java     |  39 +++++
 .../drill/exec/store/sys/zk/ZkPStore.java       | 132 +--------------
 .../exec/store/sys/zk/ZkPStoreProvider.java     |  14 +-
 .../exec/work/batch/ControlHandlerImpl.java     |   2 +-
 .../apache/drill/exec/work/foreman/Foreman.java |   8 +-
 .../drill/exec/work/foreman/QueryManager.java   |  28 ++--
 .../drill/exec/work/foreman/QueryStatus.java    |  46 ++++--
 .../src/main/resources/rest/profile/list.ftl    |   4 +-
 19 files changed, 624 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/740aad20/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java
index b310651..52808d4 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java
@@ -19,13 +19,23 @@ package org.apache.drill.exec.store.hbase.config;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
 
+import com.google.common.collect.Maps;
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
 import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
+import org.apache.drill.exec.store.sys.EStore;
 import org.apache.drill.exec.store.sys.PStore;
 import org.apache.drill.exec.store.sys.PStoreConfig;
 import org.apache.drill.exec.store.sys.PStoreProvider;
 import org.apache.drill.exec.store.sys.PStoreRegistry;
+import org.apache.drill.exec.store.sys.local.LocalEStoreProvider;
+import org.apache.drill.exec.store.sys.local.MapEStore;
+import org.apache.drill.exec.store.sys.zk.ZkEStore;
+import org.apache.drill.exec.store.sys.zk.ZkEStoreProvider;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -56,6 +66,10 @@ public class HBasePStoreProvider implements PStoreProvider {
 
   private HTableInterface table;
 
+  private final boolean zkAvailable;
+  private final LocalEStoreProvider localEStoreProvider;
+  private final ZkEStoreProvider zkEStoreProvider;
+
   public HBasePStoreProvider(PStoreRegistry registry) {
     @SuppressWarnings("unchecked")
     Map<String, Object> config = (Map<String, Object>) 
registry.getConfig().getAnyRef(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_CONFIG);
@@ -67,12 +81,27 @@ public class HBasePStoreProvider implements PStoreProvider {
       }
     }
     this.storeTableName = 
registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE);
+
+    ClusterCoordinator coord = registry.getClusterCoordinator();
+    if ((coord instanceof ZKClusterCoordinator)) {
+      this.localEStoreProvider = null;
+      this.zkEStoreProvider = new 
ZkEStoreProvider(((ZKClusterCoordinator)registry.getClusterCoordinator()).getCurator());
+      this.zkAvailable = true;
+    } else {
+      this.localEStoreProvider = new LocalEStoreProvider();
+      this.zkEStoreProvider = null;
+      this.zkAvailable = false;
+    }
+
   }
 
   @VisibleForTesting
   public HBasePStoreProvider(Configuration conf, String storeTableName) {
     this.hbaseConf = conf;
     this.storeTableName = storeTableName;
+    this.localEStoreProvider = new LocalEStoreProvider();
+    this.zkEStoreProvider = null;
+    this.zkAvailable = false;
   }
 
   @Override
@@ -105,6 +134,17 @@ public class HBasePStoreProvider implements PStoreProvider 
{
   }
 
   @Override
+  public <V> EStore<V> getEStore(PStoreConfig<V> store) throws IOException {
+    // when ZK is available, use ZK as the Ephemeral store.
+    // when ZK is not available, use a Map as the Ephemeral store.
+    if (this.zkAvailable) {
+      return zkEStoreProvider.getEStore(store);
+    } else {
+      return localEStoreProvider.getEStore(store);
+    }
+  }
+
+  @Override
   public synchronized void close() {
     if (this.table != null) {
       try {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/740aad20/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java
 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java
index 4aeaa94..eb4ba53 100644
--- 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java
+++ 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStoreProvider.java
@@ -18,8 +18,10 @@
 package org.apache.drill.exec.store.mongo.config;
 
 import java.io.IOException;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.drill.exec.store.mongo.DrillMongoConstants;
+import org.apache.drill.exec.store.sys.EStore;
 import org.apache.drill.exec.store.sys.PStore;
 import org.apache.drill.exec.store.sys.PStoreConfig;
 import org.apache.drill.exec.store.sys.PStoreProvider;
@@ -32,6 +34,8 @@ import com.mongodb.DBObject;
 import com.mongodb.MongoClient;
 import com.mongodb.MongoClientURI;
 import com.mongodb.WriteConcern;
+import org.apache.drill.exec.store.sys.local.LocalEStoreProvider;
+import org.apache.drill.exec.store.sys.local.MapEStore;
 
 public class MongoPStoreProvider implements PStoreProvider, 
DrillMongoConstants {
 
@@ -45,9 +49,11 @@ public class MongoPStoreProvider implements PStoreProvider, 
DrillMongoConstants
   private DBCollection collection;
 
   private final String mongoURL;
+  private final LocalEStoreProvider localEStoreProvider;
 
   public MongoPStoreProvider(PStoreRegistry registry) {
     mongoURL = registry.getConfig().getString(SYS_STORE_PROVIDER_MONGO_URL);
+    localEStoreProvider = new LocalEStoreProvider();
   }
 
   @Override
@@ -62,6 +68,11 @@ public class MongoPStoreProvider implements PStoreProvider, 
DrillMongoConstants
   }
 
   @Override
+  public <V> EStore<V> getEStore(PStoreConfig<V> storeConfig) throws 
IOException {
+    return localEStoreProvider.getEStore(storeConfig);
+  }
+
+  @Override
   public <V> PStore<V> getPStore(PStoreConfig<V> config) throws IOException {
     return new MongoPStore<>(config, collection);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/740aad20/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java
index 5f1e49d..666faa2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java
@@ -36,6 +36,7 @@ import javax.xml.bind.annotation.XmlRootElement;
 import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.store.sys.EStore;
 import org.apache.drill.exec.store.sys.PStore;
 import org.apache.drill.exec.work.WorkManager;
 import org.apache.drill.exec.work.foreman.QueryStatus;
@@ -111,22 +112,29 @@ public class ProfileResources {
   @Path("/profiles.json")
   @Produces(MediaType.APPLICATION_JSON)
   public QProfiles getProfilesJSON() {
-    PStore<QueryProfile> store = null;
+    PStore<QueryProfile> pStore = null;
+    EStore<QueryProfile> eStore = null;
     try {
-      store = 
work.getContext().getPersistentStoreProvider().getPStore(QueryStatus.QUERY_PROFILE);
+      pStore = 
work.getContext().getPersistentStoreProvider().getPStore(QueryStatus.QUERY_PROFILE);
+      eStore = 
work.getContext().getPersistentStoreProvider().getEStore(QueryStatus.RUNNING_QUERY_PROFILE);
     } catch (IOException e) {
-      logger.debug("Failed to get profiles from persistent store.");
+      logger.debug("Failed to get profiles from persistent or ephemeral 
store.");
       return new QProfiles(new ArrayList<ProfileInfo>(), new 
ArrayList<ProfileInfo>());
     }
 
     List<ProfileInfo> runningQueries = Lists.newArrayList();
     List<ProfileInfo> finishedQueries = Lists.newArrayList();
 
-    for (Map.Entry<String, QueryProfile> entry : store) {
+    for (Map.Entry<String, QueryProfile> entry : eStore) {
       QueryProfile profile = entry.getValue();
       if (profile.getState() == QueryState.RUNNING || profile.getState() == 
QueryState.PENDING) {
         runningQueries.add(new ProfileInfo(entry.getKey(), profile.getStart(), 
profile.getForeman().getAddress()));
-      } else {
+      }
+    }
+
+    for (Map.Entry<String, QueryProfile> entry : pStore) {
+      QueryProfile profile = entry.getValue();
+      if (profile.getState() == QueryState.COMPLETED || profile.getState() == 
QueryState.FAILED || profile.getState() == QueryState.CANCELED) {
         finishedQueries.add(new ProfileInfo(entry.getKey(), 
profile.getStart(), profile.getForeman().getAddress()));
       }
     }
@@ -146,17 +154,53 @@ public class ProfileResources {
   }
 
   private QueryProfile getQueryProfile(String queryId) {
-    PStore<QueryProfile> store = null;
+    PStore<QueryProfile> pStore = null;
     try {
-      store = 
work.getContext().getPersistentStoreProvider().getPStore(QueryStatus.QUERY_PROFILE);
+      pStore = 
work.getContext().getPersistentStoreProvider().getPStore(QueryStatus.QUERY_PROFILE);
     } catch (IOException e) {
       logger.debug("Failed to get profile for: " + queryId);
       return QueryProfile.getDefaultInstance();
     }
-    // the complete profile is now stored as blob in the PStore
-    QueryProfile profile = store.getBlob(queryId);
-    if (profile == null) {
-      profile = store.get(queryId); // this is to load profile data from older 
builds.
+
+    QueryProfile profile = null;
+
+    //TODO: we should handle the error case better. In stead of just returning 
a default profile instance, we should let user know of the error happened.
+    try {
+      // the complete profile is now stored as blob in the PStore
+      profile = pStore.getBlob(queryId);
+    } catch (Exception ex) {
+      logger.error("Fail to get full profile from PStore for query: {}. 
Error:{}", queryId, ex);
+    }
+    try {
+      if (profile == null) {
+        profile = pStore.get(queryId); // this is to load profile data from 
older builds.
+      }
+    } catch (Exception ex) {
+      logger.error("Fail to get compact profile from PStore for query: {}. 
Error:{}", queryId, ex);
+    }
+
+    return profile == null ? QueryProfile.getDefaultInstance() : profile;
+
+  }
+
+  private QueryProfile getRunningQueryProfile(String queryId) {
+    EStore<QueryProfile> eStore = null;
+    try {
+      eStore = 
work.getContext().getPersistentStoreProvider().getEStore(QueryStatus.RUNNING_QUERY_PROFILE);
+    } catch (IOException e) {
+      logger.debug("Failed to get profile for: " + queryId);
+      return QueryProfile.getDefaultInstance();
+    }
+
+    QueryProfile profile = eStore.get(queryId);
+
+    if (profile != null) {
+      if (work.getBee().getForemanForQueryId(profile.getId()) != null) {
+        profile = 
work.getBee().getForemanForQueryId(profile.getId()).getQueryStatus().getAsProfile(true);
+        return profile;
+      }
+    } else {
+        logger.debug("profile from non-foreman");
     }
     return profile == null ? QueryProfile.getDefaultInstance() : profile;
   }
@@ -184,10 +228,20 @@ public class ProfileResources {
   }
 
   @GET
+  @Path("/running_profiles/{queryid}")
+  @Produces(MediaType.TEXT_HTML)
+  public Viewable getRunningProfile(@PathParam("queryid") String queryId) {
+    ProfileWrapper wrapper = new 
ProfileWrapper(getRunningQueryProfile(queryId));
+
+    return new Viewable("/rest/profile/profile.ftl", wrapper);
+
+  }
+
+  @GET
   @Path("/profiles/cancel/{queryid}")
   @Produces(MediaType.TEXT_PLAIN)
   public String cancelQuery(@PathParam("queryid") String queryId) throws 
IOException {
-    PStore<QueryProfile> profiles = 
work.getContext().getPersistentStoreProvider().getPStore(QueryStatus.QUERY_PROFILE);
+    EStore<QueryProfile> profiles = 
work.getContext().getPersistentStoreProvider().getEStore(QueryStatus.RUNNING_QUERY_PROFILE);
     QueryProfile profile = profiles.get(queryId);
     if (profile != null && (profile.getState() == QueryState.RUNNING || 
profile.getState() == QueryState.PENDING)) {
       
work.getUserWorker().cancelQuery(QueryIdHelper.getQueryIdFromString(queryId));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/740aad20/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java
new file mode 100644
index 0000000..7214092
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.sys;
+
+import java.util.Map;
+
+/**
+ * Interfaces to define EStore, which is keep track of status/information for 
running queries. The information
+ * would be gone, if the query is completed, or the foreman drillbit is not 
responding.
+ * @param <V>
+ */
+public interface EStore <V> extends Iterable<Map.Entry<String, V>> {
+  public V get(String key);
+  public void put(String key, V value);
+  public void delete(String key);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/740aad20/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java
new file mode 100644
index 0000000..32bf0b1
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.sys;
+
+import java.io.IOException;
+
+/**
+ * Interface to define the provider which return EStore.
+ */
+
+public interface EStoreProvider {
+  public <V> EStore<V> getEStore(PStoreConfig<V> table) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/740aad20/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
index ffc2973..f6154e2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
@@ -23,4 +23,5 @@ import java.io.IOException;
 public interface PStoreProvider extends AutoCloseable, Closeable{
   public <V> PStore<V> getPStore(PStoreConfig<V> table) throws IOException;
   public void start() throws IOException;
+  public <V> EStore<V> getEStore(PStoreConfig<V> table) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/740aad20/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java
new file mode 100644
index 0000000..b505fd4
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.sys.local;
+
+import com.google.common.collect.Maps;
+import org.apache.drill.exec.store.sys.EStore;
+import org.apache.drill.exec.store.sys.EStoreProvider;
+import org.apache.drill.exec.store.sys.PStoreConfig;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentMap;
+
+public class LocalEStoreProvider implements EStoreProvider{
+  private ConcurrentMap<PStoreConfig<?>, EStore<?>> estores = 
Maps.newConcurrentMap();
+
+  @Override
+  public <V> EStore<V> getEStore(PStoreConfig<V> storeConfig) throws 
IOException {
+    if (! (estores.containsKey(storeConfig)) ) {
+      EStore<V> p = new MapEStore<V>();
+      EStore<?> p2 = estores.putIfAbsent(storeConfig, p);
+      if(p2 != null) {
+        return (EStore<V>) p2;
+      }
+      return p;
+    } else {
+      return (EStore<V>) estores.get(storeConfig);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/740aad20/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
index 95fcf14..c413866 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
@@ -23,22 +23,24 @@ import java.util.concurrent.ConcurrentMap;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.store.sys.EStore;
 import org.apache.drill.exec.store.sys.PStore;
 import org.apache.drill.exec.store.sys.PStoreConfig;
-import org.apache.drill.exec.store.sys.PStoreProvider;
 import org.apache.drill.exec.store.sys.PStoreRegistry;
+import org.apache.drill.exec.store.sys.PStoreProvider;
 
 import com.google.common.collect.Maps;
 
 /**
  * A really simple provider that stores data in the local file system, one 
value per file.
  */
-public class LocalPStoreProvider implements PStoreProvider{
+public class LocalPStoreProvider implements PStoreProvider {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LocalPStoreProvider.class);
 
   private File path;
   private final boolean enableWrite;
   private ConcurrentMap<PStoreConfig<?>, PStore<?>> pstores;
+  private final LocalEStoreProvider estoreProvider;
 
   public LocalPStoreProvider(DrillConfig config) {
     path = new 
File(config.getString(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH));
@@ -46,6 +48,7 @@ public class LocalPStoreProvider implements PStoreProvider{
     if (!enableWrite) {
       pstores = Maps.newConcurrentMap();
     }
+    estoreProvider = new LocalEStoreProvider();
   }
 
   public LocalPStoreProvider(PStoreRegistry registry) {
@@ -57,6 +60,11 @@ public class LocalPStoreProvider implements PStoreProvider{
   }
 
   @Override
+  public <V> EStore<V> getEStore(PStoreConfig<V> storeConfig) throws 
IOException {
+    return estoreProvider.getEStore(storeConfig);
+  }
+
+  @Override
   public <V> PStore<V> getPStore(PStoreConfig<V> storeConfig) throws 
IOException {
     if (enableWrite) {
       return new LocalPStore<V>(path, storeConfig);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/740aad20/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java
new file mode 100644
index 0000000..84e5027
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.sys.local;
+
+import org.apache.drill.exec.store.sys.EStore;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Implementation of EStore using ConcurrentHashMap.
+ * @param <V>
+ */
+public class MapEStore <V> implements EStore<V> {
+  ConcurrentHashMap<String, V> store = new ConcurrentHashMap<>();
+
+  @Override
+  public V get(String key) {
+    return store.get(key);
+  }
+
+  @Override
+  public void put(String key, V value) {
+    store.put(key, value);
+  }
+
+  @Override
+  public void delete(String key) {
+    store.remove(key);
+  }
+
+  @Override
+  public Iterator<Map.Entry<String, V>> iterator() {
+    return store.entrySet().iterator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/740aad20/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
new file mode 100644
index 0000000..b88ff74
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys.zk;
+
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.drill.exec.store.sys.PStoreConfig;
+import org.apache.zookeeper.CreateMode;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+/**
+ * This is the abstract class that is shared by ZkPStore (Persistent store) 
and ZkEStore (Ephemeral Store)
+ * @param <V>
+ */
+public abstract class ZkAbstractStore<V> {
+
+  protected CuratorFramework framework;
+  protected PStoreConfig<V> config;
+  private String prefix;
+  private String parent;
+
+  public ZkAbstractStore(CuratorFramework framework, PStoreConfig<V> config)
+      throws IOException {
+    this.parent = "/" + config.getName();
+    this.prefix = parent + "/";
+    this.framework = framework;
+    this.config = config;
+
+    // make sure the parent node exists.
+    try {
+      if (framework.checkExists().forPath(parent) == null) {
+        framework.create().withMode(CreateMode.PERSISTENT).forPath(parent);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException("Failure while accessing Zookeeper. " + 
e.getMessage(), e);
+    }
+
+  }
+
+  public Iterator<Entry<String, V>> iterator() {
+    try {
+      List<String> children = framework.getChildren().forPath(parent);
+      return new Iter(children.iterator());
+    } catch (Exception e) {
+      throw new RuntimeException("Failure while accessing Zookeeper. " + 
e.getMessage(), e);
+    }
+  }
+
+  protected String p(String key) {
+    Preconditions.checkArgument(!key.contains("/"),
+        "You cannot use keys that have slashes in them when using the 
Zookeeper SystemTable storage interface.");
+    return prefix + key;
+  }
+
+  public V get(String key) {
+    try {
+      byte[] bytes = framework.getData().forPath(p(key));
+      if (bytes == null) {
+        return null;
+      }
+      return config.getSerializer().deserialize(bytes);
+
+    } catch (Exception e) {
+      throw new RuntimeException("Failure while accessing Zookeeper. " + 
e.getMessage(), e);
+    }
+  }
+
+  public void put(String key, V value) {
+    try {
+      if (framework.checkExists().forPath(p(key)) != null) {
+        framework.setData().forPath(p(key), 
config.getSerializer().serialize(value));
+      } else {
+        createNodeInZK(key, value);
+      }
+
+    } catch (Exception e) {
+      throw new RuntimeException("Failure while accessing Zookeeper. " + 
e.getMessage(), e);
+    }
+  }
+
+  public void delete(String key) {
+    try {
+        framework.delete().forPath(p(key));
+    } catch (Exception e) {
+      throw new RuntimeException("Failure while accessing Zookeeper. " + 
e.getMessage(), e);
+    }
+  }
+
+  public abstract void createNodeInZK (String key, V value);
+
+  private class Iter implements Iterator<Entry<String, V>>{
+
+    private Iterator<String> keys;
+    private String current;
+
+    public Iter(Iterator<String> keys) {
+      super();
+      this.keys = keys;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return keys.hasNext();
+    }
+
+    @Override
+    public Entry<String, V> next() {
+      current = keys.next();
+      return new DeferredEntry(current);
+    }
+
+    @Override
+    public void remove() {
+      delete(current);
+      keys.remove();
+    }
+
+    private class DeferredEntry implements Entry<String, V>{
+
+      private String name;
+
+      public DeferredEntry(String name) {
+        super();
+        this.name = name;
+      }
+
+      @Override
+      public String getKey() {
+        return name;
+      }
+
+      @Override
+      public V getValue() {
+        return get(name);
+      }
+
+      @Override
+      public V setValue(V value) {
+        throw new UnsupportedOperationException();
+      }
+
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/740aad20/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStore.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStore.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStore.java
new file mode 100644
index 0000000..1abf3a6
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStore.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys.zk;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.drill.exec.store.sys.EStore;
+import org.apache.drill.exec.store.sys.PStoreConfig;
+import org.apache.zookeeper.CreateMode;
+
+import java.io.IOException;
+
+/**
+ * Implementation of EStore using Zookeeper's EPHEMERAL node.
+ * @param <V>
+ */
+public class ZkEStore<V> extends ZkAbstractStore<V> implements EStore<V>{
+
+  public ZkEStore(CuratorFramework framework, PStoreConfig<V> config) throws 
IOException{
+    super(framework,config);
+  }
+
+  @Override
+  public void delete(String key) {
+    try {
+      if (framework.checkExists().forPath(p(key)) != null) {
+        framework.delete().forPath(p(key));
+      }
+    } catch (Exception e) {
+      throw new RuntimeException("Failure while accessing Zookeeper. " + 
e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void createNodeInZK(String key, V value) {
+    try {
+      framework.create().withMode(CreateMode.EPHEMERAL).forPath(p(key), 
config.getSerializer().serialize(value));
+    } catch (Exception e) {
+      throw new RuntimeException("Failure while accessing Zookeeper", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/740aad20/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
new file mode 100644
index 0000000..34b59a7
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.sys.zk;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.drill.exec.store.sys.EStore;
+import org.apache.drill.exec.store.sys.EStoreProvider;
+import org.apache.drill.exec.store.sys.PStoreConfig;
+
+import java.io.IOException;
+
+public class ZkEStoreProvider implements EStoreProvider{
+  private final CuratorFramework curator;
+
+  public ZkEStoreProvider(CuratorFramework curator) {
+    this.curator = curator;
+  }
+
+  @Override
+  public <V> EStore<V> getEStore(PStoreConfig<V> store) throws IOException {
+    return new ZkEStore<V>(curator,store);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/740aad20/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
index 67b947a..601ba8f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
@@ -21,9 +21,6 @@ import static 
org.apache.drill.exec.ExecConstants.DRILL_SYS_FILE_SUFFIX;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.curator.framework.CuratorFramework;
@@ -37,76 +34,29 @@ import org.apache.zookeeper.CreateMode;
 
 import com.google.common.base.Preconditions;
 
-public class ZkPStore<V> implements PStore<V> {
+/**
+ * Implementation of PStore using Zookeeper's PERSISTENT node.
+ * @param <V>
+ */
+public class ZkPStore<V> extends ZkAbstractStore<V> implements PStore<V>{
 
-  private CuratorFramework framework;
-  private PStoreConfig<V> config;
-  private String prefix;
-  private String parent;
   private DrillFileSystem fs;
   private Path blobPath;
   private boolean blobPathCreated;
 
   ZkPStore(CuratorFramework framework, DrillFileSystem fs, Path blobRoot, 
PStoreConfig<V> config)
       throws IOException {
-    this.parent = "/" + config.getName();
-    this.prefix = parent + "/";
-    this.framework = framework;
-    this.config = config;
+    super(framework, config);
+
     this.fs = fs;
     this.blobPath = new Path(blobRoot, config.getName());
     this.blobPathCreated = false;
-
-    // make sure the parent node exists.
-    try {
-      if (framework.checkExists().forPath(parent) == null) {
-        framework.create().withMode(CreateMode.PERSISTENT).forPath(parent);
-      }
-    } catch (Exception e) {
-      throw new RuntimeException("Failure while accessing Zookeeper", e);
-    }
-
-  }
-
-  @Override
-  public Iterator<Entry<String, V>> iterator() {
-    try {
-      List<String> children = framework.getChildren().forPath(parent);
-      return new Iter(children.iterator());
-    } catch (Exception e) {
-      throw new RuntimeException("Failure while accessing Zookeeper", e);
-    }
-  }
-
-  private String p(String key) {
-    Preconditions.checkArgument(!key.contains("/"),
-        "You cannot use keys that have slashes in them when using the 
Zookeeper SystemTable storage interface.");
-    return prefix + key;
-  }
-
-  @Override
-  public V get(String key) {
-    try {
-      byte[] bytes = framework.getData().forPath(p(key));
-      if (bytes == null) {
-        return null;
-      }
-      return config.getSerializer().deserialize(bytes);
-
-    } catch (Exception e) {
-      throw new RuntimeException("Failure while accessing Zookeeper", e);
-    }
   }
 
   @Override
-  public void put(String key, V value) {
+  public void createNodeInZK(String key, V value) {
     try {
-      if (framework.checkExists().forPath(p(key)) != null) {
-        framework.setData().forPath(p(key), 
config.getSerializer().serialize(value));
-      } else {
-        framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), 
config.getSerializer().serialize(value));
-      }
-
+      framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), 
config.getSerializer().serialize(value));
     } catch (Exception e) {
       throw new RuntimeException("Failure while accessing Zookeeper", e);
     }
@@ -128,70 +78,6 @@ public class ZkPStore<V> implements PStore<V> {
   }
 
   @Override
-  public void delete(String key) {
-    try {
-      framework.delete().forPath(p(key));
-    } catch (Exception e) {
-      throw new RuntimeException("Failure while accessing Zookeeper", e);
-    }
-  }
-
-  private class Iter implements Iterator<Entry<String, V>>{
-
-    private Iterator<String> keys;
-    private String current;
-
-    public Iter(Iterator<String> keys) {
-      super();
-      this.keys = keys;
-    }
-
-    @Override
-    public boolean hasNext() {
-      return keys.hasNext();
-    }
-
-    @Override
-    public Entry<String, V> next() {
-      current = keys.next();
-      return new DeferredEntry(current);
-    }
-
-    @Override
-    public void remove() {
-      delete(current);
-      keys.remove();
-    }
-
-    private class DeferredEntry implements Entry<String, V>{
-
-      private String name;
-
-      public DeferredEntry(String name) {
-        super();
-        this.name = name;
-      }
-
-      @Override
-      public String getKey() {
-        return name;
-      }
-
-      @Override
-      public V getValue() {
-        return get(name);
-      }
-
-      @Override
-      public V setValue(V value) {
-        throw new UnsupportedOperationException();
-      }
-
-    }
-
-  }
-
-  @Override
   public V getBlob(String key) {
     try (DrillInputStream is = fs.open(path(key))) {
       return 
config.getSerializer().deserialize(IOUtils.toByteArray(is.getInputStream()));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/740aad20/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
index 82a1fe7..aa64f53 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
@@ -26,17 +26,18 @@ import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.shim.FileSystemCreator;
+import org.apache.drill.exec.store.sys.EStore;
 import org.apache.drill.exec.store.sys.PStore;
 import org.apache.drill.exec.store.sys.PStoreConfig;
-import org.apache.drill.exec.store.sys.PStoreProvider;
 import org.apache.drill.exec.store.sys.PStoreRegistry;
+import org.apache.drill.exec.store.sys.PStoreProvider;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import com.google.common.annotations.VisibleForTesting;
 
-public class ZkPStoreProvider implements PStoreProvider{
+public class ZkPStoreProvider implements PStoreProvider {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ZkPStoreProvider.class);
 
   private static final String DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT = 
"drill.exec.sys.store.provider.zk.blobroot";
@@ -47,6 +48,8 @@ public class ZkPStoreProvider implements PStoreProvider{
 
   private final Path blobRoot;
 
+  private final ZkEStoreProvider zkEStoreProvider;
+
   public ZkPStoreProvider(PStoreRegistry registry) throws 
DrillbitStartupException {
     ClusterCoordinator coord = registry.getClusterCoordinator();
     if (!(coord instanceof ZKClusterCoordinator)) {
@@ -72,6 +75,7 @@ public class ZkPStoreProvider implements PStoreProvider{
       throw new DrillbitStartupException("Unable to initialize blob storage.", 
e);
     }
 
+    zkEStoreProvider = new ZkEStoreProvider(curator);
   }
 
   @VisibleForTesting
@@ -83,6 +87,7 @@ public class ZkPStoreProvider implements PStoreProvider{
       drillLogDir = "/var/log/drill";
     }
     blobRoot = new Path(new File(drillLogDir).getAbsoluteFile().toURI());
+    zkEStoreProvider = new ZkEStoreProvider(curator);
   }
 
   @Override
@@ -90,6 +95,11 @@ public class ZkPStoreProvider implements PStoreProvider{
   }
 
   @Override
+  public <V> EStore<V> getEStore(PStoreConfig<V> store) throws IOException {
+    return zkEStoreProvider.getEStore(store);
+  }
+
+  @Override
   public <V> PStore<V> getPStore(PStoreConfig<V> store) throws IOException {
     return new ZkPStore<V>(curator, fs, blobRoot, store);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/740aad20/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
index 63d8c71..fc0972f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
@@ -96,7 +96,7 @@ public class ControlHandlerImpl implements 
ControlMessageHandler {
       QueryProfile profile;
       if (foreman == null) {
         try {
-          profile = 
bee.getContext().getPersistentStoreProvider().getPStore(QueryStatus.QUERY_PROFILE).get(QueryIdHelper.getQueryId(queryId));
+          profile = 
bee.getContext().getPersistentStoreProvider().getEStore(QueryStatus.RUNNING_QUERY_PROFILE).get(QueryIdHelper.getQueryId(queryId));
         } catch (IOException e) {
           throw new RpcException("Failed to get persistent store", e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/740aad20/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 2e4f43d..0163f55 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -114,6 +114,7 @@ public class Foreman implements Runnable, Closeable, 
Comparable<Object>{
         return QueryState.valueOf(i);
       }
     };
+    this.fragmentManager.getStatus().updateQueryStateInStore();
   }
 
   public QueryContext getContext() {
@@ -167,7 +168,9 @@ public class Foreman implements Runnable, Closeable, 
Comparable<Object>{
   void cleanupAndSendResult(QueryResult result) {
     bee.retireForeman(this);
     initiatingClient.sendResult(new ResponseSendListener(), new 
QueryWritableBatch(result), true);
-    state.updateState(QueryState.RUNNING, QueryState.COMPLETED);
+    state.updateState(state.getState(), result.getQueryState());
+
+    this.fragmentManager.getStatus().updateQueryStateInStore();
   }
 
   private class ResponseSendListener extends BaseRpcOutcomeListener<Ack> {
@@ -360,14 +363,13 @@ public class Foreman implements Runnable, Closeable, 
Comparable<Object>{
 
       int totalFragments = 1 + work.getFragments().size();;
       fragmentManager.getStatus().setTotalFragments(totalFragments);
-      fragmentManager.getStatus().updateCache();
 
       logger.debug("Submitting fragments to run.");
       fragmentManager.runFragments(bee, work.getRootFragment(), 
work.getRootOperator(), initiatingClient, work.getFragments());
 
       logger.debug("Fragments running.");
       state.updateState(QueryState.PENDING, QueryState.RUNNING);
-
+      fragmentManager.getStatus().updateQueryStateInStore();
     } catch (Exception e) {
       fail("Failure while setting up query.", e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/740aad20/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index b60d8a9..b200edc 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -66,6 +66,7 @@ public class QueryManager implements FragmentStatusListener{
   private final Controller controller;
   private ForemanManagerListener foremanManagerListener;
   private AtomicInteger remainingFragmentCount;
+  private AtomicInteger failedFragmentCount;
   private WorkEventBus workBus;
   private QueryId queryId;
   private FragmentExecutor rootRunner;
@@ -81,6 +82,7 @@ public class QueryManager implements FragmentStatusListener{
     this.queryId =  id;
     this.controller = controller;
     this.remainingFragmentCount = new AtomicInteger(0);
+    this.failedFragmentCount = new AtomicInteger(0);
     this.status = new QueryStatus(query, id, pStoreProvider, foreman);
   }
 
@@ -182,7 +184,7 @@ public class QueryManager implements FragmentStatusListener{
     logger.debug("New fragment status was provided to Foreman of {}", status);
     switch(status.getProfile().getState()){
     case AWAITING_ALLOCATION:
-      updateStatus(status, true);
+      updateFragmentStatus(status);
       break;
     case CANCELLED:
       // we don't care about cancellation messages since we're the only entity 
that should drive cancellations.
@@ -194,15 +196,15 @@ public class QueryManager implements 
FragmentStatusListener{
       finished(status);
       break;
     case RUNNING:
-      updateStatus(status, false);
+      updateFragmentStatus(status);
       break;
     default:
       throw new UnsupportedOperationException(String.format("Received status 
of %s", status));
     }
   }
 
-  private void updateStatus(FragmentStatus status, boolean updateCache){
-    this.status.update(status, updateCache);
+  private void updateFragmentStatus(FragmentStatus status){
+    this.status.updateFragmentStatus(status);
   }
 
   private void finished(FragmentStatus status){
@@ -213,20 +215,24 @@ public class QueryManager implements 
FragmentStatusListener{
               .setQueryState(QueryState.COMPLETED) //
               .setQueryId(queryId) //
               .build();
+      this.status.setEndTime(System.currentTimeMillis());
       foremanManagerListener.cleanupAndSendResult(result);
       workBus.removeFragmentStatusListener(queryId);
     }
-    this.status.setEndTime(System.currentTimeMillis());
     this.status.incrementFinishedFragments();
-    updateStatus(status, true);
+    updateFragmentStatus(status);
   }
 
   private void fail(FragmentStatus status){
-    stopQuery();
-    QueryResult result = 
QueryResult.newBuilder().setQueryId(queryId).setQueryState(QueryState.FAILED).addError(status.getProfile().getError()).build();
-    foremanManagerListener.cleanupAndSendResult(result);
-    this.status.setEndTime(System.currentTimeMillis());
-    updateStatus(status, true);
+    logger.warn("Fragment faild : {}", status);
+    updateFragmentStatus(status);
+    int failed = this.failedFragmentCount.incrementAndGet();
+    if (failed == 1) { // only first failed fragment need notify foreman (?)
+      stopQuery();
+      QueryResult result = 
QueryResult.newBuilder().setQueryId(queryId).setQueryState(QueryState.FAILED).addError(status.getProfile().getError()).build();
+      foremanManagerListener.cleanupAndSendResult(result);
+      this.status.setEndTime(System.currentTimeMillis());
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/740aad20/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
index dcd58df..06c8e28 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.store.sys.EStore;
 import org.apache.drill.exec.store.sys.PStore;
 import org.apache.drill.exec.store.sys.PStoreConfig;
 import org.apache.drill.exec.store.sys.PStoreProvider;
@@ -42,6 +43,8 @@ public class QueryStatus {
   public static final PStoreConfig<QueryProfile> QUERY_PROFILE = PStoreConfig.
           newProtoBuilder(SchemaUserBitShared.QueryProfile.WRITE, 
SchemaUserBitShared.QueryProfile.MERGE).name("query_profiles").build();
 
+  public static final PStoreConfig<QueryProfile> RUNNING_QUERY_PROFILE = 
PStoreConfig.
+      newProtoBuilder(SchemaUserBitShared.QueryProfile.WRITE, 
SchemaUserBitShared.QueryProfile.MERGE).name("query_profiles_running").build();
 
   // doesn't need to be thread safe as fragmentDataMap is generated in a 
single thread and then accessed by multiple threads for reads only.
   private IntObjectOpenHashMap<IntObjectOpenHashMap<FragmentData>> 
fragmentDataMap = new 
IntObjectOpenHashMap<IntObjectOpenHashMap<FragmentData>>();
@@ -57,14 +60,16 @@ public class QueryStatus {
   private int totalFragments;
   private int finishedFragments = 0;
 
-  private final PStore<QueryProfile> profileCache;
+  private final PStore<QueryProfile> profilePStore;
+  private final EStore<QueryProfile> profileEStore;
 
   public QueryStatus(RunQuery query, QueryId id, PStoreProvider provider, 
Foreman foreman) {
     this.id = id;
     this.query = query;
     this.queryId = QueryIdHelper.getQueryId(id);
     try {
-      this.profileCache = provider.getPStore(QUERY_PROFILE);
+      this.profilePStore = provider.getPStore(QUERY_PROFILE);
+      this.profileEStore = provider.getEStore(RUNNING_QUERY_PROFILE);
     } catch (IOException e) {
       throw new DrillRuntimeException(e);
     }
@@ -77,8 +82,6 @@ public class QueryStatus {
 
   public void setPlanText(String planText) {
     this.planText = planText;
-    updateCache();
-
   }
 
   public void setStartTime(long startTime) {
@@ -111,20 +114,33 @@ public class QueryStatus {
     fragmentDataSet.add(data);
   }
 
-  void update(FragmentStatus status, boolean updateCache) {
-    int majorFragmentId = status.getHandle().getMajorFragmentId();
-    int minorFragmentId = status.getHandle().getMinorFragmentId();
-    
fragmentDataMap.get(majorFragmentId).get(minorFragmentId).setStatus(status);
-    if (updateCache) {
-      updateCache();
-    }
+  void updateFragmentStatus(FragmentStatus fragmentStatus) {
+    int majorFragmentId = fragmentStatus.getHandle().getMajorFragmentId();
+    int minorFragmentId = fragmentStatus.getHandle().getMinorFragmentId();
+    
fragmentDataMap.get(majorFragmentId).get(minorFragmentId).setStatus(fragmentStatus);
   }
 
-  public void updateCache() {
+  public void updateQueryStateInStore() {
     QueryState queryState = foreman.getQueryState();
-    profileCache.put(queryId, getAsProfile(false));
-    if (queryState == QueryState.COMPLETED || queryState == QueryState.FAILED) 
{
-      profileCache.putBlob(queryId, getAsProfile(true));
+    switch (queryState) {
+      case PENDING:
+        // only foreman will put one node for "pending" query into PStore. 
This is to avoid concurrency issue when multiple threads fail and
+        // call this method.
+        profilePStore.put(queryId, getAsProfile(false));
+      case RUNNING:
+        profileEStore.put(queryId, getAsProfile(false));  // store as 
ephemeral query profile.
+        logger.warn("Update running or pending query state : {}", queryState);
+        break;
+      case COMPLETED:
+      case CANCELED:
+      case FAILED:
+        logger.warn("Update finished query state : {}", queryState);
+        profileEStore.put(queryId, getAsProfile(false));  //  Change the state 
in EStore to complete/cancel/fail.
+        // profileEStore.delete(queryId);  // delete the ephemeral query 
profile.
+        profilePStore.put(queryId, getAsProfile(false));
+        profilePStore.putBlob(queryId, getAsProfile(true));
+        break;
+      default:
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/740aad20/exec/java-exec/src/main/resources/rest/profile/list.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/rest/profile/list.ftl 
b/exec/java-exec/src/main/resources/rest/profile/list.ftl
index 2f8cd4b..ef6b66d 100644
--- a/exec/java-exec/src/main/resources/rest/profile/list.ftl
+++ b/exec/java-exec/src/main/resources/rest/profile/list.ftl
@@ -31,14 +31,14 @@
           <tr>
             <td>${query.getTime()}</td>
             <td>
-              <a href="/profiles/${query.getQueryId()}">
+              <a href="/running_profiles/${query.getQueryId()}">
                 <div style="height:100%;width:100%">
                   ${query.getQueryId()}
                 </div>
               </a>
             </td>
             <td>
-              <a 
href="http://${query.getForeman()}:8047/profiles/${query.getQueryId()}" 
target="_blank">
+              <a 
href="http://${query.getForeman()}:8047/running_profiles/${query.getQueryId()}" 
target="_blank">
                 <div style="height:100%;width:100%">
                   ${query.getForeman()}
                 </div>

Reply via email to