Repository: incubator-drill Updated Branches: refs/heads/master 9628f9bb5 -> 81fb18a08
DRILL-1414: Move profile storage to DFS rather than using PStore Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/786fd36e Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/786fd36e Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/786fd36e Branch: refs/heads/master Commit: 786fd36eb90676673dd1ccd64ae42ae3e8166df9 Parents: 9628f9b Author: Aditya Kishore <adi...@maprtech.com> Authored: Fri Sep 26 00:42:49 2014 -0700 Committer: Steven Phillips <sphill...@maprtech.com> Committed: Sun Sep 28 00:10:38 2014 -0700 ---------------------------------------------------------------------- .../exec/store/hbase/config/HBasePStore.java | 28 +++++- .../store/hbase/config/HBasePStoreProvider.java | 8 +- .../src/resources/drill-override-example.conf | 4 + .../org/apache/drill/exec/ExecConstants.java | 2 + .../exec/server/rest/ProfileResources.java | 31 +++++-- .../exec/store/dfs/shim/DrillFileSystem.java | 1 + .../dfs/shim/fallback/FallbackFileSystem.java | 10 +- .../org/apache/drill/exec/store/sys/PStore.java | 2 + .../drill/exec/store/sys/local/LocalPStore.java | 58 +++++++++--- .../store/sys/local/NoWriteLocalPStore.java | 15 ++- .../drill/exec/store/sys/zk/ZkPStore.java | 96 +++++++++++++++----- .../exec/store/sys/zk/ZkPStoreProvider.java | 42 ++++++++- .../drill/exec/work/foreman/QueryStatus.java | 6 +- .../src/main/resources/rest/profile/list.ftl | 20 +++- 14 files changed, 261 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java index 6324180..b5a697c 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.store.hbase.config; import static org.apache.drill.exec.store.hbase.config.HBasePStoreProvider.FAMILY; +import static org.apache.drill.exec.store.hbase.config.HBasePStoreProvider.FAMILY_BLOB; import static org.apache.drill.exec.store.hbase.config.HBasePStoreProvider.QUALIFIER; import java.io.IOException; @@ -58,10 +59,19 @@ public class HBasePStore<V> implements PStore<V> { } @Override - public synchronized V get(String key) { + public V get(String key) { + return get(key, FAMILY); + } + + @Override + public V getBlob(String key) { + return get(key, FAMILY_BLOB); + } + + protected synchronized V get(String key, byte[] family) { try { Get get = new Get(row(key)); - get.addColumn(FAMILY, QUALIFIER); + get.addColumn(family, QUALIFIER); return value(table.get(get)); } catch (IOException e) { throw new DrillRuntimeException("Caught error while getting row '" + key + "' from for table:" + Bytes.toString(table.getTableName()), e); @@ -69,10 +79,19 @@ public class HBasePStore<V> implements PStore<V> { } @Override - public synchronized void put(String key, V value) { + public void put(String key, V value) { + put(key, FAMILY, value); + } + + @Override + public void putBlob(String key, V value) { + put(key, FAMILY_BLOB, value); + } + + protected synchronized void put(String key, byte[] family, V value) { try { Put put = new Put(row(key)); - put.add(FAMILY, QUALIFIER, bytes(value)); + put.add(family, QUALIFIER, bytes(value)); table.put(put); } catch (IOException e) { throw new DrillRuntimeException("Caught error while putting row '" + key + "' from for table:" + Bytes.toString(table.getTableName()), e); @@ -123,7 +142,6 @@ public class HBasePStore<V> implements PStore<V> { private void delete(byte[] row) { try { Delete del = new Delete(row); - del.deleteColumns(FAMILY, QUALIFIER); table.delete(del); } catch (IOException e) { throw new DrillRuntimeException("Caught error while deleting row '" + Bytes.toStringBinary(row) http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java index 48e6c42..b310651 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStoreProvider.java @@ -44,6 +44,8 @@ public class HBasePStoreProvider implements PStoreProvider { static final byte[] FAMILY = Bytes.toBytes("s"); + static final byte[] FAMILY_BLOB = Bytes.toBytes("t"); + static final byte[] QUALIFIER = Bytes.toBytes("d"); private final String storeTableName; @@ -86,12 +88,14 @@ public class HBasePStoreProvider implements PStoreProvider { if (!admin.tableExists(storeTableName)) { HTableDescriptor desc = new HTableDescriptor(storeTableName); desc.addFamily(new HColumnDescriptor(FAMILY).setMaxVersions(1)); + desc.addFamily(new HColumnDescriptor(FAMILY_BLOB).setMaxVersions(1)); admin.createTable(desc); } else { HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(storeTableName)); - if (!desc.hasFamily(FAMILY)) { + if (!desc.hasFamily(FAMILY) || !desc.hasFamily(FAMILY_BLOB)) { throw new DrillRuntimeException("The HBase table " + storeTableName - + " specified as persistent store exists but does not contain column family: " + Bytes.toString(FAMILY)); + + " specified as persistent store exists but does not contain column family: " + + (desc.hasFamily(FAMILY) ? Bytes.toString(FAMILY_BLOB) : Bytes.toString(FAMILY))); } } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/distribution/src/resources/drill-override-example.conf ---------------------------------------------------------------------- diff --git a/distribution/src/resources/drill-override-example.conf b/distribution/src/resources/drill-override-example.conf index 64a7bc5..4cd342a 100644 --- a/distribution/src/resources/drill-override-example.conf +++ b/distribution/src/resources/drill-override-example.conf @@ -96,6 +96,10 @@ drill.exec: { }, sys.store.provider: { class: "org.apache.drill.exec.store.sys.zk.ZkPStoreProvider", + # The following section is used by ZkPStoreProvider + zk: { + blobroot: "file:///var/log/drill" + }, # The following section is only required by LocalPStoreProvider local: { path: "/tmp/drill", http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index c210541..1717237 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -172,4 +172,6 @@ public interface ExecConstants { public static final OptionValidator ENABLE_VERBOSE_ERRORS = new BooleanValidator(ENABLE_VERBOSE_ERRORS_KEY, false); public static final String BOOTSTRAP_STORAGE_PLUGINS_FILE = "bootstrap-storage-plugins.json"; + + public static final String DRILL_SYS_FILE_SUFFIX = ".sys.drill"; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java index 9cbc2e7..5f1e49d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java @@ -47,7 +47,8 @@ import com.google.common.collect.Lists; public class ProfileResources { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProfileResources.class); - @Inject WorkManager work; + @Inject + WorkManager work; public static class ProfileInfo implements Comparable<ProfileInfo> { public static final SimpleDateFormat format = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss"); @@ -55,10 +56,12 @@ public class ProfileResources { private String queryId; private Date time; private String location; + private String foreman; - public ProfileInfo(String queryId, long time) { + public ProfileInfo(String queryId, long time, String foreman) { this.queryId = queryId; this.time = new Date(time); + this.foreman = foreman; this.location = "http://localhost:8047/profile/" + queryId + ".json"; } @@ -78,6 +81,11 @@ public class ProfileResources { public int compareTo(ProfileInfo other) { return time.compareTo(other.time); } + + public String getForeman() { + return foreman; + } + } @XmlRootElement @@ -95,7 +103,7 @@ public class ProfileResources { } public List<ProfileInfo> getFinishedQueries() { - return finishedQueries; + return finishedQueries; } } @@ -114,12 +122,12 @@ public class ProfileResources { List<ProfileInfo> runningQueries = Lists.newArrayList(); List<ProfileInfo> finishedQueries = Lists.newArrayList(); - for(Map.Entry<String, QueryProfile> entry : store){ + for (Map.Entry<String, QueryProfile> entry : store) { QueryProfile profile = entry.getValue(); if (profile.getState() == QueryState.RUNNING || profile.getState() == QueryState.PENDING) { - runningQueries.add(new ProfileInfo(entry.getKey(), profile.getStart())); + runningQueries.add(new ProfileInfo(entry.getKey(), profile.getStart(), profile.getForeman().getAddress())); } else { - finishedQueries.add(new ProfileInfo(entry.getKey(), profile.getStart())); + finishedQueries.add(new ProfileInfo(entry.getKey(), profile.getStart(), profile.getForeman().getAddress())); } } @@ -145,8 +153,12 @@ public class ProfileResources { logger.debug("Failed to get profile for: " + queryId); return QueryProfile.getDefaultInstance(); } - QueryProfile profile = store.get(queryId); - return profile == null ? QueryProfile.getDefaultInstance() : profile; + // the complete profile is now stored as blob in the PStore + QueryProfile profile = store.getBlob(queryId); + if (profile == null) { + profile = store.get(queryId); // this is to load profile data from older builds. + } + return profile == null ? QueryProfile.getDefaultInstance() : profile; } @GET @@ -186,4 +198,5 @@ public class ProfileResources { } return "Query " + queryId + " not running"; } -} \ No newline at end of file + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillFileSystem.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillFileSystem.java index a78871e..d3f9134 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillFileSystem.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillFileSystem.java @@ -39,5 +39,6 @@ public abstract class DrillFileSystem implements AutoCloseable{ public abstract FileStatus getFileStatus(Path p) throws IOException; public abstract DrillOutputStream create(Path p) throws IOException; public abstract DrillInputStream open(Path p) throws IOException; + public abstract boolean mkdirs(Path f) throws IOException; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java index 3d1b9f2..959529a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java @@ -146,6 +146,14 @@ public class FallbackFileSystem extends DrillFileSystem { } - + @Override + public boolean mkdirs(Path folderPath) throws IOException { + if (!fs.exists(folderPath)) { + return fs.mkdirs(folderPath); + } else if (!fs.getFileStatus(folderPath).isDirectory()) { + throw new IOException("The specified folder path exists and is not a folder."); + } + return false; + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java index e6cca8c..040a99d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java @@ -21,7 +21,9 @@ import java.util.Map; public interface PStore<V> extends Iterable<Map.Entry<String, V>> { public V get(String key); + public V getBlob(String key); public void put(String key, V value); + public void putBlob(String key, V value); public boolean putIfAbsent(String key, V value); public void delete(String key); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStore.java index 35e4aea..c10f862 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStore.java @@ -17,8 +17,11 @@ */ package org.apache.drill.exec.store.sys.local; +import static org.apache.drill.exec.ExecConstants.DRILL_SYS_FILE_SUFFIX; + import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; @@ -35,18 +38,20 @@ import org.apache.drill.exec.store.sys.PStoreConfig; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -public class LocalPStore<V> implements PStore<V>{ +public class LocalPStore<V> implements PStore<V> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalPStore.class); + private static final String BLOB_QUALIFIER = "blob"; + private final File basePath; + private final File blobPath; private final PStoreConfig<V> config; - private static final String SUFFIX = ".sys.drill"; - public LocalPStore(File base, PStoreConfig<V> config) { super(); this.basePath = new File(base, config.getName()); - if (!basePath.exists()) { - basePath.mkdirs(); + this.blobPath = new File(basePath, BLOB_QUALIFIER); + if (!blobPath.exists()) { + blobPath.mkdirs(); } this.config = config; } @@ -59,31 +64,38 @@ public class LocalPStore<V> implements PStore<V>{ } List<String> files = Lists.newArrayList(); for (String s : f) { - if (s.endsWith(SUFFIX)) { - files.add(s.substring(0, s.length() - SUFFIX.length())); + if (s.endsWith(DRILL_SYS_FILE_SUFFIX)) { + files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length())); } } return new Iter(files.iterator()); } - private File p(String name) throws IOException { + private File p(String name, boolean blob) throws IOException { Preconditions.checkArgument( !name.contains("/") && !name.contains(":") && !name.contains("..")); - File f = new File(basePath, name + SUFFIX); + File f = new File(blob ? blobPath : basePath, name + DRILL_SYS_FILE_SUFFIX); // do this to check file name. f.getCanonicalPath(); return f; } - - @Override public V get(String key) { - try (InputStream is = new FileInputStream(p(key))) { + return get(key, false); + } + + @Override + public V getBlob(String key) { + return get(key, true); + } + + protected V get(String key, boolean blob) { + try (InputStream is = new FileInputStream(p(key, blob))) { return config.getSerializer().deserialize(IOUtils.toByteArray(is)); } catch (IOException e) { throw new RuntimeException(e); @@ -92,7 +104,16 @@ public class LocalPStore<V> implements PStore<V>{ @Override public void put(String key, V value) { - try (OutputStream os = new FileOutputStream(p(key))) { + put(key, false, value); + } + + @Override + public void putBlob(String key, V value) { + put(key, true, value); + } + + protected void put(String key, boolean blob, V value) { + try (OutputStream os = new FileOutputStream(p(key, blob))) { IOUtils.write(config.getSerializer().serialize(value), os); } catch (IOException e) { throw new RuntimeException(e); @@ -102,7 +123,7 @@ public class LocalPStore<V> implements PStore<V>{ @Override public boolean putIfAbsent(String key, V value) { try { - File f = p(key); + File f = p(key, false); if (f.exists()) { return false; } else { @@ -117,12 +138,19 @@ public class LocalPStore<V> implements PStore<V>{ @Override public void delete(String key) { try { - p(key).delete(); + delete(key, false); + delete(key, true); } catch (IOException e) { throw new RuntimeException(e); } } + protected void delete(String key, boolean blob) throws IOException { + try { + p(key, blob).delete(); + } catch (FileNotFoundException e) { /* ignored */ } + } + private class Iter implements Iterator<Entry<String, V>>{ private Iterator<String> keys; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java index 435bd0d..d1ef931 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java @@ -30,6 +30,8 @@ public class NoWriteLocalPStore<V> implements PStore<V>{ private ConcurrentMap<String, V> map = Maps.newConcurrentMap(); + private ConcurrentMap<String, V> blobMap = Maps.newConcurrentMap(); + public NoWriteLocalPStore() { super(); } @@ -45,8 +47,18 @@ public class NoWriteLocalPStore<V> implements PStore<V>{ } @Override + public V getBlob(String key) { + return blobMap.get(key); + } + + @Override public void put(String key, V value) { - map.put(key, value); + map.put(key, value); + } + + @Override + public void putBlob(String key, V value) { + blobMap.put(key, value); } @Override @@ -57,6 +69,7 @@ public class NoWriteLocalPStore<V> implements PStore<V>{ @Override public void delete(String key) { map.remove(key); + blobMap.remove(key); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java index eb21c70..67b947a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java @@ -17,37 +17,52 @@ */ package org.apache.drill.exec.store.sys.zk; +import static org.apache.drill.exec.ExecConstants.DRILL_SYS_FILE_SUFFIX; + +import java.io.FileNotFoundException; import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.Map.Entry; +import org.apache.commons.io.IOUtils; import org.apache.curator.framework.CuratorFramework; +import org.apache.drill.exec.store.dfs.shim.DrillFileSystem; +import org.apache.drill.exec.store.dfs.shim.DrillInputStream; +import org.apache.drill.exec.store.dfs.shim.DrillOutputStream; import org.apache.drill.exec.store.sys.PStore; import org.apache.drill.exec.store.sys.PStoreConfig; +import org.apache.hadoop.fs.Path; import org.apache.zookeeper.CreateMode; import com.google.common.base.Preconditions; -public class ZkPStore<V> implements PStore<V>{ +public class ZkPStore<V> implements PStore<V> { private CuratorFramework framework; private PStoreConfig<V> config; private String prefix; private String parent; + private DrillFileSystem fs; + private Path blobPath; + private boolean blobPathCreated; - ZkPStore(CuratorFramework framework, PStoreConfig<V> config) throws IOException { + ZkPStore(CuratorFramework framework, DrillFileSystem fs, Path blobRoot, PStoreConfig<V> config) + throws IOException { this.parent = "/" + config.getName(); this.prefix = parent + "/"; this.framework = framework; this.config = config; + this.fs = fs; + this.blobPath = new Path(blobRoot, config.getName()); + this.blobPathCreated = false; // make sure the parent node exists. - try{ - if(framework.checkExists().forPath(parent) == null) { + try { + if (framework.checkExists().forPath(parent) == null) { framework.create().withMode(CreateMode.PERSISTENT).forPath(parent); } - }catch(Exception e){ + } catch (Exception e) { throw new RuntimeException("Failure while accessing Zookeeper", e); } @@ -55,69 +70,68 @@ public class ZkPStore<V> implements PStore<V>{ @Override public Iterator<Entry<String, V>> iterator() { - try{ + try { List<String> children = framework.getChildren().forPath(parent); return new Iter(children.iterator()); - }catch(Exception e){ + } catch (Exception e) { throw new RuntimeException("Failure while accessing Zookeeper", e); } - } - private String p(String key){ - Preconditions.checkArgument(!key.contains("/"), "You cannot use keys that have slashes in them when using the Zookeeper SystemTable storage interface."); + private String p(String key) { + Preconditions.checkArgument(!key.contains("/"), + "You cannot use keys that have slashes in them when using the Zookeeper SystemTable storage interface."); return prefix + key; } @Override public V get(String key) { - try{ + try { byte[] bytes = framework.getData().forPath(p(key)); - if(bytes == null){ + if (bytes == null) { return null; } return config.getSerializer().deserialize(bytes); - }catch(Exception e){ + } catch (Exception e) { throw new RuntimeException("Failure while accessing Zookeeper", e); } } @Override public void put(String key, V value) { - try{ - if(framework.checkExists().forPath(p(key)) != null) { + try { + if (framework.checkExists().forPath(p(key)) != null) { framework.setData().forPath(p(key), config.getSerializer().serialize(value)); - }else{ + } else { framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value)); } - }catch(Exception e){ + } catch (Exception e) { throw new RuntimeException("Failure while accessing Zookeeper", e); } - } @Override public boolean putIfAbsent(String key, V value) { - try{ - if(framework.checkExists().forPath(p(key)) != null) { + try { + if (framework.checkExists().forPath(p(key)) != null) { return false; - }else{ + } else { framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value)); return true; } - }catch(Exception e){ + } catch (Exception e) { throw new RuntimeException("Failure while accessing Zookeeper", e); } } @Override public void delete(String key) { - try{ + try { framework.delete().forPath(p(key)); - }catch(Exception e){ + } catch (Exception e) { throw new RuntimeException("Failure while accessing Zookeeper", e); } } @@ -177,4 +191,38 @@ public class ZkPStore<V> implements PStore<V>{ } + @Override + public V getBlob(String key) { + try (DrillInputStream is = fs.open(path(key))) { + return config.getSerializer().deserialize(IOUtils.toByteArray(is.getInputStream())); + } catch (FileNotFoundException e) { + return null; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void putBlob(String key, V value) { + try (DrillOutputStream os = fs.create(path(key))) { + IOUtils.write(config.getSerializer().serialize(value), os.getOuputStream()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private Path path(String name) throws IOException { + Preconditions.checkArgument( + !name.contains("/") && + !name.contains(":") && + !name.contains("..")); + + if (!blobPathCreated) { + fs.mkdirs(blobPath); + blobPathCreated = true; + } + + return new Path(blobPath, name + DRILL_SYS_FILE_SUFFIX); + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java index f4513c2..82a1fe7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java @@ -17,32 +17,72 @@ */ package org.apache.drill.exec.store.sys.zk; +import java.io.File; import java.io.IOException; import org.apache.curator.framework.CuratorFramework; import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.zk.ZKClusterCoordinator; import org.apache.drill.exec.exception.DrillbitStartupException; +import org.apache.drill.exec.store.dfs.shim.DrillFileSystem; +import org.apache.drill.exec.store.dfs.shim.FileSystemCreator; import org.apache.drill.exec.store.sys.PStore; import org.apache.drill.exec.store.sys.PStoreConfig; import org.apache.drill.exec.store.sys.PStoreProvider; import org.apache.drill.exec.store.sys.PStoreRegistry; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.common.annotations.VisibleForTesting; public class ZkPStoreProvider implements PStoreProvider{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkPStoreProvider.class); + private static final String DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT = "drill.exec.sys.store.provider.zk.blobroot"; + private final CuratorFramework curator; + private final DrillFileSystem fs; + + private final Path blobRoot; + public ZkPStoreProvider(PStoreRegistry registry) throws DrillbitStartupException { ClusterCoordinator coord = registry.getClusterCoordinator(); if (!(coord instanceof ZKClusterCoordinator)) { throw new DrillbitStartupException("A ZkPStoreProvider was created without a ZKClusterCoordinator."); } this.curator = ((ZKClusterCoordinator)registry.getClusterCoordinator()).getCurator(); + + if (registry.getConfig().hasPath(DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT)) { + blobRoot = new Path(registry.getConfig().getString(DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT)); + } else { + String drillLogDir = System.getenv("DRILL_LOG_DIR"); + if (drillLogDir == null) { + drillLogDir = "/var/log/drill"; + } + blobRoot = new Path(new File(drillLogDir).getAbsoluteFile().toURI()); + } + Configuration fsConf = new Configuration(); + fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, blobRoot.toUri().toString()); + try { + fs = FileSystemCreator.getFileSystem(registry.getConfig(), fsConf); + fs.mkdirs(blobRoot); + } catch (IOException e) { + throw new DrillbitStartupException("Unable to initialize blob storage.", e); + } + } + @VisibleForTesting public ZkPStoreProvider(CuratorFramework curator) { this.curator = curator; + this.fs = null; + String drillLogDir = System.getenv("DRILL_LOG_DIR"); + if (drillLogDir == null) { + drillLogDir = "/var/log/drill"; + } + blobRoot = new Path(new File(drillLogDir).getAbsoluteFile().toURI()); } @Override @@ -51,7 +91,7 @@ public class ZkPStoreProvider implements PStoreProvider{ @Override public <V> PStore<V> getPStore(PStoreConfig<V> store) throws IOException { - return new ZkPStore<V>(curator, store); + return new ZkPStore<V>(curator, fs, blobRoot, store); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java index 45a151e..dcd58df 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java @@ -122,8 +122,10 @@ public class QueryStatus { public void updateCache() { QueryState queryState = foreman.getQueryState(); - boolean fullStatus = queryState == QueryState.COMPLETED || queryState == QueryState.FAILED; - profileCache.put(queryId, getAsProfile(fullStatus)); + profileCache.put(queryId, getAsProfile(false)); + if (queryState == QueryState.COMPLETED || queryState == QueryState.FAILED) { + profileCache.putBlob(queryId, getAsProfile(true)); + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/786fd36e/exec/java-exec/src/main/resources/rest/profile/list.ftl ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/rest/profile/list.ftl b/exec/java-exec/src/main/resources/rest/profile/list.ftl index 76f9a07..2f8cd4b 100644 --- a/exec/java-exec/src/main/resources/rest/profile/list.ftl +++ b/exec/java-exec/src/main/resources/rest/profile/list.ftl @@ -23,7 +23,8 @@ <table class="table table-hover"> <thead> <td>Time</td> - <td>Query</td> + <td>Query Id</td> + <td>Foreman</td> </thead> <tbody> <#list model.getRunningQueries() as query> @@ -36,6 +37,13 @@ </div> </a> </td> + <td> + <a href="http://${query.getForeman()}:8047/profiles/${query.getQueryId()}" target="_blank"> + <div style="height:100%;width:100%"> + ${query.getForeman()} + </div> + </a> + </td> </tr> </#list> </tbody> @@ -54,7 +62,8 @@ <table class="table table-hover"> <thead> <td>Time</td> - <td>Query</td> + <td>Query Id</td> + <td>Foreman</td> </thead> <tbody> <#list model.getFinishedQueries() as query> @@ -67,6 +76,13 @@ </div> </a> </td> + <td> + <a href="http://${query.getForeman()}:8047/profiles/${query.getQueryId()}" target="_blank"> + <div style="height:100%;width:100%"> + ${query.getForeman()} + </div> + </a> + </td> </tr> </#list> </tbody>