HIVE-19033: Provide an option to purge LLAP IO cache (Prasanth Jayachandran 
reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a2eb16d0
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a2eb16d0
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a2eb16d0

Branch: refs/heads/master
Commit: a2eb16d0f2e0e1f1d939d20943983e09fed313b0
Parents: 6ba33b8
Author: Prasanth Jayachandran <prasan...@apache.org>
Authored: Sun Apr 8 16:00:30 2018 -0700
Committer: Prasanth Jayachandran <prasan...@apache.org>
Committed: Sun Apr 8 16:00:30 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   3 +-
 .../apache/hive/jdbc/TestActivePassiveHA.java   |  20 +
 .../TestJdbcWithSQLAuthorization.java           |  26 +
 .../apache/hadoop/hive/llap/io/api/LlapIo.java  |   6 +
 .../daemon/rpc/LlapDaemonProtocolProtos.java    | 907 ++++++++++++++++++-
 .../impl/LlapManagementProtocolClientImpl.java  |  11 +
 .../llap/protocol/LlapManagementProtocolPB.java |   2 +
 .../src/protobuf/LlapDaemonProtocol.proto       |   8 +
 .../hive/llap/cache/CacheContentsTracker.java   |   5 +
 .../hive/llap/cache/LowLevelCachePolicy.java    |   1 +
 .../llap/cache/LowLevelFifoCachePolicy.java     |   8 +
 .../llap/cache/LowLevelLrfuCachePolicy.java     |   8 +
 .../daemon/impl/LlapProtocolServerImpl.java     |  18 +
 .../services/impl/LlapIoMemoryServlet.java      |   2 +-
 .../hive/llap/io/api/impl/LlapIoImpl.java       |  16 +-
 .../hive/llap/cache/TestLowLevelCacheImpl.java  |   5 +
 .../hive/llap/cache/TestOrcMetadataCache.java   |   5 +
 .../ql/processors/CommandProcessorFactory.java  |   4 +
 .../hadoop/hive/ql/processors/CommandUtil.java  |  44 +
 .../hadoop/hive/ql/processors/HiveCommand.java  |  14 +
 .../processors/LlapCacheResourceProcessor.java  | 195 ++++
 .../LlapClusterResourceProcessor.java           | 134 +++
 .../authorization/plugin/HiveOperationType.java |   2 +
 .../plugin/sqlstd/Operation2Privilege.java      |   2 +
 .../SQLStdHiveAuthorizationValidator.java       |   3 +-
 .../processors/TestCommandProcessorFactory.java |  18 +-
 .../cli/operation/HiveCommandOperation.java     |  12 +-
 27 files changed, 1442 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/a2eb16d0/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index f2af61d..0627c35 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3319,7 +3319,8 @@ public class HiveConf extends Configuration {
     HIVE_SERVER2_XSRF_FILTER_ENABLED("hive.server2.xsrf.filter.enabled",false,
         "If enabled, HiveServer2 will block any requests made to it over http 
" +
         "if an X-XSRF-HEADER header is not present"),
-    HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", 
"set,reset,dfs,add,list,delete,reload,compile",
+    HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist",
+      "set,reset,dfs,add,list,delete,reload,compile,llap",
         "Comma separated list of non-SQL Hive commands users are authorized to 
execute"),
     
HIVE_SERVER2_JOB_CREDENTIAL_PROVIDER_PATH("hive.server2.job.credential.provider.path",
 "",
         "If set, this configuration property should provide a comma-separated 
list of URLs that indicates the type and " +

http://git-wip-us.apache.org/repos/asf/hive/blob/a2eb16d0/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java
index d2d0bee..c94c0e1 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java
@@ -567,4 +567,24 @@ public class TestActivePassiveHA {
     hiveConf.setBoolVar(ConfVars.HIVE_SERVER2_WEBUI_USE_PAM, false);
     hiveConf.setBoolVar(ConfVars.HIVE_IN_TEST, false);
   }
+
+  // This is test for llap command AuthZ added in HIVE-19033 which require ZK 
access for it to pass
+  @Test(timeout = 60000)
+  public void testNoAuthZLlapClusterInfo() throws Exception {
+    String instanceId1 = UUID.randomUUID().toString();
+    miniHS2_1.start(getConfOverlay(instanceId1));
+    Connection hs2Conn = getConnection(miniHS2_1.getJdbcURL(), "user1");
+    boolean caughtException = false;
+    Statement stmt = hs2Conn.createStatement();
+    try {
+      stmt.execute("set hive.llap.daemon.service.hosts=@localhost");
+      stmt.execute("llap cluster -info");
+    } catch (SQLException e) {
+      caughtException = true;
+    } finally {
+      stmt.close();
+      hs2Conn.close();
+    }
+    assertEquals(false, caughtException);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/a2eb16d0/itests/hive-unit/src/test/java/org/apache/hive/jdbc/authorization/TestJdbcWithSQLAuthorization.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/authorization/TestJdbcWithSQLAuthorization.java
 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/authorization/TestJdbcWithSQLAuthorization.java
index 9e021ea..6d5c743 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/authorization/TestJdbcWithSQLAuthorization.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/authorization/TestJdbcWithSQLAuthorization.java
@@ -18,6 +18,8 @@
 
 package org.apache.hive.jdbc.authorization;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -159,6 +161,30 @@ public class TestJdbcWithSQLAuthorization {
   }
 
   @Test
+  public void testAuthZFailureLlapCachePurge() throws Exception {
+    // using different code blocks so that jdbc variables are not accidently 
re-used
+    // between the actions. Different connection/statement object should be 
used for each action.
+    {
+      Connection hs2Conn = getConnection("user1");
+      boolean caughtException = false;
+      Statement stmt = hs2Conn.createStatement();
+      try {
+        stmt.execute("llap cache -purge");
+      } catch (SQLException e) {
+        caughtException = true;
+        String msg = "Error while processing statement: Permission denied: 
Principal [name=user1, type=USER] " +
+          "does not have following privileges for operation LLAP_CACHE [[ADMIN 
PRIVILEGE] on Object " +
+          "[type=COMMAND_PARAMS, name=[-purge]], [ADMIN PRIVILEGE] on Object 
[type=SERVICE_NAME, name=localhost]]";
+        assertEquals(msg, e.getMessage());
+      } finally {
+        stmt.close();
+        hs2Conn.close();
+      }
+      assertTrue("Exception expected ", caughtException);
+    }
+  }
+
+  @Test
   public void testBlackListedUdfUsage() throws Exception {
 
     // create tables as user1

http://git-wip-us.apache.org/repos/asf/hive/blob/a2eb16d0/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java 
b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
index 6e6f5b9..e5c4a00 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
@@ -27,5 +27,11 @@ public interface LlapIo<T> {
       InputFormat<?, ?> sourceInputFormat, Deserializer serde);
   void close();
   String getMemoryInfo();
+
+  /**
+   * purge is best effort and will just release the buffers that are unlocked 
(refCount == 0). This is typically
+   * called when the system is idle.
+   */
+  long purge();
   void initCacheOnlyInputFormat(InputFormat<?, ?> inputFormat);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/a2eb16d0/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
----------------------------------------------------------------------
diff --git 
a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
 
b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
index 4753812..8fecc1e 100644
--- 
a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
+++ 
b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
@@ -18129,6 +18129,778 @@ public final class LlapDaemonProtocolProtos {
     // @@protoc_insertion_point(class_scope:LlapOutputSocketInitMessage)
   }
 
+  public interface PurgeCacheRequestProtoOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+  }
+  /**
+   * Protobuf type {@code PurgeCacheRequestProto}
+   */
+  public static final class PurgeCacheRequestProto extends
+      com.google.protobuf.GeneratedMessage
+      implements PurgeCacheRequestProtoOrBuilder {
+    // Use PurgeCacheRequestProto.newBuilder() to construct.
+    private 
PurgeCacheRequestProto(com.google.protobuf.GeneratedMessage.Builder<?> builder) 
{
+      super(builder);
+      this.unknownFields = builder.getUnknownFields();
+    }
+    private PurgeCacheRequestProto(boolean noInit) { this.unknownFields = 
com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+    private static final PurgeCacheRequestProto defaultInstance;
+    public static PurgeCacheRequestProto getDefaultInstance() {
+      return defaultInstance;
+    }
+
+    public PurgeCacheRequestProto getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+
+    private final com.google.protobuf.UnknownFieldSet unknownFields;
+    @java.lang.Override
+    public final com.google.protobuf.UnknownFieldSet
+        getUnknownFields() {
+      return this.unknownFields;
+    }
+    private PurgeCacheRequestProto(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      initFields();
+      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder();
+      try {
+        boolean done = false;
+        while (!done) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              done = true;
+              break;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                done = true;
+              }
+              break;
+            }
+          }
+        }
+      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+        throw e.setUnfinishedMessage(this);
+      } catch (java.io.IOException e) {
+        throw new com.google.protobuf.InvalidProtocolBufferException(
+            e.getMessage()).setUnfinishedMessage(this);
+      } finally {
+        this.unknownFields = unknownFields.build();
+        makeExtensionsImmutable();
+      }
+    }
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_PurgeCacheRequestProto_descriptor;
+    }
+
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_PurgeCacheRequestProto_fieldAccessorTable
+          .ensureFieldAccessorsInitialized(
+              
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto.class,
 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto.Builder.class);
+    }
+
+    public static com.google.protobuf.Parser<PurgeCacheRequestProto> PARSER =
+        new com.google.protobuf.AbstractParser<PurgeCacheRequestProto>() {
+      public PurgeCacheRequestProto parsePartialFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return new PurgeCacheRequestProto(input, extensionRegistry);
+      }
+    };
+
+    @java.lang.Override
+    public com.google.protobuf.Parser<PurgeCacheRequestProto> 
getParserForType() {
+      return PARSER;
+    }
+
+    private void initFields() {
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+
+      memoizedIsInitialized = 1;
+      return true;
+    }
+
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      getUnknownFields().writeTo(output);
+    }
+
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+
+      size = 0;
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+
+    @java.lang.Override
+    public boolean equals(final java.lang.Object obj) {
+      if (obj == this) {
+       return true;
+      }
+      if (!(obj instanceof 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto))
 {
+        return super.equals(obj);
+      }
+      
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto
 other = 
(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto)
 obj;
+
+      boolean result = true;
+      result = result &&
+          getUnknownFields().equals(other.getUnknownFields());
+      return result;
+    }
+
+    private int memoizedHashCode = 0;
+    @java.lang.Override
+    public int hashCode() {
+      if (memoizedHashCode != 0) {
+        return memoizedHashCode;
+      }
+      int hash = 41;
+      hash = (19 * hash) + getDescriptorForType().hashCode();
+      hash = (29 * hash) + getUnknownFields().hashCode();
+      memoizedHashCode = hash;
+      return hash;
+    }
+
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto
 parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto
 parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto
 parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto
 parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto
 parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto
 parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto
 parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input);
+    }
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto
 parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input, extensionRegistry);
+    }
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto
 parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto
 parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder 
newBuilder(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto
 prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    /**
+     * Protobuf type {@code PurgeCacheRequestProto}
+     */
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProtoOrBuilder
 {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_PurgeCacheRequestProto_descriptor;
+      }
+
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_PurgeCacheRequestProto_fieldAccessorTable
+            .ensureFieldAccessorsInitialized(
+                
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto.class,
 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto.Builder.class);
+      }
+
+      // Construct using 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+
+      private Builder(
+          com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+
+      public Builder clear() {
+        super.clear();
+        return this;
+      }
+
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_PurgeCacheRequestProto_descriptor;
+      }
+
+      public 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto
 getDefaultInstanceForType() {
+        return 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto.getDefaultInstance();
+      }
+
+      public 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto
 build() {
+        
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto
 result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+
+      public 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto
 buildPartial() {
+        
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto
 result = new 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto(this);
+        onBuilt();
+        return result;
+      }
+
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto)
 {
+          return 
mergeFrom((org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+
+      public Builder 
mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto
 other) {
+        if (other == 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto.getDefaultInstance())
 return this;
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+
+      public final boolean isInitialized() {
+        return true;
+      }
+
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto
 parsedMessage = null;
+        try {
+          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+          parsedMessage = 
(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto)
 e.getUnfinishedMessage();
+          throw e;
+        } finally {
+          if (parsedMessage != null) {
+            mergeFrom(parsedMessage);
+          }
+        }
+        return this;
+      }
+
+      // @@protoc_insertion_point(builder_scope:PurgeCacheRequestProto)
+    }
+
+    static {
+      defaultInstance = new PurgeCacheRequestProto(true);
+      defaultInstance.initFields();
+    }
+
+    // @@protoc_insertion_point(class_scope:PurgeCacheRequestProto)
+  }
+
+  public interface PurgeCacheResponseProtoOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+
+    // optional int64 purged_memory_bytes = 1;
+    /**
+     * <code>optional int64 purged_memory_bytes = 1;</code>
+     */
+    boolean hasPurgedMemoryBytes();
+    /**
+     * <code>optional int64 purged_memory_bytes = 1;</code>
+     */
+    long getPurgedMemoryBytes();
+  }
+  /**
+   * Protobuf type {@code PurgeCacheResponseProto}
+   */
+  public static final class PurgeCacheResponseProto extends
+      com.google.protobuf.GeneratedMessage
+      implements PurgeCacheResponseProtoOrBuilder {
+    // Use PurgeCacheResponseProto.newBuilder() to construct.
+    private 
PurgeCacheResponseProto(com.google.protobuf.GeneratedMessage.Builder<?> 
builder) {
+      super(builder);
+      this.unknownFields = builder.getUnknownFields();
+    }
+    private PurgeCacheResponseProto(boolean noInit) { this.unknownFields = 
com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+    private static final PurgeCacheResponseProto defaultInstance;
+    public static PurgeCacheResponseProto getDefaultInstance() {
+      return defaultInstance;
+    }
+
+    public PurgeCacheResponseProto getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+
+    private final com.google.protobuf.UnknownFieldSet unknownFields;
+    @java.lang.Override
+    public final com.google.protobuf.UnknownFieldSet
+        getUnknownFields() {
+      return this.unknownFields;
+    }
+    private PurgeCacheResponseProto(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      initFields();
+      int mutable_bitField0_ = 0;
+      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder();
+      try {
+        boolean done = false;
+        while (!done) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              done = true;
+              break;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                done = true;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              purgedMemoryBytes_ = input.readInt64();
+              break;
+            }
+          }
+        }
+      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+        throw e.setUnfinishedMessage(this);
+      } catch (java.io.IOException e) {
+        throw new com.google.protobuf.InvalidProtocolBufferException(
+            e.getMessage()).setUnfinishedMessage(this);
+      } finally {
+        this.unknownFields = unknownFields.build();
+        makeExtensionsImmutable();
+      }
+    }
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_PurgeCacheResponseProto_descriptor;
+    }
+
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_PurgeCacheResponseProto_fieldAccessorTable
+          .ensureFieldAccessorsInitialized(
+              
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto.class,
 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto.Builder.class);
+    }
+
+    public static com.google.protobuf.Parser<PurgeCacheResponseProto> PARSER =
+        new com.google.protobuf.AbstractParser<PurgeCacheResponseProto>() {
+      public PurgeCacheResponseProto parsePartialFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return new PurgeCacheResponseProto(input, extensionRegistry);
+      }
+    };
+
+    @java.lang.Override
+    public com.google.protobuf.Parser<PurgeCacheResponseProto> 
getParserForType() {
+      return PARSER;
+    }
+
+    private int bitField0_;
+    // optional int64 purged_memory_bytes = 1;
+    public static final int PURGED_MEMORY_BYTES_FIELD_NUMBER = 1;
+    private long purgedMemoryBytes_;
+    /**
+     * <code>optional int64 purged_memory_bytes = 1;</code>
+     */
+    public boolean hasPurgedMemoryBytes() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>optional int64 purged_memory_bytes = 1;</code>
+     */
+    public long getPurgedMemoryBytes() {
+      return purgedMemoryBytes_;
+    }
+
+    private void initFields() {
+      purgedMemoryBytes_ = 0L;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+
+      memoizedIsInitialized = 1;
+      return true;
+    }
+
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeInt64(1, purgedMemoryBytes_);
+      }
+      getUnknownFields().writeTo(output);
+    }
+
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(1, purgedMemoryBytes_);
+      }
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+
+    @java.lang.Override
+    public boolean equals(final java.lang.Object obj) {
+      if (obj == this) {
+       return true;
+      }
+      if (!(obj instanceof 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto))
 {
+        return super.equals(obj);
+      }
+      
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto
 other = 
(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto)
 obj;
+
+      boolean result = true;
+      result = result && (hasPurgedMemoryBytes() == 
other.hasPurgedMemoryBytes());
+      if (hasPurgedMemoryBytes()) {
+        result = result && (getPurgedMemoryBytes()
+            == other.getPurgedMemoryBytes());
+      }
+      result = result &&
+          getUnknownFields().equals(other.getUnknownFields());
+      return result;
+    }
+
+    private int memoizedHashCode = 0;
+    @java.lang.Override
+    public int hashCode() {
+      if (memoizedHashCode != 0) {
+        return memoizedHashCode;
+      }
+      int hash = 41;
+      hash = (19 * hash) + getDescriptorForType().hashCode();
+      if (hasPurgedMemoryBytes()) {
+        hash = (37 * hash) + PURGED_MEMORY_BYTES_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getPurgedMemoryBytes());
+      }
+      hash = (29 * hash) + getUnknownFields().hashCode();
+      memoizedHashCode = hash;
+      return hash;
+    }
+
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto
 parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto
 parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto
 parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto
 parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto
 parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto
 parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto
 parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input);
+    }
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto
 parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input, extensionRegistry);
+    }
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto
 parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto
 parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder 
newBuilder(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto
 prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    /**
+     * Protobuf type {@code PurgeCacheResponseProto}
+     */
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProtoOrBuilder
 {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_PurgeCacheResponseProto_descriptor;
+      }
+
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_PurgeCacheResponseProto_fieldAccessorTable
+            .ensureFieldAccessorsInitialized(
+                
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto.class,
 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto.Builder.class);
+      }
+
+      // Construct using 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+
+      private Builder(
+          com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+
+      public Builder clear() {
+        super.clear();
+        purgedMemoryBytes_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        return this;
+      }
+
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_PurgeCacheResponseProto_descriptor;
+      }
+
+      public 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto
 getDefaultInstanceForType() {
+        return 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto.getDefaultInstance();
+      }
+
+      public 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto
 build() {
+        
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto
 result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+
+      public 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto
 buildPartial() {
+        
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto
 result = new 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.purgedMemoryBytes_ = purgedMemoryBytes_;
+        result.bitField0_ = to_bitField0_;
+        onBuilt();
+        return result;
+      }
+
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto)
 {
+          return 
mergeFrom((org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+
+      public Builder 
mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto
 other) {
+        if (other == 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto.getDefaultInstance())
 return this;
+        if (other.hasPurgedMemoryBytes()) {
+          setPurgedMemoryBytes(other.getPurgedMemoryBytes());
+        }
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+
+      public final boolean isInitialized() {
+        return true;
+      }
+
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto
 parsedMessage = null;
+        try {
+          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+          parsedMessage = 
(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto)
 e.getUnfinishedMessage();
+          throw e;
+        } finally {
+          if (parsedMessage != null) {
+            mergeFrom(parsedMessage);
+          }
+        }
+        return this;
+      }
+      private int bitField0_;
+
+      // optional int64 purged_memory_bytes = 1;
+      private long purgedMemoryBytes_ ;
+      /**
+       * <code>optional int64 purged_memory_bytes = 1;</code>
+       */
+      public boolean hasPurgedMemoryBytes() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      /**
+       * <code>optional int64 purged_memory_bytes = 1;</code>
+       */
+      public long getPurgedMemoryBytes() {
+        return purgedMemoryBytes_;
+      }
+      /**
+       * <code>optional int64 purged_memory_bytes = 1;</code>
+       */
+      public Builder setPurgedMemoryBytes(long value) {
+        bitField0_ |= 0x00000001;
+        purgedMemoryBytes_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 purged_memory_bytes = 1;</code>
+       */
+      public Builder clearPurgedMemoryBytes() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        purgedMemoryBytes_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // @@protoc_insertion_point(builder_scope:PurgeCacheResponseProto)
+    }
+
+    static {
+      defaultInstance = new PurgeCacheResponseProto(true);
+      defaultInstance.initFields();
+    }
+
+    // @@protoc_insertion_point(class_scope:PurgeCacheResponseProto)
+  }
+
   /**
    * Protobuf service {@code LlapDaemonProtocol}
    */
@@ -18661,6 +19433,14 @@ public final class LlapDaemonProtocolProtos {
           
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto
 request,
           
com.google.protobuf.RpcCallback<org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto>
 done);
 
+      /**
+       * <code>rpc purgeCache(.PurgeCacheRequestProto) returns 
(.PurgeCacheResponseProto);</code>
+       */
+      public abstract void purgeCache(
+          com.google.protobuf.RpcController controller,
+          
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto
 request,
+          
com.google.protobuf.RpcCallback<org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto>
 done);
+
     }
 
     public static com.google.protobuf.Service newReflectiveService(
@@ -18674,6 +19454,14 @@ public final class LlapDaemonProtocolProtos {
           impl.getDelegationToken(controller, request, done);
         }
 
+        @java.lang.Override
+        public  void purgeCache(
+            com.google.protobuf.RpcController controller,
+            
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto
 request,
+            
com.google.protobuf.RpcCallback<org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto>
 done) {
+          impl.purgeCache(controller, request, done);
+        }
+
       };
     }
 
@@ -18698,6 +19486,8 @@ public final class LlapDaemonProtocolProtos {
           switch(method.getIndex()) {
             case 0:
               return impl.getDelegationToken(controller, 
(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto)request);
+            case 1:
+              return impl.purgeCache(controller, 
(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto)request);
             default:
               throw new java.lang.AssertionError("Can't get here.");
           }
@@ -18714,6 +19504,8 @@ public final class LlapDaemonProtocolProtos {
           switch(method.getIndex()) {
             case 0:
               return 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto.getDefaultInstance();
+            case 1:
+              return 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto.getDefaultInstance();
             default:
               throw new java.lang.AssertionError("Can't get here.");
           }
@@ -18730,6 +19522,8 @@ public final class LlapDaemonProtocolProtos {
           switch(method.getIndex()) {
             case 0:
               return 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.getDefaultInstance();
+            case 1:
+              return 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto.getDefaultInstance();
             default:
               throw new java.lang.AssertionError("Can't get here.");
           }
@@ -18746,6 +19540,14 @@ public final class LlapDaemonProtocolProtos {
         
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto
 request,
         
com.google.protobuf.RpcCallback<org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto>
 done);
 
+    /**
+     * <code>rpc purgeCache(.PurgeCacheRequestProto) returns 
(.PurgeCacheResponseProto);</code>
+     */
+    public abstract void purgeCache(
+        com.google.protobuf.RpcController controller,
+        
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto
 request,
+        
com.google.protobuf.RpcCallback<org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto>
 done);
+
     public static final
         com.google.protobuf.Descriptors.ServiceDescriptor
         getDescriptor() {
@@ -18773,6 +19575,11 @@ public final class LlapDaemonProtocolProtos {
             
com.google.protobuf.RpcUtil.<org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto>specializeCallback(
               done));
           return;
+        case 1:
+          this.purgeCache(controller, 
(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto)request,
+            
com.google.protobuf.RpcUtil.<org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto>specializeCallback(
+              done));
+          return;
         default:
           throw new java.lang.AssertionError("Can't get here.");
       }
@@ -18789,6 +19596,8 @@ public final class LlapDaemonProtocolProtos {
       switch(method.getIndex()) {
         case 0:
           return 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto.getDefaultInstance();
+        case 1:
+          return 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto.getDefaultInstance();
         default:
           throw new java.lang.AssertionError("Can't get here.");
       }
@@ -18805,6 +19614,8 @@ public final class LlapDaemonProtocolProtos {
       switch(method.getIndex()) {
         case 0:
           return 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.getDefaultInstance();
+        case 1:
+          return 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto.getDefaultInstance();
         default:
           throw new java.lang.AssertionError("Can't get here.");
       }
@@ -18840,6 +19651,21 @@ public final class LlapDaemonProtocolProtos {
             
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.class,
             
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.getDefaultInstance()));
       }
+
+      public  void purgeCache(
+          com.google.protobuf.RpcController controller,
+          
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto
 request,
+          
com.google.protobuf.RpcCallback<org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto>
 done) {
+        channel.callMethod(
+          getDescriptor().getMethods().get(1),
+          controller,
+          request,
+          
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto.getDefaultInstance(),
+          com.google.protobuf.RpcUtil.generalizeCallback(
+            done,
+            
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto.class,
+            
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto.getDefaultInstance()));
+      }
     }
 
     public static BlockingInterface newBlockingStub(
@@ -18852,6 +19678,11 @@ public final class LlapDaemonProtocolProtos {
           com.google.protobuf.RpcController controller,
           
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto
 request)
           throws com.google.protobuf.ServiceException;
+
+      public 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto
 purgeCache(
+          com.google.protobuf.RpcController controller,
+          
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto
 request)
+          throws com.google.protobuf.ServiceException;
     }
 
     private static final class BlockingStub implements BlockingInterface {
@@ -18872,6 +19703,18 @@ public final class LlapDaemonProtocolProtos {
           
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto.getDefaultInstance());
       }
 
+
+      public 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto
 purgeCache(
+          com.google.protobuf.RpcController controller,
+          
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheRequestProto
 request)
+          throws com.google.protobuf.ServiceException {
+        return 
(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto)
 channel.callBlockingMethod(
+          getDescriptor().getMethods().get(1),
+          controller,
+          request,
+          
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.PurgeCacheResponseProto.getDefaultInstance());
+      }
+
     }
 
     // @@protoc_insertion_point(class_scope:LlapManagementProtocol)
@@ -18987,6 +19830,16 @@ public final class LlapDaemonProtocolProtos {
   private static
     com.google.protobuf.GeneratedMessage.FieldAccessorTable
       internal_static_LlapOutputSocketInitMessage_fieldAccessorTable;
+  private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_PurgeCacheRequestProto_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_PurgeCacheRequestProto_fieldAccessorTable;
+  private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_PurgeCacheResponseProto_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_PurgeCacheResponseProto_fieldAccessorTable;
 
   public static com.google.protobuf.Descriptors.FileDescriptor
       getDescriptor() {
@@ -19063,25 +19916,29 @@ public final class LlapDaemonProtocolProtos {
       " \001(\010\"&\n\024GetTokenRequestProto\022\016\n\006app_id\030\001" +
       " \001(\t\"&\n\025GetTokenResponseProto\022\r\n\005token\030\001" +
       " \001(\014\"A\n\033LlapOutputSocketInitMessage\022\023\n\013f" +
-      "ragment_id\030\001 \002(\t\022\r\n\005token\030\002 
\001(\014*2\n\020Sourc" +
-      "eStateProto\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNIN" +
-      "G\020\002*E\n\024SubmissionStateProto\022\014\n\010ACCEPTED\020",
-      
"\001\022\014\n\010REJECTED\020\002\022\021\n\rEVICTED_OTHER\020\0032\233\003\n\022L"
 +
-      "lapDaemonProtocol\022?\n\nsubmitWork\022\027.Submit" +
-      "WorkRequestProto\032\030.SubmitWorkResponsePro" +
-      "to\022W\n\022sourceStateUpdated\022\037.SourceStateUp" +
-      "datedRequestProto\032 .SourceStateUpdatedRe" +
-      "sponseProto\022H\n\rqueryComplete\022\032.QueryComp" +
-      "leteRequestProto\032\033.QueryCompleteResponse" +
-      "Proto\022T\n\021terminateFragment\022\036.TerminateFr" +
-      "agmentRequestProto\032\037.TerminateFragmentRe" +
-      "sponseProto\022K\n\016updateFragment\022\033.UpdateFr",
-      "agmentRequestProto\032\034.UpdateFragmentRespo" +
-      "nseProto2]\n\026LlapManagementProtocol\022C\n\022ge" +
-      "tDelegationToken\022\025.GetTokenRequestProto\032" +
-      "\026.GetTokenResponseProtoBH\n&org.apache.ha" +
-      "doop.hive.llap.daemon.rpcB\030LlapDaemonPro" +
-      "tocolProtos\210\001\001\240\001\001"
+      "ragment_id\030\001 \002(\t\022\r\n\005token\030\002 
\001(\014\"\030\n\026Purge" +
+      "CacheRequestProto\"6\n\027PurgeCacheResponseP" +
+      "roto\022\033\n\023purged_memory_bytes\030\001 \001(\003*2\n\020Sou",
+      "rceStateProto\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNN" +
+      "ING\020\002*E\n\024SubmissionStateProto\022\014\n\010ACCEPTE" +
+      
"D\020\001\022\014\n\010REJECTED\020\002\022\021\n\rEVICTED_OTHER\020\0032\233\003\n"
 +
+      "\022LlapDaemonProtocol\022?\n\nsubmitWork\022\027.Subm" +
+      "itWorkRequestProto\032\030.SubmitWorkResponseP" +
+      "roto\022W\n\022sourceStateUpdated\022\037.SourceState" +
+      "UpdatedRequestProto\032 .SourceStateUpdated" +
+      "ResponseProto\022H\n\rqueryComplete\022\032.QueryCo" +
+      "mpleteRequestProto\032\033.QueryCompleteRespon" +
+      "seProto\022T\n\021terminateFragment\022\036.Terminate",
+      "FragmentRequestProto\032\037.TerminateFragment" +
+      "ResponseProto\022K\n\016updateFragment\022\033.Update" +
+      "FragmentRequestProto\032\034.UpdateFragmentRes" +
+      "ponseProto2\236\001\n\026LlapManagementProtocol\022C\n" +
+      "\022getDelegationToken\022\025.GetTokenRequestPro" +
+      "to\032\026.GetTokenResponseProto\022?\n\npurgeCache" +
+      "\022\027.PurgeCacheRequestProto\032\030.PurgeCacheRe" +
+      "sponseProtoBH\n&org.apache.hadoop.hive.ll" +
+      "ap.daemon.rpcB\030LlapDaemonProtocolProtos\210" +
+      "\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -19220,6 +20077,18 @@ public final class LlapDaemonProtocolProtos {
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_LlapOutputSocketInitMessage_descriptor,
               new java.lang.String[] { "FragmentId", "Token", });
+          internal_static_PurgeCacheRequestProto_descriptor =
+            getDescriptor().getMessageTypes().get(22);
+          internal_static_PurgeCacheRequestProto_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_PurgeCacheRequestProto_descriptor,
+              new java.lang.String[] { });
+          internal_static_PurgeCacheResponseProto_descriptor =
+            getDescriptor().getMessageTypes().get(23);
+          internal_static_PurgeCacheResponseProto_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_PurgeCacheResponseProto_descriptor,
+              new java.lang.String[] { "PurgedMemoryBytes", });
           return null;
         }
       };

http://git-wip-us.apache.org/repos/asf/hive/blob/a2eb16d0/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java
 
b/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java
index af760b1..2caae82 100644
--- 
a/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java
+++ 
b/llap-common/src/java/org/apache/hadoop/hive/llap/impl/LlapManagementProtocolClientImpl.java
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.ProtocolProxy;
@@ -78,4 +79,14 @@ public class LlapManagementProtocolClientImpl implements 
LlapManagementProtocolP
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public LlapDaemonProtocolProtos.PurgeCacheResponseProto purgeCache(final 
RpcController controller,
+    final LlapDaemonProtocolProtos.PurgeCacheRequestProto request) throws 
ServiceException {
+    try {
+      return getProxy().purgeCache(null, request);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/a2eb16d0/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapManagementProtocolPB.java
----------------------------------------------------------------------
diff --git 
a/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapManagementProtocolPB.java
 
b/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapManagementProtocolPB.java
index ff215d4..717f45d 100644
--- 
a/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapManagementProtocolPB.java
+++ 
b/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapManagementProtocolPB.java
@@ -19,6 +19,8 @@ import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import org.apache.hadoop.ipc.ProtocolInfo;
 import org.apache.hadoop.security.KerberosInfo;
 
+import com.google.protobuf.ServiceException;
+
 @ProtocolInfo(protocolName = 
"org.apache.hadoop.hive.llap.protocol.LlapManagementProtocolPB", 
protocolVersion = 1)
 @KerberosInfo(serverPrincipal = 
HiveConf.HIVE_LLAP_DAEMON_SERVICE_PRINCIPAL_NAME)
 @InterfaceAudience.Private

http://git-wip-us.apache.org/repos/asf/hive/blob/a2eb16d0/llap-common/src/protobuf/LlapDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/llap-common/src/protobuf/LlapDaemonProtocol.proto 
b/llap-common/src/protobuf/LlapDaemonProtocol.proto
index 12beca5..d70dd41 100644
--- a/llap-common/src/protobuf/LlapDaemonProtocol.proto
+++ b/llap-common/src/protobuf/LlapDaemonProtocol.proto
@@ -196,6 +196,13 @@ message LlapOutputSocketInitMessage {
   optional bytes token = 2;
 }
 
+message PurgeCacheRequestProto {
+}
+
+message PurgeCacheResponseProto {
+  optional int64 purged_memory_bytes = 1;
+}
+
 service LlapDaemonProtocol {
   rpc submitWork(SubmitWorkRequestProto) returns (SubmitWorkResponseProto);
   rpc sourceStateUpdated(SourceStateUpdatedRequestProto) returns 
(SourceStateUpdatedResponseProto);
@@ -206,4 +213,5 @@ service LlapDaemonProtocol {
 
 service LlapManagementProtocol {
   rpc getDelegationToken(GetTokenRequestProto) returns (GetTokenResponseProto);
+  rpc purgeCache(PurgeCacheRequestProto) returns (PurgeCacheResponseProto);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/a2eb16d0/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java
index 4fbaac1..6a361fa 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java
@@ -183,6 +183,11 @@ public class CacheContentsTracker implements 
LowLevelCachePolicy, EvictionListen
     realPolicy.setParentDebugDumper(dumper);
   }
 
+  @Override
+  public long purge() {
+    return realPolicy.purge();
+  }
+
 
   @Override
   public long evictSomeBlocks(long memoryToReserve) {

http://git-wip-us.apache.org/repos/asf/hive/blob/a2eb16d0/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
index 2cd70b9..3323636 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
@@ -27,4 +27,5 @@ public interface LowLevelCachePolicy extends LlapOomDebugDump 
{
   long evictSomeBlocks(long memoryToReserve);
   void setEvictionListener(EvictionListener listener);
   void setParentDebugDumper(LlapOomDebugDump dumper);
+  long purge();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/a2eb16d0/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
index 50a2411..f7f80a8 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
@@ -24,6 +24,7 @@ import java.util.LinkedList;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.hadoop.hive.llap.LlapUtil;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 
@@ -70,6 +71,13 @@ public class LowLevelFifoCachePolicy implements 
LowLevelCachePolicy {
   }
 
   @Override
+  public long purge() {
+    long evicted = evictSomeBlocks(Long.MAX_VALUE);
+    LlapIoImpl.LOG.info("PURGE: evicted {} from FIFO policy", 
LlapUtil.humanReadableByteCount(evicted));
+    return evicted;
+  }
+
+  @Override
   public long evictSomeBlocks(long memoryToReserve) {
     return evictInternal(memoryToReserve, -1);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/a2eb16d0/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
index b42f761..7787cb4 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
@@ -24,6 +24,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapUtil;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 
@@ -177,6 +178,13 @@ public class LowLevelLrfuCachePolicy implements 
LowLevelCachePolicy {
     this.parentDebugDump = dumper;
   }
 
+  @Override
+  public long purge() {
+    long evicted = evictSomeBlocks(Long.MAX_VALUE);
+    LlapIoImpl.LOG.info("PURGE: evicted {} from LRFU policy", 
LlapUtil.humanReadableByteCount(evicted));
+    return evicted;
+  }
+
 
   @Override
   public long evictSomeBlocks(long memoryToReserve) {

http://git-wip-us.apache.org/repos/asf/hive/blob/a2eb16d0/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
index 81785f0..d856b25 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
@@ -25,6 +25,9 @@ import com.google.protobuf.BlockingService;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
+
+import org.apache.hadoop.hive.llap.io.api.LlapIo;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -62,6 +65,7 @@ public class LlapProtocolServerImpl extends AbstractService
     implements LlapProtocolBlockingPB, LlapManagementProtocolPB {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(LlapProtocolServerImpl.class);
+
   private enum TokenRequiresSigning {
     TRUE, FALSE, EXCEPT_OWNER
   }
@@ -272,6 +276,20 @@ public class LlapProtocolServerImpl extends AbstractService
     return response;
   }
 
+  @Override
+  public LlapDaemonProtocolProtos.PurgeCacheResponseProto purgeCache(final 
RpcController controller,
+    final LlapDaemonProtocolProtos.PurgeCacheRequestProto request) throws 
ServiceException {
+    LlapDaemonProtocolProtos.PurgeCacheResponseProto.Builder 
responseProtoBuilder = LlapDaemonProtocolProtos
+      .PurgeCacheResponseProto.newBuilder();
+    LlapIo<?> llapIo = LlapProxy.getIo();
+    if (llapIo != null) {
+      responseProtoBuilder.setPurgedMemoryBytes(llapIo.purge());
+    } else {
+      responseProtoBuilder.setPurgedMemoryBytes(0);
+    }
+    return responseProtoBuilder.build();
+  }
+
   private boolean determineIfSigningIsRequired(UserGroupInformation 
callingUser) {
     switch (isSigningRequiredConfig) {
     case FALSE: return false;

http://git-wip-us.apache.org/repos/asf/hive/blob/a2eb16d0/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapIoMemoryServlet.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapIoMemoryServlet.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapIoMemoryServlet.java
index 153ab35..8877565 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapIoMemoryServlet.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapIoMemoryServlet.java
@@ -59,7 +59,7 @@ public class LlapIoMemoryServlet extends HttpServlet {
         return;
       }
       PrintWriter writer = null;
- 
+
       try {
         response.setContentType("text/plain; charset=utf8");
         response.setHeader(ACCESS_CONTROL_ALLOW_METHODS, "GET");

http://git-wip-us.apache.org/repos/asf/hive/blob/a2eb16d0/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index e5bc3c2..747b399 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -86,7 +86,6 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> 
{
   public static final Logger ORC_LOGGER = LoggerFactory.getLogger("LlapIoOrc");
   public static final Logger CACHE_LOGGER = 
LoggerFactory.getLogger("LlapIoCache");
   public static final Logger LOCKING_LOGGER = 
LoggerFactory.getLogger("LlapIoLocking");
-
   private static final String MODE_CACHE = "cache";
 
   // TODO: later, we may have a map
@@ -101,6 +100,7 @@ public class LlapIoImpl implements 
LlapIo<VectorizedRowBatch> {
   private final LowLevelCache dataCache;
   private final BufferUsageManager bufferManager;
   private final Configuration daemonConf;
+  private LowLevelCachePolicy cachePolicy;
 
   private LlapIoImpl(Configuration conf) throws IOException {
     this.daemonConf = conf;
@@ -139,11 +139,13 @@ public class LlapIoImpl implements 
LlapIo<VectorizedRowBatch> {
       boolean useLrfu = HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.LLAP_USE_LRFU);
       long totalMemorySize = HiveConf.getSizeVar(conf, 
ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
       int minAllocSize = (int)HiveConf.getSizeVar(conf, 
ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
-      LowLevelCachePolicy cachePolicy = useLrfu ? new LowLevelLrfuCachePolicy(
+      LowLevelCachePolicy cp = useLrfu ? new LowLevelLrfuCachePolicy(
           minAllocSize, totalMemorySize, conf) : new LowLevelFifoCachePolicy();
       boolean trackUsage = HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.LLAP_TRACK_CACHE_USAGE);
       if (trackUsage) {
-        cachePolicy = new CacheContentsTracker(cachePolicy);
+        this.cachePolicy = new CacheContentsTracker(cp);
+      } else {
+        this.cachePolicy = cp;
       }
       // Allocator uses memory manager to request memory, so create the 
manager next.
       LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager(
@@ -212,6 +214,14 @@ public class LlapIoImpl implements 
LlapIo<VectorizedRowBatch> {
   }
 
   @Override
+  public long purge() {
+    if (cachePolicy != null) {
+      return cachePolicy.purge();
+    }
+    return 0;
+  }
+
+  @Override
   public InputFormat<NullWritable, VectorizedRowBatch> getInputFormat(
       InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe) {
     ColumnVectorProducer cvp = genericCvp;

http://git-wip-us.apache.org/repos/asf/hive/blob/a2eb16d0/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
index 2c87bc2..b19cdcf 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
@@ -120,6 +120,11 @@ public class TestLowLevelCacheImpl {
     }
 
     @Override
+    public long purge() {
+      return 0;
+    }
+
+    @Override
     public void debugDumpShort(StringBuilder sb) {
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/a2eb16d0/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
index f7ebff2..58c918c 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
@@ -63,6 +63,11 @@ public class TestOrcMetadataCache {
     public void setParentDebugDumper(LlapOomDebugDump dumper) {
     }
 
+    @Override
+    public long purge() {
+      return 0;
+    }
+
     public void verifyEquals(int i) {
       assertEquals(i, lockCount);
       assertEquals(i, unlockCount);

http://git-wip-us.apache.org/repos/asf/hive/blob/a2eb16d0/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java 
b/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java
index 74a34b3..3d47991 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java
@@ -83,6 +83,10 @@ public final class CommandProcessorFactory {
         return new AddResourceProcessor();
       case LIST:
         return new ListResourceProcessor();
+      case LLAP_CLUSTER:
+        return new LlapClusterResourceProcessor();
+      case LLAP_CACHE:
+        return new LlapCacheResourceProcessor();
       case DELETE:
         return new DeleteResourceProcessor();
       case COMPILE:

http://git-wip-us.apache.org/repos/asf/hive/blob/a2eb16d0/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandUtil.java 
b/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandUtil.java
index 0dcef08..e06ea5e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandUtil.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.processors;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import org.slf4j.Logger;
@@ -87,5 +88,48 @@ class CommandUtil {
     ss.getAuthorizerV2().checkPrivileges(type, Arrays.asList(commandObj), 
null, ctxBuilder.build());
   }
 
+  /**
+   * Authorize command of given type, arguments and for service hosts (for 
Service Type authorization)
+   *
+   * @param ss - session state
+   * @param type - operation type
+   * @param command - command args
+   * @param serviceObject - service object
+   * @return null if there was no authorization error. Otherwise returns  
CommandProcessorResponse
+   * capturing the authorization error
+   */
+  static CommandProcessorResponse 
authorizeCommandAndServiceObject(SessionState ss, HiveOperationType type,
+    List<String> command, String serviceObject) {
+    if (ss == null) {
+      // ss can be null in unit tests
+      return null;
+    }
 
+    if (ss.isAuthorizationModeV2() &&
+      HiveConf.getBoolVar(ss.getConf(), 
HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) {
+      String errMsg = "Error authorizing command " + command;
+      try {
+        authorizeCommandThrowEx(ss, type, command, serviceObject);
+        // authorized to perform action
+        return null;
+      } catch (HiveAuthzPluginException | HiveAccessControlException e) {
+        LOG.error(errMsg, e);
+        return CommandProcessorResponse.create(e);
+      }
+    }
+    return null;
+  }
+
+  private static void authorizeCommandThrowEx(SessionState ss, 
HiveOperationType type,
+    List<String> command, String serviceObject) throws 
HiveAuthzPluginException, HiveAccessControlException {
+    HivePrivilegeObject commandObj = 
HivePrivilegeObject.createHivePrivilegeObject(command);
+    HivePrivilegeObject serviceObj = new 
HivePrivilegeObject(HivePrivilegeObject.HivePrivilegeObjectType.SERVICE_NAME,
+      null, serviceObject, null, null, null);
+    HiveAuthzContext.Builder ctxBuilder = new HiveAuthzContext.Builder();
+    ctxBuilder.setCommandString(Joiner.on(' ').join(command));
+    ctxBuilder.setUserIpAddress(ss.getUserIpAddress());
+    ctxBuilder.setForwardedAddresses(ss.getForwardedAddresses());
+    ss.getAuthorizerV2().checkPrivileges(type, 
Collections.singletonList(commandObj),
+      Collections.singletonList(serviceObj), ctxBuilder.build());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/a2eb16d0/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java 
b/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java
index c45563d..56c7516 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java
@@ -32,6 +32,8 @@ public enum HiveCommand {
   CRYPTO(true),
   ADD(),
   LIST(),
+  LLAP_CLUSTER(),
+  LLAP_CACHE(),
   RELOAD(),
   DELETE(),
   COMPILE();
@@ -77,6 +79,8 @@ public enum HiveCommand {
         return null;
       } else if(command.length > 1 && "set".equalsIgnoreCase(command[0]) && 
"autocommit".equalsIgnoreCase(command[1])) {
         return null;//don't want set autocommit true|false to get mixed with 
set hive.foo.bar...
+      } else if (command.length > 1 && "llap".equalsIgnoreCase(command[0])) {
+        return getLlapSubCommand(command);
       } else if (COMMANDS.contains(cmd)) {
         HiveCommand hiveCommand = HiveCommand.valueOf(cmd);
 
@@ -89,4 +93,14 @@ public enum HiveCommand {
     }
     return null;
   }
+
+  private static HiveCommand getLlapSubCommand(final String[] command) {
+    if ("cluster".equalsIgnoreCase(command[1])) {
+      return LLAP_CLUSTER;
+    } else if ("cache".equalsIgnoreCase(command[1])) {
+      return LLAP_CACHE;
+    } else {
+      return null;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/a2eb16d0/ql/src/java/org/apache/hadoop/hive/ql/processors/LlapCacheResourceProcessor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/processors/LlapCacheResourceProcessor.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/processors/LlapCacheResourceProcessor.java
new file mode 100644
index 0000000..f455055
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/processors/LlapCacheResourceProcessor.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.processors;
+
+import static 
org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_NULL_FORMAT;
+import static 
org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe.defaultNullString;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.SocketFactory;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.VariableSubstitution;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Schema;
+import 
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.net.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+
+public class LlapCacheResourceProcessor implements CommandProcessor {
+  public static final Logger LOG = 
LoggerFactory.getLogger(LlapCacheResourceProcessor.class);
+  private Options CACHE_OPTIONS = new Options();
+  private HelpFormatter helpFormatter = new HelpFormatter();
+
+  LlapCacheResourceProcessor() {
+    CACHE_OPTIONS.addOption("purge", "purge", false, "Purge LLAP IO cache");
+  }
+
+  private CommandProcessorResponse returnErrorResponse(final String errmsg) {
+    return new CommandProcessorResponse(1, "LLAP Cache Processor Helper 
Failed:" + errmsg, null);
+  }
+
+  @Override
+  public CommandProcessorResponse run(String command) {
+    SessionState ss = SessionState.get();
+    command = new VariableSubstitution(() -> 
SessionState.get().getHiveVariables()).substitute(ss.getConf(), command);
+    String[] tokens = command.split("\\s+");
+    if (tokens.length < 1) {
+      return returnErrorResponse("Command arguments are empty.");
+    }
+    String params[] = Arrays.copyOfRange(tokens, 1, tokens.length);
+    try {
+      return llapCacheCommandHandler(ss, params);
+    } catch (Exception e) {
+      return returnErrorResponse(e.getMessage());
+    }
+  }
+
+  private CommandProcessorResponse llapCacheCommandHandler(final SessionState 
ss,
+    final String[] params) throws ParseException {
+    CommandLine args = parseCommandArgs(CACHE_OPTIONS, params);
+    boolean purge = args.hasOption("purge");
+    String hs2Host = null;
+    if (ss.isHiveServerQuery()) {
+      hs2Host = ss.getHiveServer2Host();
+    }
+    if (purge) {
+      CommandProcessorResponse authErrResp =
+        CommandUtil.authorizeCommandAndServiceObject(ss, 
HiveOperationType.LLAP_CACHE, Arrays.asList(params), hs2Host);
+      if (authErrResp != null) {
+        // there was an authorization issue
+        return authErrResp;
+      }
+      try {
+        LlapRegistryService llapRegistryService = 
LlapRegistryService.getClient(ss.getConf());
+        llapCachePurge(ss, llapRegistryService);
+        return createProcessorSuccessResponse();
+      } catch (Exception e) {
+        LOG.error("Error while purging LLAP IO Cache. err: ", e);
+        return returnErrorResponse("Error while purging LLAP IO Cache. err: " 
+ e.getMessage());
+      }
+    } else {
+      String usage = getUsageAsString();
+      return returnErrorResponse("Unsupported sub-command option. " + usage);
+    }
+  }
+
+  private CommandProcessorResponse createProcessorSuccessResponse() {
+    return new CommandProcessorResponse(0, null, null, getSchema());
+  }
+
+  private Schema getSchema() {
+    Schema sch = new Schema();
+    sch.addToFieldSchemas(new FieldSchema("hostName", "string", ""));
+    sch.addToFieldSchemas(new FieldSchema("purgedMemoryBytes", "string", ""));
+    sch.putToProperties(SERIALIZATION_NULL_FORMAT, defaultNullString);
+    return sch;
+  }
+
+  private void llapCachePurge(final SessionState ss, final LlapRegistryService 
llapRegistryService) throws Exception {
+    ExecutorService executorService = Executors.newCachedThreadPool();
+    List<Future<Long>> futures = new ArrayList<>();
+    Collection<LlapServiceInstance> instances = 
llapRegistryService.getInstances().getAll();
+    for (LlapServiceInstance instance : instances) {
+      futures.add(executorService.submit(new PurgeCallable(ss.getConf(), 
instance)));
+    }
+
+    int i = 0;
+    for (LlapServiceInstance instance : instances) {
+      Future<Long> future = futures.get(i);
+      ss.out.println(Joiner.on("\t").join(instance.getHost(), future.get()));
+      i++;
+    }
+  }
+
+  private static class PurgeCallable implements Callable<Long> {
+    public static final Logger LOG = 
LoggerFactory.getLogger(PurgeCallable.class);
+    private Configuration conf;
+    private LlapServiceInstance instance;
+    private SocketFactory socketFactory;
+    private RetryPolicy retryPolicy;
+
+    PurgeCallable(Configuration conf, LlapServiceInstance llapServiceInstance) 
{
+      this.conf = conf;
+      this.instance = llapServiceInstance;
+      this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
+      //not making this configurable, best effort
+      this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(
+        10000, 2000L, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public Long call() {
+      try {
+        LlapManagementProtocolClientImpl client = new 
LlapManagementProtocolClientImpl(conf, instance.getHost(),
+          instance.getManagementPort(), retryPolicy, socketFactory);
+        LlapDaemonProtocolProtos.PurgeCacheResponseProto resp = 
client.purgeCache(null, LlapDaemonProtocolProtos
+          .PurgeCacheRequestProto.newBuilder().build());
+        return resp.getPurgedMemoryBytes();
+      } catch (Exception e) {
+        LOG.warn("Exception while purging cache.", e);
+        return 0L;
+      }
+    }
+  }
+
+  private String getUsageAsString() {
+    StringWriter out = new StringWriter();
+    PrintWriter pw = new PrintWriter(out);
+    helpFormatter.printUsage(pw, helpFormatter.getWidth(), "llap cache", 
CACHE_OPTIONS);
+    pw.flush();
+    return out.toString();
+  }
+
+  private CommandLine parseCommandArgs(final Options opts, String[] args) 
throws ParseException {
+    CommandLineParser parser = new GnuParser();
+    return parser.parse(opts, args);
+  }
+
+  @Override
+  public void close() {
+  }
+}

Reply via email to