Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMappingFactory.java URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMappingFactory.java?rev=1619005&view=auto ============================================================================== --- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMappingFactory.java (added) +++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ColumnMappingFactory.java Tue Aug 19 22:41:10 2014 @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.accumulo.columns; + +import java.util.Map.Entry; + +import org.apache.hadoop.hive.accumulo.AccumuloHiveConstants; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.log4j.Logger; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +/** + * + */ +public class ColumnMappingFactory { + private static final Logger log = Logger.getLogger(ColumnMappingFactory.class); + + /** + * Generate the proper instance of a ColumnMapping + * + * @param columnSpec + * Specification for mapping this column to Accumulo + * @param defaultEncoding + * The default encoding in which values should be encoded to Accumulo + */ + public static ColumnMapping get(String columnSpec, ColumnEncoding defaultEncoding, + String columnName, TypeInfo columnType) { + Preconditions.checkNotNull(columnSpec); + Preconditions.checkNotNull(columnName); + Preconditions.checkNotNull(columnType); + ColumnEncoding encoding = defaultEncoding; + + // Check for column encoding specification + if (ColumnEncoding.hasColumnEncoding(columnSpec)) { + String columnEncodingStr = ColumnEncoding.getColumnEncoding(columnSpec); + columnSpec = ColumnEncoding.stripCode(columnSpec); + + if (AccumuloHiveConstants.ROWID.equalsIgnoreCase(columnSpec)) { + return new HiveAccumuloRowIdColumnMapping(columnSpec, + ColumnEncoding.get(columnEncodingStr), columnName, columnType.getTypeName()); + } else { + Entry<String,String> pair = parseMapping(columnSpec); + + if (isPrefix(pair.getValue())) { + // Sanity check that, for a map, we got 2 encodings + if (!ColumnEncoding.isMapEncoding(columnEncodingStr)) { + throw new IllegalArgumentException("Expected map encoding for a map specification, " + + columnSpec + " with encoding " + columnEncodingStr); + } + + Entry<ColumnEncoding,ColumnEncoding> encodings = ColumnEncoding + .getMapEncoding(columnEncodingStr); + + return new HiveAccumuloMapColumnMapping(pair.getKey(), pair.getValue(), + encodings.getKey(), encodings.getValue(), columnName, columnType.getTypeName()); + } else { + return new HiveAccumuloColumnMapping(pair.getKey(), pair.getValue(), + ColumnEncoding.getFromMapping(columnEncodingStr), columnName, columnType.getTypeName()); + } + } + } else { + if (AccumuloHiveConstants.ROWID.equalsIgnoreCase(columnSpec)) { + return new HiveAccumuloRowIdColumnMapping(columnSpec, defaultEncoding, columnName, + columnType.getTypeName()); + } else { + Entry<String,String> pair = parseMapping(columnSpec); + boolean isPrefix = isPrefix(pair.getValue()); + + String cq = pair.getValue(); + + // Replace any \* that appear in the prefix with a regular * + if (-1 != cq.indexOf(AccumuloHiveConstants.ESCAPED_ASTERISK)) { + cq = cq.replaceAll(AccumuloHiveConstants.ESCAPED_ASERTISK_REGEX, + Character.toString(AccumuloHiveConstants.ASTERISK)); + } + + if (isPrefix) { + return new HiveAccumuloMapColumnMapping(pair.getKey(), cq.substring(0, cq.length() - 1), + defaultEncoding, defaultEncoding, columnName, columnType.getTypeName()); + } else { + return new HiveAccumuloColumnMapping(pair.getKey(), cq, encoding, columnName, columnType.getTypeName()); + } + } + } + } + + public static ColumnMapping getMap(String columnSpec, ColumnEncoding keyEncoding, + ColumnEncoding valueEncoding, String columnName, TypeInfo columnType) { + Entry<String,String> pair = parseMapping(columnSpec); + return new HiveAccumuloMapColumnMapping(pair.getKey(), pair.getValue(), keyEncoding, + valueEncoding, columnName, columnType.toString()); + + } + + public static boolean isPrefix(String maybePrefix) { + Preconditions.checkNotNull(maybePrefix); + + if (AccumuloHiveConstants.ASTERISK == maybePrefix.charAt(maybePrefix.length() - 1)) { + if (maybePrefix.length() > 1) { + return AccumuloHiveConstants.ESCAPE != maybePrefix.charAt(maybePrefix.length() - 2); + } else { + return true; + } + } + + // If we couldn't find an asterisk, it's not a prefix + return false; + } + + /** + * Consumes the column mapping specification and breaks it into column family and column + * qualifier. + */ + public static Entry<String,String> parseMapping(String columnSpec) + throws InvalidColumnMappingException { + int index = 0; + while (true) { + if (index >= columnSpec.length()) { + log.error("Cannot parse '" + columnSpec + "' as colon-separated column configuration"); + throw new InvalidColumnMappingException( + "Columns must be provided as colon-separated family and qualifier pairs"); + } + + index = columnSpec.indexOf(AccumuloHiveConstants.COLON, index); + + if (-1 == index) { + log.error("Cannot parse '" + columnSpec + "' as colon-separated column configuration"); + throw new InvalidColumnMappingException( + "Columns must be provided as colon-separated family and qualifier pairs"); + } + + // Check for an escape character before the colon + if (index - 1 > 0) { + char testChar = columnSpec.charAt(index - 1); + if (AccumuloHiveConstants.ESCAPE == testChar) { + // this colon is escaped, search again after it + index++; + continue; + } + + // If the previous character isn't an escape characters, it's the separator + } + + // Can't be escaped, it is the separator + break; + } + + String cf = columnSpec.substring(0, index), cq = columnSpec.substring(index + 1); + + // Check for the escaped colon to remove before doing the expensive regex replace + if (-1 != cf.indexOf(AccumuloHiveConstants.ESCAPED_COLON)) { + cf = cf.replaceAll(AccumuloHiveConstants.ESCAPED_COLON_REGEX, + Character.toString(AccumuloHiveConstants.COLON)); + } + + // Check for the escaped colon to remove before doing the expensive regex replace + if (-1 != cq.indexOf(AccumuloHiveConstants.ESCAPED_COLON)) { + cq = cq.replaceAll(AccumuloHiveConstants.ESCAPED_COLON_REGEX, + Character.toString(AccumuloHiveConstants.COLON)); + } + + return Maps.immutableEntry(cf, cq); + } +}
Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloColumnMapping.java URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloColumnMapping.java?rev=1619005&view=auto ============================================================================== --- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloColumnMapping.java (added) +++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloColumnMapping.java Tue Aug 19 22:41:10 2014 @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.accumulo.columns; + +import org.apache.hadoop.hive.accumulo.AccumuloHiveConstants; +import org.apache.log4j.Logger; + +import com.google.common.base.Charsets; + +/** + * A Hive column which maps to a column family and column qualifier pair in Accumulo + */ +public class HiveAccumuloColumnMapping extends ColumnMapping { + @SuppressWarnings("unused") + private static final Logger log = Logger.getLogger(HiveAccumuloColumnMapping.class); + + protected String columnFamily, columnQualifier; + protected byte[] columnFamilyBytes, columnQualifierBytes; + + public HiveAccumuloColumnMapping(String cf, String cq, ColumnEncoding encoding, + String columnName, String columnType) { + super(cf + AccumuloHiveConstants.COLON + cq, encoding, columnName, columnType); + + columnFamily = cf; + columnQualifier = cq; + } + + public String getColumnFamily() { + return this.columnFamily; + } + + /** + * Cached bytes for the columnFamily. Modifications to the bytes will affect those stored in this + * ColumnMapping -- such modifications are highly recommended against. + * + * @return UTF8 formatted bytes + */ + public byte[] getColumnFamilyBytes() { + if (null == columnFamilyBytes) { + columnFamilyBytes = columnFamily.getBytes(Charsets.UTF_8); + } + + return columnFamilyBytes; + } + + public String getColumnQualifier() { + return this.columnQualifier; + } + + /** + * Cached bytes for the columnQualifier. Modifications to the bytes will affect those stored in + * this ColumnMapping -- such modifications are highly recommended against. + * + * @return UTF8 formatted bytes + */ + public byte[] getColumnQualifierBytes() { + if (null == columnQualifierBytes) { + columnQualifierBytes = columnQualifier.getBytes(Charsets.UTF_8); + } + + return columnQualifierBytes; + } + + public String serialize() { + StringBuilder sb = new StringBuilder(16); + sb.append(columnFamily).append(AccumuloHiveConstants.COLON); + if (null != columnQualifier) { + sb.append(columnQualifier); + } + return sb.toString(); + } + + @Override + public String toString() { + return "[" + this.getClass().getSimpleName() + ": " + columnFamily + ":" + columnQualifier + + ", encoding " + encoding + "]"; + } +} Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloMapColumnMapping.java URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloMapColumnMapping.java?rev=1619005&view=auto ============================================================================== --- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloMapColumnMapping.java (added) +++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloMapColumnMapping.java Tue Aug 19 22:41:10 2014 @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.accumulo.columns; + +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.hive.accumulo.AccumuloHiveConstants; + +import com.google.common.base.Preconditions; + +/** + * ColumnMapping for combining Accumulo columns into a single Hive Map. Expects ColumnEncoding + * values for both the Key and Value of the Map. + */ +public class HiveAccumuloMapColumnMapping extends ColumnMapping { + + protected final String columnFamily, columnQualifierPrefix; + protected final ColumnEncoding keyEncoding, valueEncoding; + + /** + * @param columnFamily + * The column family that all qualifiers within should be placed into the same Hive map + * @param columnQualifierPrefix + * The column qualifier prefix to include in the map, null is treated as an empty prefix + * @param keyEncoding + * The encoding scheme for keys in this column family + * @param valueEncoding + * The encoding scheme for the Accumulo values + */ + public HiveAccumuloMapColumnMapping(String columnFamily, String columnQualifierPrefix, + ColumnEncoding keyEncoding, ColumnEncoding valueEncoding, String columnName, + String columnType) { + // Try to make something reasonable to pass up to the base class + super((null == columnFamily ? "" : columnFamily) + AccumuloHiveConstants.COLON, valueEncoding, + columnName, columnType); + + Preconditions.checkNotNull(columnFamily, "Must provide a column family"); + + this.columnFamily = columnFamily; + this.columnQualifierPrefix = (null == columnQualifierPrefix) ? "" : columnQualifierPrefix; + this.keyEncoding = keyEncoding; + this.valueEncoding = valueEncoding; + } + + public String getColumnFamily() { + return columnFamily; + } + + public String getColumnQualifierPrefix() { + return columnQualifierPrefix; + } + + public ColumnEncoding getKeyEncoding() { + return keyEncoding; + } + + public ColumnEncoding getValueEncoding() { + return valueEncoding; + } + + @Override + public boolean equals(Object o) { + if (o instanceof HiveAccumuloMapColumnMapping) { + HiveAccumuloMapColumnMapping other = (HiveAccumuloMapColumnMapping) o; + return columnFamily.equals(other.columnFamily) + && columnQualifierPrefix.equals(other.columnQualifierPrefix) + && keyEncoding.equals(other.keyEncoding) && valueEncoding.equals(other.valueEncoding); + } + + return false; + } + + @Override + public int hashCode() { + HashCodeBuilder hcb = new HashCodeBuilder(23, 31); + hcb.append(columnFamily).append(columnQualifierPrefix).append(keyEncoding) + .append(valueEncoding); + return hcb.toHashCode(); + } + + @Override + public String toString() { + return "[" + this.getClass().getSimpleName() + ": " + columnFamily + ":" + + columnQualifierPrefix + "* encoding: " + keyEncoding + ":" + valueEncoding + "]"; + } +} Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloRowIdColumnMapping.java URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloRowIdColumnMapping.java?rev=1619005&view=auto ============================================================================== --- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloRowIdColumnMapping.java (added) +++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveAccumuloRowIdColumnMapping.java Tue Aug 19 22:41:10 2014 @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.accumulo.columns; + +import org.apache.accumulo.core.data.Mutation; +import org.apache.hadoop.hive.accumulo.AccumuloHiveConstants; + +import com.google.common.base.Preconditions; + +/** + * {@link ColumnMapping} which corresponds to the Hive column which should be used as the rowID in a + * {@link Mutation} + */ +public class HiveAccumuloRowIdColumnMapping extends ColumnMapping { + + public HiveAccumuloRowIdColumnMapping(String columnSpec, ColumnEncoding encoding, + String columnName, String columnType) { + super(columnSpec, encoding, columnName, columnType); + + // Ensure that we have the correct identifier as the column name + Preconditions.checkArgument(columnSpec.equalsIgnoreCase(AccumuloHiveConstants.ROWID)); + } + + @Override + public String toString() { + return "[" + this.getClass().getSimpleName() + ", " + this.mappingSpec + ", encoding " + + encoding + "]"; + } +} Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveColumn.java URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveColumn.java?rev=1619005&view=auto ============================================================================== --- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveColumn.java (added) +++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/HiveColumn.java Tue Aug 19 22:41:10 2014 @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.accumulo.columns; + +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +import com.google.common.base.Preconditions; + +/** + * + */ +public class HiveColumn { + + // The name of this column in the Hive schema + protected final String columnName; + + // The Hive type of this column + protected final TypeInfo columnType; + + public HiveColumn(String columnName, TypeInfo columnType) { + Preconditions.checkNotNull(columnName); + Preconditions.checkNotNull(columnType); + + this.columnName = columnName; + this.columnType = columnType; + } + + /** + * Get the name of the Hive column + */ + public String getColumnName() { + return columnName; + } + + /** + * The Hive type of this column + */ + public TypeInfo getColumnType() { + return columnType; + } +} Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/InvalidColumnMappingException.java URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/InvalidColumnMappingException.java?rev=1619005&view=auto ============================================================================== --- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/InvalidColumnMappingException.java (added) +++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/InvalidColumnMappingException.java Tue Aug 19 22:41:10 2014 @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.accumulo.columns; + +/** + * + */ +public class InvalidColumnMappingException extends IllegalArgumentException { + + private static final long serialVersionUID = 1L; + + public InvalidColumnMappingException() { + super(); + } + + public InvalidColumnMappingException(String msg) { + super(msg); + } + + public InvalidColumnMappingException(String message, Throwable cause) { + super(message, cause); + } + + public InvalidColumnMappingException(Throwable cause) { + super(cause); + } +} Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloRecordReader.java URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloRecordReader.java?rev=1619005&view=auto ============================================================================== --- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloRecordReader.java (added) +++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloRecordReader.java Tue Aug 19 22:41:10 2014 @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.accumulo.mr; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.SortedMap; + +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.util.PeekingIterator; +import org.apache.hadoop.hive.accumulo.AccumuloHiveRow; +import org.apache.hadoop.hive.accumulo.predicate.PrimitiveComparisonFilter; +import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDe; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.RecordReader; + +import com.google.common.collect.Lists; + +/** + * Translate the {@link Key} {@link Value} pairs from {@link AccumuloInputFormat} to a + * {@link Writable} for consumption by the {@link AccumuloSerDe}. + */ +public class HiveAccumuloRecordReader implements RecordReader<Text,AccumuloHiveRow> { + private RecordReader<Text,PeekingIterator<Entry<Key,Value>>> recordReader; + private int iteratorCount; + + public HiveAccumuloRecordReader( + RecordReader<Text,PeekingIterator<Entry<Key,Value>>> recordReader, int iteratorCount) { + this.recordReader = recordReader; + this.iteratorCount = iteratorCount; + } + + @Override + public void close() throws IOException { + recordReader.close(); + } + + @Override + public Text createKey() { + return new Text(); + } + + @Override + public AccumuloHiveRow createValue() { + return new AccumuloHiveRow(); + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public float getProgress() throws IOException { + return recordReader.getProgress(); + } + + @Override + public boolean next(Text rowKey, AccumuloHiveRow row) throws IOException { + Text key = recordReader.createKey(); + PeekingIterator<Map.Entry<Key,Value>> iter = recordReader.createValue(); + if (recordReader.next(key, iter)) { + row.clear(); + row.setRowId(key.toString()); + List<Key> keys = Lists.newArrayList(); + List<Value> values = Lists.newArrayList(); + while (iter.hasNext()) { // collect key/values for this row. + Map.Entry<Key,Value> kv = iter.next(); + keys.add(kv.getKey()); + values.add(kv.getValue()); + + } + if (iteratorCount == 0) { // no encoded values, we can push directly to row. + pushToValue(keys, values, row); + } else { + for (int i = 0; i < iteratorCount; i++) { // each iterator creates a level of encoding. + SortedMap<Key,Value> decoded = PrimitiveComparisonFilter.decodeRow(keys.get(0), + values.get(0)); + keys = Lists.newArrayList(decoded.keySet()); + values = Lists.newArrayList(decoded.values()); + } + pushToValue(keys, values, row); // after decoding we can push to value. + } + + return true; + } else { + return false; + } + } + + // flatten key/value pairs into row object for use in Serde. + private void pushToValue(List<Key> keys, List<Value> values, AccumuloHiveRow row) + throws IOException { + Iterator<Key> kIter = keys.iterator(); + Iterator<Value> vIter = values.iterator(); + while (kIter.hasNext()) { + Key k = kIter.next(); + Value v = vIter.next(); + row.add(k.getColumnFamily().toString(), k.getColumnQualifier().toString(), v.get()); + } + } +} Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloSplit.java URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloSplit.java?rev=1619005&view=auto ============================================================================== --- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloSplit.java (added) +++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloSplit.java Tue Aug 19 22:41:10 2014 @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.accumulo.mr; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.accumulo.core.client.mapred.RangeInputSplit; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.util.StringUtils; +import org.apache.log4j.Logger; + +/** + * Wraps RangeInputSplit into a FileSplit so Hadoop won't complain when it tries to make its own + * Path. + * + * <p> + * If the {@link RangeInputSplit} is used directly, it will hit a branch of code in + * {@link HiveInputSplit} which generates an invalid Path. Wrap it ourselves so that it doesn't + * error + */ +public class HiveAccumuloSplit extends FileSplit implements InputSplit { + private static final Logger log = Logger.getLogger(HiveAccumuloSplit.class); + + private RangeInputSplit split; + + public HiveAccumuloSplit() { + super((Path) null, 0, 0, (String[]) null); + split = new RangeInputSplit(); + } + + public HiveAccumuloSplit(RangeInputSplit split, Path dummyPath) { + super(dummyPath, 0, 0, (String[]) null); + this.split = split; + } + + public RangeInputSplit getSplit() { + return this.split; + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + split.readFields(in); + } + + @Override + public String toString() { + return "HiveAccumuloSplit: " + split; + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + split.write(out); + } + + @Override + public long getLength() { + int len = 0; + try { + return split.getLength(); + } catch (IOException e) { + log.error("Error getting length for split: " + StringUtils.stringifyException(e)); + } + return len; + } + + @Override + public String[] getLocations() throws IOException { + return split.getLocations(); + } +} Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java?rev=1619005&view=auto ============================================================================== --- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java (added) +++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java Tue Aug 19 22:41:10 2014 @@ -0,0 +1,485 @@ +package org.apache.hadoop.hive.accumulo.mr; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.mapred.AccumuloInputFormat; +import org.apache.accumulo.core.client.mapred.AccumuloRowInputFormat; +import org.apache.accumulo.core.client.mapred.RangeInputSplit; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.PeekingIterator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.accumulo.AccumuloConnectionParameters; +import org.apache.hadoop.hive.accumulo.AccumuloHiveRow; +import org.apache.hadoop.hive.accumulo.columns.ColumnMapper; +import org.apache.hadoop.hive.accumulo.columns.ColumnMapping; +import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloColumnMapping; +import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloMapColumnMapping; +import org.apache.hadoop.hive.accumulo.predicate.AccumuloPredicateHandler; +import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters; +import org.apache.hadoop.hive.accumulo.serde.TooManyAccumuloColumnsException; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Wraps older InputFormat for use with Hive. + * + * Configure input scan with proper ranges, iterators, and columns based on serde properties for + * Hive table. + */ +public class HiveAccumuloTableInputFormat implements + org.apache.hadoop.mapred.InputFormat<Text,AccumuloHiveRow> { + private static final Logger log = LoggerFactory.getLogger(HiveAccumuloTableInputFormat.class); + + // Visible for testing + protected AccumuloRowInputFormat accumuloInputFormat = new AccumuloRowInputFormat(); + protected AccumuloPredicateHandler predicateHandler = AccumuloPredicateHandler.getInstance(); + + @Override + public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException { + final AccumuloConnectionParameters accumuloParams = new AccumuloConnectionParameters(jobConf); + final Instance instance = accumuloParams.getInstance(); + final ColumnMapper columnMapper; + try { + columnMapper = getColumnMapper(jobConf); + } catch (TooManyAccumuloColumnsException e) { + throw new IOException(e); + } + + JobContext context = ShimLoader.getHadoopShims().newJobContext(Job.getInstance(jobConf)); + Path[] tablePaths = FileInputFormat.getInputPaths(context); + + try { + final Connector connector = accumuloParams.getConnector(instance); + final List<ColumnMapping> columnMappings = columnMapper.getColumnMappings(); + final List<IteratorSetting> iterators = predicateHandler.getIterators(jobConf, columnMapper); + final Collection<Range> ranges = predicateHandler.getRanges(jobConf, columnMapper); + + // Setting an empty collection of ranges will, unexpectedly, scan all data + // We don't want that. + if (null != ranges && ranges.isEmpty()) { + return new InputSplit[0]; + } + + // Set the relevant information in the Configuration for the AccumuloInputFormat + configure(jobConf, instance, connector, accumuloParams, columnMapper, iterators, ranges); + + int numColumns = columnMappings.size(); + + List<Integer> readColIds = ColumnProjectionUtils.getReadColumnIDs(jobConf); + + // Sanity check + if (numColumns < readColIds.size()) + throw new IOException("Number of column mappings (" + numColumns + ")" + + " numbers less than the hive table columns. (" + readColIds.size() + ")"); + + // get splits from Accumulo + InputSplit[] splits = accumuloInputFormat.getSplits(jobConf, numSplits); + + HiveAccumuloSplit[] hiveSplits = new HiveAccumuloSplit[splits.length]; + for (int i = 0; i < splits.length; i++) { + RangeInputSplit ris = (RangeInputSplit) splits[i]; + hiveSplits[i] = new HiveAccumuloSplit(ris, tablePaths[0]); + } + + return hiveSplits; + } catch (AccumuloException e) { + log.error("Could not configure AccumuloInputFormat", e); + throw new IOException(StringUtils.stringifyException(e)); + } catch (AccumuloSecurityException e) { + log.error("Could not configure AccumuloInputFormat", e); + throw new IOException(StringUtils.stringifyException(e)); + } catch (SerDeException e) { + log.error("Could not configure AccumuloInputFormat", e); + throw new IOException(StringUtils.stringifyException(e)); + } + } + + /** + * Setup accumulo input format from conf properties. Delegates to final RecordReader from mapred + * package. + * + * @param inputSplit + * @param jobConf + * @param reporter + * @return RecordReader + * @throws IOException + */ + @Override + public RecordReader<Text,AccumuloHiveRow> getRecordReader(InputSplit inputSplit, + final JobConf jobConf, final Reporter reporter) throws IOException { + final ColumnMapper columnMapper; + try { + columnMapper = getColumnMapper(jobConf); + } catch (TooManyAccumuloColumnsException e) { + throw new IOException(e); + } + + try { + final List<IteratorSetting> iterators = predicateHandler.getIterators(jobConf, columnMapper); + + HiveAccumuloSplit hiveSplit = (HiveAccumuloSplit) inputSplit; + RangeInputSplit rangeSplit = hiveSplit.getSplit(); + + log.info("Split: " + rangeSplit); + + // The RangeInputSplit *should* have all of the necesary information contained in it + // which alleviates us from re-parsing our configuration from the AccumuloStorageHandler + // and re-setting it into the Configuration (like we did in getSplits(...)). Thus, it should + // be unnecessary to re-invoke configure(...) + + // ACCUMULO-2962 Iterators weren't getting serialized into the InputSplit, but we can + // compensate because we still have that info. + // Should be fixed in Accumulo 1.5.2 and 1.6.1 + if (null == rangeSplit.getIterators() + || (rangeSplit.getIterators().isEmpty() && !iterators.isEmpty())) { + log.debug("Re-setting iterators on InputSplit due to Accumulo bug."); + rangeSplit.setIterators(iterators); + } + + // ACCUMULO-3015 Like the above, RangeInputSplit should have the table name + // but we want it to, so just re-set it if it's null. + if (null == getTableName(rangeSplit)) { + final AccumuloConnectionParameters accumuloParams = new AccumuloConnectionParameters( + jobConf); + log.debug("Re-setting table name on InputSplit due to Accumulo bug."); + setTableName(rangeSplit, accumuloParams.getAccumuloTableName()); + } + + final RecordReader<Text,PeekingIterator<Map.Entry<Key,Value>>> recordReader = accumuloInputFormat + .getRecordReader(rangeSplit, jobConf, reporter); + + return new HiveAccumuloRecordReader(recordReader, iterators.size()); + } catch (SerDeException e) { + throw new IOException(StringUtils.stringifyException(e)); + } + } + + protected ColumnMapper getColumnMapper(Configuration conf) throws IOException, + TooManyAccumuloColumnsException { + final String defaultStorageType = conf.get(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE); + + String[] columnNamesArr = conf.getStrings(serdeConstants.LIST_COLUMNS); + if (null == columnNamesArr) { + throw new IOException( + "Hive column names must be provided to InputFormat in the Configuration"); + } + List<String> columnNames = Arrays.asList(columnNamesArr); + + String serializedTypes = conf.get(serdeConstants.LIST_COLUMN_TYPES); + if (null == serializedTypes) { + throw new IOException( + "Hive column types must be provided to InputFormat in the Configuration"); + } + ArrayList<TypeInfo> columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(serializedTypes); + + return new ColumnMapper(conf.get(AccumuloSerDeParameters.COLUMN_MAPPINGS), defaultStorageType, + columnNames, columnTypes); + } + + /** + * Configure the underlying AccumuloInputFormat + * + * @param conf + * Job configuration + * @param instance + * Accumulo instance + * @param connector + * Accumulo connector + * @param accumuloParams + * Connection information to the Accumulo instance + * @param columnMapper + * Configuration of Hive to Accumulo columns + * @param iterators + * Any iterators to be configured server-side + * @param ranges + * Accumulo ranges on for the query + * @throws AccumuloSecurityException + * @throws AccumuloException + * @throws SerDeException + */ + protected void configure(JobConf conf, Instance instance, Connector connector, + AccumuloConnectionParameters accumuloParams, ColumnMapper columnMapper, + List<IteratorSetting> iterators, Collection<Range> ranges) throws AccumuloSecurityException, + AccumuloException, SerDeException { + + // Handle implementation of Instance and invoke appropriate InputFormat method + if (instance instanceof MockInstance) { + setMockInstance(conf, instance.getInstanceName()); + } else { + setZooKeeperInstance(conf, instance.getInstanceName(), instance.getZooKeepers()); + } + + // Set the username/passwd for the Accumulo connection + setConnectorInfo(conf, accumuloParams.getAccumuloUserName(), + new PasswordToken(accumuloParams.getAccumuloPassword())); + + // Read from the given Accumulo table + setInputTableName(conf, accumuloParams.getAccumuloTableName()); + + // Check Configuration for any user-provided Authorization definition + Authorizations auths = AccumuloSerDeParameters.getAuthorizationsFromConf(conf); + + if (null == auths) { + // Default to all of user's authorizations when no configuration is provided + auths = connector.securityOperations().getUserAuthorizations( + accumuloParams.getAccumuloUserName()); + } + + // Implicitly handles users providing invalid authorizations + setScanAuthorizations(conf, auths); + + // restrict with any filters found from WHERE predicates. + addIterators(conf, iterators); + + // restrict with any ranges found from WHERE predicates. + // not setting ranges scans the entire table + if (null != ranges) { + log.info("Setting ranges: " + ranges); + setRanges(conf, ranges); + } + + // Restrict the set of columns that we want to read from the Accumulo table + HashSet<Pair<Text,Text>> pairs = getPairCollection(columnMapper.getColumnMappings()); + if (null != pairs && !pairs.isEmpty()) { + fetchColumns(conf, pairs); + } + } + + // Wrap the static AccumuloInputFormat methods with methods that we can + // verify were correctly called via Mockito + + protected void setMockInstance(JobConf conf, String instanceName) { + try { + AccumuloInputFormat.setMockInstance(conf, instanceName); + } catch (IllegalStateException e) { + // AccumuloInputFormat complains if you re-set an already set value. We just don't care. + log.debug("Ignoring exception setting mock instance of " + instanceName, e); + } + } + + @SuppressWarnings("deprecation") + protected void setZooKeeperInstance(JobConf conf, String instanceName, String zkHosts) { + // To support builds against 1.5, we can't use the new 1.6 setZooKeeperInstance which + // takes a ClientConfiguration class that only exists in 1.6 + try { + AccumuloInputFormat.setZooKeeperInstance(conf, instanceName, zkHosts); + } catch (IllegalStateException ise) { + // AccumuloInputFormat complains if you re-set an already set value. We just don't care. + log.debug("Ignoring exception setting ZooKeeper instance of " + instanceName + " at " + + zkHosts, ise); + } + } + + protected void setConnectorInfo(JobConf conf, String user, AuthenticationToken token) + throws AccumuloSecurityException { + try { + AccumuloInputFormat.setConnectorInfo(conf, user, token); + } catch (IllegalStateException e) { + // AccumuloInputFormat complains if you re-set an already set value. We just don't care. + log.debug("Ignoring exception setting Accumulo Connector instance for user " + user, e); + } + } + + protected void setInputTableName(JobConf conf, String tableName) { + AccumuloInputFormat.setInputTableName(conf, tableName); + } + + protected void setScanAuthorizations(JobConf conf, Authorizations auths) { + AccumuloInputFormat.setScanAuthorizations(conf, auths); + } + + protected void addIterators(JobConf conf, List<IteratorSetting> iterators) { + for (IteratorSetting is : iterators) { + AccumuloInputFormat.addIterator(conf, is); + } + } + + protected void setRanges(JobConf conf, Collection<Range> ranges) { + AccumuloInputFormat.setRanges(conf, ranges); + } + + protected void fetchColumns(JobConf conf, Set<Pair<Text,Text>> cfCqPairs) { + AccumuloInputFormat.fetchColumns(conf, cfCqPairs); + } + + /** + * Create col fam/qual pairs from pipe separated values, usually from config object. Ignores + * rowID. + * + * @param columnMappings + * The list of ColumnMappings for the given query + * @return a Set of Pairs of colfams and colquals + */ + protected HashSet<Pair<Text,Text>> getPairCollection(List<ColumnMapping> columnMappings) { + final HashSet<Pair<Text,Text>> pairs = new HashSet<Pair<Text,Text>>(); + + for (ColumnMapping columnMapping : columnMappings) { + if (columnMapping instanceof HiveAccumuloColumnMapping) { + HiveAccumuloColumnMapping accumuloColumnMapping = (HiveAccumuloColumnMapping) columnMapping; + + Text cf = new Text(accumuloColumnMapping.getColumnFamily()); + Text cq = null; + + // A null cq implies an empty column qualifier + if (null != accumuloColumnMapping.getColumnQualifier()) { + cq = new Text(accumuloColumnMapping.getColumnQualifier()); + } + + pairs.add(new Pair<Text,Text>(cf, cq)); + } else if (columnMapping instanceof HiveAccumuloMapColumnMapping) { + HiveAccumuloMapColumnMapping mapMapping = (HiveAccumuloMapColumnMapping) columnMapping; + + // Can't fetch prefix on colqual, must pull the entire qualifier + // TODO use an iterator to do the filter, server-side. + pairs.add(new Pair<Text,Text>(new Text(mapMapping.getColumnFamily()), null)); + } + } + + log.info("Computed columns to fetch (" + pairs + ") from " + columnMappings); + + return pairs; + } + + /** + * Reflection to work around Accumulo 1.5 and 1.6 incompatibilities. Throws an {@link IOException} + * for any reflection related exceptions + * + * @param split + * A RangeInputSplit + * @return The name of the table from the split + * @throws IOException + */ + protected String getTableName(RangeInputSplit split) throws IOException { + // ACCUMULO-3017 shenanigans with method names changing without deprecation + Method getTableName = null; + try { + getTableName = RangeInputSplit.class.getMethod("getTableName"); + } catch (SecurityException e) { + log.debug("Could not get getTableName method from RangeInputSplit", e); + } catch (NoSuchMethodException e) { + log.debug("Could not get getTableName method from RangeInputSplit", e); + } + + if (null != getTableName) { + try { + return (String) getTableName.invoke(split); + } catch (IllegalArgumentException e) { + log.debug("Could not invoke getTableName method from RangeInputSplit", e); + } catch (IllegalAccessException e) { + log.debug("Could not invoke getTableName method from RangeInputSplit", e); + } catch (InvocationTargetException e) { + log.debug("Could not invoke getTableName method from RangeInputSplit", e); + } + } + + Method getTable; + try { + getTable = RangeInputSplit.class.getMethod("getTable"); + } catch (SecurityException e) { + throw new IOException("Could not get table name from RangeInputSplit", e); + } catch (NoSuchMethodException e) { + throw new IOException("Could not get table name from RangeInputSplit", e); + } + + try { + return (String) getTable.invoke(split); + } catch (IllegalArgumentException e) { + throw new IOException("Could not get table name from RangeInputSplit", e); + } catch (IllegalAccessException e) { + throw new IOException("Could not get table name from RangeInputSplit", e); + } catch (InvocationTargetException e) { + throw new IOException("Could not get table name from RangeInputSplit", e); + } + } + + /** + * Sets the table name on a RangeInputSplit, accounting for change in method name. Any reflection + * related exception is wrapped in an {@link IOException} + * + * @param split + * The RangeInputSplit to operate on + * @param tableName + * The name of the table to set + * @throws IOException + */ + protected void setTableName(RangeInputSplit split, String tableName) throws IOException { + // ACCUMULO-3017 shenanigans with method names changing without deprecation + Method setTableName = null; + try { + setTableName = RangeInputSplit.class.getMethod("setTableName", String.class); + } catch (SecurityException e) { + log.debug("Could not get getTableName method from RangeInputSplit", e); + } catch (NoSuchMethodException e) { + log.debug("Could not get getTableName method from RangeInputSplit", e); + } + + if (null != setTableName) { + try { + setTableName.invoke(split, tableName); + return; + } catch (IllegalArgumentException e) { + log.debug("Could not invoke getTableName method from RangeInputSplit", e); + } catch (IllegalAccessException e) { + log.debug("Could not invoke getTableName method from RangeInputSplit", e); + } catch (InvocationTargetException e) { + log.debug("Could not invoke getTableName method from RangeInputSplit", e); + } + } + + Method setTable; + try { + setTable = RangeInputSplit.class.getMethod("setTable", String.class); + } catch (SecurityException e) { + throw new IOException("Could not set table name from RangeInputSplit", e); + } catch (NoSuchMethodException e) { + throw new IOException("Could not set table name from RangeInputSplit", e); + } + + try { + setTable.invoke(split, tableName); + } catch (IllegalArgumentException e) { + throw new IOException("Could not set table name from RangeInputSplit", e); + } catch (IllegalAccessException e) { + throw new IOException("Could not set table name from RangeInputSplit", e); + } catch (InvocationTargetException e) { + throw new IOException("Could not set table name from RangeInputSplit", e); + } + } +} Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java?rev=1619005&view=auto ============================================================================== --- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java (added) +++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java Tue Aug 19 22:41:10 2014 @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.accumulo.mr; + +import java.io.IOException; + +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.accumulo.AccumuloConnectionParameters; +import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters; +import org.apache.hadoop.mapred.JobConf; + +import com.google.common.base.Preconditions; + +/** + * + */ +public class HiveAccumuloTableOutputFormat extends AccumuloOutputFormat { + + @Override + public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException { + configureAccumuloOutputFormat(job); + + super.checkOutputSpecs(ignored, job); + } + + protected void configureAccumuloOutputFormat(JobConf job) throws IOException { + AccumuloConnectionParameters cnxnParams = new AccumuloConnectionParameters(job); + + final String tableName = job.get(AccumuloSerDeParameters.TABLE_NAME); + + // Make sure we actually go the table name + Preconditions.checkNotNull(tableName, + "Expected Accumulo table name to be provided in job configuration"); + + // Set the necessary Accumulo information + try { + // Username/passwd for Accumulo + setAccumuloConnectorInfo(job, cnxnParams.getAccumuloUserName(), + new PasswordToken(cnxnParams.getAccumuloPassword())); + + if (cnxnParams.useMockInstance()) { + setAccumuloMockInstance(job, cnxnParams.getAccumuloInstanceName()); + } else { + // Accumulo instance name with ZK quorum + setAccumuloZooKeeperInstance(job, cnxnParams.getAccumuloInstanceName(), + cnxnParams.getZooKeepers()); + } + + // Set the table where we're writing this data + setDefaultAccumuloTableName(job, tableName); + } catch (AccumuloSecurityException e) { + log.error("Could not connect to Accumulo with provided credentials", e); + throw new IOException(e); + } + } + + // Non-static methods to wrap the static AccumuloOutputFormat methods to enable testing + + protected void setAccumuloConnectorInfo(JobConf conf, String username, AuthenticationToken token) + throws AccumuloSecurityException { + AccumuloOutputFormat.setConnectorInfo(conf, username, token); + } + + @SuppressWarnings("deprecation") + protected void setAccumuloZooKeeperInstance(JobConf conf, String instanceName, String zookeepers) { + AccumuloOutputFormat.setZooKeeperInstance(conf, instanceName, zookeepers); + } + + protected void setAccumuloMockInstance(JobConf conf, String instanceName) { + AccumuloOutputFormat.setMockInstance(conf, instanceName); + } + + protected void setDefaultAccumuloTableName(JobConf conf, String tableName) { + AccumuloOutputFormat.setDefaultTableName(conf, tableName); + } +} Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/package-info.java URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/package-info.java?rev=1619005&view=auto ============================================================================== --- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/package-info.java (added) +++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/package-info.java Tue Aug 19 22:41:10 2014 @@ -0,0 +1,4 @@ +/** + * Serde and InputFormat support for connecting Hive to Accumulo tables. + */ +package org.apache.hadoop.hive.accumulo; \ No newline at end of file Added: hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java URL: http://svn.apache.org/viewvc/hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java?rev=1619005&view=auto ============================================================================== --- hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java (added) +++ hive/trunk/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java Tue Aug 19 22:41:10 2014 @@ -0,0 +1,408 @@ +package org.apache.hadoop.hive.accumulo.predicate; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.data.Range; +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.accumulo.columns.ColumnMapper; +import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloColumnMapping; +import org.apache.hadoop.hive.accumulo.predicate.compare.CompareOp; +import org.apache.hadoop.hive.accumulo.predicate.compare.DoubleCompare; +import org.apache.hadoop.hive.accumulo.predicate.compare.Equal; +import org.apache.hadoop.hive.accumulo.predicate.compare.GreaterThan; +import org.apache.hadoop.hive.accumulo.predicate.compare.GreaterThanOrEqual; +import org.apache.hadoop.hive.accumulo.predicate.compare.IntCompare; +import org.apache.hadoop.hive.accumulo.predicate.compare.LessThan; +import org.apache.hadoop.hive.accumulo.predicate.compare.LessThanOrEqual; +import org.apache.hadoop.hive.accumulo.predicate.compare.Like; +import org.apache.hadoop.hive.accumulo.predicate.compare.LongCompare; +import org.apache.hadoop.hive.accumulo.predicate.compare.NotEqual; +import org.apache.hadoop.hive.accumulo.predicate.compare.PrimitiveComparison; +import org.apache.hadoop.hive.accumulo.predicate.compare.StringCompare; +import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; +import org.apache.hadoop.hive.ql.index.IndexSearchCondition; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler.DecomposedPredicate; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.ql.udf.UDFLike; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotEqual; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.log4j.Logger; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * + * Supporting operations dealing with Hive Predicate pushdown to iterators and ranges. + * + * See {@link PrimitiveComparisonFilter} + * + */ +public class AccumuloPredicateHandler { + private static final List<Range> TOTAL_RANGE = Collections.singletonList(new Range()); + + private static AccumuloPredicateHandler handler = new AccumuloPredicateHandler(); + private static Map<String,Class<? extends CompareOp>> compareOps = Maps.newHashMap(); + private static Map<String,Class<? extends PrimitiveComparison>> pComparisons = Maps.newHashMap(); + + // Want to start sufficiently "high" enough in the iterator stack + private static int iteratorCount = 50; + + private static final Logger log = Logger.getLogger(AccumuloPredicateHandler.class); + static { + compareOps.put(GenericUDFOPEqual.class.getName(), Equal.class); + compareOps.put(GenericUDFOPNotEqual.class.getName(), NotEqual.class); + compareOps.put(GenericUDFOPGreaterThan.class.getName(), GreaterThan.class); + compareOps.put(GenericUDFOPEqualOrGreaterThan.class.getName(), GreaterThanOrEqual.class); + compareOps.put(GenericUDFOPEqualOrLessThan.class.getName(), LessThanOrEqual.class); + compareOps.put(GenericUDFOPLessThan.class.getName(), LessThan.class); + compareOps.put(UDFLike.class.getName(), Like.class); + + pComparisons.put("bigint", LongCompare.class); + pComparisons.put("int", IntCompare.class); + pComparisons.put("double", DoubleCompare.class); + pComparisons.put("string", StringCompare.class); + } + + public static AccumuloPredicateHandler getInstance() { + return handler; + } + + /** + * + * @return set of all UDF class names with matching CompareOpt implementations. + */ + public Set<String> cOpKeyset() { + return compareOps.keySet(); + } + + /** + * + * @return set of all hive data types with matching PrimitiveCompare implementations. + */ + public Set<String> pComparisonKeyset() { + return pComparisons.keySet(); + } + + /** + * + * @param udfType + * GenericUDF classname to lookup matching CompareOpt + * @return Class<? extends CompareOpt/> + */ + public Class<? extends CompareOp> getCompareOpClass(String udfType) + throws NoSuchCompareOpException { + if (!compareOps.containsKey(udfType)) + throw new NoSuchCompareOpException("Null compare op for specified key: " + udfType); + return compareOps.get(udfType); + } + + public CompareOp getCompareOp(String udfType, IndexSearchCondition sc) + throws NoSuchCompareOpException, SerDeException { + Class<? extends CompareOp> clz = getCompareOpClass(udfType); + + try { + return clz.newInstance(); + } catch (ClassCastException e) { + throw new SerDeException("Column type mismatch in WHERE clause " + + sc.getComparisonExpr().getExprString() + " found type " + + sc.getConstantDesc().getTypeString() + " instead of " + + sc.getColumnDesc().getTypeString()); + } catch (IllegalAccessException e) { + throw new SerDeException("Could not instantiate class for WHERE clause", e); + } catch (InstantiationException e) { + throw new SerDeException("Could not instantiate class for WHERE clause", e); + } + } + + /** + * + * @param type + * String hive column lookup matching PrimitiveCompare + * @return Class<? extends ></?> + */ + public Class<? extends PrimitiveComparison> getPrimitiveComparisonClass(String type) + throws NoSuchPrimitiveComparisonException { + if (!pComparisons.containsKey(type)) + throw new NoSuchPrimitiveComparisonException("Null primitive comparison for specified key: " + + type); + return pComparisons.get(type); + } + + public PrimitiveComparison getPrimitiveComparison(String type, IndexSearchCondition sc) + throws NoSuchPrimitiveComparisonException, SerDeException { + Class<? extends PrimitiveComparison> clz = getPrimitiveComparisonClass(type); + + try { + return clz.newInstance(); + } catch (ClassCastException e) { + throw new SerDeException("Column type mismatch in WHERE clause " + + sc.getComparisonExpr().getExprString() + " found type " + + sc.getConstantDesc().getTypeString() + " instead of " + + sc.getColumnDesc().getTypeString()); + } catch (IllegalAccessException e) { + throw new SerDeException("Could not instantiate class for WHERE clause", e); + } catch (InstantiationException e) { + throw new SerDeException("Could not instantiate class for WHERE clause", e); + } + } + + private AccumuloPredicateHandler() {} + + /** + * Loop through search conditions and build ranges for predicates involving rowID column, if any. + */ + public List<Range> getRanges(Configuration conf, ColumnMapper columnMapper) throws SerDeException { + if (!columnMapper.hasRowIdMapping()) { + return TOTAL_RANGE; + } + + int rowIdOffset = columnMapper.getRowIdOffset(); + String[] hiveColumnNamesArr = conf.getStrings(serdeConstants.LIST_COLUMNS); + + if (null == hiveColumnNamesArr) { + throw new IllegalArgumentException("Could not find Hive columns in configuration"); + } + + // Already verified that we should have the rowId mapping + String hiveRowIdColumnName = hiveColumnNamesArr[rowIdOffset]; + + ExprNodeDesc root = this.getExpression(conf); + + // No expression, therefore scan the whole table + if (null == root) { + return TOTAL_RANGE; + } + + Object result = generateRanges(columnMapper, hiveRowIdColumnName, root); + + if (null == result) { + log.info("Calculated null set of ranges, scanning full table"); + return TOTAL_RANGE; + } else if (result instanceof Range) { + log.info("Computed a single Range for the query: " + result); + return Collections.singletonList((Range) result); + } else if (result instanceof List) { + log.info("Computed a collection of Ranges for the query: " + result); + @SuppressWarnings("unchecked") + List<Range> ranges = (List<Range>) result; + return ranges; + } else { + throw new IllegalArgumentException("Unhandled return from Range generation: " + result); + } + } + + /** + * Encapsulates the traversal over some {@link ExprNodeDesc} tree for the generation of Accumuluo + * Ranges using expressions involving the Accumulo rowid-mapped Hive column + * + * @param columnMapper + * Mapping of Hive to Accumulo columns for the query + * @param hiveRowIdColumnName + * Name of the hive column mapped to the Accumulo rowid + * @param root + * Root of some ExprNodeDesc tree to traverse, the WHERE clause + * @return An object representing the result from the ExprNodeDesc tree traversal using the + * AccumuloRangeGenerator + */ + protected Object generateRanges(ColumnMapper columnMapper, String hiveRowIdColumnName, ExprNodeDesc root) { + AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, + columnMapper.getRowIdMapping(), hiveRowIdColumnName); + Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator, + Collections.<Rule,NodeProcessor> emptyMap(), null); + GraphWalker ogw = new DefaultGraphWalker(disp); + ArrayList<Node> roots = new ArrayList<Node>(); + roots.add(root); + HashMap<Node,Object> nodeOutput = new HashMap<Node,Object>(); + + try { + ogw.startWalking(roots, nodeOutput); + } catch (SemanticException ex) { + throw new RuntimeException(ex); + } + + return nodeOutput.get(root); + } + + /** + * Loop through search conditions and build iterator settings for predicates involving columns + * other than rowID, if any. + * + * @param conf + * Configuration + * @throws SerDeException + */ + public List<IteratorSetting> getIterators(Configuration conf, ColumnMapper columnMapper) + throws SerDeException { + List<IteratorSetting> itrs = Lists.newArrayList(); + boolean shouldPushdown = conf.getBoolean(AccumuloSerDeParameters.ITERATOR_PUSHDOWN_KEY, + AccumuloSerDeParameters.ITERATOR_PUSHDOWN_DEFAULT); + if (!shouldPushdown) { + log.info("Iterator pushdown is disabled for this table"); + return itrs; + } + + int rowIdOffset = columnMapper.getRowIdOffset(); + String[] hiveColumnNamesArr = conf.getStrings(serdeConstants.LIST_COLUMNS); + + if (null == hiveColumnNamesArr) { + throw new IllegalArgumentException("Could not find Hive columns in configuration"); + } + + String hiveRowIdColumnName = null; + + if (rowIdOffset >= 0 && rowIdOffset < hiveColumnNamesArr.length) { + hiveRowIdColumnName = hiveColumnNamesArr[rowIdOffset]; + } + + List<String> hiveColumnNames = Arrays.asList(hiveColumnNamesArr); + + for (IndexSearchCondition sc : getSearchConditions(conf)) { + String col = sc.getColumnDesc().getColumn(); + if (hiveRowIdColumnName == null || !hiveRowIdColumnName.equals(col)) { + HiveAccumuloColumnMapping mapping = (HiveAccumuloColumnMapping) columnMapper + .getColumnMappingForHiveColumn(hiveColumnNames, col); + itrs.add(toSetting(mapping, sc)); + } + } + if (log.isInfoEnabled()) + log.info("num iterators = " + itrs.size()); + return itrs; + } + + /** + * Create an IteratorSetting for the right qualifier, constant, CompareOpt, and PrimitiveCompare + * type. + * + * @param accumuloColumnMapping + * ColumnMapping to filter + * @param sc + * IndexSearchCondition + * @return IteratorSetting + * @throws SerDeException + */ + public IteratorSetting toSetting(HiveAccumuloColumnMapping accumuloColumnMapping, + IndexSearchCondition sc) throws SerDeException { + iteratorCount++; + final IteratorSetting is = new IteratorSetting(iteratorCount, + PrimitiveComparisonFilter.FILTER_PREFIX + iteratorCount, PrimitiveComparisonFilter.class); + final String type = sc.getColumnDesc().getTypeString(); + final String comparisonOpStr = sc.getComparisonOp(); + + PushdownTuple tuple; + try { + tuple = new PushdownTuple(sc, getPrimitiveComparison(type, sc), getCompareOp(comparisonOpStr, + sc)); + } catch (NoSuchPrimitiveComparisonException e) { + throw new SerDeException("No configured PrimitiveComparison class for " + type, e); + } catch (NoSuchCompareOpException e) { + throw new SerDeException("No configured CompareOp class for " + comparisonOpStr, e); + } + + is.addOption(PrimitiveComparisonFilter.P_COMPARE_CLASS, tuple.getpCompare().getClass() + .getName()); + is.addOption(PrimitiveComparisonFilter.COMPARE_OPT_CLASS, tuple.getcOpt().getClass().getName()); + is.addOption(PrimitiveComparisonFilter.CONST_VAL, + new String(Base64.encodeBase64(tuple.getConstVal()))); + is.addOption(PrimitiveComparisonFilter.COLUMN, accumuloColumnMapping.serialize()); + + return is; + } + + public ExprNodeDesc getExpression(Configuration conf) { + String filteredExprSerialized = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR); + if (filteredExprSerialized == null) + return null; + + return Utilities.deserializeExpression(filteredExprSerialized); + } + + /** + * + * @param conf + * Configuration + * @return list of IndexSearchConditions from the filter expression. + */ + public List<IndexSearchCondition> getSearchConditions(Configuration conf) { + final List<IndexSearchCondition> sConditions = Lists.newArrayList(); + ExprNodeDesc filterExpr = getExpression(conf); + if (null == filterExpr) { + return sConditions; + } + IndexPredicateAnalyzer analyzer = newAnalyzer(conf); + ExprNodeDesc residual = analyzer.analyzePredicate(filterExpr, sConditions); + if (residual != null) + throw new RuntimeException("Unexpected residual predicate: " + residual.getExprString()); + return sConditions; + } + + /** + * + * @param conf + * Configuration + * @param desc + * predicate expression node. + * @return DecomposedPredicate containing translated search conditions the analyzer can support. + */ + public DecomposedPredicate decompose(Configuration conf, ExprNodeDesc desc) { + IndexPredicateAnalyzer analyzer = newAnalyzer(conf); + List<IndexSearchCondition> sConditions = new ArrayList<IndexSearchCondition>(); + ExprNodeDesc residualPredicate = analyzer.analyzePredicate(desc, sConditions); + + if (sConditions.size() == 0) { + if (log.isInfoEnabled()) + log.info("nothing to decompose. Returning"); + return null; + } + + DecomposedPredicate decomposedPredicate = new DecomposedPredicate(); + decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions(sConditions); + decomposedPredicate.residualPredicate = (ExprNodeGenericFuncDesc) residualPredicate; + return decomposedPredicate; + } + + /** + * Build an analyzer that allows comparison opts from compareOpts map, and all columns from table + * definition. + */ + private IndexPredicateAnalyzer newAnalyzer(Configuration conf) { + IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer(); + analyzer.clearAllowedColumnNames(); + for (String op : cOpKeyset()) { + analyzer.addComparisonOp(op); + } + + String[] hiveColumnNames = conf.getStrings(serdeConstants.LIST_COLUMNS); + for (String col : hiveColumnNames) { + analyzer.allowColumnName(col); + } + + return analyzer; + } +}