mmiklavc commented on a change in pull request #1506: METRON-2188 Upgrade to HBase 2.0.2 URL: https://github.com/apache/metron/pull/1506#discussion_r323355605
########## File path: metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/mock/MockHTable.java ########## @@ -44,651 +35,757 @@ import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.util.Bytes; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListMap; + /** * MockHTable. * - * This implementation is a selected excerpt from https://gist.github.com/agaoglu/613217 + * This implementation is a selected excerpt from https://gist.github.com/agaoglu/613217 and + * https://github.com/rayokota/hgraphdb/blob/07c551f39a92b7ee2c8b48edcc7c0b314f6c3e33/src/main/java/org/apache/hadoop/hbase/client/mock/MockHTable.java. */ public class MockHTable implements Table { + private final TableName tableName; + private final List<String> columnFamilies = new ArrayList<>(); + private Configuration config; + private List<Put> putLog; + private final NavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>> data + = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); + + private static List<Cell> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, int maxVersions) { + return toKeyValue(row, rowdata, 0, Long.MAX_VALUE, maxVersions); + } + + private static List<Cell> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, long timestampStart, long timestampEnd, int maxVersions) { + List<Cell> ret = new ArrayList<>(); + for (byte[] family : rowdata.keySet()) { + for (byte[] qualifier : rowdata.get(family).keySet()) { + int versionsAdded = 0; + for (Map.Entry<Long, byte[]> tsToVal : rowdata.get(family).get(qualifier).descendingMap().entrySet()) { + if (versionsAdded++ == maxVersions) + break; + Long timestamp = tsToVal.getKey(); + if (timestamp < timestampStart) + continue; + if (timestamp > timestampEnd) + continue; + byte[] value = tsToVal.getValue(); + ret.add(new KeyValue(row, family, qualifier, timestamp, value)); + } + } + } + return ret; + } + + @SuppressWarnings("WeakerAccess") + public MockHTable(TableName tableName) { + this.tableName = tableName; + this.putLog = new ArrayList<>(); + } + + public MockHTable(TableName tableName, String... columnFamilies) { + this.tableName = tableName; + this.columnFamilies.addAll(Arrays.asList(columnFamilies)); + this.putLog = new ArrayList<>(); + } + + @SuppressWarnings("WeakerAccess") + public MockHTable(TableName tableName, List<String> columnFamilies) { + this.tableName = tableName; + this.columnFamilies.addAll(columnFamilies); + this.putLog = new ArrayList<>(); + } + + public int size() { + return data.size(); + } + + public byte[] getTableName() { + return getName().getName(); + } + + @Override + public TableName getName() { + return tableName; + } + + @Override + public Configuration getConfiguration() { + return config; + } + + public MockHTable setConfiguration(Configuration config) { + this.config = config; + return this; + } + + @Override + public HTableDescriptor getTableDescriptor() throws IOException { + HTableDescriptor table = new HTableDescriptor(tableName); + for (String columnFamily : columnFamilies) { + table.addFamily(new HColumnDescriptor(columnFamily)); + } + return table; + } + + @Override + public TableDescriptor getDescriptor() throws IOException { + return getTableDescriptor(); + } + + @Override + public boolean exists(Get get) throws IOException { + Result result = get(get); + return result != null && !result.isEmpty(); + } - private final String tableName; - private final List<String> columnFamilies = new ArrayList<>(); - private HColumnDescriptor[] descriptors; - private final List<Put> putLog; - private NavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>> data - = new TreeMap<>(Bytes.BYTES_COMPARATOR); - - private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, int maxVersions) { - return toKeyValue(row, rowdata, 0, Long.MAX_VALUE, maxVersions); - } - - private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, long timestampStart, long timestampEnd, int maxVersions) { - List<KeyValue> ret = new ArrayList<KeyValue>(); - for (byte[] family : rowdata.keySet()) - for (byte[] qualifier : rowdata.get(family).keySet()) { - int versionsAdded = 0; - for (Map.Entry<Long, byte[]> tsToVal : rowdata.get(family).get(qualifier).descendingMap().entrySet()) { - if (versionsAdded++ == maxVersions) - break; - Long timestamp = tsToVal.getKey(); - if (timestamp < timestampStart) - continue; - if (timestamp > timestampEnd) - continue; - byte[] value = tsToVal.getValue(); - ret.add(new KeyValue(row, family, qualifier, timestamp, value)); + /** + * Test for the existence of columns in the table, as specified by the Gets. + * + * <p>This will return an array of booleans. Each value will be true if the related Get matches + * one or more keys, false if not. + * + * <p>This is a server-side call so it prevents any data from being transferred to + * the client. + * + * @param gets the Gets + * @return Array of boolean. True if the specified Get matches one or more keys, false if not. + * @throws IOException e + */ + @Override + public boolean[] existsAll(List<Get> gets) throws IOException { + boolean[] ret = new boolean[gets.size()]; + int i = 0; + for(boolean b : exists(gets)) { + ret[i++] = b; + } + return ret; + } + + @Override + public boolean[] exists(List<Get> list) throws IOException { + boolean[] ret = new boolean[list.size()]; + int i = 0; + for(Get g : list) { + ret[i++] = exists(g); + } + return ret; + } + + @Override + public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException { + Object[] rows = batch(actions); + System.arraycopy(rows, 0, results, 0, rows.length); + } + + public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException { + Object[] results = new Object[actions.size()]; // same size. + for (int i = 0; i < actions.size(); i++) { + Row r = actions.get(i); + if (r instanceof Delete) { + delete((Delete) r); + results[i] = new Result(); + } + if (r instanceof Put) { + put((Put) r); + results[i] = new Result(); + } + if (r instanceof Get) { + Result result = get((Get) r); + results[i] = result; + } + if (r instanceof Increment) { + Result result = increment((Increment) r); + results[i] = result; + } + if (r instanceof Append) { + Result result = append((Append) r); + results[i] = result; + } + } + return results; + } + + @Deprecated + @Override + public <R> void batchCallback(final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback) throws IOException, InterruptedException { + throw new RuntimeException(this.getClass() + " does NOT implement this method."); + } + + @Override + public Result get(Get get) throws IOException { + if (!data.containsKey(get.getRow())) + return new Result(); + byte[] row = get.getRow(); + List<Cell> kvs = new ArrayList<>(); + Filter filter = get.getFilter(); + int maxResults = get.getMaxResultsPerColumnFamily(); + + if (!get.hasFamilies()) { + kvs = toKeyValue(row, data.get(row), get.getMaxVersions()); + if (filter != null) { + kvs = filter(filter, kvs); + } + if (maxResults >= 0 && kvs.size() > maxResults) { + kvs = kvs.subList(0, maxResults); + } + } else { + for (byte[] family : get.getFamilyMap().keySet()) { + if (data.get(row).get(family) == null) + continue; + NavigableSet<byte[]> qualifiers = get.getFamilyMap().get(family); + if (qualifiers == null || qualifiers.isEmpty()) + qualifiers = data.get(row).get(family).navigableKeySet(); + List<Cell> familyKvs = new ArrayList<>(); + for (byte[] qualifier : qualifiers) { + if (qualifier == null) + qualifier = "".getBytes(StandardCharsets.UTF_8); + if (!data.get(row).containsKey(family) || + !data.get(row).get(family).containsKey(qualifier) || + data.get(row).get(family).get(qualifier).isEmpty()) + continue; + Map.Entry<Long, byte[]> timestampAndValue = data.get(row).get(family).get(qualifier).lastEntry(); + familyKvs.add(new KeyValue(row, family, qualifier, timestampAndValue.getKey(), timestampAndValue.getValue())); + } + if (filter != null) { + familyKvs = filter(filter, familyKvs); + } + if (maxResults >= 0 && familyKvs.size() > maxResults) { + familyKvs = familyKvs.subList(0, maxResults); + } + kvs.addAll(familyKvs); + } + } + return Result.create(kvs); + } + + @Override + public Result[] get(List<Get> gets) throws IOException { + List<Result> results = new ArrayList<>(); + for (Get g : gets) { + results.add(get(g)); + } + return results.toArray(new Result[results.size()]); + } + + @Override + public ResultScanner getScanner(Scan scan) throws IOException { + final List<Result> ret = new ArrayList<>(); + byte[] st = scan.getStartRow(); + byte[] sp = scan.getStopRow(); + Filter filter = scan.getFilter(); + int maxResults = scan.getMaxResultsPerColumnFamily(); + + Set<byte[]> dataKeySet = scan.isReversed() ? data.descendingKeySet() : data.keySet(); + for (byte[] row : dataKeySet) { + // if row is equal to startRow emit it. When startRow (inclusive) and + // stopRow (exclusive) is the same, it should not be excluded which would + // happen w/o this control. + if (st != null && st.length > 0 && + Bytes.BYTES_COMPARATOR.compare(st, row) != 0) { + if (scan.isReversed()) { + // if row is before startRow do not emit, pass to next row + //noinspection ConstantConditions + if (st != null && st.length > 0 && + Bytes.BYTES_COMPARATOR.compare(st, row) <= 0) + continue; + // if row is equal to stopRow or after it do not emit, stop iteration + if (sp != null && sp.length > 0 && + Bytes.BYTES_COMPARATOR.compare(sp, row) > 0) + break; + } else { + // if row is before startRow do not emit, pass to next row + //noinspection ConstantConditions + if (st != null && st.length > 0 && + Bytes.BYTES_COMPARATOR.compare(st, row) > 0) + continue; + // if row is equal to stopRow or after it do not emit, stop iteration + if (sp != null && sp.length > 0 && + Bytes.BYTES_COMPARATOR.compare(sp, row) <= 0) + break; + } + } + + List<Cell> kvs; + if (!scan.hasFamilies()) { + kvs = toKeyValue(row, data.get(row), scan.getTimeRange().getMin(), scan.getTimeRange().getMax(), scan.getMaxVersions()); + if (filter != null) { + kvs = filter(filter, kvs); + } + if (maxResults >= 0 && kvs.size() > maxResults) { + kvs = kvs.subList(0, maxResults); + } + } else { + kvs = new ArrayList<>(); + for (byte[] family : scan.getFamilyMap().keySet()) { + if (data.get(row).get(family) == null) + continue; + NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family); + if (qualifiers == null || qualifiers.isEmpty()) + qualifiers = data.get(row).get(family).navigableKeySet(); + List<Cell> familyKvs = new ArrayList<>(); + for (byte[] qualifier : qualifiers) { + if (data.get(row).get(family).get(qualifier) == null) + continue; + List<KeyValue> tsKvs = new ArrayList<>(); + for (Long timestamp : data.get(row).get(family).get(qualifier).descendingKeySet()) { + if (timestamp < scan.getTimeRange().getMin()) + continue; + if (timestamp > scan.getTimeRange().getMax()) + continue; + byte[] value = data.get(row).get(family).get(qualifier).get(timestamp); + tsKvs.add(new KeyValue(row, family, qualifier, timestamp, value)); + if (tsKvs.size() == scan.getMaxVersions()) { + break; + } + } + familyKvs.addAll(tsKvs); + } + if (filter != null) { + familyKvs = filter(filter, familyKvs); + } + if (maxResults >= 0 && familyKvs.size() > maxResults) { + familyKvs = familyKvs.subList(0, maxResults); + } + kvs.addAll(familyKvs); + } + } + if (!kvs.isEmpty()) { + ret.add(Result.create(kvs)); + } + // Check for early out optimization + if (filter != null && filter.filterAllRemaining()) { + break; + } + } + + return new ResultScanner() { + private final Iterator<Result> iterator = ret.iterator(); + + @Override + public Iterator<Result> iterator() { + return iterator; + } + + @Override + public Result[] next(int nbRows) throws IOException { + ArrayList<Result> resultSets = new ArrayList<>(nbRows); + for (int i = 0; i < nbRows; i++) { + Result next = next(); + if (next != null) { + resultSets.add(next); + } else { + break; + } + } + return resultSets.toArray(new Result[resultSets.size()]); + } + + @Override + public Result next() throws IOException { + try { + return iterator().next(); + } catch (NoSuchElementException e) { + return null; + } + } + + @Override + public void close() { + } + + public ScanMetrics getScanMetrics() { + return null; + } + + public boolean renewLease() { + return false; + } + }; + } + + @Override + public ResultScanner getScanner(byte[] family) throws IOException { + Scan scan = new Scan(); + scan.addFamily(family); + return getScanner(scan); + } + + @Override + public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { + Scan scan = new Scan(); + scan.addColumn(family, qualifier); + return getScanner(scan); + } + + public List<Put> getPutLog() { + synchronized (putLog) { + return ImmutableList.copyOf(putLog); + } + } + + public void addToPutLog(Put put) { + synchronized(putLog) { + putLog.add(put); + } + } + + public void clear() { + synchronized (putLog) { + putLog.clear(); + } + data.clear(); + } + + @Override + public void put(Put put) throws IOException { + addToPutLog(put); + + byte[] row = put.getRow(); + NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowData = forceFind(data, row, new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR)); + for (byte[] family : put.getFamilyCellMap().keySet()) { + if (!columnFamilies.contains(new String(family, StandardCharsets.UTF_8))) { + throw new RuntimeException("Not Exists columnFamily : " + new String(family, StandardCharsets.UTF_8)); + } + NavigableMap<byte[], NavigableMap<Long, byte[]>> familyData = forceFind(rowData, family, new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR)); + for (Cell kv : put.getFamilyCellMap().get(family)) { + long ts = put.getTimeStamp(); + if (ts == HConstants.LATEST_TIMESTAMP) ts = System.currentTimeMillis(); + CellUtil.updateLatestStamp(kv, ts); + byte[] qualifier = CellUtil.cloneQualifier(kv); + NavigableMap<Long, byte[]> qualifierData = forceFind(familyData, qualifier, new ConcurrentSkipListMap<>()); + qualifierData.put(kv.getTimestamp(), CellUtil.cloneValue(kv)); + } + } + } + + /** + * Helper method to find a key in a map. If key is not found, newObject is + * added to map and returned + * + * @param map + * map to extract value from + * @param key + * key to look for + * @param newObject + * set key to this if not found + * @return found value or newObject if not found + */ + private <K, V> V forceFind(NavigableMap<K, V> map, K key, V newObject) { + V data = map.putIfAbsent(key, newObject); + if (data == null) { + data = newObject; + } + return data; + } + + @Override + public void put(List<Put> puts) throws IOException { + for (Put put : puts) { + put(put); + } + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException { + return checkAndPut(row, family, qualifier, CompareFilter.CompareOp.EQUAL, value, put); + } + + /** + * Atomically checks if a row/family/qualifier value matches the expected + * value. If it does, it adds the put. If the passed value is null, the check + * is for the lack of column (ie: non-existance) + * + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param compareOp comparison operator to use + * @param value the expected value + * @param put data to put if check succeeds + * @return true if the new put was executed, false otherwise + * @throws IOException e + */ + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException { + if (check(row, family, qualifier, compareOp, value)) { + put(put); + return true; } - } - return ret; - } - public MockHTable(String tableName) { - this.tableName = tableName; - this.putLog = new ArrayList<>(); - } - - public MockHTable(String tableName, String... columnFamilies) { - this(tableName); - for(String cf : columnFamilies) { - addColumnFamily(cf); - } - } - - public int size() { - return data.size(); - } - - public void addColumnFamily(String columnFamily) { - this.columnFamilies.add(columnFamily); - descriptors = new HColumnDescriptor[columnFamilies.size()]; - int i = 0; - for(String cf : columnFamilies) { - descriptors[i++] = new HColumnDescriptor(cf); - } - } - - public byte[] getTableName() { - return Bytes.toBytes(tableName); - } - - @Override - public TableName getName() { - return TableName.valueOf(tableName); - } - - @Override - public Configuration getConfiguration() { - return HBaseConfiguration.create(); - } - - @Override - public HTableDescriptor getTableDescriptor() throws IOException { - HTableDescriptor ret = new HTableDescriptor(tableName); - for(HColumnDescriptor c : descriptors) { - ret.addFamily(c); - } - return ret; - } - - @Override - public boolean exists(Get get) throws IOException { - if(get.getFamilyMap() == null || get.getFamilyMap().size() == 0) { - return data.containsKey(get.getRow()); - } else { - byte[] row = get.getRow(); - if(!data.containsKey(row)) { return false; - } - for(byte[] family : get.getFamilyMap().keySet()) { - if(!data.get(row).containsKey(family)) { - return false; + } + + @Override + public void delete(Delete delete) throws IOException { + byte[] row = delete.getRow(); + if (data.containsKey(row)) { Review comment: Original source versions had extra code for removing family cell maps and the like. Our original implementation also removed this. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
