http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/AccumuloIndexDefinition.java ---------------------------------------------------------------------- diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/AccumuloIndexDefinition.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/AccumuloIndexDefinition.java new file mode 100644 index 0000000..51531d6 --- /dev/null +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/AccumuloIndexDefinition.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.accumulo.mr; + +import java.util.HashMap; +import java.util.Map; + +/** + * Index table definition. + */ +public class AccumuloIndexDefinition { + private final String baseTable; + private final String indexTable; + private final Map<String, String> colMap; + + + public AccumuloIndexDefinition(String baseTable, String indexTable) { + this.colMap = new HashMap<String, String>(); + this.baseTable = baseTable; + this.indexTable = indexTable; + } + + public String getBaseTable() { + return baseTable; + } + + + public String getIndexTable() { + return indexTable; + } + + public void addIndexCol(String cf, String cq, String colType) { + colMap.put(encode(cf, cq), colType); + } + + public Map<String, String> getColumnMap() { + return colMap; + } + + public void setColumnTuples(String columns) { + if (columns != null) { + String cols = columns.trim(); + if (!cols.isEmpty() && !"*".equals(cols)) { + for (String col : cols.split(",")) { + String[] cfcqtp = col.trim().split(":"); + addIndexCol(cfcqtp[0], cfcqtp[1], cfcqtp[2]); + } + } + } + } + + public boolean contains(String cf, String cq) { + return colMap.containsKey(encode(cf, cq)); + } + + public String getColType(String cf, String cq) { + return colMap.get(encode(cf, cq)); + } + + private String encode(String cf, String cq) { + return cq + ":" + cq; + } +}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/AccumuloIndexedOutputFormat.java ---------------------------------------------------------------------- diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/AccumuloIndexedOutputFormat.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/AccumuloIndexedOutputFormat.java new file mode 100644 index 0000000..a055233 --- /dev/null +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/AccumuloIndexedOutputFormat.java @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.accumulo.mr; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat; +import org.apache.accumulo.core.data.ColumnUpdate; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.accumulo.AccumuloIndexLexicoder; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Progressable; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Extension of AccumuloOutputFormat to support indexing. + */ +public class AccumuloIndexedOutputFormat extends AccumuloOutputFormat { + private static final Logger LOG = Logger.getLogger(AccumuloIndexedOutputFormat.class); + private static final Class<?> CLASS = AccumuloOutputFormat.class; + private static final byte[] EMPTY_BYTES = new byte[0]; + + public static void setIndexTableName(JobConf job, String tableName) { + IndexOutputConfigurator.setIndexTableName(CLASS, job, tableName); + } + + protected static String getIndexTableName(JobConf job) { + return IndexOutputConfigurator.getIndexTableName(CLASS, job); + } + + public static void setIndexColumns(JobConf job, String fields) { + IndexOutputConfigurator.setIndexColumns(CLASS, job, fields); + } + + protected static String getIndexColumns(JobConf job) { + return IndexOutputConfigurator.getIndexColumns(CLASS, job); + } + + public static void setStringEncoding(JobConf job, Boolean isStringEncoding) { + IndexOutputConfigurator.setRecordEncoding(CLASS, job, isStringEncoding); + } + + protected static Boolean getStringEncoding(JobConf job) { + return IndexOutputConfigurator.getRecordEncoding(CLASS, job); + } + + public RecordWriter<Text, Mutation> getRecordWriter(FileSystem ignored, JobConf job, + String name, Progressable progress) throws IOException { + try { + return new AccumuloIndexedOutputFormat.AccumuloRecordWriter(job); + } catch (Exception e) { + throw new IOException(e); + } + } + + protected static class AccumuloRecordWriter implements RecordWriter<Text, Mutation> { + private MultiTableBatchWriter mtbw = null; + private Map<Text, BatchWriter> bws = null; + private Text defaultTableName = null; + private Text indexTableName = null; + private boolean simulate = false; + private boolean createTables = false; + private boolean isStringEncoded = true; + private long mutCount = 0L; + private long valCount = 0L; + private Connector conn; + private AccumuloIndexDefinition indexDef = null; + + protected AccumuloRecordWriter(JobConf job) + throws AccumuloException, AccumuloSecurityException, IOException { + Level l = AccumuloIndexedOutputFormat.getLogLevel(job); + if (l != null) { + LOG.setLevel(AccumuloIndexedOutputFormat.getLogLevel(job)); + } + this.isStringEncoded = AccumuloIndexedOutputFormat.getStringEncoding(job).booleanValue(); + this.simulate = AccumuloIndexedOutputFormat.getSimulationMode(job).booleanValue(); + this.createTables = AccumuloIndexedOutputFormat.canCreateTables(job).booleanValue(); + if (this.simulate) { + LOG.info("Simulating output only. No writes to tables will occur"); + } + + this.bws = new HashMap(); + String tname = AccumuloIndexedOutputFormat.getDefaultTableName(job); + this.defaultTableName = tname == null ? null : new Text(tname); + + String iname = AccumuloIndexedOutputFormat.getIndexTableName(job); + if (iname != null) { + LOG.info("Index Table = " + iname); + this.indexTableName = new Text(iname); + this.indexDef = createIndexDefinition(job, tname, iname); + } + if (!this.simulate) { + this.conn = AccumuloIndexedOutputFormat.getInstance(job) + .getConnector(AccumuloIndexedOutputFormat.getPrincipal(job), + AccumuloIndexedOutputFormat.getAuthenticationToken(job)); + this.mtbw = this.conn.createMultiTableBatchWriter( + AccumuloIndexedOutputFormat.getBatchWriterOptions(job)); + } + } + + AccumuloIndexDefinition createIndexDefinition(JobConf job, String tname, String iname) { + AccumuloIndexDefinition def = new AccumuloIndexDefinition(tname, iname); + String cols = AccumuloIndexedOutputFormat.getIndexColumns(job); + LOG.info("Index Cols = " + cols); + def.setColumnTuples(cols); + return def; + } + + public void write(Text table, Mutation mutation) throws IOException { + if(table == null || table.toString().isEmpty()) { + table = this.defaultTableName; + } + + if(!this.simulate && table == null) { + throw new IOException("No table or default table specified. Try simulation mode next time"); + } else { + ++this.mutCount; + this.valCount += (long)mutation.size(); + this.printMutation(table, mutation); + if(!this.simulate) { + if(!this.bws.containsKey(table)) { + try { + this.addTable(table); + } catch (Exception var5) { + LOG.error(var5); + throw new IOException(var5); + } + } + if(indexTableName != null && !this.bws.containsKey(indexTableName)) { + try { + this.addTable(indexTableName); + } catch (Exception var6) { + LOG.error(var6); + throw new IOException(var6); + } + } + + try { + ((BatchWriter)this.bws.get(table)).addMutation(mutation); + } catch (MutationsRejectedException var4) { + throw new IOException(var4); + } + + // if this table has an associated index table then attempt to build + // index mutations + if (indexTableName != null) { + List<Mutation> idxMuts = getIndexMutations(mutation); + if (!idxMuts.isEmpty()) { + try { + BatchWriter writer = this.bws.get(indexTableName); + for (Mutation m : idxMuts) { + writer.addMutation(m); + } + } catch (MutationsRejectedException var4) { + throw new IOException(var4); + } + } + } + } + } + } + + public void addTable(Text tableName) throws AccumuloException, AccumuloSecurityException { + if(this.simulate) { + LOG.info("Simulating adding table: " + tableName); + } else { + LOG.debug("Adding table: " + tableName); + BatchWriter bw = null; + String table = tableName.toString(); + if(this.createTables && !this.conn.tableOperations().exists(table)) { + try { + this.conn.tableOperations().create(table); + } catch (AccumuloSecurityException var8) { + LOG.error("Accumulo security violation creating " + table, var8); + throw var8; + } catch (TableExistsException var9) { + LOG.warn("Table Exists " + table, var9); + } + } + + try { + bw = this.mtbw.getBatchWriter(table); + } catch (TableNotFoundException var5) { + LOG.error("Accumulo table " + table + " doesn't exist and cannot be created.", var5); + throw new AccumuloException(var5); + } + + if(bw != null) { + this.bws.put(tableName, bw); + } + + } + } + + private int printMutation(Text table, Mutation m) { + if(LOG.isTraceEnabled()) { + LOG.trace(String.format("Table %s row key: %s", + new Object[]{table, this.hexDump(m.getRow())})); + Iterator itr = m.getUpdates().iterator(); + + while(itr.hasNext()) { + ColumnUpdate cu = (ColumnUpdate)itr.next(); + LOG.trace(String.format("Table %s column: %s:%s", + new Object[]{table, this.hexDump(cu.getColumnFamily()), + this.hexDump(cu.getColumnQualifier())})); + LOG.trace(String.format("Table %s security: %s", + new Object[]{table, (new ColumnVisibility(cu.getColumnVisibility())).toString()})); + LOG.trace(String.format("Table %s value: %s", + new Object[]{table, this.hexDump(cu.getValue())})); + } + } + + return m.getUpdates().size(); + } + + private List<Mutation> getIndexMutations(Mutation baseMut) { + List indexMuts = new ArrayList<Mutation>(); + + // nothing to do if there is not a index definition for this table + if (null != indexDef) { + + byte[] rowId = baseMut.getRow(); + + + for (ColumnUpdate cu : baseMut.getUpdates()) { + String cf = new String(cu.getColumnFamily()); + String cq = new String(cu.getColumnQualifier()); + + // if this columnFamily/columnQualifier pair is defined in the index build a new mutation + // so key=value, cf=columnFamily_columnQualifer, cq=rowKey, cv=columnVisibility value=[] + String colType = indexDef.getColType(cf, cq); + if (colType != null) { + LOG.trace(String.format("Building index for column %s:%s", new Object[]{cf, cq})); + Mutation m = new Mutation(AccumuloIndexLexicoder.encodeValue(cu.getValue(), colType, + isStringEncoded)); + String colFam = cf + "_" + cq; + m.put(colFam.getBytes(), rowId, new ColumnVisibility(cu.getColumnVisibility()), + EMPTY_BYTES); + indexMuts.add(m); + } + } + } + return indexMuts; + } + + private String hexDump(byte[] ba) { + StringBuilder sb = new StringBuilder(); + byte[] arr = ba; + int len = ba.length; + + for(int i = 0; i < len; ++i) { + byte b = arr[i]; + if(b > 32 && b < 126) { + sb.append((char)b); + } else { + sb.append(String.format("x%02x", new Object[]{Byte.valueOf(b)})); + } + } + + return sb.toString(); + } + + public void close(Reporter reporter) throws IOException { + LOG.debug("mutations written: " + this.mutCount + ", values written: " + this.valCount); + if(!this.simulate) { + try { + this.mtbw.close(); + } catch (MutationsRejectedException var7) { + if(var7.getAuthorizationFailuresMap().size() >= 0) { + Map tables = new HashMap(); + + Map.Entry ke; + Object secCodes; + for(Iterator itr = var7.getAuthorizationFailuresMap().entrySet().iterator(); + itr.hasNext(); ((Set)secCodes).addAll((Collection)ke.getValue())) { + ke = (Map.Entry)itr.next(); + secCodes = (Set)tables.get(((KeyExtent)ke.getKey()).getTableId().toString()); + if(secCodes == null) { + secCodes = new HashSet(); + tables.put(((KeyExtent)ke.getKey()).getTableId().toString(), secCodes); + } + } + + LOG.error("Not authorized to write to tables : " + tables); + } + + if(var7.getConstraintViolationSummaries().size() > 0) { + LOG.error("Constraint violations : " + var7.getConstraintViolationSummaries().size()); + } + throw new IOException(var7); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java ---------------------------------------------------------------------- diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java index 3ae5431..bfa764a 100644 --- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java @@ -1,10 +1,11 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -14,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.hive.accumulo.mr; import java.io.IOException; @@ -27,8 +29,11 @@ import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Mutation; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.accumulo.AccumuloConnectionParameters; +import org.apache.hadoop.hive.accumulo.columns.ColumnEncoding; +import org.apache.hadoop.hive.accumulo.serde.AccumuloIndexParameters; import org.apache.hadoop.hive.accumulo.HiveAccumuloHelper; import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordWriter; @@ -42,7 +47,7 @@ import com.google.common.base.Preconditions; /** * */ -public class HiveAccumuloTableOutputFormat extends AccumuloOutputFormat { +public class HiveAccumuloTableOutputFormat extends AccumuloIndexedOutputFormat { protected final HiveAccumuloHelper helper = new HiveAccumuloHelper(); @@ -54,7 +59,8 @@ public class HiveAccumuloTableOutputFormat extends AccumuloOutputFormat { } @Override - public RecordWriter<Text,Mutation> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { + public RecordWriter<Text, Mutation> getRecordWriter(FileSystem ignored, JobConf job, String name, + Progressable progress) throws IOException { configureAccumuloOutputFormat(job); return super.getRecordWriter(ignored, job, name, progress); @@ -117,6 +123,16 @@ public class HiveAccumuloTableOutputFormat extends AccumuloOutputFormat { // Set the table where we're writing this data setDefaultAccumuloTableName(job, tableName); + + // Set the index table information + final String indexTableName = job.get(AccumuloIndexParameters.INDEXTABLE_NAME); + final String indexedColumns = job.get(AccumuloIndexParameters.INDEXED_COLUMNS); + final String columnTypes = job.get(serdeConstants.LIST_COLUMN_TYPES); + final boolean binaryEncoding = ColumnEncoding.BINARY.getName() + .equalsIgnoreCase(job.get(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE)); + setAccumuloIndexTableName(job, indexTableName); + setAccumuloIndexColumns(job, indexedColumns); + setAccumuloStringEncoding(job, !binaryEncoding); } catch (AccumuloSecurityException e) { log.error("Could not connect to Accumulo with provided credentials", e); throw new IOException(e); @@ -125,10 +141,10 @@ public class HiveAccumuloTableOutputFormat extends AccumuloOutputFormat { // Non-static methods to wrap the static AccumuloOutputFormat methods to enable testing - protected void setConnectorInfoWithErrorChecking(JobConf conf, String username, AuthenticationToken token) - throws AccumuloSecurityException { + protected void setConnectorInfoWithErrorChecking(JobConf conf, String username, + AuthenticationToken token) throws AccumuloSecurityException { try { - AccumuloOutputFormat.setConnectorInfo(conf, username, token); + AccumuloIndexedOutputFormat.setConnectorInfo(conf, username, token); } catch (IllegalStateException e) { // AccumuloOutputFormat complains if you re-set an already set value. We just don't care. log.debug("Ignoring exception setting Accumulo Connector instance for user " + username, e); @@ -136,8 +152,8 @@ public class HiveAccumuloTableOutputFormat extends AccumuloOutputFormat { } @SuppressWarnings("deprecation") - protected void setZooKeeperInstanceWithErrorChecking(JobConf conf, String instanceName, String zookeepers, - boolean isSasl) throws IOException { + protected void setZooKeeperInstanceWithErrorChecking(JobConf conf, String instanceName, + String zookeepers, boolean isSasl) throws IOException { try { if (isSasl) { // Reflection to support Accumulo 1.5. Remove when Accumulo 1.5 support is dropped @@ -146,7 +162,7 @@ public class HiveAccumuloTableOutputFormat extends AccumuloOutputFormat { getHelper().setZooKeeperInstance(conf, AccumuloOutputFormat.class, zookeepers, instanceName, isSasl); } else { - AccumuloOutputFormat.setZooKeeperInstance(conf, instanceName, zookeepers); + AccumuloIndexedOutputFormat.setZooKeeperInstance(conf, instanceName, zookeepers); } } catch (IllegalStateException ise) { // AccumuloOutputFormat complains if you re-set an already set value. We just don't care. @@ -157,7 +173,7 @@ public class HiveAccumuloTableOutputFormat extends AccumuloOutputFormat { protected void setMockInstanceWithErrorChecking(JobConf conf, String instanceName) { try { - AccumuloOutputFormat.setMockInstance(conf, instanceName); + AccumuloIndexedOutputFormat.setMockInstance(conf, instanceName); } catch (IllegalStateException e) { // AccumuloOutputFormat complains if you re-set an already set value. We just don't care. log.debug("Ignoring exception setting mock instance of " + instanceName, e); @@ -165,7 +181,19 @@ public class HiveAccumuloTableOutputFormat extends AccumuloOutputFormat { } protected void setDefaultAccumuloTableName(JobConf conf, String tableName) { - AccumuloOutputFormat.setDefaultTableName(conf, tableName); + AccumuloIndexedOutputFormat.setDefaultTableName(conf, tableName); + } + + protected void setAccumuloIndexTableName(JobConf conf, String indexTableName) { + AccumuloIndexedOutputFormat.setIndexTableName(conf, indexTableName); + } + + protected void setAccumuloIndexColumns(JobConf conf, String indexColumns) { + AccumuloIndexedOutputFormat.setIndexColumns(conf, indexColumns); + } + + protected void setAccumuloStringEncoding(JobConf conf, Boolean isStringEncoded) { + AccumuloIndexedOutputFormat.setStringEncoding(conf, isStringEncoded); } HiveAccumuloHelper getHelper() { http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/IndexOutputConfigurator.java ---------------------------------------------------------------------- diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/IndexOutputConfigurator.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/IndexOutputConfigurator.java new file mode 100644 index 0000000..98294bb --- /dev/null +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/IndexOutputConfigurator.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.accumulo.mr; + +import org.apache.accumulo.core.client.mapreduce.lib.impl.OutputConfigurator; +import org.apache.hadoop.conf.Configuration; + +/** + * Extension of OutputConfigurtion to support indexing. + */ +public class IndexOutputConfigurator extends OutputConfigurator { + /** + * Accumulo Write options. + */ + public static enum WriteOpts { + DEFAULT_TABLE_NAME, + INDEX_TABLE_NAME, + INDEX_COLUMNS, + COLUMN_TYPES, + BINARY_ENCODING, + BATCH_WRITER_CONFIG; + + private WriteOpts() { + } + } + + public static void setIndexTableName(Class<?> implementingClass, Configuration conf, + String tableName) { + if(tableName != null) { + conf.set(enumToConfKey(implementingClass, WriteOpts.INDEX_TABLE_NAME), tableName); + } + } + + public static String getIndexTableName(Class<?> implementingClass, Configuration conf) { + return conf.get(enumToConfKey(implementingClass, WriteOpts.INDEX_TABLE_NAME)); + } + + public static void setIndexColumns(Class<?> implementingClass, Configuration conf, + String tableName) { + if(tableName != null) { + conf.set(enumToConfKey(implementingClass, WriteOpts.INDEX_COLUMNS), tableName); + } + } + + public static String getIndexColumns(Class<?> implementingClass, Configuration conf) { + return conf.get(enumToConfKey(implementingClass, WriteOpts.INDEX_COLUMNS)); + } + + + public static void setRecordEncoding(Class<?> implementingClass, Configuration conf, + Boolean isBinary) { + conf.set(enumToConfKey(implementingClass, WriteOpts.BINARY_ENCODING), isBinary.toString()); + } + + public static Boolean getRecordEncoding(Class<?> implementingClass, Configuration conf) { + return Boolean.valueOf(conf.get(enumToConfKey(implementingClass, WriteOpts.BINARY_ENCODING))); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/package-info.java ---------------------------------------------------------------------- diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/package-info.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/package-info.java new file mode 100644 index 0000000..599b1ea --- /dev/null +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/package-info.java @@ -0,0 +1,4 @@ +/** + * map reduce and supporting classes + */ +package org.apache.hadoop.hive.accumulo.mr; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java ---------------------------------------------------------------------- diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java index a7ec7c5..718a5c5 100644 --- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java @@ -1,10 +1,11 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -29,6 +30,7 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.data.Range; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.accumulo.columns.ColumnEncoding; import org.apache.hadoop.hive.accumulo.columns.ColumnMapper; import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloColumnMapping; import org.apache.hadoop.hive.accumulo.predicate.compare.CompareOp; @@ -86,13 +88,13 @@ public class AccumuloPredicateHandler { private static final List<Range> TOTAL_RANGE = Collections.singletonList(new Range()); private static AccumuloPredicateHandler handler = new AccumuloPredicateHandler(); - private static Map<String,Class<? extends CompareOp>> compareOps = Maps.newHashMap(); - private static Map<String,Class<? extends PrimitiveComparison>> pComparisons = Maps.newHashMap(); + private static Map<String, Class<? extends CompareOp>> compareOps = Maps.newHashMap(); + private static Map<String, Class<? extends PrimitiveComparison>> pComparisons = Maps.newHashMap(); // Want to start sufficiently "high" enough in the iterator stack private static int iteratorCount = 50; - private static final Logger log = LoggerFactory.getLogger(AccumuloPredicateHandler.class); + private static final Logger LOG = LoggerFactory.getLogger(AccumuloPredicateHandler.class); static { compareOps.put(GenericUDFOPEqual.class.getName(), Equal.class); compareOps.put(GenericUDFOPNotEqual.class.getName(), NotEqual.class); @@ -136,8 +138,9 @@ public class AccumuloPredicateHandler { */ public Class<? extends CompareOp> getCompareOpClass(String udfType) throws NoSuchCompareOpException { - if (!compareOps.containsKey(udfType)) + if (!compareOps.containsKey(udfType)) { throw new NoSuchCompareOpException("Null compare op for specified key: " + udfType); + } return compareOps.get(udfType); } @@ -167,9 +170,10 @@ public class AccumuloPredicateHandler { */ public Class<? extends PrimitiveComparison> getPrimitiveComparisonClass(String type) throws NoSuchPrimitiveComparisonException { - if (!pComparisons.containsKey(type)) + if (!pComparisons.containsKey(type)) { throw new NoSuchPrimitiveComparisonException("Null primitive comparison for specified key: " + type); + } return pComparisons.get(type); } @@ -196,7 +200,8 @@ public class AccumuloPredicateHandler { /** * Loop through search conditions and build ranges for predicates involving rowID column, if any. */ - public List<Range> getRanges(Configuration conf, ColumnMapper columnMapper) throws SerDeException { + public List<Range> getRanges(Configuration conf, ColumnMapper columnMapper) + throws SerDeException { if (!columnMapper.hasRowIdMapping()) { return TOTAL_RANGE; } @@ -218,16 +223,16 @@ public class AccumuloPredicateHandler { return TOTAL_RANGE; } - Object result = generateRanges(columnMapper, hiveRowIdColumnName, root); + Object result = generateRanges(conf, columnMapper, hiveRowIdColumnName, root); if (null == result) { - log.info("Calculated null set of ranges, scanning full table"); + LOG.info("Calculated null set of ranges, scanning full table"); return TOTAL_RANGE; } else if (result instanceof Range) { - log.info("Computed a single Range for the query: " + result); + LOG.info("Computed a single Range for the query: " + result); return Collections.singletonList((Range) result); } else if (result instanceof List) { - log.info("Computed a collection of Ranges for the query: " + result); + LOG.info("Computed a collection of Ranges for the query: " + result); @SuppressWarnings("unchecked") List<Range> ranges = (List<Range>) result; return ranges; @@ -237,9 +242,11 @@ public class AccumuloPredicateHandler { } /** - * Encapsulates the traversal over some {@link ExprNodeDesc} tree for the generation of Accumuluo - * Ranges using expressions involving the Accumulo rowid-mapped Hive column + * Encapsulates the traversal over some {@link ExprNodeDesc} tree for the generation of Accumuluo. + * Ranges using expressions involving the Accumulo rowid-mapped Hive column. * + * @param conf + * Hadoop configuration * @param columnMapper * Mapping of Hive to Accumulo columns for the query * @param hiveRowIdColumnName @@ -249,15 +256,16 @@ public class AccumuloPredicateHandler { * @return An object representing the result from the ExprNodeDesc tree traversal using the * AccumuloRangeGenerator */ - protected Object generateRanges(ColumnMapper columnMapper, String hiveRowIdColumnName, ExprNodeDesc root) { - AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(handler, + protected Object generateRanges(Configuration conf, ColumnMapper columnMapper, + String hiveRowIdColumnName, ExprNodeDesc root) { + AccumuloRangeGenerator rangeGenerator = new AccumuloRangeGenerator(conf, handler, columnMapper.getRowIdMapping(), hiveRowIdColumnName); Dispatcher disp = new DefaultRuleDispatcher(rangeGenerator, - Collections.<Rule,NodeProcessor> emptyMap(), null); + Collections.<Rule, NodeProcessor> emptyMap(), null); GraphWalker ogw = new DefaultGraphWalker(disp); - ArrayList<Node> roots = new ArrayList<Node>(); + List<Node> roots = new ArrayList<Node>(); roots.add(root); - HashMap<Node,Object> nodeOutput = new HashMap<Node,Object>(); + HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>(); try { ogw.startWalking(roots, nodeOutput); @@ -282,10 +290,13 @@ public class AccumuloPredicateHandler { boolean shouldPushdown = conf.getBoolean(AccumuloSerDeParameters.ITERATOR_PUSHDOWN_KEY, AccumuloSerDeParameters.ITERATOR_PUSHDOWN_DEFAULT); if (!shouldPushdown) { - log.info("Iterator pushdown is disabled for this table"); + LOG.info("Iterator pushdown is disabled for this table"); return itrs; } + boolean binaryEncodedRow = ColumnEncoding.BINARY.getName(). + equalsIgnoreCase(conf.get(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE)); + int rowIdOffset = columnMapper.getRowIdOffset(); String[] hiveColumnNamesArr = conf.getStrings(serdeConstants.LIST_COLUMNS); @@ -306,11 +317,12 @@ public class AccumuloPredicateHandler { if (hiveRowIdColumnName == null || !hiveRowIdColumnName.equals(col)) { HiveAccumuloColumnMapping mapping = (HiveAccumuloColumnMapping) columnMapper .getColumnMappingForHiveColumn(hiveColumnNames, col); - itrs.add(toSetting(mapping, sc)); + itrs.add(toSetting(mapping, sc, binaryEncodedRow)); } } - if (log.isInfoEnabled()) - log.info("num iterators = " + itrs.size()); + if (LOG.isInfoEnabled()) { + LOG.info("num iterators = " + itrs.size()); + } return itrs; } @@ -322,15 +334,19 @@ public class AccumuloPredicateHandler { * ColumnMapping to filter * @param sc * IndexSearchCondition + * @param binaryEncodedValues + * flag for binary encodedValues * @return IteratorSetting * @throws SerDeException */ public IteratorSetting toSetting(HiveAccumuloColumnMapping accumuloColumnMapping, - IndexSearchCondition sc) throws SerDeException { + IndexSearchCondition sc, boolean binaryEncodedValues) throws SerDeException { iteratorCount++; final IteratorSetting is = new IteratorSetting(iteratorCount, - PrimitiveComparisonFilter.FILTER_PREFIX + iteratorCount, PrimitiveComparisonFilter.class); - final String type = sc.getColumnDesc().getTypeString(); + PrimitiveComparisonFilter.FILTER_PREFIX + iteratorCount, + PrimitiveComparisonFilter.class); + final String type = binaryEncodedValues ? sc.getColumnDesc().getTypeString() + : ColumnEncoding.STRING.getName(); final String comparisonOpStr = sc.getComparisonOp(); PushdownTuple tuple; @@ -355,8 +371,9 @@ public class AccumuloPredicateHandler { public ExprNodeDesc getExpression(Configuration conf) { String filteredExprSerialized = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR); - if (filteredExprSerialized == null) + if (filteredExprSerialized == null) { return null; + } return SerializationUtilities.deserializeExpression(filteredExprSerialized); } @@ -375,8 +392,9 @@ public class AccumuloPredicateHandler { } IndexPredicateAnalyzer analyzer = newAnalyzer(conf); ExprNodeDesc residual = analyzer.analyzePredicate(filterExpr, sConditions); - if (residual != null) + if (residual != null) { throw new RuntimeException("Unexpected residual predicate: " + residual.getExprString()); + } return sConditions; } @@ -394,8 +412,7 @@ public class AccumuloPredicateHandler { ExprNodeDesc residualPredicate = analyzer.analyzePredicate(desc, sConditions); if (sConditions.size() == 0) { - if (log.isInfoEnabled()) - log.info("nothing to decompose. Returning"); + LOG.info("nothing to decompose. Returning"); return null; } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java ---------------------------------------------------------------------- diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java index 21392d1..90607ed 100644 --- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloRangeGenerator.java @@ -1,10 +1,11 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -14,15 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.accumulo.predicate; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Stack; +package org.apache.hadoop.hive.accumulo.predicate; import org.apache.accumulo.core.data.Range; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.accumulo.serde.AccumuloIndexParameters; +import org.apache.hadoop.hive.accumulo.AccumuloIndexScanner; +import org.apache.hadoop.hive.accumulo.AccumuloIndexScannerException; +import org.apache.hadoop.hive.accumulo.AccumuloIndexLexicoder; import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloRowIdColumnMapping; import org.apache.hadoop.hive.accumulo.predicate.compare.CompareOp; import org.apache.hadoop.hive.accumulo.predicate.compare.Equal; @@ -43,17 +44,19 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.lazy.LazyUtils; import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantBooleanObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantByteObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDoubleObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantFloatObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantIntObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantLongObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantShortObjectInspector; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.UTF8; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Stack; + +import static java.nio.charset.StandardCharsets.UTF_8; + /** * */ @@ -63,12 +66,27 @@ public class AccumuloRangeGenerator implements NodeProcessor { private final AccumuloPredicateHandler predicateHandler; private final HiveAccumuloRowIdColumnMapping rowIdMapping; private final String hiveRowIdColumnName; + private AccumuloIndexScanner indexScanner; - public AccumuloRangeGenerator(AccumuloPredicateHandler predicateHandler, + public AccumuloRangeGenerator(Configuration conf, AccumuloPredicateHandler predicateHandler, HiveAccumuloRowIdColumnMapping rowIdMapping, String hiveRowIdColumnName) { this.predicateHandler = predicateHandler; this.rowIdMapping = rowIdMapping; this.hiveRowIdColumnName = hiveRowIdColumnName; + try { + this.indexScanner = new AccumuloIndexParameters(conf).createScanner(); + } catch (AccumuloIndexScannerException e) { + LOG.error(e.getLocalizedMessage(), e); + this.indexScanner = null; + } + } + + public AccumuloIndexScanner getIndexScanner() { + return indexScanner; + } + + public void setIndexScanner(AccumuloIndexScanner indexScanner) { + this.indexScanner = indexScanner; } @Override @@ -234,13 +252,39 @@ public class AccumuloRangeGenerator implements NodeProcessor { return null; } - // Reject any clauses that are against a column that isn't the rowId mapping + ConstantObjectInspector objInspector = constantDesc.getWritableObjectInspector(); + + // Reject any clauses that are against a column that isn't the rowId mapping or indexed if (!this.hiveRowIdColumnName.equals(columnDesc.getColumn())) { + if (this.indexScanner != null && this.indexScanner.isIndexed(columnDesc.getColumn())) { + return getIndexedRowIds(genericUdf, leftHandNode, columnDesc.getColumn(), objInspector); + } return null; } - ConstantObjectInspector objInspector = constantDesc.getWritableObjectInspector(); + Text constText = getConstantText(objInspector); + + return getRange(genericUdf, leftHandNode, constText); + } + + private Range getRange(GenericUDF genericUdf, ExprNodeDesc leftHandNode, Text constText) { + Class<? extends CompareOp> opClz; + try { + opClz = predicateHandler.getCompareOpClass(genericUdf.getUdfName()); + } catch (NoSuchCompareOpException e) { + throw new IllegalArgumentException("Unhandled UDF class: " + genericUdf.getUdfName()); + } + + if (leftHandNode instanceof ExprNodeConstantDesc) { + return getConstantOpColumnRange(opClz, constText); + } else if (leftHandNode instanceof ExprNodeColumnDesc) { + return getColumnOpConstantRange(opClz, constText); + } else { + throw new IllegalStateException("Expected column or constant on LHS of expression"); + } + } + private Text getConstantText(ConstantObjectInspector objInspector) throws SemanticException { Text constText; switch (rowIdMapping.getEncoding()) { case STRING: @@ -257,21 +301,7 @@ public class AccumuloRangeGenerator implements NodeProcessor { throw new SemanticException("Unable to parse unknown encoding: " + rowIdMapping.getEncoding()); } - - Class<? extends CompareOp> opClz; - try { - opClz = predicateHandler.getCompareOpClass(genericUdf.getUdfName()); - } catch (NoSuchCompareOpException e) { - throw new IllegalArgumentException("Unhandled UDF class: " + genericUdf.getUdfName()); - } - - if (leftHandNode instanceof ExprNodeConstantDesc) { - return getConstantOpColumnRange(opClz, constText); - } else if (leftHandNode instanceof ExprNodeColumnDesc) { - return getColumnOpConstantRange(opClz, constText); - } else { - throw new IllegalStateException("Expected column or constant on LHS of expression"); - } + return constText; } protected Range getConstantOpColumnRange(Class<? extends CompareOp> opClz, Text constText) { @@ -311,6 +341,21 @@ public class AccumuloRangeGenerator implements NodeProcessor { } } + + protected Object getIndexedRowIds(GenericUDF genericUdf, ExprNodeDesc leftHandNode, + String columnName, ConstantObjectInspector objInspector) + throws SemanticException { + Text constText = getConstantText(objInspector); + byte[] value = constText.toString().getBytes(UTF_8); + byte[] encoded = AccumuloIndexLexicoder.encodeValue(value, leftHandNode.getTypeString(), true); + Range range = getRange(genericUdf, leftHandNode, new Text(encoded)); + if (indexScanner != null) { + return indexScanner.getIndexRowRanges(columnName, range); + } + return null; + } + + protected Text getUtf8Value(ConstantObjectInspector objInspector) { // TODO is there a more correct way to get the literal value for the Object? return new Text(objInspector.getWritableConstantValue().toString()); @@ -327,7 +372,7 @@ public class AccumuloRangeGenerator implements NodeProcessor { ByteArrayOutputStream out = new ByteArrayOutputStream(); if (objInspector instanceof PrimitiveObjectInspector) { LazyUtils.writePrimitive(out, objInspector.getWritableConstantValue(), - (PrimitiveObjectInspector) objInspector); + (PrimitiveObjectInspector) objInspector); } else { return getUtf8Value(objInspector); } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PrimitiveComparisonFilter.java ---------------------------------------------------------------------- diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PrimitiveComparisonFilter.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PrimitiveComparisonFilter.java index 17d5529..5121ea3 100644 --- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PrimitiveComparisonFilter.java +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/PrimitiveComparisonFilter.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -37,7 +37,6 @@ import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloColumnMapping; import org.apache.hadoop.hive.accumulo.predicate.compare.CompareOp; import org.apache.hadoop.hive.accumulo.predicate.compare.PrimitiveComparison; import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +53,7 @@ import com.google.common.collect.Lists; */ public class PrimitiveComparisonFilter extends WholeRowIterator { @SuppressWarnings("unused") - private static final Logger log = LoggerFactory.getLogger(PrimitiveComparisonFilter.class); + private static final Logger LOG = LoggerFactory.getLogger(PrimitiveComparisonFilter.class); public static final String FILTER_PREFIX = "accumulo.filter.compare.iterator."; public static final String P_COMPARE_CLASS = "accumulo.filter.iterator.p.compare.class"; @@ -68,7 +67,7 @@ public class PrimitiveComparisonFilter extends WholeRowIterator { @Override protected boolean filter(Text currentRow, List<Key> keys, List<Value> values) { - SortedMap<Key,Value> items; + SortedMap<Key, Value> items; boolean allow; try { // if key doesn't contain CF, it's an encoded value from a previous iterator. while (keys.get(0).getColumnFamily().getBytes().length == 0) { @@ -103,11 +102,11 @@ public class PrimitiveComparisonFilter extends WholeRowIterator { } @Override - public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, + public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException { super.init(source, options, env); String serializedColumnMapping = options.get(COLUMN); - Entry<String,String> pair = ColumnMappingFactory.parseMapping(serializedColumnMapping); + Entry<String, String> pair = ColumnMappingFactory.parseMapping(serializedColumnMapping); // The ColumnEncoding, column name and type are all irrelevant at this point, just need the // cf:[cq] @@ -135,7 +134,7 @@ public class PrimitiveComparisonFilter extends WholeRowIterator { } } - protected byte[] getConstant(Map<String,String> options) { + protected byte[] getConstant(Map<String, String> options) { String b64Const = options.get(CONST_VAL); return Base64.decodeBase64(b64Const.getBytes()); } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloIndexParameters.java ---------------------------------------------------------------------- diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloIndexParameters.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloIndexParameters.java new file mode 100644 index 0000000..d295c7b --- /dev/null +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloIndexParameters.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.accumulo.serde; + +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.accumulo.AccumuloDefaultIndexScanner; +import org.apache.hadoop.hive.accumulo.AccumuloIndexScanner; +import org.apache.hadoop.hive.accumulo.AccumuloIndexScannerException; + +import java.nio.charset.StandardCharsets; +import java.util.HashSet; +import java.util.Set; + + +/** + * Accumulo Index Parameters for Hive tables. + */ +public class AccumuloIndexParameters { + public static final int DEFAULT_MAX_ROWIDS = 20000; + public static final String INDEX_SCANNER = "accumulo.index.scanner"; + public static final String MAX_INDEX_ROWS = "accumulo.index.rows.max"; + public static final String INDEXED_COLUMNS = "accumulo.indexed.columns"; + public static final String INDEXTABLE_NAME = "accumulo.indextable.name"; + private static final Set<String> EMPTY_SET = new HashSet<String>(); + private Configuration conf; + + public AccumuloIndexParameters(Configuration conf) { + this.conf = conf; + } + + public String getIndexTable() { + return this.conf.get(INDEXTABLE_NAME); + } + + public int getMaxIndexRows() { + return this.conf.getInt(MAX_INDEX_ROWS, DEFAULT_MAX_ROWIDS); + } + + public final Set<String> getIndexColumns() { + String colmap = conf.get(INDEXED_COLUMNS); + if (colmap != null) { + Set<String> cols = new HashSet<String>(); + for (String col : colmap.split(",")) { + cols.add(col.trim()); + } + return cols; + } + return EMPTY_SET; + } + + + public final Authorizations getTableAuths() { + String auths = conf.get(AccumuloSerDeParameters.AUTHORIZATIONS_KEY); + if (auths != null && !auths.isEmpty()) { + return new Authorizations(auths.trim().getBytes(StandardCharsets.UTF_8)); + } + return new Authorizations(); + } + + public Configuration getConf() { + return conf; + } + + public final AccumuloIndexScanner createScanner() throws AccumuloIndexScannerException { + AccumuloIndexScanner handler; + + String classname = conf.get(INDEX_SCANNER); + if (classname != null) { + try { + handler = (AccumuloIndexScanner) Class.forName(classname).newInstance(); + } catch (ClassCastException | InstantiationException | IllegalAccessException + | ClassNotFoundException e) { + throw new AccumuloIndexScannerException("Cannot use index scanner class: " + classname, e); + } + } else { + handler = new AccumuloDefaultIndexScanner(); + } + if (handler != null) { + handler.init(conf); + } + return handler; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDeParameters.java ---------------------------------------------------------------------- diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDeParameters.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDeParameters.java index 09c5f24..ef454f0 100644 --- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDeParameters.java +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDeParameters.java @@ -17,9 +17,11 @@ package org.apache.hadoop.hive.accumulo.serde; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.NoSuchElementException; import java.util.Properties; +import java.util.Set; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.ColumnVisibility; @@ -58,12 +60,21 @@ public class AccumuloSerDeParameters extends AccumuloConnectionParameters { public static final String COMPOSITE_ROWID_FACTORY = "accumulo.composite.rowid.factory"; public static final String COMPOSITE_ROWID_CLASS = "accumulo.composite.rowid"; + public static final int DEFAULT_MAX_ROWIDS = 20000; + public static final String INDEX_SCANNER = "accumulo.index.scanner"; + public static final String MAX_INDEX_ROWS = "accumulo.index.rows.max"; + public static final String INDEXED_COLUMNS = "accumulo.indexed.columns"; + public static final String INDEXTABLE_NAME = "accumulo.indextable.name"; + private static final Set<String> EMPTY_SET = new HashSet<String>(); + + protected final ColumnMapper columnMapper; private Properties tableProperties; private String serdeName; private LazySerDeParameters lazySerDeParameters; + private AccumuloIndexParameters indexParams; private AccumuloRowIdFactory rowIdFactory; public AccumuloSerDeParameters(Configuration conf, Properties tableProperties, String serdeName) @@ -73,6 +84,7 @@ public class AccumuloSerDeParameters extends AccumuloConnectionParameters { this.serdeName = serdeName; lazySerDeParameters = new LazySerDeParameters(conf, tableProperties, serdeName); + indexParams = new AccumuloIndexParameters(conf); // The default encoding for this table when not otherwise specified String defaultStorage = tableProperties.getProperty(DEFAULT_STORAGE_TYPE); @@ -135,10 +147,17 @@ public class AccumuloSerDeParameters extends AccumuloConnectionParameters { return new DefaultAccumuloRowIdFactory(); } + public AccumuloIndexParameters getIndexParams() { + return indexParams; + } + public LazySerDeParameters getSerDeParameters() { + return lazySerDeParameters; } + + public Properties getTableProperties() { return tableProperties; } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/package-info.java ---------------------------------------------------------------------- diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/package-info.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/package-info.java new file mode 100644 index 0000000..7311e87 --- /dev/null +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/package-info.java @@ -0,0 +1,4 @@ +/** + * accumulo serde classes + */ +package org.apache.hadoop.hive.accumulo.serde; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloDefaultIndexScanner.java ---------------------------------------------------------------------- diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloDefaultIndexScanner.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloDefaultIndexScanner.java new file mode 100644 index 0000000..7d6cc0e --- /dev/null +++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloDefaultIndexScanner.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.accumulo; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.accumulo.serde.AccumuloIndexParameters; +import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.io.Text; +import org.junit.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class TestAccumuloDefaultIndexScanner { + private static final Logger LOG = LoggerFactory.getLogger(TestAccumuloDefaultIndexScanner.class); + private static final Value EMPTY_VALUE = new Value(); + + private static void addRow(BatchWriter writer, String rowId, String cf, String cq) throws MutationsRejectedException { + Mutation mut = new Mutation(rowId); + mut.put(new Text(cf), new Text(cq), EMPTY_VALUE); + writer.addMutation(mut); + } + + private static void addRow(BatchWriter writer, Integer rowId, String cf, String cq) throws MutationsRejectedException { + Mutation mut = new Mutation(AccumuloIndexLexicoder.encodeValue(String.valueOf(rowId).getBytes(), "int", true)); + mut.put(new Text(cf), new Text(cq), EMPTY_VALUE); + writer.addMutation(mut); + } + + private static void addRow(BatchWriter writer, boolean rowId, String cf, String cq) throws MutationsRejectedException { + Mutation mut = new Mutation(String.valueOf(rowId)); + mut.put(new Text(cf), new Text(cq), EMPTY_VALUE); + writer.addMutation(mut); + } + + public static AccumuloDefaultIndexScanner buildMockHandler(int maxMatches) { + try { + String table = "table"; + Text emptyText = new Text(""); + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.INDEXTABLE_NAME, table); + conf.setInt(AccumuloIndexParameters.MAX_INDEX_ROWS, maxMatches); + conf.set(AccumuloIndexParameters.INDEXED_COLUMNS, "*"); + conf.set(serdeConstants.LIST_COLUMNS, "rid,name,age,cars,mgr"); + conf.set(AccumuloSerDeParameters.COLUMN_MAPPINGS, ":rowId,name:name,age:age,cars:cars,mgr:mgr"); + AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner(); + handler.init(conf); + + MockInstance inst = new MockInstance("test_instance"); + Connector conn = inst.getConnector("root", new PasswordToken("")); + if (!conn.tableOperations().exists(table)) { + conn.tableOperations().create(table); + BatchWriterConfig batchConfig = new BatchWriterConfig(); + BatchWriter writer = conn.createBatchWriter(table, batchConfig); + addRow(writer, "fred", "name_name", "row1"); + addRow(writer, "25", "age_age", "row1"); + addRow(writer, 5, "cars_cars", "row1"); + addRow(writer, true, "mgr_mgr", "row1"); + addRow(writer, "bill", "name_name", "row2"); + addRow(writer, "20", "age_age", "row2"); + addRow(writer, 2, "cars_cars", "row2"); + addRow(writer, false, "mgr_mgr", "row2"); + addRow(writer, "sally", "name_name", "row3"); + addRow(writer, "23", "age_age", "row3"); + addRow(writer, 6, "cars_cars", "row3"); + addRow(writer, true, "mgr_mgr", "row3"); + addRow(writer, "rob", "name_name", "row4"); + addRow(writer, "60", "age_age", "row4"); + addRow(writer, 1, "cars_cars", "row4"); + addRow(writer, false, "mgr_mgr", "row4"); + writer.close(); + } + AccumuloConnectionParameters connectionParams = Mockito + .mock(AccumuloConnectionParameters.class); + AccumuloStorageHandler storageHandler = Mockito.mock(AccumuloStorageHandler.class); + + Mockito.when(connectionParams.getConnector()).thenReturn(conn); + handler.setConnectParams(connectionParams); + return handler; + } catch (AccumuloSecurityException | AccumuloException | TableExistsException | TableNotFoundException e) { + LOG.error(e.getLocalizedMessage(), e); + } + return null; + } + + @Test + public void testMatchNone() { + AccumuloDefaultIndexScanner handler = buildMockHandler(10); + List<Range> ranges = handler.getIndexRowRanges("name", new Range("mike")); + assertEquals(0, ranges.size()); + } + + @Test + public void testMatchRange() { + AccumuloDefaultIndexScanner handler = buildMockHandler(10); + List<Range> ranges = handler.getIndexRowRanges("age", new Range("10", "50")); + assertEquals(3, ranges.size()); + assertTrue("does not contain row1", ranges.contains(new Range("row1"))); + assertTrue("does not contain row2", ranges.contains(new Range("row2"))); + assertTrue("does not contain row3", ranges.contains(new Range("row3"))); + } + + public void testTooManyMatches() { + AccumuloDefaultIndexScanner handler = buildMockHandler(2); + List<Range> ranges = handler.getIndexRowRanges("age", new Range("10", "50")); + assertNull("ranges should be null", ranges); + } + + @Test + public void testMatchExact() { + AccumuloDefaultIndexScanner handler = buildMockHandler(10); + List<Range> ranges = handler.getIndexRowRanges("age", new Range("20")); + assertEquals(1, ranges.size()); + assertTrue("does not contain row2", ranges.contains(new Range("row2"))); + } + + @Test + public void testValidIndex() { + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.INDEXED_COLUMNS, "name,age,phone,email"); + conf.set(AccumuloIndexParameters.INDEXTABLE_NAME, "contact"); + AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner(); + handler.init(conf); + assertTrue("name is not identified as an index", handler.isIndexed("name")); + assertTrue("age is not identified as an index", handler.isIndexed("age")); + assertTrue("phone is not identified as an index", handler.isIndexed("phone")); + assertTrue("email is not identified as an index", handler.isIndexed("email")); + } + + @Test + public void testInvalidIndex() { + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.INDEXED_COLUMNS, "name,age,phone,email"); + conf.set(AccumuloIndexParameters.INDEXTABLE_NAME, "contact"); + AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner(); + handler.init(conf); + assertFalse("mobile is identified as an index", handler.isIndexed("mobile")); + assertFalse("mail is identified as an index", handler.isIndexed("mail")); + } + + + @Test + public void testMissingTable() { + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.INDEXED_COLUMNS, "name,age,phone,email"); + AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner(); + handler.init(conf); + assertFalse("name is identified as an index", handler.isIndexed("name")); + assertFalse("age is identified as an index", handler.isIndexed("age")); + } + + @Test + public void testWildcardIndex() { + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.INDEXED_COLUMNS, "*"); + conf.set(AccumuloIndexParameters.INDEXTABLE_NAME, "contact"); + AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner(); + handler.init(conf); + assertTrue("name is not identified as an index", handler.isIndexed("name")); + assertTrue("age is not identified as an index", handler.isIndexed("age")); + } + + @Test + public void testNullIndex() { + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.INDEXTABLE_NAME, "contact"); + AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner(); + handler.init(conf); + assertTrue("name is not identified as an index", handler.isIndexed("name")); + } + + @Test + public void testEmptyIndex() { + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.INDEXED_COLUMNS, ""); + conf.set(AccumuloIndexParameters.INDEXTABLE_NAME, "contact"); + AccumuloDefaultIndexScanner handler = new AccumuloDefaultIndexScanner(); + handler.init(conf); + assertFalse("name is identified as an index", handler.isIndexed("name")); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexLexicoder.java ---------------------------------------------------------------------- diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexLexicoder.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexLexicoder.java new file mode 100644 index 0000000..b19f10e --- /dev/null +++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexLexicoder.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.accumulo; + +import org.apache.accumulo.core.client.lexicoder.BigIntegerLexicoder; +import org.apache.accumulo.core.client.lexicoder.DoubleLexicoder; +import org.apache.accumulo.core.client.lexicoder.IntegerLexicoder; +import org.apache.accumulo.core.client.lexicoder.LongLexicoder; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.junit.Assert; +import org.junit.Test; + +import java.math.BigInteger; +import java.nio.ByteBuffer; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * + */ +public class TestAccumuloIndexLexicoder { + + @Test + public void testBooleanString() { + byte[] value = Boolean.TRUE.toString().getBytes(UTF_8); + assertArrayEquals(AccumuloIndexLexicoder.encodeValue(value, serdeConstants.BOOLEAN_TYPE_NAME, + true), value); + } + + @Test + public void testBooleanBinary() { + byte[] value = new byte[] { 1 }; + assertArrayEquals(AccumuloIndexLexicoder.encodeValue(value, serdeConstants.BOOLEAN_TYPE_NAME, + false), Boolean.TRUE.toString().getBytes(UTF_8)); + } + + @Test + public void testIntString() { + byte[] value = "10".getBytes(UTF_8); + byte[] encoded = new IntegerLexicoder().encode(10); + + byte[] lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.INT_TYPE_NAME, true); + assertArrayEquals(lex, encoded); + + lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.SMALLINT_TYPE_NAME, true); + assertArrayEquals(lex, encoded); + + lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.TINYINT_TYPE_NAME, true); + assertArrayEquals(lex, encoded); + } + + @Test + public void testIntBinary() { + byte[] value = ByteBuffer.allocate(4).putInt(10).array(); + byte[] encoded = new IntegerLexicoder().encode(10); + + byte[] lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.INT_TYPE_NAME, false); + assertArrayEquals(lex, encoded); + + value = ByteBuffer.allocate(2).putShort((short) 10).array(); + lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.SMALLINT_TYPE_NAME, false); + assertArrayEquals(lex, encoded); + + value = ByteBuffer.allocate(1).put((byte)10).array(); + lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.TINYINT_TYPE_NAME, false); + assertArrayEquals(lex, encoded); + } + + @Test + public void testFloatBinary() { + byte[] value = ByteBuffer.allocate(4).putFloat(10.55f).array(); + byte[] encoded = new DoubleLexicoder().encode((double)10.55f); + String val = new String(encoded); + + byte[] lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.FLOAT_TYPE_NAME, false); + assertArrayEquals(lex, encoded); + + value = ByteBuffer.allocate(8).putDouble(10.55).array(); + encoded = new DoubleLexicoder().encode(10.55); + lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.DOUBLE_TYPE_NAME, false); + assertArrayEquals(lex, encoded); + } + + @Test + public void testFloatString() { + byte[] value = "10.55".getBytes(UTF_8); + byte[] encoded = new DoubleLexicoder().encode(10.55); + + byte[] lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.FLOAT_TYPE_NAME, true); + assertArrayEquals(lex, encoded); + + lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.DOUBLE_TYPE_NAME, true); + assertArrayEquals(lex, encoded); + } + + @Test + public void testBigIntBinary() { + byte[] value = new String("1232322323").getBytes(UTF_8); + byte[] encoded = new BigIntegerLexicoder().encode(new BigInteger("1232322323", 10)); + + byte[] lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.BIGINT_TYPE_NAME, true); + assertArrayEquals(lex, encoded); + + value = new BigInteger( "1232322323", 10 ).toByteArray(); + encoded = new BigIntegerLexicoder().encode(new BigInteger("1232322323", 10 )); + lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.BIGINT_TYPE_NAME, false); + assertArrayEquals(lex, encoded); + } + + @Test + public void testDecimalString() { + String strVal = "12323232233434"; + byte[] value = strVal.getBytes(UTF_8); + byte[] encoded = strVal.getBytes(UTF_8); + + byte[] lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.DECIMAL_TYPE_NAME, true); + assertArrayEquals(lex, encoded); + + + lex = AccumuloIndexLexicoder.encodeValue(value, "DECIMAL (10,3)", true); + assertArrayEquals(lex, encoded); + } + + @Test + public void testDecimalBinary() { + byte[] value = new BigInteger("12323232233434", 10).toString().getBytes(UTF_8); + byte[] encoded = new String(value).getBytes(UTF_8); + + byte[] lex = AccumuloIndexLexicoder.encodeValue(value, serdeConstants.DECIMAL_TYPE_NAME, false); + assertArrayEquals(lex, encoded); + } + + @Test + public void testDateString() { + String date = "2016-02-22"; + byte[] value = date.getBytes(UTF_8); + assertArrayEquals(AccumuloIndexLexicoder.encodeValue(value, serdeConstants.DATE_TYPE_NAME, + true), value); + } + + @Test + public void testDateTimeString() { + String timestamp = "2016-02-22 12:12:06.000000005"; + byte[] value = timestamp.getBytes(UTF_8); + assertArrayEquals(AccumuloIndexLexicoder.encodeValue(value, serdeConstants.TIMESTAMP_TYPE_NAME, + true), value); + } + + @Test + public void testString() { + String strVal = "The quick brown fox"; + byte[] value = strVal.getBytes(UTF_8); + assertArrayEquals(AccumuloIndexLexicoder.encodeValue(value, serdeConstants.STRING_TYPE_NAME, + true), value); + assertArrayEquals(AccumuloIndexLexicoder.encodeValue(value, "varChar(20)", + true), value); + assertArrayEquals(AccumuloIndexLexicoder.encodeValue(value, "CHAR (20)", + true), value); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexParameters.java ---------------------------------------------------------------------- diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexParameters.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexParameters.java new file mode 100644 index 0000000..976fd27 --- /dev/null +++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloIndexParameters.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.accumulo; + +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.accumulo.serde.AccumuloIndexParameters; +import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters; +import org.junit.Test; + +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TestAccumuloIndexParameters { + + public static class MockAccumuloIndexScanner implements AccumuloIndexScanner { + + @Override + public void init(Configuration conf) { + } + + @Override + public boolean isIndexed(String columnName) { + return false; + } + + @Override + public List<Range> getIndexRowRanges(String column, Range indexRange) { + return null; + } + } + + @Test + public void testDefaultScanner() { + try { + AccumuloIndexScanner scanner = new AccumuloIndexParameters(new Configuration()).createScanner(); + assertTrue(scanner instanceof AccumuloDefaultIndexScanner); + } catch (AccumuloIndexScannerException e) { + fail("Unexpected exception thrown"); + } + } + + @Test + public void testUserHandler() throws AccumuloIndexScannerException { + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.INDEX_SCANNER, MockAccumuloIndexScanner.class.getName()); + AccumuloIndexScanner scanner = new AccumuloIndexParameters(conf).createScanner(); + assertTrue(scanner instanceof MockAccumuloIndexScanner); + } + + @Test + public void testBadHandler() { + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.INDEX_SCANNER, "a.class.does.not.exist.IndexHandler"); + try { + AccumuloIndexScanner scanner = new AccumuloIndexParameters(conf).createScanner(); + } catch (AccumuloIndexScannerException e) { + return; + } + fail("Failed to throw exception for class not found"); + } + + @Test + public void getIndexColumns() { + Configuration conf = new Configuration(); + conf.set(AccumuloIndexParameters.INDEXED_COLUMNS, "a,b,c"); + Set<String> cols = new AccumuloIndexParameters(conf).getIndexColumns(); + assertEquals(3, cols.size()); + assertTrue("Missing column a", cols.contains("a")); + assertTrue("Missing column b", cols.contains("b")); + assertTrue("Missing column c", cols.contains("c")); + } + + @Test + public void getMaxIndexRows() { + Configuration conf = new Configuration(); + conf.setInt(AccumuloIndexParameters.MAX_INDEX_ROWS, 10); + int maxRows = new AccumuloIndexParameters(conf).getMaxIndexRows(); + assertEquals(10, maxRows); + } + + @Test + public void getAuths() { + Configuration conf = new Configuration(); + conf.set(AccumuloSerDeParameters.AUTHORIZATIONS_KEY, "public,open"); + Authorizations auths = new AccumuloIndexParameters(conf).getTableAuths(); + assertEquals(2, auths.size()); + assertTrue("Missing auth public", auths.contains("public")); + assertTrue("Missing auth open", auths.contains("open")); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java ---------------------------------------------------------------------- diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java index 0aaa782..8d195ee 100644 --- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java +++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloStorageHandler.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde.serdeConstants; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -59,6 +60,8 @@ public class TestAccumuloStorageHandler { Map<String,String> jobProperties = new HashMap<String,String>(); props.setProperty(AccumuloSerDeParameters.COLUMN_MAPPINGS, "cf:cq1,cf:cq2,cf:cq3"); + props.setProperty(serdeConstants.LIST_COLUMN_TYPES, "string:int:string"); + props.setProperty(serdeConstants.LIST_COLUMNS, "name,age,email"); props.setProperty(AccumuloSerDeParameters.TABLE_NAME, "table"); props.setProperty(AccumuloSerDeParameters.VISIBILITY_LABEL_KEY, "foo"); http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java ---------------------------------------------------------------------- diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java index 88e4530..0bb50e8 100644 --- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java +++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java @@ -488,6 +488,7 @@ public class TestAccumuloPredicateHandler { TypeInfoFactory.intTypeInfo, TypeInfoFactory.stringTypeInfo); conf.set(serdeConstants.LIST_COLUMNS, Joiner.on(',').join(columnNames)); conf.set(serdeConstants.LIST_COLUMN_TYPES, "string,int,string"); + conf.set(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE, ColumnEncoding.BINARY.getName()); String columnMappingStr = "cf:f1,cf:f2,:rowID"; conf.set(AccumuloSerDeParameters.COLUMN_MAPPINGS, columnMappingStr); columnMapper = new ColumnMapper(columnMappingStr, ColumnEncoding.STRING.getName(), columnNames, @@ -758,7 +759,7 @@ public class TestAccumuloPredicateHandler { String hiveRowIdColumnName = "rid"; Mockito.when(mockHandler.getRanges(conf, columnMapper)).thenCallRealMethod(); - Mockito.when(mockHandler.generateRanges(columnMapper, hiveRowIdColumnName, root)).thenReturn(null); + Mockito.when(mockHandler.generateRanges(conf, columnMapper, hiveRowIdColumnName, root)).thenReturn(null); Mockito.when(mockHandler.getExpression(conf)).thenReturn(root); // A null result from AccumuloRangeGenerator is all ranges @@ -776,7 +777,8 @@ public class TestAccumuloPredicateHandler { String hiveRowIdColumnName = "rid"; Mockito.when(mockHandler.getRanges(conf, columnMapper)).thenCallRealMethod(); - Mockito.when(mockHandler.generateRanges(columnMapper, hiveRowIdColumnName, root)).thenReturn(Collections.emptyList()); + Mockito.when(mockHandler.generateRanges(conf, columnMapper, hiveRowIdColumnName, root)) + .thenReturn(Collections.emptyList()); Mockito.when(mockHandler.getExpression(conf)).thenReturn(root); // A null result from AccumuloRangeGenerator is all ranges @@ -795,7 +797,7 @@ public class TestAccumuloPredicateHandler { Range r = new Range("a"); Mockito.when(mockHandler.getRanges(conf, columnMapper)).thenCallRealMethod(); - Mockito.when(mockHandler.generateRanges(columnMapper, hiveRowIdColumnName, root)).thenReturn(r); + Mockito.when(mockHandler.generateRanges(conf, columnMapper, hiveRowIdColumnName, root)).thenReturn(r); Mockito.when(mockHandler.getExpression(conf)).thenReturn(root); // A null result from AccumuloRangeGenerator is all ranges @@ -814,7 +816,8 @@ public class TestAccumuloPredicateHandler { Range r1 = new Range("a"), r2 = new Range("z"); Mockito.when(mockHandler.getRanges(conf, columnMapper)).thenCallRealMethod(); - Mockito.when(mockHandler.generateRanges(columnMapper, hiveRowIdColumnName, root)).thenReturn(Arrays.asList(r1, r2)); + Mockito.when(mockHandler.generateRanges(conf, columnMapper, hiveRowIdColumnName, root)) + .thenReturn(Arrays.asList(r1, r2)); Mockito.when(mockHandler.getExpression(conf)).thenReturn(root); // A null result from AccumuloRangeGenerator is all ranges
