http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/TimeValidator.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/TimeValidator.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/TimeValidator.java
new file mode 100644
index 0000000..75f16cb
--- /dev/null
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/TimeValidator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.conf;
+
+import java.util.concurrent.TimeUnit;
+
+public class TimeValidator implements Validator {
+
+  private final TimeUnit unit;
+  private final Long min;
+  private final boolean minInclusive;
+
+  private final Long max;
+  private final boolean maxInclusive;
+
+  public TimeValidator(TimeUnit unit) {
+    this(unit, null, false, null, false);
+  }
+
+  public TimeValidator(TimeUnit unit, Long min, boolean minInclusive, Long max,
+                       boolean maxInclusive) {
+    this.unit = unit;
+    this.min = min;
+    this.minInclusive = minInclusive;
+    this.max = max;
+    this.maxInclusive = maxInclusive;
+  }
+
+  @Override
+  public void validate(String value) {
+    // First just check that this translates
+    TimeUnit defaultUnit = unit;
+    long time = MetastoreConf.convertTimeStr(value, defaultUnit, defaultUnit);
+    if (min != null) {
+      if (minInclusive ? time < min : time <= min) {
+        throw new IllegalArgumentException(value + " is smaller than minimum " 
+ min +
+            MetastoreConf.timeAbbreviationFor(defaultUnit));
+      }
+    }
+
+    if (max != null) {
+      if (maxInclusive ? time > max : time >= max) {
+        throw new IllegalArgumentException(value + " is larger than maximum " 
+ max +
+            MetastoreConf.timeAbbreviationFor(defaultUnit));
+      }
+    }
+  }
+
+  private String timeString(long time, TimeUnit timeUnit) {
+    return time + " " + MetastoreConf.timeAbbreviationFor(timeUnit);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/CompositePartitionSpecProxy.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/CompositePartitionSpecProxy.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/CompositePartitionSpecProxy.java
new file mode 100644
index 0000000..91d790a
--- /dev/null
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/CompositePartitionSpecProxy.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.partition.spec;
+
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionSpec;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
+
+/**
+ * Implementation of PartitionSpecProxy that composes a list of 
PartitionSpecProxy.
+ */
+public class CompositePartitionSpecProxy extends PartitionSpecProxy {
+
+  private String catName;
+  private String dbName;
+  private String tableName;
+  private List<PartitionSpec> partitionSpecs;
+  private List<PartitionSpecProxy> partitionSpecProxies;
+  private int size = 0;
+
+  protected CompositePartitionSpecProxy(List<PartitionSpec> partitionSpecs) 
throws MetaException {
+    this.partitionSpecs = partitionSpecs;
+    if (partitionSpecs.isEmpty()) {
+      catName = null;
+      dbName = null;
+      tableName = null;
+    }
+    else {
+      catName = partitionSpecs.get(0).getCatName();
+      dbName = partitionSpecs.get(0).getDbName();
+      tableName = partitionSpecs.get(0).getTableName();
+      this.partitionSpecProxies = new ArrayList<>(partitionSpecs.size());
+      for (PartitionSpec partitionSpec : partitionSpecs) {
+        PartitionSpecProxy partitionSpecProxy = Factory.get(partitionSpec);
+        this.partitionSpecProxies.add(partitionSpecProxy);
+        size += partitionSpecProxy.size();
+      }
+    }
+    // Assert class-invariant.
+    assert isValid() : "Invalid CompositePartitionSpecProxy!";
+  }
+
+  @Deprecated
+  protected CompositePartitionSpecProxy(String dbName, String tableName, 
List<PartitionSpec> partitionSpecs) throws MetaException {
+    this(DEFAULT_CATALOG_NAME, dbName, tableName, partitionSpecs);
+
+  }
+
+  protected CompositePartitionSpecProxy(String catName, String dbName, String 
tableName,
+                                        List<PartitionSpec> partitionSpecs) 
throws MetaException {
+    this.catName = catName;
+    this.dbName = dbName;
+    this.tableName = tableName;
+    this.partitionSpecs = partitionSpecs;
+    this.partitionSpecProxies = new ArrayList<>(partitionSpecs.size());
+    for (PartitionSpec partitionSpec : partitionSpecs) {
+      
this.partitionSpecProxies.add(PartitionSpecProxy.Factory.get(partitionSpec));
+    }
+    // Assert class-invariant.
+    assert isValid() : "Invalid CompositePartitionSpecProxy!";
+  }
+
+  private boolean isValid() {
+    for (PartitionSpecProxy partitionSpecProxy : partitionSpecProxies) {
+      if (partitionSpecProxy instanceof CompositePartitionSpecProxy) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  @Override
+  public int size() {
+    return size;
+  }
+
+  /**
+   * Iterator to iterate over all Partitions, across all PartitionSpecProxy 
instances within the Composite.
+   */
+  public static class Iterator implements PartitionIterator {
+
+    private CompositePartitionSpecProxy composite;
+    private List<PartitionSpecProxy> partitionSpecProxies;
+    private int index = -1; // Index into partitionSpecs.
+    private PartitionIterator iterator = null;
+
+    public Iterator(CompositePartitionSpecProxy composite) {
+      this.composite = composite;
+      this.partitionSpecProxies = composite.partitionSpecProxies;
+
+      if (this.partitionSpecProxies != null && 
!this.partitionSpecProxies.isEmpty()) {
+        this.index = 0;
+        this.iterator = 
this.partitionSpecProxies.get(this.index).getPartitionIterator();
+      }
+    }
+
+    @Override
+    public boolean hasNext() {
+
+      if (iterator == null) {
+        return false;
+      }
+
+      if (iterator.hasNext()) {
+        return true;
+      }
+
+      while ( ++index < partitionSpecProxies.size()
+          && !(iterator = 
partitionSpecProxies.get(index).getPartitionIterator()).hasNext());
+
+      return index < partitionSpecProxies.size() && iterator.hasNext();
+
+    }
+
+    @Override
+    public Partition next() {
+
+        if (iterator.hasNext())
+          return iterator.next();
+
+        while (++index < partitionSpecProxies.size()
+            && !(iterator = 
partitionSpecProxies.get(index).getPartitionIterator()).hasNext());
+
+        return index == partitionSpecProxies.size()? null : iterator.next();
+
+    }
+
+    @Override
+    public void remove() {
+      iterator.remove();
+    }
+
+    @Override
+    public Partition getCurrent() {
+      return iterator.getCurrent();
+    }
+
+    @Override
+    public String getCatName() {
+      return composite.getCatName();
+    }
+
+    @Override
+    public String getDbName() {
+      return composite.dbName;
+    }
+
+    @Override
+    public String getTableName() {
+      return composite.tableName;
+    }
+
+    @Override
+    public Map<String, String> getParameters() {
+      return iterator.getParameters();
+    }
+
+    @Override
+    public void setParameters(Map<String, String> parameters) {
+      iterator.setParameters(parameters);
+    }
+
+    @Override
+    public String getLocation() {
+      return iterator.getLocation();
+    }
+
+    @Override
+    public void putToParameters(String key, String value) {
+      iterator.putToParameters(key, value);
+    }
+
+    @Override
+    public void setCreateTime(long time) {
+      iterator.setCreateTime(time);
+    }
+  }
+
+  @Override
+  public void setCatName(String catName) {
+    this.catName = catName;
+    for (PartitionSpecProxy partSpecProxy : partitionSpecProxies) {
+      partSpecProxy.setCatName(catName);
+    }
+
+  }
+
+  @Override
+  public void setDbName(String dbName) {
+    this.dbName = dbName;
+    for (PartitionSpecProxy partSpecProxy : partitionSpecProxies) {
+      partSpecProxy.setDbName(dbName);
+    }
+  }
+
+  @Override
+  public void setTableName(String tableName) {
+    this.tableName = tableName;
+    for (PartitionSpecProxy partSpecProxy : partitionSpecProxies) {
+      partSpecProxy.setTableName(tableName);
+    }
+  }
+
+  @Override
+  public String getCatName() {
+    return catName;
+  }
+
+  @Override
+  public String getDbName() {
+    return dbName;
+  }
+
+  @Override
+  public String getTableName() {
+    return tableName;
+  }
+
+  @Override
+  public PartitionIterator getPartitionIterator() {
+    return new Iterator(this);
+  }
+
+  @Override
+  public List<PartitionSpec> toPartitionSpec() {
+    return partitionSpecs;
+  }
+
+  @Override
+  public void setRootLocation(String rootLocation) throws MetaException {
+    for (PartitionSpecProxy partSpecProxy : partitionSpecProxies) {
+      partSpecProxy.setRootLocation(rootLocation);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionListComposingSpecProxy.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionListComposingSpecProxy.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionListComposingSpecProxy.java
new file mode 100644
index 0000000..585b8fd
--- /dev/null
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionListComposingSpecProxy.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.partition.spec;
+
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionListComposingSpec;
+import org.apache.hadoop.hive.metastore.api.PartitionSpec;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * PartitionSpecProxy implementation that composes a List of Partitions.
+ */
+public class PartitionListComposingSpecProxy extends PartitionSpecProxy {
+
+  private PartitionSpec partitionSpec;
+
+  protected PartitionListComposingSpecProxy(PartitionSpec partitionSpec) 
throws MetaException {
+    assert partitionSpec.isSetPartitionList()
+        : "Partition-list should have been set.";
+    PartitionListComposingSpec partitionList = 
partitionSpec.getPartitionList();
+    if (partitionList == null || partitionList.getPartitions() == null) {
+      throw new MetaException("The partition list cannot be null.");
+    }
+    for (Partition partition : partitionList.getPartitions()) {
+      if (partition == null) {
+        throw new MetaException("Partition cannot be null.");
+      }
+      if (partition.getValues() == null || partition.getValues().isEmpty()) {
+        throw new MetaException("The partition value list cannot be null or 
empty.");
+      }
+      if (partition.getValues().contains(null)) {
+        throw new MetaException("Partition value cannot be null.");
+      }
+    }
+    this.partitionSpec = partitionSpec;
+  }
+
+  @Override
+  public String getCatName() {
+    return partitionSpec.getCatName();
+  }
+
+  @Override
+  public String getDbName() {
+    return partitionSpec.getDbName();
+  }
+
+  @Override
+  public String getTableName() {
+    return partitionSpec.getTableName();
+  }
+
+  @Override
+  public PartitionIterator getPartitionIterator() {
+    return new Iterator(this);
+  }
+
+  @Override
+  public List<PartitionSpec> toPartitionSpec() {
+    return Arrays.asList(partitionSpec);
+  }
+
+  @Override
+  public int size() {
+    return partitionSpec.getPartitionList().getPartitionsSize();
+  }
+
+  @Override
+  public void setCatName(String catName) {
+    partitionSpec.setCatName(catName);
+    for (Partition partition : 
partitionSpec.getPartitionList().getPartitions()) {
+      partition.setCatName(catName);
+    }
+  }
+
+  @Override
+  public void setDbName(String dbName) {
+    partitionSpec.setDbName(dbName);
+    for (Partition partition : 
partitionSpec.getPartitionList().getPartitions()) {
+      partition.setDbName(dbName);
+    }
+  }
+
+  @Override
+  public void setTableName(String tableName) {
+    partitionSpec.setTableName(tableName);
+    for (Partition partition : 
partitionSpec.getPartitionList().getPartitions()) {
+      partition.setTableName(tableName);
+    }
+  }
+
+  @Override
+  public void setRootLocation(String newRootPath) throws MetaException {
+
+    String oldRootPath = partitionSpec.getRootPath();
+
+    if (oldRootPath == null) {
+      throw new MetaException("No common root-path. Can't replace root-path!");
+    }
+
+    if (newRootPath == null) {
+      throw new MetaException("Root path cannot be null.");
+    }
+
+    for (Partition partition : 
partitionSpec.getPartitionList().getPartitions()) {
+      String location = partition.getSd().getLocation();
+      if (location.startsWith(oldRootPath)) {
+        partition.getSd().setLocation(location.replace(oldRootPath, 
newRootPath));
+      }
+      else {
+        throw new MetaException("Common root-path not found. Can't replace 
root-path!");
+      }
+    }
+  }
+
+  public static class Iterator implements PartitionIterator {
+
+    PartitionListComposingSpecProxy partitionSpecProxy;
+    List<Partition> partitionList;
+    int index;
+
+    public Iterator(PartitionListComposingSpecProxy partitionSpecProxy) {
+      this.partitionSpecProxy = partitionSpecProxy;
+      this.partitionList = 
partitionSpecProxy.partitionSpec.getPartitionList().getPartitions();
+      this.index = 0;
+    }
+
+    @Override
+    public Partition getCurrent() {
+      return partitionList.get(index);
+    }
+
+    @Override
+    public String getCatName() {
+      return partitionSpecProxy.getCatName();
+    }
+
+    @Override
+    public String getDbName() {
+      return partitionSpecProxy.getDbName();
+    }
+
+    @Override
+    public String getTableName() {
+      return partitionSpecProxy.getTableName();
+    }
+
+    @Override
+    public Map<String, String> getParameters() {
+      return partitionList.get(index).getParameters();
+    }
+
+    @Override
+    public void setParameters(Map<String, String> parameters) {
+      partitionList.get(index).setParameters(parameters);
+    }
+
+    @Override
+    public String getLocation() {
+      return partitionList.get(index).getSd().getLocation();
+    }
+
+    @Override
+    public void putToParameters(String key, String value) {
+      partitionList.get(index).putToParameters(key, value);
+    }
+
+    @Override
+    public void setCreateTime(long time) {
+      partitionList.get(index).setCreateTime((int)time);
+    }
+
+    @Override
+    public boolean hasNext() {
+      return index < partitionList.size();
+    }
+
+    @Override
+    public Partition next() {
+      return partitionList.get(index++);
+    }
+
+    @Override
+    public void remove() {
+      partitionList.remove(index);
+    }
+  } // class Iterator;
+
+} // class PartitionListComposingSpecProxy;

http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionSpecProxy.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionSpecProxy.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionSpecProxy.java
new file mode 100644
index 0000000..1866446
--- /dev/null
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionSpecProxy.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.partition.spec;
+
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionSpec;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Polymorphic proxy class, equivalent to 
org.apache.hadoop.hive.metastore.api.PartitionSpec.
+ */
+public abstract class PartitionSpecProxy {
+
+  /**
+   * The number of Partition instances represented by the PartitionSpec.
+   * @return Number of partitions.
+   */
+  public abstract int size();
+
+  /**
+   * Set catalog name.
+   * @param catName catalog name.
+   */
+  public abstract void setCatName(String catName);
+
+  /**
+   * Setter for name of the DB.
+   * @param dbName The name of the DB.
+   */
+  public abstract void setDbName(String dbName);
+
+  /**
+   * Setter for name of the table.
+   * @param tableName The name of the table.
+   */
+  public abstract void setTableName(String tableName);
+
+  /**
+   * Get catalog name.
+   * @return catalog name.
+   */
+  public abstract String getCatName();
+
+  /**
+   * Getter for name of the DB.
+   * @return The name of the DB.
+   */
+  public abstract String getDbName();
+
+  /**
+   * Getter for name of the table.
+   * @return The name of the table.
+   */
+  public abstract String getTableName();
+
+  /**
+   * Iterator to the (virtual) sequence of Partitions represented by the 
PartitionSpec.
+   * @return A PartitionIterator to the beginning of the Partition sequence.
+   */
+  public abstract PartitionIterator getPartitionIterator();
+
+  /**
+   * Conversion to a org.apache.hadoop.hive.metastore.api.PartitionSpec 
sequence.
+   * @return A list of org.apache.hadoop.hive.metastore.api.PartitionSpec 
instances.
+   */
+  public abstract List<PartitionSpec> toPartitionSpec();
+
+  /**
+   * Setter for the common root-location for all partitions in the 
PartitionSet.
+   * @param rootLocation The new common root-location.
+   * @throws MetaException
+   */
+  public abstract void setRootLocation(String rootLocation) throws 
MetaException;
+
+  /**
+   * Factory to construct PartitionSetProxy instances, from PartitionSets.
+   */
+  public static class Factory {
+
+    /**
+     * Factory method. Construct PartitionSpecProxy from raw PartitionSpec.
+     * @param partSpec Raw PartitionSpec from the Thrift API.
+     * @return PartitionSpecProxy instance.
+     * @throws MetaException
+     */
+    public static PartitionSpecProxy get(PartitionSpec partSpec) throws 
MetaException {
+
+      if (partSpec == null) {
+        return null;
+      }
+      else
+      if (partSpec.isSetPartitionList()) {
+        return new PartitionListComposingSpecProxy(partSpec);
+      }
+      else
+      if (partSpec.isSetSharedSDPartitionSpec()) {
+        return new PartitionSpecWithSharedSDProxy(partSpec);
+      }
+
+      assert false : "Unsupported type of PartitionSpec!";
+      return null;
+    }
+
+    /**
+     * Factory method to construct CompositePartitionSpecProxy.
+     * @param partitionSpecs List of raw PartitionSpecs.
+     * @return A CompositePartitionSpecProxy instance.
+     * @throws MetaException
+     */
+    public static PartitionSpecProxy get(List<PartitionSpec> partitionSpecs) 
throws MetaException {
+      return new CompositePartitionSpecProxy(partitionSpecs);
+    }
+
+  } // class Factory;
+
+  /**
+   * Iterator to iterate over Partitions corresponding to a PartitionSpec.
+   */
+  public interface PartitionIterator extends java.util.Iterator<Partition> {
+
+    /**
+     * Getter for the Partition "pointed to" by the iterator.
+     * Like next(), but without advancing the iterator.
+     * @return The "current" partition object.
+     */
+    Partition getCurrent();
+
+    /**
+     * Get the catalog name.
+     * @return catalog name.
+     */
+    String getCatName();
+
+    /**
+     * Getter for the name of the DB.
+     * @return Name of the DB.
+     */
+    String getDbName();
+
+    /**
+     * Getter for the name of the table.
+     * @return Name of the table.
+     */
+    String getTableName();
+
+    /**
+     * Getter for the Partition parameters.
+     * @return Key-value map for Partition-level parameters.
+     */
+    Map<String, String> getParameters();
+
+    /**
+     * Setter for Partition parameters.
+     * @param parameters Key-value map fo Partition-level parameters.
+     */
+    void setParameters(Map<String, String> parameters);
+
+    /**
+     * Insert an individual parameter to a Partition's parameter-set.
+     * @param key
+     * @param value
+     */
+    void putToParameters(String key, String value);
+
+    /**
+     * Getter for Partition-location.
+     * @return Partition's location.
+     */
+    String getLocation();
+
+    /**
+     * Setter for creation-time of a Partition.
+     * @param time Timestamp indicating the time of creation of the Partition.
+     */
+    void setCreateTime(long time);
+
+  } // class PartitionIterator;
+
+  /**
+   * Simple wrapper class for pre-constructed Partitions, to expose a 
PartitionIterator interface,
+   * where the iterator-sequence consists of just one Partition.
+   */
+  public static class SimplePartitionWrapperIterator implements 
PartitionIterator {
+    private Partition partition;
+    public SimplePartitionWrapperIterator(Partition partition) {this.partition 
= partition;}
+
+    @Override public Partition getCurrent() { return partition; }
+    @Override public String getCatName() { return partition.getCatName(); }
+    @Override public String getDbName() { return partition.getDbName(); }
+    @Override public String getTableName() { return partition.getTableName(); }
+    @Override public Map<String, String> getParameters() { return 
partition.getParameters(); }
+    @Override public void setParameters(Map<String, String> parameters) { 
partition.setParameters(parameters); }
+    @Override public void putToParameters(String key, String value) { 
partition.putToParameters(key, value);}
+    @Override public String getLocation() { return 
partition.getSd().getLocation(); }
+    @Override public void setCreateTime(long time) { 
partition.setCreateTime((int)time);}
+    @Override public boolean hasNext() { return false; } // No next partition.
+    @Override public Partition next() { return null; } // No next partition.
+    @Override public void remove() {} // Do nothing.
+  } // P
+
+} // class PartitionSpecProxy;

http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionSpecWithSharedSDProxy.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionSpecWithSharedSDProxy.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionSpecWithSharedSDProxy.java
new file mode 100644
index 0000000..5b46206
--- /dev/null
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionSpecWithSharedSDProxy.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.partition.spec;
+
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionSpec;
+import org.apache.hadoop.hive.metastore.api.PartitionSpecWithSharedSD;
+import org.apache.hadoop.hive.metastore.api.PartitionWithoutSD;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Subclass of PartitionSpecProxy that pulls out commonality of
+ * StorageDescriptor properties within a Partition-list into a common
+ * StorageDescriptor instance.
+ */
+public class PartitionSpecWithSharedSDProxy extends PartitionSpecProxy {
+
+  private PartitionSpec partitionSpec;
+
+  public PartitionSpecWithSharedSDProxy(PartitionSpec partitionSpec) throws 
MetaException {
+    assert partitionSpec.isSetSharedSDPartitionSpec();
+    if (partitionSpec.getSharedSDPartitionSpec().getSd() == null) {
+      throw new MetaException("The shared storage descriptor must be set.");
+    }
+    this.partitionSpec = partitionSpec;
+  }
+
+  @Override
+  public int size() {
+    return partitionSpec.getSharedSDPartitionSpec().getPartitionsSize();
+  }
+
+  @Override
+  public void setCatName(String catName) {
+    partitionSpec.setCatName(catName);
+  }
+
+  @Override
+  public void setDbName(String dbName) {
+    partitionSpec.setDbName(dbName);
+  }
+
+  @Override
+  public void setTableName(String tableName) {
+    partitionSpec.setTableName(tableName);
+  }
+
+  @Override
+  public String getCatName() {
+    return partitionSpec.getCatName();
+  }
+
+  @Override
+  public String getDbName() {
+    return partitionSpec.getDbName();
+  }
+
+  @Override
+  public String getTableName() {
+    return partitionSpec.getTableName();
+  }
+
+  public PartitionIterator getPartitionIterator() {
+    return new Iterator(this);
+  }
+
+  @Override
+  public List<PartitionSpec> toPartitionSpec() {
+    return Arrays.asList(partitionSpec);
+  }
+
+  @Override
+  public void setRootLocation(String rootLocation) throws MetaException {
+    partitionSpec.setRootPath(rootLocation);
+    partitionSpec.getSharedSDPartitionSpec().getSd().setLocation(rootLocation);
+  }
+
+  /**
+   * Iterator implementation to iterate over all Partitions within the 
PartitionSpecWithSharedSDProxy.
+   */
+  public static class Iterator implements PartitionIterator {
+
+    private PartitionSpecWithSharedSDProxy partitionSpecWithSharedSDProxy;
+    private PartitionSpecWithSharedSD pSpec;
+    private int index;
+
+    Iterator(PartitionSpecWithSharedSDProxy partitionSpecWithSharedSDProxy) {
+      this.partitionSpecWithSharedSDProxy = partitionSpecWithSharedSDProxy;
+      this.pSpec = 
this.partitionSpecWithSharedSDProxy.partitionSpec.getSharedSDPartitionSpec();
+      this.index = 0;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return index < pSpec.getPartitions().size();
+    }
+
+    @Override
+    public Partition next() {
+      Partition partition = getCurrent();
+      ++index;
+      return partition;
+    }
+
+    @Override
+    public void remove() {
+      pSpec.getPartitions().remove(index);
+    }
+
+    @Override
+    public Partition getCurrent() {
+      PartitionWithoutSD partWithoutSD = pSpec.getPartitions().get(index);
+      StorageDescriptor partSD = new StorageDescriptor(pSpec.getSd());
+      partSD.setLocation(partSD.getLocation() + 
partWithoutSD.getRelativePath());
+
+      Partition p = new Partition(
+          partWithoutSD.getValues(),
+          partitionSpecWithSharedSDProxy.partitionSpec.getDbName(),
+          partitionSpecWithSharedSDProxy.partitionSpec.getTableName(),
+          partWithoutSD.getCreateTime(),
+          partWithoutSD.getLastAccessTime(),
+          partSD,
+          partWithoutSD.getParameters()
+      );
+      p.setCatName(partitionSpecWithSharedSDProxy.partitionSpec.getCatName());
+      return p;
+    }
+
+    @Override
+    public String getCatName() {
+      return partitionSpecWithSharedSDProxy.partitionSpec.getCatName();
+    }
+
+    @Override
+    public String getDbName() {
+      return partitionSpecWithSharedSDProxy.partitionSpec.getDbName();
+    }
+
+    @Override
+    public String getTableName() {
+      return partitionSpecWithSharedSDProxy.partitionSpec.getTableName();
+    }
+
+    @Override
+    public Map<String, String> getParameters() {
+      return pSpec.getPartitions().get(index).getParameters();
+    }
+
+    @Override
+    public void setParameters(Map<String, String> parameters) {
+      pSpec.getPartitions().get(index).setParameters(parameters);
+    }
+
+    @Override
+    public String getLocation() {
+      return pSpec.getSd().getLocation() + 
pSpec.getPartitions().get(index).getRelativePath();
+    }
+
+    @Override
+    public void putToParameters(String key, String value) {
+      pSpec.getPartitions().get(index).putToParameters(key, value);
+    }
+
+    @Override
+    public void setCreateTime(long time) {
+      pSpec.getPartitions().get(index).setCreateTime((int)time);
+    }
+
+  } // static class Iterator;
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenIdentifier.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenIdentifier.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenIdentifier.java
new file mode 100644
index 0000000..ba6c7e3
--- /dev/null
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenIdentifier.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.security;
+
+import org.apache.hadoop.io.Text;
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+
+/**
+ * A delegation token identifier that is specific to Hive.
+ */
+public class DelegationTokenIdentifier
+    extends AbstractDelegationTokenIdentifier {
+  public static final Text HIVE_DELEGATION_KIND = new 
Text("HIVE_DELEGATION_TOKEN");
+
+  /**
+   * Create an empty delegation token identifier for reading into.
+   */
+  public DelegationTokenIdentifier() {
+  }
+
+  /**
+   * Create a new delegation token identifier
+   * @param owner the effective username of the token owner
+   * @param renewer the username of the renewer
+   * @param realUser the real username of the token owner
+   */
+  public DelegationTokenIdentifier(Text owner, Text renewer, Text realUser) {
+    super(owner, renewer, realUser);
+  }
+
+  @Override
+  public Text getKind() {
+    return HIVE_DELEGATION_KIND;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSecretManager.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSecretManager.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSecretManager.java
new file mode 100644
index 0000000..af88107
--- /dev/null
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSecretManager.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.security;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+
+/**
+ * A Hive specific delegation token secret manager.
+ * The secret manager is responsible for generating and accepting the password
+ * for each token.
+ */
+public class DelegationTokenSecretManager
+    extends AbstractDelegationTokenSecretManager<DelegationTokenIdentifier> {
+
+  /**
+   * Create a secret manager
+   * @param delegationKeyUpdateInterval the number of seconds for rolling new
+   *        secret keys.
+   * @param delegationTokenMaxLifetime the maximum lifetime of the delegation
+   *        tokens
+   * @param delegationTokenRenewInterval how often the tokens must be renewed
+   * @param delegationTokenRemoverScanInterval how often the tokens are scanned
+   *        for expired tokens
+   */
+  public DelegationTokenSecretManager(long delegationKeyUpdateInterval,
+                                      long delegationTokenMaxLifetime,
+                                      long delegationTokenRenewInterval,
+                                      long delegationTokenRemoverScanInterval) 
{
+    super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
+          delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
+  }
+
+  @Override
+  public DelegationTokenIdentifier createIdentifier() {
+    return new DelegationTokenIdentifier();
+  }
+
+  /**
+   * Verify token string
+   * @param tokenStrForm
+   * @return user name
+   * @throws IOException
+   */
+  public synchronized String verifyDelegationToken(String tokenStrForm) throws 
IOException {
+    Token<DelegationTokenIdentifier> t = new Token<>();
+    t.decodeFromUrlString(tokenStrForm);
+
+    DelegationTokenIdentifier id = getTokenIdentifier(t);
+    verifyToken(id, t.getPassword());
+    return id.getUser().getShortUserName();
+  }
+
+  protected DelegationTokenIdentifier 
getTokenIdentifier(Token<DelegationTokenIdentifier> token)
+      throws IOException {
+    // turn bytes back into identifier for cache lookup
+    ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+    DataInputStream in = new DataInputStream(buf);
+    DelegationTokenIdentifier id = createIdentifier();
+    id.readFields(in);
+    return id;
+  }
+
+  public synchronized void cancelDelegationToken(String tokenStrForm) throws 
IOException {
+    Token<DelegationTokenIdentifier> t= new Token<>();
+    t.decodeFromUrlString(tokenStrForm);
+    String user = UserGroupInformation.getCurrentUser().getUserName();
+    cancelToken(t, user);
+  }
+
+  public synchronized long renewDelegationToken(String tokenStrForm) throws 
IOException {
+    Token<DelegationTokenIdentifier> t= new Token<>();
+    t.decodeFromUrlString(tokenStrForm);
+    //when a token is created the renewer of the token is stored
+    //as shortName in AbstractDelegationTokenIdentifier.setRenewer()
+    //this seems like an inconsistency because while cancelling the token
+    //it uses the shortname to compare the renewer while it does not use
+    //shortname during token renewal. Use getShortUserName() until its fixed
+    //in HADOOP-15068
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    return renewToken(t, user);
+  }
+
+  public synchronized String getDelegationToken(final String ownerStr, final 
String renewer) throws IOException {
+    if (ownerStr == null) {
+      throw new RuntimeException("Delegation token owner is null");
+    }
+    Text owner = new Text(ownerStr);
+    Text realUser = null;
+    UserGroupInformation currentUgi = UserGroupInformation.getCurrentUser();
+    if (currentUgi.getUserName() != null) {
+      realUser = new Text(currentUgi.getUserName());
+    }
+    DelegationTokenIdentifier ident =
+      new DelegationTokenIdentifier(owner, new Text(renewer), realUser);
+    Token<DelegationTokenIdentifier> t = new Token<>(
+        ident, this);
+    return t.encodeToUrlString();
+  }
+
+  public String getUserFromToken(String tokenStr) throws IOException {
+    Token<DelegationTokenIdentifier> delegationToken = new Token<>();
+    delegationToken.decodeFromUrlString(tokenStr);
+
+    ByteArrayInputStream buf = new 
ByteArrayInputStream(delegationToken.getIdentifier());
+    DataInputStream in = new DataInputStream(buf);
+    DelegationTokenIdentifier id = createIdentifier();
+    id.readFields(in);
+    return id.getUser().getShortUserName();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSelector.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSelector.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSelector.java
new file mode 100644
index 0000000..51b21fa
--- /dev/null
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSelector.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.security;
+
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
+
+/**
+ * A delegation token that is specialized for Hive
+ */
+
+public class DelegationTokenSelector
+    extends AbstractDelegationTokenSelector<DelegationTokenIdentifier>{
+
+  public DelegationTokenSelector() {
+    super(DelegationTokenIdentifier.HIVE_DELEGATION_KIND);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge.java
new file mode 100644
index 0000000..b21b072
--- /dev/null
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge.java
@@ -0,0 +1,700 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.security;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+import java.util.Locale;
+import java.util.Map;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.commons.codec.binary.Base64;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSaslClientTransport;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+
+/**
+ * Functions that bridge Thrift's SASL transports to Hadoop's
+ * SASL callback handlers and authentication classes.
+ * HIVE-11378 This class is not directly used anymore.  It now exists only as 
a shell to be
+ * extended by HadoopThriftAuthBridge23 in 0.23 shims.  I have made it abstract
+ * to avoid maintenance errors.
+ */
+public abstract class HadoopThriftAuthBridge {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HadoopThriftAuthBridge.class);
+
+  // We want to have only one auth bridge.  In the past this was handled by 
ShimLoader, but since
+  // we're no longer using that we'll do it here.
+  private static HadoopThriftAuthBridge self = null;
+
+  public static HadoopThriftAuthBridge getBridge() {
+    if (self == null) {
+      synchronized (HadoopThriftAuthBridge.class) {
+        if (self == null) self = new HadoopThriftAuthBridge23();
+      }
+    }
+    return self;
+  }
+
+  public Client createClient() {
+    return new Client();
+  }
+
+  public Client createClientWithConf(String authMethod) {
+    UserGroupInformation ugi;
+    try {
+      ugi = UserGroupInformation.getLoginUser();
+    } catch(IOException e) {
+      throw new IllegalStateException("Unable to get current login user: " + 
e, e);
+    }
+    if (loginUserHasCurrentAuthMethod(ugi, authMethod)) {
+      LOG.debug("Not setting UGI conf as passed-in authMethod of " + 
authMethod + " = current.");
+      return new Client();
+    } else {
+      LOG.debug("Setting UGI conf as passed-in authMethod of " + authMethod + 
" != current.");
+      Configuration conf = new Configuration();
+      conf.set(HADOOP_SECURITY_AUTHENTICATION, authMethod);
+      UserGroupInformation.setConfiguration(conf);
+      return new Client();
+    }
+  }
+
+  public Server createServer(String keytabFile, String principalConf, String 
clientConf) throws TTransportException {
+    return new Server(keytabFile, principalConf, clientConf);
+  }
+
+
+  public String getServerPrincipal(String principalConfig, String host)
+      throws IOException {
+    String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, 
host);
+    String names[] = SaslRpcServer.splitKerberosName(serverPrincipal);
+    if (names.length != 3) {
+      throw new IOException(
+          "Kerberos principal name does NOT have the expected hostname part: "
+              + serverPrincipal);
+    }
+    return serverPrincipal;
+  }
+
+  /**
+   * Method to get canonical-ized hostname, given a hostname (possibly a 
CNAME).
+   * This should allow for service-principals to use simplified CNAMEs.
+   * @param hostName The hostname to be canonical-ized.
+   * @return Given a CNAME, the canonical-ized hostname is returned. If not 
found, the original hostname is returned.
+   */
+  public String getCanonicalHostName(String hostName) {
+    try {
+      return InetAddress.getByName(hostName).getCanonicalHostName();
+    }
+    catch(UnknownHostException exception) {
+      LOG.warn("Could not retrieve canonical hostname for " + hostName, 
exception);
+      return hostName;
+    }
+  }
+
+  public UserGroupInformation getCurrentUGIWithConf(String authMethod)
+      throws IOException {
+    UserGroupInformation ugi;
+    try {
+      ugi = UserGroupInformation.getCurrentUser();
+    } catch(IOException e) {
+      throw new IllegalStateException("Unable to get current user: " + e, e);
+    }
+    if (loginUserHasCurrentAuthMethod(ugi, authMethod)) {
+      LOG.debug("Not setting UGI conf as passed-in authMethod of " + 
authMethod + " = current.");
+      return ugi;
+    } else {
+      LOG.debug("Setting UGI conf as passed-in authMethod of " + authMethod + 
" != current.");
+      Configuration conf = new Configuration();
+      conf.set(HADOOP_SECURITY_AUTHENTICATION, authMethod);
+      UserGroupInformation.setConfiguration(conf);
+      return UserGroupInformation.getCurrentUser();
+    }
+  }
+
+  /**
+   * Return true if the current login user is already using the given 
authMethod.
+   *
+   * Used above to ensure we do not create a new Configuration object and as 
such
+   * lose other settings such as the cluster to which the JVM is connected. 
Required
+   * for oozie since it does not have a core-site.xml see HIVE-7682
+   */
+  private boolean loginUserHasCurrentAuthMethod(UserGroupInformation ugi, 
String sAuthMethod) {
+    AuthenticationMethod authMethod;
+    try {
+      // based on SecurityUtil.getAuthenticationMethod()
+      authMethod = Enum.valueOf(AuthenticationMethod.class, 
sAuthMethod.toUpperCase(Locale.ENGLISH));
+    } catch (IllegalArgumentException iae) {
+      throw new IllegalArgumentException("Invalid attribute value for " +
+          HADOOP_SECURITY_AUTHENTICATION + " of " + sAuthMethod, iae);
+    }
+    LOG.debug("Current authMethod = " + ugi.getAuthenticationMethod());
+    return ugi.getAuthenticationMethod().equals(authMethod);
+  }
+
+
+  /**
+   * Read and return Hadoop SASL configuration which can be configured using
+   * "hadoop.rpc.protection"
+   * @param conf
+   * @return Hadoop SASL configuration
+   */
+
+  public abstract Map<String, String> getHadoopSaslProperties(Configuration 
conf);
+
+  public static class Client {
+    /**
+     * Create a client-side SASL transport that wraps an underlying transport.
+     *
+     * @param methodStr The authentication method to use. Currently only 
KERBEROS is
+     *               supported.
+     * @param principalConfig The Kerberos principal of the target server.
+     * @param underlyingTransport The underlying transport mechanism, usually 
a TSocket.
+     * @param saslProps the sasl properties to create the client with
+     */
+
+
+    public TTransport createClientTransport(
+        String principalConfig, String host,
+        String methodStr, String tokenStrForm, final TTransport 
underlyingTransport,
+        final Map<String, String> saslProps) throws IOException {
+      final AuthMethod method = AuthMethod.valueOf(AuthMethod.class, 
methodStr);
+
+      TTransport saslTransport = null;
+      switch (method) {
+      case DIGEST:
+        Token<DelegationTokenIdentifier> t= new Token<>();
+        t.decodeFromUrlString(tokenStrForm);
+        saslTransport = new TSaslClientTransport(
+            method.getMechanismName(),
+            null,
+            null, SaslRpcServer.SASL_DEFAULT_REALM,
+            saslProps, new SaslClientCallbackHandler(t),
+            underlyingTransport);
+        return new TUGIAssumingTransport(saslTransport, 
UserGroupInformation.getCurrentUser());
+
+      case KERBEROS:
+        String serverPrincipal = 
SecurityUtil.getServerPrincipal(principalConfig, host);
+        final String names[] = 
SaslRpcServer.splitKerberosName(serverPrincipal);
+        if (names.length != 3) {
+          throw new IOException(
+              "Kerberos principal name does NOT have the expected hostname 
part: "
+                  + serverPrincipal);
+        }
+        try {
+          return UserGroupInformation.getCurrentUser().doAs(
+              new PrivilegedExceptionAction<TUGIAssumingTransport>() {
+                @Override
+                public TUGIAssumingTransport run() throws IOException {
+                  TTransport saslTransport = new TSaslClientTransport(
+                    method.getMechanismName(),
+                    null,
+                    names[0], names[1],
+                    saslProps, null,
+                    underlyingTransport);
+                  return new TUGIAssumingTransport(saslTransport, 
UserGroupInformation.getCurrentUser());
+                }
+              });
+        } catch (InterruptedException | SaslException se) {
+          throw new IOException("Could not instantiate SASL transport", se);
+        }
+
+      default:
+        throw new IOException("Unsupported authentication method: " + method);
+      }
+    }
+    private static class SaslClientCallbackHandler implements CallbackHandler {
+      private final String userName;
+      private final char[] userPassword;
+
+      public SaslClientCallbackHandler(Token<? extends TokenIdentifier> token) 
{
+        this.userName = encodeIdentifier(token.getIdentifier());
+        this.userPassword = encodePassword(token.getPassword());
+      }
+
+
+      @Override
+      public void handle(Callback[] callbacks)
+          throws UnsupportedCallbackException {
+        NameCallback nc = null;
+        PasswordCallback pc = null;
+        RealmCallback rc = null;
+        for (Callback callback : callbacks) {
+          if (callback instanceof RealmChoiceCallback) {
+            continue;
+          } else if (callback instanceof NameCallback) {
+            nc = (NameCallback) callback;
+          } else if (callback instanceof PasswordCallback) {
+            pc = (PasswordCallback) callback;
+          } else if (callback instanceof RealmCallback) {
+            rc = (RealmCallback) callback;
+          } else {
+            throw new UnsupportedCallbackException(callback,
+                "Unrecognized SASL client callback");
+          }
+        }
+        if (nc != null) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("SASL client callback: setting username: " + userName);
+          }
+          nc.setName(userName);
+        }
+        if (pc != null) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("SASL client callback: setting userPassword");
+          }
+          pc.setPassword(userPassword);
+        }
+        if (rc != null) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("SASL client callback: setting realm: "
+                + rc.getDefaultText());
+          }
+          rc.setText(rc.getDefaultText());
+        }
+      }
+
+      static String encodeIdentifier(byte[] identifier) {
+        return new String(Base64.encodeBase64(identifier));
+      }
+
+      static char[] encodePassword(byte[] password) {
+        return new String(Base64.encodeBase64(password)).toCharArray();
+      }
+    }
+  }
+
+  public static class Server {
+    public enum ServerMode {
+      HIVESERVER2, METASTORE
+    };
+
+    protected final UserGroupInformation realUgi;
+    protected final UserGroupInformation clientValidationUGI;
+    protected DelegationTokenSecretManager secretManager;
+
+    public Server() throws TTransportException {
+      try {
+        realUgi = UserGroupInformation.getCurrentUser();
+        clientValidationUGI = UserGroupInformation.getCurrentUser();
+      } catch (IOException ioe) {
+        throw new TTransportException(ioe);
+      }
+    }
+    /**
+     * Create a server with a kerberos keytab/principal.
+     */
+    protected Server(String keytabFile, String principalConf, String 
clientConf)
+        throws TTransportException {
+      if (keytabFile == null || keytabFile.isEmpty()) {
+        throw new TTransportException("No keytab specified");
+      }
+      if (principalConf == null || principalConf.isEmpty()) {
+        throw new TTransportException("No principal specified");
+      }
+      if (clientConf == null || clientConf.isEmpty()) {
+        // Don't bust existing setups.
+        LOG.warn("Client-facing principal not set. Using server-side setting: 
" + principalConf);
+        clientConf = principalConf;
+      }
+
+      // Login from the keytab
+      String kerberosName;
+      try {
+        LOG.info("Logging in via CLIENT based principal ");
+        kerberosName =
+            SecurityUtil.getServerPrincipal(clientConf, "0.0.0.0");
+        UserGroupInformation.loginUserFromKeytab(
+            kerberosName, keytabFile);
+        clientValidationUGI = UserGroupInformation.getLoginUser();
+        assert clientValidationUGI.isFromKeytab();
+
+        LOG.info("Logging in via SERVER based principal ");
+        kerberosName =
+            SecurityUtil.getServerPrincipal(principalConf, "0.0.0.0");
+        UserGroupInformation.loginUserFromKeytab(
+            kerberosName, keytabFile);
+        realUgi = UserGroupInformation.getLoginUser();
+        assert realUgi.isFromKeytab();
+      } catch (IOException ioe) {
+        throw new TTransportException(ioe);
+      }
+    }
+
+    public void setSecretManager(DelegationTokenSecretManager secretManager) {
+      this.secretManager = secretManager;
+    }
+
+    /**
+     * Create a TTransportFactory that, upon connection of a client socket,
+     * negotiates a Kerberized SASL transport. The resulting TTransportFactory
+     * can be passed as both the input and output transport factory when
+     * instantiating a TThreadPoolServer, for example.
+     *
+     * @param saslProps Map of SASL properties
+     */
+
+    public TTransportFactory createTransportFactory(Map<String, String> 
saslProps)
+        throws TTransportException {
+
+      TSaslServerTransport.Factory transFactory = 
createSaslServerTransportFactory(saslProps);
+
+      return new TUGIAssumingTransportFactory(transFactory, 
clientValidationUGI);
+    }
+
+    /**
+     * Create a TSaslServerTransport.Factory that, upon connection of a client
+     * socket, negotiates a Kerberized SASL transport.
+     *
+     * @param saslProps Map of SASL properties
+     */
+    public TSaslServerTransport.Factory createSaslServerTransportFactory(
+        Map<String, String> saslProps) throws TTransportException {
+      // Parse out the kerberos principal, host, realm.
+      String kerberosName = clientValidationUGI.getUserName();
+      final String names[] = SaslRpcServer.splitKerberosName(kerberosName);
+      if (names.length != 3) {
+        throw new TTransportException("Kerberos principal should have 3 parts: 
" + kerberosName);
+      }
+
+      TSaslServerTransport.Factory transFactory = new 
TSaslServerTransport.Factory();
+      transFactory.addServerDefinition(
+          AuthMethod.KERBEROS.getMechanismName(),
+          names[0], names[1],  // two parts of kerberos principal
+          saslProps,
+          new SaslRpcServer.SaslGssCallbackHandler());
+      transFactory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(),
+          null, SaslRpcServer.SASL_DEFAULT_REALM,
+          saslProps, new SaslDigestCallbackHandler(secretManager));
+
+      return transFactory;
+    }
+
+    /**
+     * Wrap a TTransportFactory in such a way that, before processing any RPC, 
it
+     * assumes the UserGroupInformation of the user authenticated by
+     * the SASL transport.
+     */
+    public TTransportFactory wrapTransportFactory(TTransportFactory 
transFactory) {
+      return new TUGIAssumingTransportFactory(transFactory, realUgi);
+    }
+
+    /**
+     * Wrap a TProcessor in such a way that, before processing any RPC, it
+     * assumes the UserGroupInformation of the user authenticated by
+     * the SASL transport.
+     */
+
+    public TProcessor wrapProcessor(TProcessor processor) {
+      return new TUGIAssumingProcessor(processor, secretManager, true);
+    }
+
+    /**
+     * Wrap a TProcessor to capture the client information like connecting 
userid, ip etc
+     */
+
+    public TProcessor wrapNonAssumingProcessor(TProcessor processor) {
+      return new TUGIAssumingProcessor(processor, secretManager, false);
+    }
+
+    final static ThreadLocal<InetAddress> remoteAddress =
+        new ThreadLocal<InetAddress>() {
+
+      @Override
+      protected InetAddress initialValue() {
+        return null;
+      }
+    };
+
+    public InetAddress getRemoteAddress() {
+      return remoteAddress.get();
+    }
+
+    final static ThreadLocal<AuthenticationMethod> authenticationMethod =
+        new ThreadLocal<AuthenticationMethod>() {
+
+      @Override
+      protected AuthenticationMethod initialValue() {
+        return AuthenticationMethod.TOKEN;
+      }
+    };
+
+    private static ThreadLocal<String> remoteUser = new ThreadLocal<String> () 
{
+
+      @Override
+      protected String initialValue() {
+        return null;
+      }
+    };
+
+
+    public String getRemoteUser() {
+      return remoteUser.get();
+    }
+
+    private final static ThreadLocal<String> userAuthMechanism =
+        new ThreadLocal<String>() {
+
+      @Override
+      protected String initialValue() {
+        return AuthMethod.KERBEROS.getMechanismName();
+      }
+    };
+
+    public String getUserAuthMechanism() {
+      return userAuthMechanism.get();
+    }
+    /** CallbackHandler for SASL DIGEST-MD5 mechanism */
+    // This code is pretty much completely based on Hadoop's
+    // SaslRpcServer.SaslDigestCallbackHandler - the only reason we could not
+    // use that Hadoop class as-is was because it needs a Server.Connection 
object
+    // which is relevant in hadoop rpc but not here in the metastore - so the
+    // code below does not deal with the Connection Server.object.
+    static class SaslDigestCallbackHandler implements CallbackHandler {
+      private final DelegationTokenSecretManager secretManager;
+
+      public SaslDigestCallbackHandler(
+          DelegationTokenSecretManager secretManager) {
+        this.secretManager = secretManager;
+      }
+
+      private char[] getPassword(DelegationTokenIdentifier tokenid) throws 
InvalidToken {
+        return encodePassword(secretManager.retrievePassword(tokenid));
+      }
+
+      private char[] encodePassword(byte[] password) {
+        return new String(Base64.encodeBase64(password)).toCharArray();
+      }
+      /** {@inheritDoc} */
+
+      @Override
+      public void handle(Callback[] callbacks) throws InvalidToken,
+      UnsupportedCallbackException {
+        NameCallback nc = null;
+        PasswordCallback pc = null;
+        AuthorizeCallback ac = null;
+        for (Callback callback : callbacks) {
+          if (callback instanceof AuthorizeCallback) {
+            ac = (AuthorizeCallback) callback;
+          } else if (callback instanceof NameCallback) {
+            nc = (NameCallback) callback;
+          } else if (callback instanceof PasswordCallback) {
+            pc = (PasswordCallback) callback;
+          } else if (callback instanceof RealmCallback) {
+            continue; // realm is ignored
+          } else {
+            throw new UnsupportedCallbackException(callback,
+                "Unrecognized SASL DIGEST-MD5 Callback");
+          }
+        }
+        if (pc != null) {
+          DelegationTokenIdentifier tokenIdentifier = SaslRpcServer.
+              getIdentifier(nc.getDefaultName(), secretManager);
+          char[] password = getPassword(tokenIdentifier);
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("SASL server DIGEST-MD5 callback: setting password "
+                + "for client: " + tokenIdentifier.getUser());
+          }
+          pc.setPassword(password);
+        }
+        if (ac != null) {
+          String authid = ac.getAuthenticationID();
+          String authzid = ac.getAuthorizationID();
+          if (authid.equals(authzid)) {
+            ac.setAuthorized(true);
+          } else {
+            ac.setAuthorized(false);
+          }
+          if (ac.isAuthorized()) {
+            if (LOG.isDebugEnabled()) {
+              String username =
+                  SaslRpcServer.getIdentifier(authzid, 
secretManager).getUser().getUserName();
+              LOG.debug("SASL server DIGEST-MD5 callback: setting "
+                  + "canonicalized client ID: " + username);
+            }
+            ac.setAuthorizedID(authzid);
+          }
+        }
+      }
+    }
+
+    /**
+     * Processor that pulls the SaslServer object out of the transport, and
+     * assumes the remote user's UGI before calling through to the original
+     * processor.
+     *
+     * This is used on the server side to set the UGI for each specific call.
+     */
+    protected class TUGIAssumingProcessor implements TProcessor {
+      final TProcessor wrapped;
+      DelegationTokenSecretManager secretManager;
+      boolean useProxy;
+      TUGIAssumingProcessor(TProcessor wrapped, DelegationTokenSecretManager 
secretManager,
+          boolean useProxy) {
+        this.wrapped = wrapped;
+        this.secretManager = secretManager;
+        this.useProxy = useProxy;
+      }
+
+
+      @Override
+      public boolean process(final TProtocol inProt, final TProtocol outProt) 
throws TException {
+        TTransport trans = inProt.getTransport();
+        if (!(trans instanceof TSaslServerTransport)) {
+          throw new TException("Unexpected non-SASL transport " + 
trans.getClass());
+        }
+        TSaslServerTransport saslTrans = (TSaslServerTransport)trans;
+        SaslServer saslServer = saslTrans.getSaslServer();
+        String authId = saslServer.getAuthorizationID();
+        LOG.debug("AUTH ID ======>" + authId);
+        String endUser = authId;
+
+        Socket socket = 
((TSocket)(saslTrans.getUnderlyingTransport())).getSocket();
+        remoteAddress.set(socket.getInetAddress());
+
+        String mechanismName = saslServer.getMechanismName();
+        userAuthMechanism.set(mechanismName);
+        if 
(AuthMethod.PLAIN.getMechanismName().equalsIgnoreCase(mechanismName)) {
+          remoteUser.set(endUser);
+          return wrapped.process(inProt, outProt);
+        }
+
+        authenticationMethod.set(AuthenticationMethod.KERBEROS);
+        
if(AuthMethod.TOKEN.getMechanismName().equalsIgnoreCase(mechanismName)) {
+          try {
+            TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authId,
+                secretManager);
+            endUser = tokenId.getUser().getUserName();
+            authenticationMethod.set(AuthenticationMethod.TOKEN);
+          } catch (InvalidToken e) {
+            throw new TException(e.getMessage());
+          }
+        }
+
+        UserGroupInformation clientUgi = null;
+        try {
+          if (useProxy) {
+            clientUgi = UserGroupInformation.createProxyUser(
+                endUser, UserGroupInformation.getLoginUser());
+            remoteUser.set(clientUgi.getShortUserName());
+            LOG.debug("Set remoteUser :" + remoteUser.get());
+            return clientUgi.doAs(new PrivilegedExceptionAction<Boolean>() {
+
+              @Override
+              public Boolean run() {
+                try {
+                  return wrapped.process(inProt, outProt);
+                } catch (TException te) {
+                  throw new RuntimeException(te);
+                }
+              }
+            });
+          } else {
+            // use the short user name for the request
+            UserGroupInformation endUserUgi = 
UserGroupInformation.createRemoteUser(endUser);
+            remoteUser.set(endUserUgi.getShortUserName());
+            LOG.debug("Set remoteUser :" + remoteUser.get() + ", from endUser 
:" + endUser);
+            return wrapped.process(inProt, outProt);
+          }
+        } catch (RuntimeException rte) {
+          if (rte.getCause() instanceof TException) {
+            throw (TException)rte.getCause();
+          }
+          throw rte;
+        } catch (InterruptedException ie) {
+          throw new RuntimeException(ie); // unexpected!
+        } catch (IOException ioe) {
+          throw new RuntimeException(ioe); // unexpected!
+        }
+        finally {
+          if (clientUgi != null) {
+            try { FileSystem.closeAllForUGI(clientUgi); }
+            catch(IOException exception) {
+              LOG.error("Could not clean up file-system handles for UGI: " + 
clientUgi, exception);
+            }
+          }
+        }
+      }
+    }
+
+    /**
+     * A TransportFactory that wraps another one, but assumes a specified UGI
+     * before calling through.
+     *
+     * This is used on the server side to assume the server's Principal when 
accepting
+     * clients.
+     */
+    static class TUGIAssumingTransportFactory extends TTransportFactory {
+      private final UserGroupInformation ugi;
+      private final TTransportFactory wrapped;
+
+      public TUGIAssumingTransportFactory(TTransportFactory wrapped, 
UserGroupInformation ugi) {
+        assert wrapped != null;
+        assert ugi != null;
+        this.wrapped = wrapped;
+        this.ugi = ugi;
+      }
+
+
+      @Override
+      public TTransport getTransport(final TTransport trans) {
+        return ugi.doAs(new PrivilegedAction<TTransport>() {
+          @Override
+          public TTransport run() {
+            return wrapped.getTransport(trans);
+          }
+        });
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge23.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge23.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge23.java
new file mode 100644
index 0000000..dc76535
--- /dev/null
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge23.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.security;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SaslRpcServer;
+
+/**
+ * Functions that bridge Thrift's SASL transports to Hadoop's SASL callback
+ * handlers and authentication classes.
+ *
+ * This is a 0.23/2.x specific implementation
+ */
+public class HadoopThriftAuthBridge23 extends HadoopThriftAuthBridge {
+
+  private static Field SASL_PROPS_FIELD;
+  private static Class<?> SASL_PROPERTIES_RESOLVER_CLASS;
+  private static Method RES_GET_INSTANCE_METHOD;
+  private static Method GET_DEFAULT_PROP_METHOD;
+  static {
+    SASL_PROPERTIES_RESOLVER_CLASS = null;
+    SASL_PROPS_FIELD = null;
+    final String SASL_PROP_RES_CLASSNAME = 
"org.apache.hadoop.security.SaslPropertiesResolver";
+    try {
+      SASL_PROPERTIES_RESOLVER_CLASS = Class.forName(SASL_PROP_RES_CLASSNAME);
+
+    } catch (ClassNotFoundException e) {
+    }
+
+    if (SASL_PROPERTIES_RESOLVER_CLASS != null) {
+      // found the class, so this would be hadoop version 2.4 or newer (See
+      // HADOOP-10221, HADOOP-10451)
+      try {
+        RES_GET_INSTANCE_METHOD = 
SASL_PROPERTIES_RESOLVER_CLASS.getMethod("getInstance",
+            Configuration.class);
+        GET_DEFAULT_PROP_METHOD = 
SASL_PROPERTIES_RESOLVER_CLASS.getMethod("getDefaultProperties");
+      } catch (Exception e) {
+        // this must be hadoop 2.4 , where getDefaultProperties was protected
+      }
+    }
+
+    if (SASL_PROPERTIES_RESOLVER_CLASS == null || GET_DEFAULT_PROP_METHOD == 
null) {
+      // this must be a hadoop 2.4 version or earlier.
+      // Resorting to the earlier method of getting the properties, which uses 
SASL_PROPS field
+      try {
+        SASL_PROPS_FIELD = SaslRpcServer.class.getField("SASL_PROPS");
+      } catch (NoSuchFieldException e) {
+        // Older version of hadoop should have had this field
+        throw new IllegalStateException("Error finding hadoop SASL_PROPS field 
in "
+            + SaslRpcServer.class.getSimpleName(), e);
+      }
+    }
+  }
+
+  // TODO RIVEN switch this back to package level when we can move 
TestHadoopAuthBridge23 into
+  // riven.
+  // Package permission so that HadoopThriftAuthBridge can construct it but 
others cannot.
+  protected HadoopThriftAuthBridge23() {
+
+  }
+
+  /**
+   * Read and return Hadoop SASL configuration which can be configured using
+   * "hadoop.rpc.protection"
+   *
+   * @param conf
+   * @return Hadoop SASL configuration
+   */
+  @SuppressWarnings("unchecked")
+  @Override
+  public Map<String, String> getHadoopSaslProperties(Configuration conf) {
+    if (SASL_PROPS_FIELD != null) {
+      // hadoop 2.4 and earlier way of finding the sasl property settings
+      // Initialize the SaslRpcServer to ensure QOP parameters are read from
+      // conf
+      SaslRpcServer.init(conf);
+      try {
+        return (Map<String, String>) SASL_PROPS_FIELD.get(null);
+      } catch (Exception e) {
+        throw new IllegalStateException("Error finding hadoop SASL 
properties", e);
+      }
+    }
+    // 2.5 and later way of finding sasl property
+    try {
+      Configurable saslPropertiesResolver = (Configurable) 
RES_GET_INSTANCE_METHOD.invoke(null,
+          conf);
+      saslPropertiesResolver.setConf(conf);
+      return (Map<String, String>) 
GET_DEFAULT_PROP_METHOD.invoke(saslPropertiesResolver);
+    } catch (Exception e) {
+      throw new IllegalStateException("Error finding hadoop SASL properties", 
e);
+    }
+  }
+
+}

Reply via email to