http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/TimeValidator.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/TimeValidator.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/TimeValidator.java new file mode 100644 index 0000000..75f16cb --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/TimeValidator.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.conf; + +import java.util.concurrent.TimeUnit; + +public class TimeValidator implements Validator { + + private final TimeUnit unit; + private final Long min; + private final boolean minInclusive; + + private final Long max; + private final boolean maxInclusive; + + public TimeValidator(TimeUnit unit) { + this(unit, null, false, null, false); + } + + public TimeValidator(TimeUnit unit, Long min, boolean minInclusive, Long max, + boolean maxInclusive) { + this.unit = unit; + this.min = min; + this.minInclusive = minInclusive; + this.max = max; + this.maxInclusive = maxInclusive; + } + + @Override + public void validate(String value) { + // First just check that this translates + TimeUnit defaultUnit = unit; + long time = MetastoreConf.convertTimeStr(value, defaultUnit, defaultUnit); + if (min != null) { + if (minInclusive ? time < min : time <= min) { + throw new IllegalArgumentException(value + " is smaller than minimum " + min + + MetastoreConf.timeAbbreviationFor(defaultUnit)); + } + } + + if (max != null) { + if (maxInclusive ? time > max : time >= max) { + throw new IllegalArgumentException(value + " is larger than maximum " + max + + MetastoreConf.timeAbbreviationFor(defaultUnit)); + } + } + } + + private String timeString(long time, TimeUnit timeUnit) { + return time + " " + MetastoreConf.timeAbbreviationFor(timeUnit); + } +}
http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/CompositePartitionSpecProxy.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/CompositePartitionSpecProxy.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/CompositePartitionSpecProxy.java new file mode 100644 index 0000000..91d790a --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/CompositePartitionSpecProxy.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.partition.spec; + +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PartitionSpec; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; + +/** + * Implementation of PartitionSpecProxy that composes a list of PartitionSpecProxy. + */ +public class CompositePartitionSpecProxy extends PartitionSpecProxy { + + private String catName; + private String dbName; + private String tableName; + private List<PartitionSpec> partitionSpecs; + private List<PartitionSpecProxy> partitionSpecProxies; + private int size = 0; + + protected CompositePartitionSpecProxy(List<PartitionSpec> partitionSpecs) throws MetaException { + this.partitionSpecs = partitionSpecs; + if (partitionSpecs.isEmpty()) { + catName = null; + dbName = null; + tableName = null; + } + else { + catName = partitionSpecs.get(0).getCatName(); + dbName = partitionSpecs.get(0).getDbName(); + tableName = partitionSpecs.get(0).getTableName(); + this.partitionSpecProxies = new ArrayList<>(partitionSpecs.size()); + for (PartitionSpec partitionSpec : partitionSpecs) { + PartitionSpecProxy partitionSpecProxy = Factory.get(partitionSpec); + this.partitionSpecProxies.add(partitionSpecProxy); + size += partitionSpecProxy.size(); + } + } + // Assert class-invariant. + assert isValid() : "Invalid CompositePartitionSpecProxy!"; + } + + @Deprecated + protected CompositePartitionSpecProxy(String dbName, String tableName, List<PartitionSpec> partitionSpecs) throws MetaException { + this(DEFAULT_CATALOG_NAME, dbName, tableName, partitionSpecs); + + } + + protected CompositePartitionSpecProxy(String catName, String dbName, String tableName, + List<PartitionSpec> partitionSpecs) throws MetaException { + this.catName = catName; + this.dbName = dbName; + this.tableName = tableName; + this.partitionSpecs = partitionSpecs; + this.partitionSpecProxies = new ArrayList<>(partitionSpecs.size()); + for (PartitionSpec partitionSpec : partitionSpecs) { + this.partitionSpecProxies.add(PartitionSpecProxy.Factory.get(partitionSpec)); + } + // Assert class-invariant. + assert isValid() : "Invalid CompositePartitionSpecProxy!"; + } + + private boolean isValid() { + for (PartitionSpecProxy partitionSpecProxy : partitionSpecProxies) { + if (partitionSpecProxy instanceof CompositePartitionSpecProxy) { + return false; + } + } + + return true; + } + + @Override + public int size() { + return size; + } + + /** + * Iterator to iterate over all Partitions, across all PartitionSpecProxy instances within the Composite. + */ + public static class Iterator implements PartitionIterator { + + private CompositePartitionSpecProxy composite; + private List<PartitionSpecProxy> partitionSpecProxies; + private int index = -1; // Index into partitionSpecs. + private PartitionIterator iterator = null; + + public Iterator(CompositePartitionSpecProxy composite) { + this.composite = composite; + this.partitionSpecProxies = composite.partitionSpecProxies; + + if (this.partitionSpecProxies != null && !this.partitionSpecProxies.isEmpty()) { + this.index = 0; + this.iterator = this.partitionSpecProxies.get(this.index).getPartitionIterator(); + } + } + + @Override + public boolean hasNext() { + + if (iterator == null) { + return false; + } + + if (iterator.hasNext()) { + return true; + } + + while ( ++index < partitionSpecProxies.size() + && !(iterator = partitionSpecProxies.get(index).getPartitionIterator()).hasNext()); + + return index < partitionSpecProxies.size() && iterator.hasNext(); + + } + + @Override + public Partition next() { + + if (iterator.hasNext()) + return iterator.next(); + + while (++index < partitionSpecProxies.size() + && !(iterator = partitionSpecProxies.get(index).getPartitionIterator()).hasNext()); + + return index == partitionSpecProxies.size()? null : iterator.next(); + + } + + @Override + public void remove() { + iterator.remove(); + } + + @Override + public Partition getCurrent() { + return iterator.getCurrent(); + } + + @Override + public String getCatName() { + return composite.getCatName(); + } + + @Override + public String getDbName() { + return composite.dbName; + } + + @Override + public String getTableName() { + return composite.tableName; + } + + @Override + public Map<String, String> getParameters() { + return iterator.getParameters(); + } + + @Override + public void setParameters(Map<String, String> parameters) { + iterator.setParameters(parameters); + } + + @Override + public String getLocation() { + return iterator.getLocation(); + } + + @Override + public void putToParameters(String key, String value) { + iterator.putToParameters(key, value); + } + + @Override + public void setCreateTime(long time) { + iterator.setCreateTime(time); + } + } + + @Override + public void setCatName(String catName) { + this.catName = catName; + for (PartitionSpecProxy partSpecProxy : partitionSpecProxies) { + partSpecProxy.setCatName(catName); + } + + } + + @Override + public void setDbName(String dbName) { + this.dbName = dbName; + for (PartitionSpecProxy partSpecProxy : partitionSpecProxies) { + partSpecProxy.setDbName(dbName); + } + } + + @Override + public void setTableName(String tableName) { + this.tableName = tableName; + for (PartitionSpecProxy partSpecProxy : partitionSpecProxies) { + partSpecProxy.setTableName(tableName); + } + } + + @Override + public String getCatName() { + return catName; + } + + @Override + public String getDbName() { + return dbName; + } + + @Override + public String getTableName() { + return tableName; + } + + @Override + public PartitionIterator getPartitionIterator() { + return new Iterator(this); + } + + @Override + public List<PartitionSpec> toPartitionSpec() { + return partitionSpecs; + } + + @Override + public void setRootLocation(String rootLocation) throws MetaException { + for (PartitionSpecProxy partSpecProxy : partitionSpecProxies) { + partSpecProxy.setRootLocation(rootLocation); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionListComposingSpecProxy.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionListComposingSpecProxy.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionListComposingSpecProxy.java new file mode 100644 index 0000000..585b8fd --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionListComposingSpecProxy.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.partition.spec; + +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PartitionListComposingSpec; +import org.apache.hadoop.hive.metastore.api.PartitionSpec; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * PartitionSpecProxy implementation that composes a List of Partitions. + */ +public class PartitionListComposingSpecProxy extends PartitionSpecProxy { + + private PartitionSpec partitionSpec; + + protected PartitionListComposingSpecProxy(PartitionSpec partitionSpec) throws MetaException { + assert partitionSpec.isSetPartitionList() + : "Partition-list should have been set."; + PartitionListComposingSpec partitionList = partitionSpec.getPartitionList(); + if (partitionList == null || partitionList.getPartitions() == null) { + throw new MetaException("The partition list cannot be null."); + } + for (Partition partition : partitionList.getPartitions()) { + if (partition == null) { + throw new MetaException("Partition cannot be null."); + } + if (partition.getValues() == null || partition.getValues().isEmpty()) { + throw new MetaException("The partition value list cannot be null or empty."); + } + if (partition.getValues().contains(null)) { + throw new MetaException("Partition value cannot be null."); + } + } + this.partitionSpec = partitionSpec; + } + + @Override + public String getCatName() { + return partitionSpec.getCatName(); + } + + @Override + public String getDbName() { + return partitionSpec.getDbName(); + } + + @Override + public String getTableName() { + return partitionSpec.getTableName(); + } + + @Override + public PartitionIterator getPartitionIterator() { + return new Iterator(this); + } + + @Override + public List<PartitionSpec> toPartitionSpec() { + return Arrays.asList(partitionSpec); + } + + @Override + public int size() { + return partitionSpec.getPartitionList().getPartitionsSize(); + } + + @Override + public void setCatName(String catName) { + partitionSpec.setCatName(catName); + for (Partition partition : partitionSpec.getPartitionList().getPartitions()) { + partition.setCatName(catName); + } + } + + @Override + public void setDbName(String dbName) { + partitionSpec.setDbName(dbName); + for (Partition partition : partitionSpec.getPartitionList().getPartitions()) { + partition.setDbName(dbName); + } + } + + @Override + public void setTableName(String tableName) { + partitionSpec.setTableName(tableName); + for (Partition partition : partitionSpec.getPartitionList().getPartitions()) { + partition.setTableName(tableName); + } + } + + @Override + public void setRootLocation(String newRootPath) throws MetaException { + + String oldRootPath = partitionSpec.getRootPath(); + + if (oldRootPath == null) { + throw new MetaException("No common root-path. Can't replace root-path!"); + } + + if (newRootPath == null) { + throw new MetaException("Root path cannot be null."); + } + + for (Partition partition : partitionSpec.getPartitionList().getPartitions()) { + String location = partition.getSd().getLocation(); + if (location.startsWith(oldRootPath)) { + partition.getSd().setLocation(location.replace(oldRootPath, newRootPath)); + } + else { + throw new MetaException("Common root-path not found. Can't replace root-path!"); + } + } + } + + public static class Iterator implements PartitionIterator { + + PartitionListComposingSpecProxy partitionSpecProxy; + List<Partition> partitionList; + int index; + + public Iterator(PartitionListComposingSpecProxy partitionSpecProxy) { + this.partitionSpecProxy = partitionSpecProxy; + this.partitionList = partitionSpecProxy.partitionSpec.getPartitionList().getPartitions(); + this.index = 0; + } + + @Override + public Partition getCurrent() { + return partitionList.get(index); + } + + @Override + public String getCatName() { + return partitionSpecProxy.getCatName(); + } + + @Override + public String getDbName() { + return partitionSpecProxy.getDbName(); + } + + @Override + public String getTableName() { + return partitionSpecProxy.getTableName(); + } + + @Override + public Map<String, String> getParameters() { + return partitionList.get(index).getParameters(); + } + + @Override + public void setParameters(Map<String, String> parameters) { + partitionList.get(index).setParameters(parameters); + } + + @Override + public String getLocation() { + return partitionList.get(index).getSd().getLocation(); + } + + @Override + public void putToParameters(String key, String value) { + partitionList.get(index).putToParameters(key, value); + } + + @Override + public void setCreateTime(long time) { + partitionList.get(index).setCreateTime((int)time); + } + + @Override + public boolean hasNext() { + return index < partitionList.size(); + } + + @Override + public Partition next() { + return partitionList.get(index++); + } + + @Override + public void remove() { + partitionList.remove(index); + } + } // class Iterator; + +} // class PartitionListComposingSpecProxy; http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionSpecProxy.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionSpecProxy.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionSpecProxy.java new file mode 100644 index 0000000..1866446 --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionSpecProxy.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.partition.spec; + +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PartitionSpec; + +import java.util.List; +import java.util.Map; + +/** + * Polymorphic proxy class, equivalent to org.apache.hadoop.hive.metastore.api.PartitionSpec. + */ +public abstract class PartitionSpecProxy { + + /** + * The number of Partition instances represented by the PartitionSpec. + * @return Number of partitions. + */ + public abstract int size(); + + /** + * Set catalog name. + * @param catName catalog name. + */ + public abstract void setCatName(String catName); + + /** + * Setter for name of the DB. + * @param dbName The name of the DB. + */ + public abstract void setDbName(String dbName); + + /** + * Setter for name of the table. + * @param tableName The name of the table. + */ + public abstract void setTableName(String tableName); + + /** + * Get catalog name. + * @return catalog name. + */ + public abstract String getCatName(); + + /** + * Getter for name of the DB. + * @return The name of the DB. + */ + public abstract String getDbName(); + + /** + * Getter for name of the table. + * @return The name of the table. + */ + public abstract String getTableName(); + + /** + * Iterator to the (virtual) sequence of Partitions represented by the PartitionSpec. + * @return A PartitionIterator to the beginning of the Partition sequence. + */ + public abstract PartitionIterator getPartitionIterator(); + + /** + * Conversion to a org.apache.hadoop.hive.metastore.api.PartitionSpec sequence. + * @return A list of org.apache.hadoop.hive.metastore.api.PartitionSpec instances. + */ + public abstract List<PartitionSpec> toPartitionSpec(); + + /** + * Setter for the common root-location for all partitions in the PartitionSet. + * @param rootLocation The new common root-location. + * @throws MetaException + */ + public abstract void setRootLocation(String rootLocation) throws MetaException; + + /** + * Factory to construct PartitionSetProxy instances, from PartitionSets. + */ + public static class Factory { + + /** + * Factory method. Construct PartitionSpecProxy from raw PartitionSpec. + * @param partSpec Raw PartitionSpec from the Thrift API. + * @return PartitionSpecProxy instance. + * @throws MetaException + */ + public static PartitionSpecProxy get(PartitionSpec partSpec) throws MetaException { + + if (partSpec == null) { + return null; + } + else + if (partSpec.isSetPartitionList()) { + return new PartitionListComposingSpecProxy(partSpec); + } + else + if (partSpec.isSetSharedSDPartitionSpec()) { + return new PartitionSpecWithSharedSDProxy(partSpec); + } + + assert false : "Unsupported type of PartitionSpec!"; + return null; + } + + /** + * Factory method to construct CompositePartitionSpecProxy. + * @param partitionSpecs List of raw PartitionSpecs. + * @return A CompositePartitionSpecProxy instance. + * @throws MetaException + */ + public static PartitionSpecProxy get(List<PartitionSpec> partitionSpecs) throws MetaException { + return new CompositePartitionSpecProxy(partitionSpecs); + } + + } // class Factory; + + /** + * Iterator to iterate over Partitions corresponding to a PartitionSpec. + */ + public interface PartitionIterator extends java.util.Iterator<Partition> { + + /** + * Getter for the Partition "pointed to" by the iterator. + * Like next(), but without advancing the iterator. + * @return The "current" partition object. + */ + Partition getCurrent(); + + /** + * Get the catalog name. + * @return catalog name. + */ + String getCatName(); + + /** + * Getter for the name of the DB. + * @return Name of the DB. + */ + String getDbName(); + + /** + * Getter for the name of the table. + * @return Name of the table. + */ + String getTableName(); + + /** + * Getter for the Partition parameters. + * @return Key-value map for Partition-level parameters. + */ + Map<String, String> getParameters(); + + /** + * Setter for Partition parameters. + * @param parameters Key-value map fo Partition-level parameters. + */ + void setParameters(Map<String, String> parameters); + + /** + * Insert an individual parameter to a Partition's parameter-set. + * @param key + * @param value + */ + void putToParameters(String key, String value); + + /** + * Getter for Partition-location. + * @return Partition's location. + */ + String getLocation(); + + /** + * Setter for creation-time of a Partition. + * @param time Timestamp indicating the time of creation of the Partition. + */ + void setCreateTime(long time); + + } // class PartitionIterator; + + /** + * Simple wrapper class for pre-constructed Partitions, to expose a PartitionIterator interface, + * where the iterator-sequence consists of just one Partition. + */ + public static class SimplePartitionWrapperIterator implements PartitionIterator { + private Partition partition; + public SimplePartitionWrapperIterator(Partition partition) {this.partition = partition;} + + @Override public Partition getCurrent() { return partition; } + @Override public String getCatName() { return partition.getCatName(); } + @Override public String getDbName() { return partition.getDbName(); } + @Override public String getTableName() { return partition.getTableName(); } + @Override public Map<String, String> getParameters() { return partition.getParameters(); } + @Override public void setParameters(Map<String, String> parameters) { partition.setParameters(parameters); } + @Override public void putToParameters(String key, String value) { partition.putToParameters(key, value);} + @Override public String getLocation() { return partition.getSd().getLocation(); } + @Override public void setCreateTime(long time) { partition.setCreateTime((int)time);} + @Override public boolean hasNext() { return false; } // No next partition. + @Override public Partition next() { return null; } // No next partition. + @Override public void remove() {} // Do nothing. + } // P + +} // class PartitionSpecProxy; http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionSpecWithSharedSDProxy.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionSpecWithSharedSDProxy.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionSpecWithSharedSDProxy.java new file mode 100644 index 0000000..5b46206 --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/partition/spec/PartitionSpecWithSharedSDProxy.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.partition.spec; + +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PartitionSpec; +import org.apache.hadoop.hive.metastore.api.PartitionSpecWithSharedSD; +import org.apache.hadoop.hive.metastore.api.PartitionWithoutSD; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * Subclass of PartitionSpecProxy that pulls out commonality of + * StorageDescriptor properties within a Partition-list into a common + * StorageDescriptor instance. + */ +public class PartitionSpecWithSharedSDProxy extends PartitionSpecProxy { + + private PartitionSpec partitionSpec; + + public PartitionSpecWithSharedSDProxy(PartitionSpec partitionSpec) throws MetaException { + assert partitionSpec.isSetSharedSDPartitionSpec(); + if (partitionSpec.getSharedSDPartitionSpec().getSd() == null) { + throw new MetaException("The shared storage descriptor must be set."); + } + this.partitionSpec = partitionSpec; + } + + @Override + public int size() { + return partitionSpec.getSharedSDPartitionSpec().getPartitionsSize(); + } + + @Override + public void setCatName(String catName) { + partitionSpec.setCatName(catName); + } + + @Override + public void setDbName(String dbName) { + partitionSpec.setDbName(dbName); + } + + @Override + public void setTableName(String tableName) { + partitionSpec.setTableName(tableName); + } + + @Override + public String getCatName() { + return partitionSpec.getCatName(); + } + + @Override + public String getDbName() { + return partitionSpec.getDbName(); + } + + @Override + public String getTableName() { + return partitionSpec.getTableName(); + } + + public PartitionIterator getPartitionIterator() { + return new Iterator(this); + } + + @Override + public List<PartitionSpec> toPartitionSpec() { + return Arrays.asList(partitionSpec); + } + + @Override + public void setRootLocation(String rootLocation) throws MetaException { + partitionSpec.setRootPath(rootLocation); + partitionSpec.getSharedSDPartitionSpec().getSd().setLocation(rootLocation); + } + + /** + * Iterator implementation to iterate over all Partitions within the PartitionSpecWithSharedSDProxy. + */ + public static class Iterator implements PartitionIterator { + + private PartitionSpecWithSharedSDProxy partitionSpecWithSharedSDProxy; + private PartitionSpecWithSharedSD pSpec; + private int index; + + Iterator(PartitionSpecWithSharedSDProxy partitionSpecWithSharedSDProxy) { + this.partitionSpecWithSharedSDProxy = partitionSpecWithSharedSDProxy; + this.pSpec = this.partitionSpecWithSharedSDProxy.partitionSpec.getSharedSDPartitionSpec(); + this.index = 0; + } + + @Override + public boolean hasNext() { + return index < pSpec.getPartitions().size(); + } + + @Override + public Partition next() { + Partition partition = getCurrent(); + ++index; + return partition; + } + + @Override + public void remove() { + pSpec.getPartitions().remove(index); + } + + @Override + public Partition getCurrent() { + PartitionWithoutSD partWithoutSD = pSpec.getPartitions().get(index); + StorageDescriptor partSD = new StorageDescriptor(pSpec.getSd()); + partSD.setLocation(partSD.getLocation() + partWithoutSD.getRelativePath()); + + Partition p = new Partition( + partWithoutSD.getValues(), + partitionSpecWithSharedSDProxy.partitionSpec.getDbName(), + partitionSpecWithSharedSDProxy.partitionSpec.getTableName(), + partWithoutSD.getCreateTime(), + partWithoutSD.getLastAccessTime(), + partSD, + partWithoutSD.getParameters() + ); + p.setCatName(partitionSpecWithSharedSDProxy.partitionSpec.getCatName()); + return p; + } + + @Override + public String getCatName() { + return partitionSpecWithSharedSDProxy.partitionSpec.getCatName(); + } + + @Override + public String getDbName() { + return partitionSpecWithSharedSDProxy.partitionSpec.getDbName(); + } + + @Override + public String getTableName() { + return partitionSpecWithSharedSDProxy.partitionSpec.getTableName(); + } + + @Override + public Map<String, String> getParameters() { + return pSpec.getPartitions().get(index).getParameters(); + } + + @Override + public void setParameters(Map<String, String> parameters) { + pSpec.getPartitions().get(index).setParameters(parameters); + } + + @Override + public String getLocation() { + return pSpec.getSd().getLocation() + pSpec.getPartitions().get(index).getRelativePath(); + } + + @Override + public void putToParameters(String key, String value) { + pSpec.getPartitions().get(index).putToParameters(key, value); + } + + @Override + public void setCreateTime(long time) { + pSpec.getPartitions().get(index).setCreateTime((int)time); + } + + } // static class Iterator; + +} http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenIdentifier.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenIdentifier.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenIdentifier.java new file mode 100644 index 0000000..ba6c7e3 --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenIdentifier.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.security; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; + +/** + * A delegation token identifier that is specific to Hive. + */ +public class DelegationTokenIdentifier + extends AbstractDelegationTokenIdentifier { + public static final Text HIVE_DELEGATION_KIND = new Text("HIVE_DELEGATION_TOKEN"); + + /** + * Create an empty delegation token identifier for reading into. + */ + public DelegationTokenIdentifier() { + } + + /** + * Create a new delegation token identifier + * @param owner the effective username of the token owner + * @param renewer the username of the renewer + * @param realUser the real username of the token owner + */ + public DelegationTokenIdentifier(Text owner, Text renewer, Text realUser) { + super(owner, renewer, realUser); + } + + @Override + public Text getKind() { + return HIVE_DELEGATION_KIND; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSecretManager.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSecretManager.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSecretManager.java new file mode 100644 index 0000000..af88107 --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSecretManager.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.security; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; + +/** + * A Hive specific delegation token secret manager. + * The secret manager is responsible for generating and accepting the password + * for each token. + */ +public class DelegationTokenSecretManager + extends AbstractDelegationTokenSecretManager<DelegationTokenIdentifier> { + + /** + * Create a secret manager + * @param delegationKeyUpdateInterval the number of seconds for rolling new + * secret keys. + * @param delegationTokenMaxLifetime the maximum lifetime of the delegation + * tokens + * @param delegationTokenRenewInterval how often the tokens must be renewed + * @param delegationTokenRemoverScanInterval how often the tokens are scanned + * for expired tokens + */ + public DelegationTokenSecretManager(long delegationKeyUpdateInterval, + long delegationTokenMaxLifetime, + long delegationTokenRenewInterval, + long delegationTokenRemoverScanInterval) { + super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, + delegationTokenRenewInterval, delegationTokenRemoverScanInterval); + } + + @Override + public DelegationTokenIdentifier createIdentifier() { + return new DelegationTokenIdentifier(); + } + + /** + * Verify token string + * @param tokenStrForm + * @return user name + * @throws IOException + */ + public synchronized String verifyDelegationToken(String tokenStrForm) throws IOException { + Token<DelegationTokenIdentifier> t = new Token<>(); + t.decodeFromUrlString(tokenStrForm); + + DelegationTokenIdentifier id = getTokenIdentifier(t); + verifyToken(id, t.getPassword()); + return id.getUser().getShortUserName(); + } + + protected DelegationTokenIdentifier getTokenIdentifier(Token<DelegationTokenIdentifier> token) + throws IOException { + // turn bytes back into identifier for cache lookup + ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); + DataInputStream in = new DataInputStream(buf); + DelegationTokenIdentifier id = createIdentifier(); + id.readFields(in); + return id; + } + + public synchronized void cancelDelegationToken(String tokenStrForm) throws IOException { + Token<DelegationTokenIdentifier> t= new Token<>(); + t.decodeFromUrlString(tokenStrForm); + String user = UserGroupInformation.getCurrentUser().getUserName(); + cancelToken(t, user); + } + + public synchronized long renewDelegationToken(String tokenStrForm) throws IOException { + Token<DelegationTokenIdentifier> t= new Token<>(); + t.decodeFromUrlString(tokenStrForm); + //when a token is created the renewer of the token is stored + //as shortName in AbstractDelegationTokenIdentifier.setRenewer() + //this seems like an inconsistency because while cancelling the token + //it uses the shortname to compare the renewer while it does not use + //shortname during token renewal. Use getShortUserName() until its fixed + //in HADOOP-15068 + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + return renewToken(t, user); + } + + public synchronized String getDelegationToken(final String ownerStr, final String renewer) throws IOException { + if (ownerStr == null) { + throw new RuntimeException("Delegation token owner is null"); + } + Text owner = new Text(ownerStr); + Text realUser = null; + UserGroupInformation currentUgi = UserGroupInformation.getCurrentUser(); + if (currentUgi.getUserName() != null) { + realUser = new Text(currentUgi.getUserName()); + } + DelegationTokenIdentifier ident = + new DelegationTokenIdentifier(owner, new Text(renewer), realUser); + Token<DelegationTokenIdentifier> t = new Token<>( + ident, this); + return t.encodeToUrlString(); + } + + public String getUserFromToken(String tokenStr) throws IOException { + Token<DelegationTokenIdentifier> delegationToken = new Token<>(); + delegationToken.decodeFromUrlString(tokenStr); + + ByteArrayInputStream buf = new ByteArrayInputStream(delegationToken.getIdentifier()); + DataInputStream in = new DataInputStream(buf); + DelegationTokenIdentifier id = createIdentifier(); + id.readFields(in); + return id.getUser().getShortUserName(); + } +} + http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSelector.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSelector.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSelector.java new file mode 100644 index 0000000..51b21fa --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenSelector.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.security; + +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector; + +/** + * A delegation token that is specialized for Hive + */ + +public class DelegationTokenSelector + extends AbstractDelegationTokenSelector<DelegationTokenIdentifier>{ + + public DelegationTokenSelector() { + super(DelegationTokenIdentifier.HIVE_DELEGATION_KIND); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge.java new file mode 100644 index 0000000..b21b072 --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge.java @@ -0,0 +1,700 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.security; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.Socket; +import java.net.UnknownHostException; +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; +import java.util.Locale; +import java.util.Map; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.AuthorizeCallback; +import javax.security.sasl.RealmCallback; +import javax.security.sasl.RealmChoiceCallback; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; + +import org.apache.commons.codec.binary.Base64; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.SaslRpcServer.AuthMethod; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.thrift.TException; +import org.apache.thrift.TProcessor; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSaslClientTransport; +import org.apache.thrift.transport.TSaslServerTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.apache.thrift.transport.TTransportFactory; + +/** + * Functions that bridge Thrift's SASL transports to Hadoop's + * SASL callback handlers and authentication classes. + * HIVE-11378 This class is not directly used anymore. It now exists only as a shell to be + * extended by HadoopThriftAuthBridge23 in 0.23 shims. I have made it abstract + * to avoid maintenance errors. + */ +public abstract class HadoopThriftAuthBridge { + private static final Logger LOG = LoggerFactory.getLogger(HadoopThriftAuthBridge.class); + + // We want to have only one auth bridge. In the past this was handled by ShimLoader, but since + // we're no longer using that we'll do it here. + private static HadoopThriftAuthBridge self = null; + + public static HadoopThriftAuthBridge getBridge() { + if (self == null) { + synchronized (HadoopThriftAuthBridge.class) { + if (self == null) self = new HadoopThriftAuthBridge23(); + } + } + return self; + } + + public Client createClient() { + return new Client(); + } + + public Client createClientWithConf(String authMethod) { + UserGroupInformation ugi; + try { + ugi = UserGroupInformation.getLoginUser(); + } catch(IOException e) { + throw new IllegalStateException("Unable to get current login user: " + e, e); + } + if (loginUserHasCurrentAuthMethod(ugi, authMethod)) { + LOG.debug("Not setting UGI conf as passed-in authMethod of " + authMethod + " = current."); + return new Client(); + } else { + LOG.debug("Setting UGI conf as passed-in authMethod of " + authMethod + " != current."); + Configuration conf = new Configuration(); + conf.set(HADOOP_SECURITY_AUTHENTICATION, authMethod); + UserGroupInformation.setConfiguration(conf); + return new Client(); + } + } + + public Server createServer(String keytabFile, String principalConf, String clientConf) throws TTransportException { + return new Server(keytabFile, principalConf, clientConf); + } + + + public String getServerPrincipal(String principalConfig, String host) + throws IOException { + String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host); + String names[] = SaslRpcServer.splitKerberosName(serverPrincipal); + if (names.length != 3) { + throw new IOException( + "Kerberos principal name does NOT have the expected hostname part: " + + serverPrincipal); + } + return serverPrincipal; + } + + /** + * Method to get canonical-ized hostname, given a hostname (possibly a CNAME). + * This should allow for service-principals to use simplified CNAMEs. + * @param hostName The hostname to be canonical-ized. + * @return Given a CNAME, the canonical-ized hostname is returned. If not found, the original hostname is returned. + */ + public String getCanonicalHostName(String hostName) { + try { + return InetAddress.getByName(hostName).getCanonicalHostName(); + } + catch(UnknownHostException exception) { + LOG.warn("Could not retrieve canonical hostname for " + hostName, exception); + return hostName; + } + } + + public UserGroupInformation getCurrentUGIWithConf(String authMethod) + throws IOException { + UserGroupInformation ugi; + try { + ugi = UserGroupInformation.getCurrentUser(); + } catch(IOException e) { + throw new IllegalStateException("Unable to get current user: " + e, e); + } + if (loginUserHasCurrentAuthMethod(ugi, authMethod)) { + LOG.debug("Not setting UGI conf as passed-in authMethod of " + authMethod + " = current."); + return ugi; + } else { + LOG.debug("Setting UGI conf as passed-in authMethod of " + authMethod + " != current."); + Configuration conf = new Configuration(); + conf.set(HADOOP_SECURITY_AUTHENTICATION, authMethod); + UserGroupInformation.setConfiguration(conf); + return UserGroupInformation.getCurrentUser(); + } + } + + /** + * Return true if the current login user is already using the given authMethod. + * + * Used above to ensure we do not create a new Configuration object and as such + * lose other settings such as the cluster to which the JVM is connected. Required + * for oozie since it does not have a core-site.xml see HIVE-7682 + */ + private boolean loginUserHasCurrentAuthMethod(UserGroupInformation ugi, String sAuthMethod) { + AuthenticationMethod authMethod; + try { + // based on SecurityUtil.getAuthenticationMethod() + authMethod = Enum.valueOf(AuthenticationMethod.class, sAuthMethod.toUpperCase(Locale.ENGLISH)); + } catch (IllegalArgumentException iae) { + throw new IllegalArgumentException("Invalid attribute value for " + + HADOOP_SECURITY_AUTHENTICATION + " of " + sAuthMethod, iae); + } + LOG.debug("Current authMethod = " + ugi.getAuthenticationMethod()); + return ugi.getAuthenticationMethod().equals(authMethod); + } + + + /** + * Read and return Hadoop SASL configuration which can be configured using + * "hadoop.rpc.protection" + * @param conf + * @return Hadoop SASL configuration + */ + + public abstract Map<String, String> getHadoopSaslProperties(Configuration conf); + + public static class Client { + /** + * Create a client-side SASL transport that wraps an underlying transport. + * + * @param methodStr The authentication method to use. Currently only KERBEROS is + * supported. + * @param principalConfig The Kerberos principal of the target server. + * @param underlyingTransport The underlying transport mechanism, usually a TSocket. + * @param saslProps the sasl properties to create the client with + */ + + + public TTransport createClientTransport( + String principalConfig, String host, + String methodStr, String tokenStrForm, final TTransport underlyingTransport, + final Map<String, String> saslProps) throws IOException { + final AuthMethod method = AuthMethod.valueOf(AuthMethod.class, methodStr); + + TTransport saslTransport = null; + switch (method) { + case DIGEST: + Token<DelegationTokenIdentifier> t= new Token<>(); + t.decodeFromUrlString(tokenStrForm); + saslTransport = new TSaslClientTransport( + method.getMechanismName(), + null, + null, SaslRpcServer.SASL_DEFAULT_REALM, + saslProps, new SaslClientCallbackHandler(t), + underlyingTransport); + return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser()); + + case KERBEROS: + String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host); + final String names[] = SaslRpcServer.splitKerberosName(serverPrincipal); + if (names.length != 3) { + throw new IOException( + "Kerberos principal name does NOT have the expected hostname part: " + + serverPrincipal); + } + try { + return UserGroupInformation.getCurrentUser().doAs( + new PrivilegedExceptionAction<TUGIAssumingTransport>() { + @Override + public TUGIAssumingTransport run() throws IOException { + TTransport saslTransport = new TSaslClientTransport( + method.getMechanismName(), + null, + names[0], names[1], + saslProps, null, + underlyingTransport); + return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser()); + } + }); + } catch (InterruptedException | SaslException se) { + throw new IOException("Could not instantiate SASL transport", se); + } + + default: + throw new IOException("Unsupported authentication method: " + method); + } + } + private static class SaslClientCallbackHandler implements CallbackHandler { + private final String userName; + private final char[] userPassword; + + public SaslClientCallbackHandler(Token<? extends TokenIdentifier> token) { + this.userName = encodeIdentifier(token.getIdentifier()); + this.userPassword = encodePassword(token.getPassword()); + } + + + @Override + public void handle(Callback[] callbacks) + throws UnsupportedCallbackException { + NameCallback nc = null; + PasswordCallback pc = null; + RealmCallback rc = null; + for (Callback callback : callbacks) { + if (callback instanceof RealmChoiceCallback) { + continue; + } else if (callback instanceof NameCallback) { + nc = (NameCallback) callback; + } else if (callback instanceof PasswordCallback) { + pc = (PasswordCallback) callback; + } else if (callback instanceof RealmCallback) { + rc = (RealmCallback) callback; + } else { + throw new UnsupportedCallbackException(callback, + "Unrecognized SASL client callback"); + } + } + if (nc != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("SASL client callback: setting username: " + userName); + } + nc.setName(userName); + } + if (pc != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("SASL client callback: setting userPassword"); + } + pc.setPassword(userPassword); + } + if (rc != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("SASL client callback: setting realm: " + + rc.getDefaultText()); + } + rc.setText(rc.getDefaultText()); + } + } + + static String encodeIdentifier(byte[] identifier) { + return new String(Base64.encodeBase64(identifier)); + } + + static char[] encodePassword(byte[] password) { + return new String(Base64.encodeBase64(password)).toCharArray(); + } + } + } + + public static class Server { + public enum ServerMode { + HIVESERVER2, METASTORE + }; + + protected final UserGroupInformation realUgi; + protected final UserGroupInformation clientValidationUGI; + protected DelegationTokenSecretManager secretManager; + + public Server() throws TTransportException { + try { + realUgi = UserGroupInformation.getCurrentUser(); + clientValidationUGI = UserGroupInformation.getCurrentUser(); + } catch (IOException ioe) { + throw new TTransportException(ioe); + } + } + /** + * Create a server with a kerberos keytab/principal. + */ + protected Server(String keytabFile, String principalConf, String clientConf) + throws TTransportException { + if (keytabFile == null || keytabFile.isEmpty()) { + throw new TTransportException("No keytab specified"); + } + if (principalConf == null || principalConf.isEmpty()) { + throw new TTransportException("No principal specified"); + } + if (clientConf == null || clientConf.isEmpty()) { + // Don't bust existing setups. + LOG.warn("Client-facing principal not set. Using server-side setting: " + principalConf); + clientConf = principalConf; + } + + // Login from the keytab + String kerberosName; + try { + LOG.info("Logging in via CLIENT based principal "); + kerberosName = + SecurityUtil.getServerPrincipal(clientConf, "0.0.0.0"); + UserGroupInformation.loginUserFromKeytab( + kerberosName, keytabFile); + clientValidationUGI = UserGroupInformation.getLoginUser(); + assert clientValidationUGI.isFromKeytab(); + + LOG.info("Logging in via SERVER based principal "); + kerberosName = + SecurityUtil.getServerPrincipal(principalConf, "0.0.0.0"); + UserGroupInformation.loginUserFromKeytab( + kerberosName, keytabFile); + realUgi = UserGroupInformation.getLoginUser(); + assert realUgi.isFromKeytab(); + } catch (IOException ioe) { + throw new TTransportException(ioe); + } + } + + public void setSecretManager(DelegationTokenSecretManager secretManager) { + this.secretManager = secretManager; + } + + /** + * Create a TTransportFactory that, upon connection of a client socket, + * negotiates a Kerberized SASL transport. The resulting TTransportFactory + * can be passed as both the input and output transport factory when + * instantiating a TThreadPoolServer, for example. + * + * @param saslProps Map of SASL properties + */ + + public TTransportFactory createTransportFactory(Map<String, String> saslProps) + throws TTransportException { + + TSaslServerTransport.Factory transFactory = createSaslServerTransportFactory(saslProps); + + return new TUGIAssumingTransportFactory(transFactory, clientValidationUGI); + } + + /** + * Create a TSaslServerTransport.Factory that, upon connection of a client + * socket, negotiates a Kerberized SASL transport. + * + * @param saslProps Map of SASL properties + */ + public TSaslServerTransport.Factory createSaslServerTransportFactory( + Map<String, String> saslProps) throws TTransportException { + // Parse out the kerberos principal, host, realm. + String kerberosName = clientValidationUGI.getUserName(); + final String names[] = SaslRpcServer.splitKerberosName(kerberosName); + if (names.length != 3) { + throw new TTransportException("Kerberos principal should have 3 parts: " + kerberosName); + } + + TSaslServerTransport.Factory transFactory = new TSaslServerTransport.Factory(); + transFactory.addServerDefinition( + AuthMethod.KERBEROS.getMechanismName(), + names[0], names[1], // two parts of kerberos principal + saslProps, + new SaslRpcServer.SaslGssCallbackHandler()); + transFactory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(), + null, SaslRpcServer.SASL_DEFAULT_REALM, + saslProps, new SaslDigestCallbackHandler(secretManager)); + + return transFactory; + } + + /** + * Wrap a TTransportFactory in such a way that, before processing any RPC, it + * assumes the UserGroupInformation of the user authenticated by + * the SASL transport. + */ + public TTransportFactory wrapTransportFactory(TTransportFactory transFactory) { + return new TUGIAssumingTransportFactory(transFactory, realUgi); + } + + /** + * Wrap a TProcessor in such a way that, before processing any RPC, it + * assumes the UserGroupInformation of the user authenticated by + * the SASL transport. + */ + + public TProcessor wrapProcessor(TProcessor processor) { + return new TUGIAssumingProcessor(processor, secretManager, true); + } + + /** + * Wrap a TProcessor to capture the client information like connecting userid, ip etc + */ + + public TProcessor wrapNonAssumingProcessor(TProcessor processor) { + return new TUGIAssumingProcessor(processor, secretManager, false); + } + + final static ThreadLocal<InetAddress> remoteAddress = + new ThreadLocal<InetAddress>() { + + @Override + protected InetAddress initialValue() { + return null; + } + }; + + public InetAddress getRemoteAddress() { + return remoteAddress.get(); + } + + final static ThreadLocal<AuthenticationMethod> authenticationMethod = + new ThreadLocal<AuthenticationMethod>() { + + @Override + protected AuthenticationMethod initialValue() { + return AuthenticationMethod.TOKEN; + } + }; + + private static ThreadLocal<String> remoteUser = new ThreadLocal<String> () { + + @Override + protected String initialValue() { + return null; + } + }; + + + public String getRemoteUser() { + return remoteUser.get(); + } + + private final static ThreadLocal<String> userAuthMechanism = + new ThreadLocal<String>() { + + @Override + protected String initialValue() { + return AuthMethod.KERBEROS.getMechanismName(); + } + }; + + public String getUserAuthMechanism() { + return userAuthMechanism.get(); + } + /** CallbackHandler for SASL DIGEST-MD5 mechanism */ + // This code is pretty much completely based on Hadoop's + // SaslRpcServer.SaslDigestCallbackHandler - the only reason we could not + // use that Hadoop class as-is was because it needs a Server.Connection object + // which is relevant in hadoop rpc but not here in the metastore - so the + // code below does not deal with the Connection Server.object. + static class SaslDigestCallbackHandler implements CallbackHandler { + private final DelegationTokenSecretManager secretManager; + + public SaslDigestCallbackHandler( + DelegationTokenSecretManager secretManager) { + this.secretManager = secretManager; + } + + private char[] getPassword(DelegationTokenIdentifier tokenid) throws InvalidToken { + return encodePassword(secretManager.retrievePassword(tokenid)); + } + + private char[] encodePassword(byte[] password) { + return new String(Base64.encodeBase64(password)).toCharArray(); + } + /** {@inheritDoc} */ + + @Override + public void handle(Callback[] callbacks) throws InvalidToken, + UnsupportedCallbackException { + NameCallback nc = null; + PasswordCallback pc = null; + AuthorizeCallback ac = null; + for (Callback callback : callbacks) { + if (callback instanceof AuthorizeCallback) { + ac = (AuthorizeCallback) callback; + } else if (callback instanceof NameCallback) { + nc = (NameCallback) callback; + } else if (callback instanceof PasswordCallback) { + pc = (PasswordCallback) callback; + } else if (callback instanceof RealmCallback) { + continue; // realm is ignored + } else { + throw new UnsupportedCallbackException(callback, + "Unrecognized SASL DIGEST-MD5 Callback"); + } + } + if (pc != null) { + DelegationTokenIdentifier tokenIdentifier = SaslRpcServer. + getIdentifier(nc.getDefaultName(), secretManager); + char[] password = getPassword(tokenIdentifier); + + if (LOG.isDebugEnabled()) { + LOG.debug("SASL server DIGEST-MD5 callback: setting password " + + "for client: " + tokenIdentifier.getUser()); + } + pc.setPassword(password); + } + if (ac != null) { + String authid = ac.getAuthenticationID(); + String authzid = ac.getAuthorizationID(); + if (authid.equals(authzid)) { + ac.setAuthorized(true); + } else { + ac.setAuthorized(false); + } + if (ac.isAuthorized()) { + if (LOG.isDebugEnabled()) { + String username = + SaslRpcServer.getIdentifier(authzid, secretManager).getUser().getUserName(); + LOG.debug("SASL server DIGEST-MD5 callback: setting " + + "canonicalized client ID: " + username); + } + ac.setAuthorizedID(authzid); + } + } + } + } + + /** + * Processor that pulls the SaslServer object out of the transport, and + * assumes the remote user's UGI before calling through to the original + * processor. + * + * This is used on the server side to set the UGI for each specific call. + */ + protected class TUGIAssumingProcessor implements TProcessor { + final TProcessor wrapped; + DelegationTokenSecretManager secretManager; + boolean useProxy; + TUGIAssumingProcessor(TProcessor wrapped, DelegationTokenSecretManager secretManager, + boolean useProxy) { + this.wrapped = wrapped; + this.secretManager = secretManager; + this.useProxy = useProxy; + } + + + @Override + public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException { + TTransport trans = inProt.getTransport(); + if (!(trans instanceof TSaslServerTransport)) { + throw new TException("Unexpected non-SASL transport " + trans.getClass()); + } + TSaslServerTransport saslTrans = (TSaslServerTransport)trans; + SaslServer saslServer = saslTrans.getSaslServer(); + String authId = saslServer.getAuthorizationID(); + LOG.debug("AUTH ID ======>" + authId); + String endUser = authId; + + Socket socket = ((TSocket)(saslTrans.getUnderlyingTransport())).getSocket(); + remoteAddress.set(socket.getInetAddress()); + + String mechanismName = saslServer.getMechanismName(); + userAuthMechanism.set(mechanismName); + if (AuthMethod.PLAIN.getMechanismName().equalsIgnoreCase(mechanismName)) { + remoteUser.set(endUser); + return wrapped.process(inProt, outProt); + } + + authenticationMethod.set(AuthenticationMethod.KERBEROS); + if(AuthMethod.TOKEN.getMechanismName().equalsIgnoreCase(mechanismName)) { + try { + TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authId, + secretManager); + endUser = tokenId.getUser().getUserName(); + authenticationMethod.set(AuthenticationMethod.TOKEN); + } catch (InvalidToken e) { + throw new TException(e.getMessage()); + } + } + + UserGroupInformation clientUgi = null; + try { + if (useProxy) { + clientUgi = UserGroupInformation.createProxyUser( + endUser, UserGroupInformation.getLoginUser()); + remoteUser.set(clientUgi.getShortUserName()); + LOG.debug("Set remoteUser :" + remoteUser.get()); + return clientUgi.doAs(new PrivilegedExceptionAction<Boolean>() { + + @Override + public Boolean run() { + try { + return wrapped.process(inProt, outProt); + } catch (TException te) { + throw new RuntimeException(te); + } + } + }); + } else { + // use the short user name for the request + UserGroupInformation endUserUgi = UserGroupInformation.createRemoteUser(endUser); + remoteUser.set(endUserUgi.getShortUserName()); + LOG.debug("Set remoteUser :" + remoteUser.get() + ", from endUser :" + endUser); + return wrapped.process(inProt, outProt); + } + } catch (RuntimeException rte) { + if (rte.getCause() instanceof TException) { + throw (TException)rte.getCause(); + } + throw rte; + } catch (InterruptedException ie) { + throw new RuntimeException(ie); // unexpected! + } catch (IOException ioe) { + throw new RuntimeException(ioe); // unexpected! + } + finally { + if (clientUgi != null) { + try { FileSystem.closeAllForUGI(clientUgi); } + catch(IOException exception) { + LOG.error("Could not clean up file-system handles for UGI: " + clientUgi, exception); + } + } + } + } + } + + /** + * A TransportFactory that wraps another one, but assumes a specified UGI + * before calling through. + * + * This is used on the server side to assume the server's Principal when accepting + * clients. + */ + static class TUGIAssumingTransportFactory extends TTransportFactory { + private final UserGroupInformation ugi; + private final TTransportFactory wrapped; + + public TUGIAssumingTransportFactory(TTransportFactory wrapped, UserGroupInformation ugi) { + assert wrapped != null; + assert ugi != null; + this.wrapped = wrapped; + this.ugi = ugi; + } + + + @Override + public TTransport getTransport(final TTransport trans) { + return ugi.doAs(new PrivilegedAction<TTransport>() { + @Override + public TTransport run() { + return wrapped.getTransport(trans); + } + }); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge23.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge23.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge23.java new file mode 100644 index 0000000..dc76535 --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/HadoopThriftAuthBridge23.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.security; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.Map; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.SaslRpcServer; + +/** + * Functions that bridge Thrift's SASL transports to Hadoop's SASL callback + * handlers and authentication classes. + * + * This is a 0.23/2.x specific implementation + */ +public class HadoopThriftAuthBridge23 extends HadoopThriftAuthBridge { + + private static Field SASL_PROPS_FIELD; + private static Class<?> SASL_PROPERTIES_RESOLVER_CLASS; + private static Method RES_GET_INSTANCE_METHOD; + private static Method GET_DEFAULT_PROP_METHOD; + static { + SASL_PROPERTIES_RESOLVER_CLASS = null; + SASL_PROPS_FIELD = null; + final String SASL_PROP_RES_CLASSNAME = "org.apache.hadoop.security.SaslPropertiesResolver"; + try { + SASL_PROPERTIES_RESOLVER_CLASS = Class.forName(SASL_PROP_RES_CLASSNAME); + + } catch (ClassNotFoundException e) { + } + + if (SASL_PROPERTIES_RESOLVER_CLASS != null) { + // found the class, so this would be hadoop version 2.4 or newer (See + // HADOOP-10221, HADOOP-10451) + try { + RES_GET_INSTANCE_METHOD = SASL_PROPERTIES_RESOLVER_CLASS.getMethod("getInstance", + Configuration.class); + GET_DEFAULT_PROP_METHOD = SASL_PROPERTIES_RESOLVER_CLASS.getMethod("getDefaultProperties"); + } catch (Exception e) { + // this must be hadoop 2.4 , where getDefaultProperties was protected + } + } + + if (SASL_PROPERTIES_RESOLVER_CLASS == null || GET_DEFAULT_PROP_METHOD == null) { + // this must be a hadoop 2.4 version or earlier. + // Resorting to the earlier method of getting the properties, which uses SASL_PROPS field + try { + SASL_PROPS_FIELD = SaslRpcServer.class.getField("SASL_PROPS"); + } catch (NoSuchFieldException e) { + // Older version of hadoop should have had this field + throw new IllegalStateException("Error finding hadoop SASL_PROPS field in " + + SaslRpcServer.class.getSimpleName(), e); + } + } + } + + // TODO RIVEN switch this back to package level when we can move TestHadoopAuthBridge23 into + // riven. + // Package permission so that HadoopThriftAuthBridge can construct it but others cannot. + protected HadoopThriftAuthBridge23() { + + } + + /** + * Read and return Hadoop SASL configuration which can be configured using + * "hadoop.rpc.protection" + * + * @param conf + * @return Hadoop SASL configuration + */ + @SuppressWarnings("unchecked") + @Override + public Map<String, String> getHadoopSaslProperties(Configuration conf) { + if (SASL_PROPS_FIELD != null) { + // hadoop 2.4 and earlier way of finding the sasl property settings + // Initialize the SaslRpcServer to ensure QOP parameters are read from + // conf + SaslRpcServer.init(conf); + try { + return (Map<String, String>) SASL_PROPS_FIELD.get(null); + } catch (Exception e) { + throw new IllegalStateException("Error finding hadoop SASL properties", e); + } + } + // 2.5 and later way of finding sasl property + try { + Configurable saslPropertiesResolver = (Configurable) RES_GET_INSTANCE_METHOD.invoke(null, + conf); + saslPropertiesResolver.setConf(conf); + return (Map<String, String>) GET_DEFAULT_PROP_METHOD.invoke(saslPropertiesResolver); + } catch (Exception e) { + throw new IllegalStateException("Error finding hadoop SASL properties", e); + } + } + +}
